SingingCat 0
application
queue.c
Go to the documentation of this file.
1#include <main-header.h>
2#include <queue.h>
3#include <command-parser.h>
4#include <command-handler.h>
5#include <multiram.h>
6#include <routing.h>
7#include <config.h>
8#include "espressif/esp8266_cloud.h"
9#include <ti1101.h>
10#include <route_command.h>
11#include "metrics/metrics.h"
12
13#define MAX_COM_RETRIES 30
14
15/****************************
16 * command send failure codes
17 **************************/
18// permanent failures
19#define SEND_BUF_RESERVED1 1
20#define SEND_BUF_RESERVED2 2
21#define SEND_NO_CLOUD 3
22#define SEND_NO_TARGET 5
23#define SEND_NO_RECIPIENT 6
24#define SEND_COM_FAILED_TO_ENCODE 7
25#define SEND_COM_NOT_ENCODED 12
26#define SEND_COM_INVALID 13
27#define SEND_COM_STILL_INVALID 14
28#define SEND_FAILURE_NODEV 16
29// temporary failures
30#define SEND_DEVICE_BUSY -113
31
32
33
34//#include <bare-metal.h>
50static int command_index = 0;
51static struct command *foocom;
52static char queuebuf[256]; // limits the max size of an outbound packet
53// must be larger than max commands that can be allocated
54#define COM_QUEUE_SIZE 80
55typedef struct queue_entry {
56 struct command * command;
58 pkt_callback callback;
61
62static void callback(struct queue_entry *q, struct command *com);
63static struct queue_entry out_queue[COM_QUEUE_SIZE];
64static int send_command_internal(struct command *com, int print);
65
66static int broadcast_device();
67
78#define COMMAND_AGE_BEFORE_GC 120
79
80void queue_init() {
81 memset(&out_queue, 0, sizeof(out_queue));
82}
88 printf("Outbound command queue cleared.\r\n");
89 queue_init();
90}
91void print_outbound_queue() {
92 int i;
93 int elements = get_mpm()->max_commands;
94 struct comnode *p;
95
96 for (i = 0; i < elements; i++) {
97 p = &get_mpm()->commem[i];
98 if (p->allocated == 0) {
99 continue;
100 }
101 printf("------- Queue position %i: ------\r\n", i);
102 command_print(&p->command);
103 }
104}
105
106struct command *alloc_command_with_minfree(int minfree) {
107 int i;
108 struct comnode *p;
109 struct comnode *res;
110 int elements = get_mpm()->max_commands;
111
112 int free = 0;
113
114 res = NULL;
115 for (i = 0; i < elements; i++) {
116 p = &get_mpm()->commem[i];
117 if (p->allocated == 0) {
118 free++;
119 if ((free >= minfree) && (res != NULL)) {
120 break;
121 }
122
123 if (res == NULL) {
124 res = p;
125 }
126 }
127 }
128 if (res != NULL) {
129 command_init(&res->command);
130 res->command.index = -1;
131 res->allocated = mculib_get_seconds_since_boot();
132 return &res->command;
133 }
134
135 // we have now command buffers left to allocate
136 res = NULL;
137 printf("command buffers are all allocated (reserved %i):\n", minfree);
138 for (i = 0; i < elements; i++) {
139 p = &get_mpm()->commem[i];
140 long now = mculib_get_seconds_since_boot();
141 long age = (now - p->allocated);
142 if (p->allocated != 0) {
143 printf("%i. (age seconds: %i) (%s) \r\n", i, (int)age, command_get_name(p->command.com));
144 //command_print(&p->command);
145 }
146 if ((age > COMMAND_AGE_BEFORE_GC) || (age < -10)) {
147 //IPLOG("Garbage collecting above command due to its age and using it!\r\n");
148 if (res == NULL) {
149 res = p; // update result (reuse last command gc'ed)
150 } else {
151 p->allocated = 0;
152 }
153 break;
154 }
155 }
156 if (res == NULL) {
157 IPLOG("BIG FAT WARNING - No command left to allocate (%i) (reserved %i).\r\n", elements, minfree);
158 return NULL;
159 }
160 printf("Reallocated garbage collected command: %p\r\n", &res->command);
161 command_init(&res->command);
162 return &res->command;
163}
164
165
174 return alloc_command_with_minfree(0);
175}
176
177
182 struct comnode *p;
183 int i;
184 int elements = sizeof(get_mpm()->commem) / sizeof(get_mpm()->commem[0]);
185
186 if (index == -1) {
187 for (i = 0; i < elements; i++) {
188 p = &get_mpm()->commem[i];
189 if (p->allocated != 0) {
190 free_command(&p->command);
191 }
192 }
193 }
194}
200void free_command(struct command *com) {
201 struct comnode *p;
202
203 p = (struct comnode *)com;
204 if (p->allocated == 0) {
205 // printf("BIG WARNING: DUPLICATE FREE OF COMMAND! \r\n");
206 return;
207 }
208 p->allocated = 0;
209}
210
211/*
212 * free any commands that are too old
213 */
214static void garbage_collect_commands() {
215 int i;
216 struct comnode *p;
217 int elements = get_mpm()->max_commands;
218
219 long now = mculib_get_seconds_since_boot();
220
221 for (i = 0; i < elements; i++) {
222 p = &get_mpm()->commem[i];
223 if (p->allocated == 0) {
224 continue;
225 }
226 long age = (now - p->allocated);
227 if ((age > COMMAND_AGE_BEFORE_GC) || (age < -10)) {
228 free_command((struct command *)p);
229 }
230 }
231}
232/*
233 * free queue entries which point to commands that are no longer valid
234 */
235static void garbage_collect_queue_entries() {
236 int elements = sizeof(out_queue) / sizeof(out_queue[0]);
237 int i;
238 struct comnode *p;
239
240 for (i = 0; i < elements; i++) {
241 if (out_queue[i].command == NULL) {
242 continue;
243 }
244 p = (struct comnode *)out_queue[i].command;
245 if (p->allocated == 0) {
246 out_queue[i].command = NULL;
247 }
248 }
249}
250
251/*
252 * set or clear the POWERSAVE flag, depending on module config
253 */
254static void set_power_mode_on_com(struct command *com) {
255 if (config_get_flag(CONFIG_FLAGS_POWER_SAVE)) {
256 com->flags |= COMFLAGS_POWERSAVE;
257 } else {
258 com->flags &= ~COMFLAGS_POWERSAVE;
259 }
260}
261
262
263
264
265
272 struct command *dcom;
273
274 dcom = alloc_command();
275 if (dcom == NULL) {
276 return NULL;
277 }
278 command_init(dcom);
279
280 dcom->target = com->sender;
281 dcom->encoding = 'a';
282 dcom->index = com->index;
283 dcom->sourcedev = com->sourcedev;
284 dcom->connid = com->connid;
285 dcom->com = com->com;
286 dcom->flags = COMFLAGS_DATA | COMFLAGS_SUCCESS;
287 return dcom;
288}
294int send_data(struct command *com, const char *format, ...) {
295 struct command *dcom;
296 va_list args;
297
298
299 dcom = get_data_reply(com);
300 if (dcom == NULL) {
301 return 1;
302 }
303 va_start(args, format);
304 command_add_varg(dcom, format, args);
305 send_command(dcom);
306 free_command(dcom);
307 va_end(args);
308 return 0;
309}
310
326int process_queue_reply(struct command *ack) {
327 int elements = sizeof(out_queue) / sizeof(out_queue[0]);
328 struct queue_entry *q;
329 int i;
330
331 for (i = 0; i < elements; i++) {
332 q = &out_queue[i];
333 if (q->command == NULL) {
334 continue;
335 }
336 // check if this is a reply to one of our commands
337 if ((q->command->index == ack->index)
338 && (q->command->target == ack->sender)
339 ) {
340 printf("That is my command at index %i\r\n", i);
341 callback(q, ack);
342 struct command *c = q->command;
343 if (c->target != 0xFFFFFFFF) {
344 q->command = NULL;
345 free_command(c);
346 }
347 return 1;
348 }
349 }
350 return 0;
351}
352
353static int get_next_command_index() {
354 if ((command_index > 32000) || (command_index < 0)) {
355 command_index = 0;
356 }
357 return ++command_index;
358}
375 printf("Sending command:\r\n");
376 int a = send_command_internal(com, 1);
377
378 if (a != 0) {
379 printf("Sending failed (code=%i)\r\n", a);
380 }
381 return a;
382}
383
384int send_command_quietly(struct command *com) {
385 return send_command_internal(com, 0);
386}
387
388/*
389 * \brief same as send_command plus flag wether to print command or not
390 * return: 0 ok, >0 on permanent errors, <0 on temporary errors
391 */
392static int send_command_internal(struct command *com, int p) {
393 struct hostroute *host;
394
395 // I cannot do this, because we might be FORWARDING a packet.
396 //com->sender = get_my_node_id();
397
398 //printf("Sending command to device %i: %s\r\n", com->sourcedev, queuebuf);
399 if (com->index == -1) {
400 com->index = get_next_command_index();
401 }
402 if (com->target == 0) {
403 return SEND_NO_TARGET;
404 }
405
406 // special case: send to server
407 if (com->target == CLOUD_SERVER) {
408 if ((esp_cloud_is_connected()) || ((com->local_flags & (1 << COM_LOCAL_FLAG_FORCE_DEVICE)) && (com->sourcedev == SOURCE_WIFI))) {
409 // send to server
410 com->sourcedev = 1;
411 com->recipient = CLOUD_SERVER;
412 } else {
413 // no direct route to target and no server connection:
414 // try to use a configured route
415 struct route *r = get_configured_route(com);
416 if (r != NULL) {
417 com->recipient = r->sendto;
418 com->sourcedev = r->out_device;
419 }
420 }
421 }
422
423 if (com->recipient == 0) {
424 if (com->target == BROADCAST) {
425 com->sourcedev = broadcast_device();
426 com->recipient = BROADCAST;
427 goto gotdev_no_recipt;
428 }
429 // forward to "best" device
430 host = routing_find_host(com->target);
431 if (host == NULL) {
432 // use server as default gateway
433 request_route(com->target);
434 if (esp_cloud_is_connected()) {
435 com->sourcedev = SOURCE_WIFI;
436 com->recipient = CLOUD_SERVER;
437 } else {
438 return SEND_NO_CLOUD; // cloud not connected
439 }
440 } else {
441 com->sourcedev = host->device;
442 com->recipient = host->nexthop;
443 }
444 }
445gotdev_no_recipt:
446
447 if (com->recipient == 0) {
448 return SEND_NO_RECIPIENT;
449 }
450
451 if ((com->sourcedev == 0) && (com->recipient = BROADCAST)) {
452 struct network_context *nc = nic_by_index(broadcast_device());
453 if (nc != NULL) {
454 com->sourcedev = nc->source;
455 }
456 }
457
458 if (com->sourcedev == 0) {
459 return SEND_FAILURE_NODEV;
460 }
461 // TODO: fix wifi netdev
462 if ((com->sourcedev != SOURCE_WIFI) && (nic_is_busy(com->sourcedev))) {
463 // device busy? return negative (temporary response)
464 return SEND_DEVICE_BUSY;
465 }
466 if (p) {
467 if (!is_command_valid(com)) {
468 printf("Cannot print at %p\r\n", com);
469 return SEND_COM_INVALID;
470 }
471 foocom = com;
472 command_print(com);
473 if (!is_command_valid(com)) {
474 printf("print broke stuff at %p (was: %p)\r\n", com, foocom);
475 return SEND_COM_STILL_INVALID;
476 }
477 }
478 if (!is_command_valid(com)) {
479 printf("Cannot encode at %p\r\n", com);
480 return SEND_COM_NOT_ENCODED;
481 }
482 set_power_mode_on_com(com);
483 int len = command_encode_ascii(com, 256, (char *)&queuebuf);
484
485 if (len < 0) {
486 printf("Failed to encode packet (%i) at %p.\r\n", len, com);
487 return SEND_COM_FAILED_TO_ENCODE;
488 }
489 printf("Sending via %s: %i bytes (%s)\r\n", command_get_source_name(com->sourcedev), len, &queuebuf);
490
491 int i = send_buf_via_device(com->sourcedev, (const byte *)&queuebuf, len);
492
493 if (i == 0) {
494 return 0;
495 }
496 printf("Failed to sent packet:\r\n");
497 command_print(com);
498 return i;
499 /*
500 * if (com->sourcedev == 1) {
501 * wifi_send_data(PKT_COMMAND, (const byte *)&queuebuf, len); // wifi
502 * return 0;
503 * } else if (com->sourcedev == 3) {
504 * printf("REPLY: %s\r\n", (char *)&queuebuf); // local tty
505 * return 2;
506 * } else if (com->sourcedev == 2) { // radio
507 * if (!ti1101_send(len, (const byte *)&queuebuf)) {
508 * printf("Not sent\r\n");
509 * return -1;
510 * }
511 * printf("Sent\r\n");
512 * return 0;
513 * } else {
514 * printf("******************\r\n");
515 * printf("Unable to send packet! Unknown source: %i\r\n", com->sourcedev);
516 * if (p) {
517 * command_print(com);
518 * }
519 * printf("******************\r\n");
520 * return 1;
521 * }
522 */
523}
524
533int send_command_fw_info(struct command *com, int err) {
534 struct command *reply;
535
536 reply = alloc_command();
537 if (reply == NULL) {
538 return 2;
539 }
540 command_init(reply);
541
542 reply->target = com->sender;
543 reply->encoding = 'a';
544 reply->index = com->index;
545 reply->sourcedev = com->sourcedev;
546 reply->connid = com->connid;
547 reply->com = com->com;
548 reply->flags = COMFLAGS_FORWARDED;
549 if (err == 0) {
550 reply->flags = reply->flags | COMFLAGS_ACK;
551 }
552 return deliver_command(reply, NULL);
553}
563 struct command *reply;
564
565 reply = alloc_command();
566 if (reply == NULL) {
567 return 2;
568 }
569 command_init(reply);
570
571 reply->target = com->sender;
572 reply->encoding = 'a';
573 reply->index = com->index;
574 reply->sourcedev = com->sourcedev;
575 reply->connid = com->connid;
576 reply->com = com->com;
577 reply->flags = flags & ~(COMFLAGS_FORWARDED); // clear forwarded flag. it is _not_ yet forwarded
578 return deliver_command(reply, NULL);
579}
588int send_command_reply_with_args(struct command *com, byte flags, const char *format, ...) {
589 va_list args;
590
591 struct command *reply;
592
593 reply = alloc_command();
594 if (reply == NULL) {
595 return 2;
596 }
597 command_init(reply);
598
599 reply->target = com->sender;
600 reply->encoding = 'a';
601 reply->index = com->index;
602 reply->sourcedev = com->sourcedev;
603 reply->connid = com->connid;
604 reply->com = com->com;
605 reply->flags = flags & ~(COMFLAGS_FORWARDED); // clear forwarded flag. it is _not_ yet forwarded
606 va_start(args, format);
607 command_add_varg(reply, format, args);
608 va_end(args);
609
610 return deliver_command(reply, NULL);
611}
615static struct queue_entry *alloc_free_queue_entry(struct command *com) {
616 int elements = sizeof(out_queue) / sizeof(out_queue[0]);
617 int i;
618
619 for (i = 0; i < elements; i++) {
620 if (out_queue[i].command == NULL) {
621 memset(&out_queue[i], 0, sizeof(out_queue[0]));
622 out_queue[i].command = com;
623 return &out_queue[i];
624 }
625 }
626 printf("BIG FAT WARNING - no free queue slot found!\r\n");
627 return NULL;
628}
629
651int deliver_command(struct command *com, pkt_callback cb) {
652 struct queue_entry *q;
653
654 if (!is_command_valid(com)) {
655 error_com(com);
656 return 100;
657 }
658 q = alloc_free_queue_entry(com);
659 if (q == NULL) {
660 printf("No available queue slot\r\n");
661 return 1;
662 }
663 q->callback = cb;
664 IncMetric_COM_QUEUE_ADDED;
665 return 0;
666}
667
668static void callback(struct queue_entry *q, struct command *com) {
669 if ((q == NULL)) {
670 return;
671 }
672 if (q->callback == NULL) {
673 return;
674 }
675 q->callback(q->command, com);
676}
677
683 int res;
684 int x;
685 int elements = sizeof(out_queue) / sizeof(out_queue[0]);
686 struct queue_entry *q;
687 struct command *c;
688 int i;
689 long now = mculib_get_seconds_since_boot();
690 char t[16];
691
692 garbage_collect_commands();
693 garbage_collect_queue_entries();
694 res = 0;
695 x = 0;
696 for (i = 0; i < elements; i++) {
697 q = &out_queue[i];
698 if (q->command == NULL) {
699 continue;
700 }
701 if (!is_command_valid(q->command)) {
702 printf("Command %i in queue is invalid\r\n", i);
703 continue;
704 }
705 res++;
706 if ((q->last_attempt != 0) && (now - q->last_attempt) <= 1) {
707 continue;
708 }
709 x++;
710 }
711 if (x == 0) {
712 return; // no elements are due
713 }
714 IPLOG("Processing queue with %i elements, due elements: %i\r\n", res, x);
715 for (i = 0; i < elements; i++) {
716 q = &out_queue[i];
717 if (q->command == NULL) {
718 continue;
719 }
720 if (q->last_attempt > now) {
721 q->last_attempt = now; // overflow fix?
722 continue;
723 }
724 if ((q->last_attempt != 0) && (now - q->last_attempt) <= 1) {
725 continue;
726 }
727 q->last_attempt = now;
728 c = q->command;
729 pretty_node_to_str(c->target, (char *)&t);
730 // try at most 5 times //
731 if (q->attemptctr > MAX_COM_RETRIES) {
732 callback(q, NULL);
733 q->command = NULL;
734 free_command(c);
735 printf("Discarded command, No response to command %i (target: %s)\r\n", i, t);
736 break;
737 }
738 printf("sending com %i (attempt #%i) to %s\r\n", i, q->attemptctr, t);
739 res = send_command(q->command);
740 if (res < 0) {
741 // temporarily failed to send
742 // such as, wifi AP is offline
743 // or radio is busy
744 printf("Queue: temporarily unable to send command %i\r\n", i);
745 } else if (res > 0) {
746 // permanently failed to send
747 // such as Invalid Address or so
748 printf("Queue: command #%i (type %i) unsendable (given up) reason: %i\r\n", i, q->command->com, res);
749 callback(q, NULL);
750 q->command = NULL;
751 free_command(c);
752 continue;
753 } else {
754 // we did _send_ it, so increase attemptctr
755 q->attemptctr++;
756 }
757
758 // any combination of these flags will only be retried if the send error was temporary, otherwise freed
759 if (res >= 0) {
760 if (c->flags & (COMFLAGS_ACK | COMFLAGS_FORWARDED | COMFLAGS_DATA)) {
761 callback(q, NULL);
762 q->command = NULL;
763 free_command(c);
764 }
765 }
766 // break; // we only process one command at a time to give time
767 // to process responses, if any
768 // cnw: I think this is non-sense, this method is being executed immediately
769 // after it exits anyways, so we might as well process them all ;-)
770 }
771}
772
776void request_route(long nodeid) {
777 struct command *com;
778
779 printf("Requesting route for %N\r\n", nodeid);
780 // wifi
781 com = alloc_command();
782 if (com == NULL) {
783 return;
784 }
785 com->sourcedev = SOURCE_WIFI;
786 com->com = 12; // announce
787 com->target = 0xFFFFFFFF;
788 com->recipient = 0xFFFFFFFF;
789 com->flags = 0;
790 command_add_arg(com, "%N", nodeid);
791
792 if (!config_get_wifidisable()) {
793 com->sourcedev = SOURCE_WIFI;
795 }
796
797 com->flags = COMFLAGS_ACK | COMFLAGS_SUCCESS;
798 // TODO: make this uniform for all nics
799 if (isready_by_type(SOURCE_RADIO)) {
800 com->sourcedev = SOURCE_RADIO;
802 }
803 if (isready_by_type(SOURCE_LORA)) {
804 com->sourcedev = SOURCE_LORA;
806 }
808}
809
814 int i;
815 int coms = 0;
816 int elements = sizeof(out_queue) / sizeof(out_queue[0]);
817 struct queue_entry *q;
818
819 for (i = 0; i < elements; i++) {
820 q = &out_queue[i];
821 if (q->command == NULL) {
822 continue;
823 }
824 if ((q->command->com == 1) || (q->command->com == 23)) {
825 continue;
826 }
827 coms++;
828 }
829 return coms;
830}
831
832static int broadcast_device() {
833 int i = 0;
834 int somewhat_good = 0;
835 int verygood = 0;
836
837 for (;;) {
838 struct network_context *nc = nic_by_index(i);
839 i++;
840 if (nc == NULL) {
841 break;
842 }
843 uint8_t type = nc->source;
844 if (!isonline_by_type(type)) {
845 continue;
846 }
847 somewhat_good = type;
848 if ((isready_by_type(type))) {
849 verygood = type;
850 }
851 }
852 if (verygood != 0) {
853 return verygood;
854 }
855 return somewhat_good;
856}
char * command_get_source_name(byte sourcedev)
returns a human readable text identifying a source device
int process_queue_reply(struct command *ack)
process a reply
Definition: queue.c:326
int command_encode_ascii(struct command *com, int bufsize, char *buf)
encode a command to an ascii blob properly surrounded by '{' and '}' and escaped returns length if ok...
long nexthop
here the nodeif of the intermediary hop (the proxy)
Definition: routing.h:27
int send_command_reply_with_args(struct command *com, byte flags, const char *format,...)
send a reply to a command
Definition: queue.c:588
void free_commands(int index)
free commands by index (-1 for all)
Definition: queue.c:181
int send_command(struct command *com)
send a command to another module (or broadcast)
Definition: queue.c:374
void free_command(struct command *com)
free a command
Definition: queue.c:200
int deliver_command(struct command *com, pkt_callback cb)
deliver a command to a module
Definition: queue.c:651
void command_add_arg(struct command *com, const char *format,...)
adds an arg to a partially initialised command structure
int get_outbound_command_count_important()
return number of commands to be delivered (apart from announce/noop)
Definition: queue.c:813
struct hostroute * routing_find_host(const long nodeid)
find route to host or NULL if none known
Definition: routing.c:315
int attemptctr
Definition: queue.c:57
long host
Definition: routing.h:21
void command_add_varg(struct command *com, const char *format, va_list args)
adds a varg list of parameters to a command
int send_command_reply(struct command *com, byte flags)
send a reply to a command
Definition: queue.c:562
const char * command_get_name(int num)
given a command number returns its name
void command_init(struct command *com)
initialize a command structure with default values
long last_attempt
Definition: queue.c:59
void clear_outbound_queue()
clear the outbound queue any commands within the queue are silently discarded
Definition: queue.c:87
#define COMMAND_AGE_BEFORE_GC
maximum amount of time a packet may remain allocated
Definition: queue.c:78
struct command * get_data_reply(struct command *com)
allocates and initializes a packet to be send as "data" to the command typically you'd add some data ...
Definition: queue.c:271
int send_data(struct command *com, const char *format,...)
send the format string as data in response to command "com"
Definition: queue.c:294
void command_print(struct command *com)
prints a command in human readable format to serial console
void process_command_queue()
this gets called when we got some cpu cycles spare we then send out commands and timeout other comman...
Definition: queue.c:682
struct command * alloc_command()
allocate a free command
Definition: queue.c:173
pkt_callback callback
Definition: queue.c:58
void request_route(long host)
send an arp request (equivalent)
Definition: queue.c:776
int send_command_fw_info(struct command *com, int err)
send a reply to a command
Definition: queue.c:533
definitions of routing table structures
int com
Definition: command.h:22
long target
Definition: command.h:16
uint8_t connid
Definition: command.h:18
uint8_t sourcedev
Definition: command.h:17
uint8_t encoding
Definition: command.h:11
long recipient
Definition: command.h:15
int index
Definition: command.h:13
uint8_t flags
Definition: command.h:23
long sender
Definition: command.h:14
Definition: queue.c:55