SingingCat  0
application
queue.c
Go to the documentation of this file.
1 #include <main-header.h>
2 #include <queue.h>
3 #include <command-parser.h>
4 #include <command-handler.h>
5 #include <multiram.h>
6 #include <routing.h>
7 #include <config.h>
8 #include "espressif/esp8266_cloud.h"
9 #include <ti1101.h>
10 #include <route_command.h>
11 #include "metrics/metrics.h"
12 
13 #define MAX_COM_RETRIES 30
14 
15 /****************************
16  * command send failure codes
17  **************************/
18 // permanent failures
19 #define SEND_BUF_RESERVED1 1
20 #define SEND_BUF_RESERVED2 2
21 #define SEND_NO_CLOUD 3
22 #define SEND_NO_TARGET 5
23 #define SEND_NO_RECIPIENT 6
24 #define SEND_COM_FAILED_TO_ENCODE 7
25 #define SEND_COM_NOT_ENCODED 12
26 #define SEND_COM_INVALID 13
27 #define SEND_COM_STILL_INVALID 14
28 #define SEND_FAILURE_NODEV 16
29 // temporary failures
30 #define SEND_DEVICE_BUSY -113
31 
32 
33 
34 //#include <bare-metal.h>
50 static int command_index = 0;
51 static struct command *foocom;
52 static char queuebuf[256]; // limits the max size of an outbound packet
53 #define COM_QUEUE_SIZE 50
54 typedef struct queue_entry {
55  struct command * command;
56  int attemptctr;
57  pkt_callback callback;
58  long last_attempt;
59 } _queue_entry;
60 
61 static void callback(struct queue_entry *q, struct command *com);
62 static struct queue_entry out_queue[COM_QUEUE_SIZE];
63 static int send_command_internal(struct command *com, int print);
64 
65 static int broadcast_device();
66 
77 #define COMMAND_AGE_BEFORE_GC 120
78 
79 void queue_init() {
80  memset(&out_queue, 0, sizeof(out_queue));
81 }
87  printf("Outbound command queue cleared.\r\n");
88  queue_init();
89 }
90 void print_outbound_queue() {
91  int i;
92  int elements = get_mpm()->max_commands;
93  struct comnode *p;
94 
95  for (i = 0; i < elements; i++) {
96  p = &get_mpm()->commem[i];
97  if (p->allocated == 0) {
98  continue;
99  }
100  printf("------- Queue position %i: ------\r\n", i);
101  command_print(&p->command);
102  }
103 }
104 
105 struct command *alloc_command_with_minfree(int minfree) {
106  int i;
107  struct comnode *p;
108  struct comnode *res;
109  int elements = get_mpm()->max_commands;
110 
111  int free=0;
112  res=NULL;
113  for (i = 0; i < elements; i++) {
114  p = &get_mpm()->commem[i];
115  if (p->allocated == 0) {
116  free++;
117  if ( (free>=minfree) && (res!=NULL) ){
118  break;
119  }
120 
121  if (res==NULL) {
122  res = p;
123  }
124  }
125  }
126  if (res!=NULL) {
127  command_init(&res->command);
128  res->command.index = -1;
129  res->allocated = mculib_get_seconds_since_boot();
130  return &res->command;
131  }
132 
133  // we have now command buffers left to allocate
134  res = NULL;
135  printf("command buffers are all allocated (reserved %i):\n",minfree);
136  for (i = 0; i < elements; i++) {
137  p = &get_mpm()->commem[i];
138  long now = mculib_get_seconds_since_boot();
139  long age = (now - p->allocated);
140  if (p->allocated != 0) {
141  printf("%i. (age seconds: %i) (%s) \r\n", i, (int)age, command_get_name(p->command.com));
142  //command_print(&p->command);
143  }
144  if ((age > COMMAND_AGE_BEFORE_GC) || (age < -10)) {
145  //IPLOG("Garbage collecting above command due to its age and using it!\r\n");
146  if (res == NULL) {
147  res = p; // update result (reuse last command gc'ed)
148  } else {
149  p->allocated = 0;
150  }
151  break;
152  }
153  }
154  if (res == NULL) {
155  IPLOG("BIG FAT WARNING - No command left to allocate (%i) (reserved %i).\r\n", elements,minfree);
156  return NULL;
157  }
158  printf("Reallocated garbage collected command: %p\r\n", &res->command);
159  command_init(&res->command);
160  return &res->command;
161 }
162 
163 
172  return alloc_command_with_minfree(0);
173 }
174 
175 
179 void free_commands(int index) {
180  struct comnode *p;
181  int i;
182  int elements = sizeof(get_mpm()->commem) / sizeof(get_mpm()->commem[0]);
183 
184  if (index == -1) {
185  for (i = 0; i < elements; i++) {
186  p = &get_mpm()->commem[i];
187  if (p->allocated != 0) {
188  free_command(&p->command);
189  }
190  }
191  }
192 }
198 void free_command(struct command *com) {
199  struct comnode *p;
200 
201  p = (struct comnode *)com;
202  if (p->allocated == 0) {
203  // printf("BIG WARNING: DUPLICATE FREE OF COMMAND! \r\n");
204  return;
205  }
206  p->allocated = 0;
207 }
208 
209 /*
210  * free any commands that are too old
211  */
212 static void garbage_collect_commands() {
213  int i;
214  struct comnode *p;
215  int elements = get_mpm()->max_commands;
216 
217  long now = mculib_get_seconds_since_boot();
218 
219  for (i = 0; i < elements; i++) {
220  p = &get_mpm()->commem[i];
221  if (p->allocated == 0) {
222  continue;
223  }
224  long age = (now - p->allocated);
225  if ((age > COMMAND_AGE_BEFORE_GC) || (age < -10)) {
226  free_command((struct command *)p);
227  }
228  }
229 }
230 /*
231  * free queue entries which point to commands that are no longer valid
232  */
233 static void garbage_collect_queue_entries() {
234  int elements = sizeof(out_queue) / sizeof(out_queue[0]);
235  int i;
236  struct comnode *p;
237 
238  for (i = 0; i < elements; i++) {
239  if (out_queue[i].command == NULL) {
240  continue;
241  }
242  p = (struct comnode *)out_queue[i].command;
243  if (p->allocated == 0) {
244  out_queue[i].command = NULL;
245  }
246  }
247 }
248 
249 /*
250  * set or clear the POWERSAVE flag, depending on module config
251  */
252 static void set_power_mode_on_com(struct command *com) {
253  if (config_get_flag(CONFIG_FLAGS_POWER_SAVE)) {
254  com->flags |= COMFLAGS_POWERSAVE;
255  } else {
256  com->flags &= ~COMFLAGS_POWERSAVE;
257  }
258 }
259 
260 
261 
262 
263 
269 struct command *get_data_reply(struct command *com) {
270  struct command *dcom;
271 
272  dcom = alloc_command();
273  if (dcom == NULL) {
274  return NULL;
275  }
276  command_init(dcom);
277 
278  dcom->target = com->sender;
279  dcom->encoding = 'a';
280  dcom->index = com->index;
281  dcom->sourcedev = com->sourcedev;
282  dcom->connid = com->connid;
283  dcom->com = com->com;
284  dcom->flags = COMFLAGS_DATA | COMFLAGS_SUCCESS;
285  return dcom;
286 }
292 int send_data(struct command *com, const char *format, ...) {
293  struct command *dcom;
294  va_list args;
295 
296 
297  dcom = get_data_reply(com);
298  if (dcom == NULL) {
299  return 1;
300  }
301  va_start(args, format);
302  command_add_varg(dcom, format, args);
303  send_command(dcom);
304  free_command(dcom);
305  va_end(args);
306  return 0;
307 }
308 
324 int process_queue_reply(struct command *ack) {
325  int elements = sizeof(out_queue) / sizeof(out_queue[0]);
326  struct queue_entry *q;
327  int i;
328 
329  for (i = 0; i < elements; i++) {
330  q = &out_queue[i];
331  if (q->command == NULL) {
332  continue;
333  }
334  // check if this is a reply to one of our commands
335  if ((q->command->index == ack->index)
336  && (q->command->target == ack->sender)
337  ) {
338  printf("That is my command at index %i\r\n", i);
339  callback(q, ack);
340  struct command *c = q->command;
341  if (c->target != 0xFFFFFFFF) {
342  q->command = NULL;
343  free_command(c);
344  }
345  return 1;
346  }
347  }
348  return 0;
349 }
350 
351 static int get_next_command_index() {
352  if ((command_index > 32000) || (command_index < 0)) {
353  command_index = 0;
354  }
355  return ++command_index;
356 }
372 int send_command(struct command *com) {
373  printf("Sending command:\r\n");
374  int a = send_command_internal(com, 1);
375 
376  if (a != 0) {
377  printf("Sending failed (code=%i)\r\n", a);
378  }
379  return a;
380 }
381 
382 int send_command_quietly(struct command *com) {
383  return send_command_internal(com, 0);
384 }
385 
386 /*
387  * \brief same as send_command plus flag wether to print command or not
388  * return: 0 ok, >0 on permanent errors, <0 on temporary errors
389  */
390 static int send_command_internal(struct command *com, int p) {
391  struct hostroute *host;
392 
393  // I cannot do this, because we might be FORWARDING a packet.
394  //com->sender = get_my_node_id();
395 
396  //printf("Sending command to device %i: %s\r\n", com->sourcedev, queuebuf);
397  if (com->index == -1) {
398  com->index = get_next_command_index();
399  }
400  if (com->target == 0) {
401  return SEND_NO_TARGET;
402  }
403 
404  // special case: send to server
405  if (com->target == CLOUD_SERVER) {
406  if ((esp_cloud_is_connected()) || ((com->local_flags & (1 << COM_LOCAL_FLAG_FORCE_DEVICE)) && (com->sourcedev == SOURCE_WIFI))) {
407  // send to server
408  com->sourcedev = 1;
409  com->recipient = CLOUD_SERVER;
410  } else {
411  // no direct route to target and no server connection:
412  // try to use a configured route
413  struct route *r = get_configured_route(com);
414  if (r != NULL) {
415  com->recipient = r->sendto;
416  com->sourcedev = r->out_device;
417  }
418  }
419  }
420 
421  if (com->recipient == 0) {
422  if (com->target == BROADCAST) {
423  com->sourcedev = broadcast_device();
424  com->recipient = BROADCAST;
425  goto gotdev_no_recipt;
426  }
427  // forward to "best" device
428  host = routing_find_host(com->target);
429  if (host == NULL) {
430  // use server as default gateway
431  request_route(com->target);
432  if (esp_cloud_is_connected()) {
433  com->sourcedev = SOURCE_WIFI;
434  com->recipient = CLOUD_SERVER;
435  } else {
436  return SEND_NO_CLOUD; // cloud not connected
437  }
438  } else {
439  com->sourcedev = host->device;
440  com->recipient = host->nexthop;
441  }
442  }
443 gotdev_no_recipt:
444 
445  if (com->recipient == 0) {
446  return SEND_NO_RECIPIENT;
447  }
448 
449  if ((com->sourcedev == 0) && (com->recipient = BROADCAST)) {
450  struct network_context *nc = nic_by_index(0);
451  if (nc != NULL) {
452  com->sourcedev = nc->source;
453  }
454  }
455 
456  if (com->sourcedev == 0) {
457  return SEND_FAILURE_NODEV;
458  }
459  // TODO: fix wifi netdev
460  if ((com->sourcedev != SOURCE_WIFI) && (nic_is_busy(com->sourcedev))) {
461  // device busy? return negative (temporary response)
462  return SEND_DEVICE_BUSY;
463  }
464  if (p) {
465  if (!is_command_valid(com)) {
466  printf("Cannot print at %p\r\n", com);
467  return SEND_COM_INVALID;
468  }
469  foocom = com;
470  command_print(com);
471  if (!is_command_valid(com)) {
472  printf("print broke stuff at %p (was: %p)\r\n", com, foocom);
473  return SEND_COM_STILL_INVALID;
474  }
475  }
476  if (!is_command_valid(com)) {
477  printf("Cannot encode at %p\r\n", com);
478  return SEND_COM_NOT_ENCODED;
479  }
480  set_power_mode_on_com(com);
481  int len = command_encode_ascii(com, 256, (char *)&queuebuf);
482 
483  if (len < 0) {
484  printf("Failed to encode packet (%i) at %p.\r\n", len, com);
485  return SEND_COM_FAILED_TO_ENCODE;
486  }
487  printf("Sending via %s: %i bytes (%s)\r\n", command_get_source_name(com->sourcedev), len, &queuebuf);
488 
489  int i = send_buf_via_device(com->sourcedev, (const byte *)&queuebuf, len);
490 
491  if (i == 0) {
492  return 0;
493  }
494  printf("Failed to sent packet:\r\n");
495  command_print(com);
496  return i;
497  /*
498  * if (com->sourcedev == 1) {
499  * wifi_send_data(PKT_COMMAND, (const byte *)&queuebuf, len); // wifi
500  * return 0;
501  * } else if (com->sourcedev == 3) {
502  * printf("REPLY: %s\r\n", (char *)&queuebuf); // local tty
503  * return 2;
504  * } else if (com->sourcedev == 2) { // radio
505  * if (!ti1101_send(len, (const byte *)&queuebuf)) {
506  * printf("Not sent\r\n");
507  * return -1;
508  * }
509  * printf("Sent\r\n");
510  * return 0;
511  * } else {
512  * printf("******************\r\n");
513  * printf("Unable to send packet! Unknown source: %i\r\n", com->sourcedev);
514  * if (p) {
515  * command_print(com);
516  * }
517  * printf("******************\r\n");
518  * return 1;
519  * }
520  */
521 }
522 
531 int send_command_fw_info(struct command *com, int err) {
532  struct command *reply;
533 
534  reply = alloc_command();
535  if (reply == NULL) {
536  return 2;
537  }
538  command_init(reply);
539 
540  reply->target = com->sender;
541  reply->encoding = 'a';
542  reply->index = com->index;
543  reply->sourcedev = com->sourcedev;
544  reply->connid = com->connid;
545  reply->com = com->com;
546  reply->flags = COMFLAGS_FORWARDED;
547  if (err == 0) {
548  reply->flags = reply->flags | COMFLAGS_ACK;
549  }
550  return deliver_command(reply, NULL);
551 }
560 int send_command_reply(struct command *com, byte flags) {
561  struct command *reply;
562 
563  reply = alloc_command();
564  if (reply == NULL) {
565  return 2;
566  }
567  command_init(reply);
568 
569  reply->target = com->sender;
570  reply->encoding = 'a';
571  reply->index = com->index;
572  reply->sourcedev = com->sourcedev;
573  reply->connid = com->connid;
574  reply->com = com->com;
575  reply->flags = flags & ~(COMFLAGS_FORWARDED); // clear forwarded flag. it is _not_ yet forwarded
576  return deliver_command(reply, NULL);
577 }
586 int send_command_reply_with_args(struct command *com, byte flags, const char *format, ...) {
587  va_list args;
588 
589  struct command *reply;
590 
591  reply = alloc_command();
592  if (reply == NULL) {
593  return 2;
594  }
595  command_init(reply);
596 
597  reply->target = com->sender;
598  reply->encoding = 'a';
599  reply->index = com->index;
600  reply->sourcedev = com->sourcedev;
601  reply->connid = com->connid;
602  reply->com = com->com;
603  reply->flags = flags & ~(COMFLAGS_FORWARDED); // clear forwarded flag. it is _not_ yet forwarded
604  va_start(args, format);
605  command_add_varg(reply, format, args);
606  va_end(args);
607 
608  return deliver_command(reply, NULL);
609 }
613 static struct queue_entry *alloc_free_queue_entry(struct command *com) {
614  int elements = sizeof(out_queue) / sizeof(out_queue[0]);
615  int i;
616 
617  for (i = 0; i < elements; i++) {
618  if (out_queue[i].command == NULL) {
619  memset(&out_queue[i], 0, sizeof(out_queue[0]));
620  out_queue[i].command = com;
621  return &out_queue[i];
622  }
623  }
624  printf("BIG FAT WARNING - no free queue slot found!\r\n");
625  return NULL;
626 }
627 
649 int deliver_command(struct command *com, pkt_callback cb) {
650  struct queue_entry *q;
651 
652  if (!is_command_valid(com)) {
653  error_com(com);
654  return 100;
655  }
656  q = alloc_free_queue_entry(com);
657  if (q == NULL) {
658  printf("No available queue slot\r\n");
659  return 1;
660  }
661  q->callback = cb;
662  IncMetric_COM_QUEUE_ADDED;
663  return 0;
664 }
665 
666 static void callback(struct queue_entry *q, struct command *com) {
667  if ((q == NULL)) {
668  return;
669  }
670  if (q->callback == NULL) {
671  return;
672  }
673  q->callback(q->command, com);
674 }
675 
681  int res;
682  int x;
683  int elements = sizeof(out_queue) / sizeof(out_queue[0]);
684  struct queue_entry *q;
685  struct command *c;
686  int i;
687  long now = mculib_get_seconds_since_boot();
688  char t[16];
689 
690  garbage_collect_commands();
691  garbage_collect_queue_entries();
692  res = 0;
693  x = 0;
694  for (i = 0; i < elements; i++) {
695  q = &out_queue[i];
696  if (q->command == NULL) {
697  continue;
698  }
699  if (!is_command_valid(q->command)) {
700  printf("Command %i in queue is invalid\r\n", i);
701  continue;
702  }
703  res++;
704  if ((q->last_attempt != 0) && (now - q->last_attempt) <= 1) {
705  continue;
706  }
707  x++;
708  }
709  if (x == 0) {
710  return; // no elements are due
711  }
712  IPLOG("Processing queue with %i elements, due elements: %i\r\n", res, x);
713  for (i = 0; i < elements; i++) {
714  q = &out_queue[i];
715  if (q->command == NULL) {
716  continue;
717  }
718  if (q->last_attempt > now) {
719  q->last_attempt = now; // overflow fix?
720  continue;
721  }
722  if ((q->last_attempt != 0) && (now - q->last_attempt) <= 1) {
723  continue;
724  }
725  q->last_attempt = now;
726  c = q->command;
727  pretty_node_to_str(c->target, (char *)&t);
728  // try at most 5 times //
729  if (q->attemptctr > MAX_COM_RETRIES) {
730  callback(q, NULL);
731  q->command = NULL;
732  free_command(c);
733  printf("Discarded command, No response to command %i (target: %s)\r\n", i, t);
734  break;
735  }
736  printf("sending com %i (attempt #%i) to %s\r\n", i, q->attemptctr, t);
737  res = send_command(q->command);
738  if (res < 0) {
739  // temporarily failed to send
740  // such as, wifi AP is offline
741  // or radio is busy
742  printf("Queue: temporarily unable to send command %i\r\n", i);
743  } else if (res > 0) {
744  // permanently failed to send
745  // such as Invalid Address or so
746  printf("Queue: command #%i (type %i) unsendable (given up) reason: %i\r\n", i, q->command->com, res);
747  callback(q, NULL);
748  q->command = NULL;
749  free_command(c);
750  continue;
751  } else {
752  // we did _send_ it, so increase attemptctr
753  q->attemptctr++;
754  }
755 
756  // any combination of these flags will only be retried if the send error was temporary, otherwise freed
757  if (res >= 0) {
758  if (c->flags & (COMFLAGS_ACK | COMFLAGS_FORWARDED | COMFLAGS_DATA)) {
759  callback(q, NULL);
760  q->command = NULL;
761  free_command(c);
762  }
763  }
764  // break; // we only process one command at a time to give time
765  // to process responses, if any
766  // cnw: I think this is non-sense, this method is being executed immediately
767  // after it exits anyways, so we might as well process them all ;-)
768  }
769 }
770 
774 void request_route(long nodeid) {
775  struct command *com;
776 
777  printf("Requesting route for %N\r\n", nodeid);
778  // wifi
779  com = alloc_command();
780  if (com == NULL) {
781  return;
782  }
783  com->sourcedev = SOURCE_WIFI;
784  com->com = 12; // announce
785  com->target = 0xFFFFFFFF;
786  com->recipient = 0xFFFFFFFF;
787  com->flags = 0;
788  command_add_arg(com, "%N", nodeid);
789 
790  if (!config_get_wifidisable()) {
791  com->sourcedev = SOURCE_WIFI;
792  send_command(com);
793  }
794 
795  com->flags = COMFLAGS_ACK | COMFLAGS_SUCCESS;
796  // TODO: make this uniform for all nics
797  if (isready_by_type(SOURCE_RADIO)) {
798  com->sourcedev = SOURCE_RADIO;
799  send_command(com);
800  }
801  if (isready_by_type(SOURCE_LORA)) {
802  com->sourcedev = SOURCE_LORA;
803  send_command(com);
804  }
805  free_command(com);
806 }
807 
812  int i;
813  int coms = 0;
814  int elements = sizeof(out_queue) / sizeof(out_queue[0]);
815  struct queue_entry *q;
816 
817  for (i = 0; i < elements; i++) {
818  q = &out_queue[i];
819  if (q->command == NULL) {
820  continue;
821  }
822  if ((q->command->com == 1) || (q->command->com == 23)) {
823  continue;
824  }
825  coms++;
826  }
827  return coms;
828 }
829 
830 static int broadcast_device() {
831  int i = 0;
832  int somewhat_good = 0;
833  int verygood = 0;
834 
835  for (;;) {
836  struct network_context *nc = nic_by_index(i);
837  i++;
838  if (nc == NULL) {
839  break;
840  }
841  uint8_t type = nc->source;
842  if (!isonline_by_type(type)) {
843  continue;
844  }
845  somewhat_good = type;
846  if ((isready_by_type(type))) {
847  verygood = type;
848  }
849  }
850  if (verygood != 0) {
851  return verygood;
852  }
853  return somewhat_good;
854 }
int process_queue_reply(struct command *ack)
process a reply
Definition: queue.c:324
int command_encode_ascii(struct command *com, int bufsize, char *buf)
encode a command to an ascii blob properly surrounded by '{' and '}' and escaped returns length if ok...
long nexthop
here the nodeif of the intermediary hop (the proxy)
Definition: routing.h:27
int send_command_reply_with_args(struct command *com, byte flags, const char *format,...)
send a reply to a command
Definition: queue.c:586
void free_commands(int index)
free commands by index (-1 for all)
Definition: queue.c:179
int send_command(struct command *com)
send a command to another module (or broadcast)
Definition: queue.c:372
void free_command(struct command *com)
free a command
Definition: queue.c:198
int deliver_command(struct command *com, pkt_callback cb)
deliver a command to a module
Definition: queue.c:649
struct command * get_data_reply(struct command *com)
allocates and initializes a packet to be send as "data" to the command typically you'd add some data ...
Definition: queue.c:269
struct command * alloc_command()
allocate a free command
Definition: queue.c:171
const char * command_get_name(int num)
given a command number returns its name
void command_add_arg(struct command *com, const char *format,...)
adds an arg to a partially initialised command structure
int get_outbound_command_count_important()
return number of commands to be delivered (apart from announce/noop)
Definition: queue.c:811
struct hostroute * routing_find_host(const long nodeid)
find route to host or NULL if none known
Definition: routing.c:315
int attemptctr
Definition: queue.c:56
long host
Definition: routing.h:21
void command_add_varg(struct command *com, const char *format, va_list args)
adds a varg list of parameters to a command
int send_command_reply(struct command *com, byte flags)
send a reply to a command
Definition: queue.c:560
void command_init(struct command *com)
initialize a command structure with default values
long last_attempt
Definition: queue.c:58
void clear_outbound_queue()
clear the outbound queue any commands within the queue are silently discarded
Definition: queue.c:86
#define COMMAND_AGE_BEFORE_GC
maximum amount of time a packet may remain allocated
Definition: queue.c:77
int send_data(struct command *com, const char *format,...)
send the format string as data in response to command "com"
Definition: queue.c:292
char * command_get_source_name(byte sourcedev)
returns a human readable text identifying a source device
void command_print(struct command *com)
prints a command in human readable format to serial console
void process_command_queue()
this gets called when we got some cpu cycles spare we then send out commands and timeout other comman...
Definition: queue.c:680
pkt_callback callback
Definition: queue.c:57
void request_route(long host)
send an arp request (equivalent)
Definition: queue.c:774
int send_command_fw_info(struct command *com, int err)
send a reply to a command
Definition: queue.c:531
definitions of routing table structures
int com
Definition: command.h:22
long target
Definition: command.h:16
uint8_t connid
Definition: command.h:18
uint8_t sourcedev
Definition: command.h:17
uint8_t encoding
Definition: command.h:11
long recipient
Definition: command.h:15
int index
Definition: command.h:13
uint8_t flags
Definition: command.h:23
long sender
Definition: command.h:14
Definition: queue.c:54