SingingCat 0
application
streams.c
Go to the documentation of this file.
1#include "streams.h"
2
3
14typedef struct stbuf {
15 int tn;
16 int fd;
17 int size;
18 byte buf[300];
19} _stbuf;
20
21static struct stbuf streambuf[10];
22
23
27struct stream streams[2];
28static byte streams_enabled;
29static struct stream *get_stream(int fd);
30static void sendfail(int fd, int tn, long sender, int err_no, const char *err);
31static int get_max_open_streams() {
32 return sizeof(streams) / sizeof(streams[0]);
33}
34static char errbuf[256]; // an error buffer, a bit like like errstr()
35void stream_init() {
36 errbuf[0] = 0;
37 memset(&streams, 0, sizeof(streams));
38 memset(&streambuf, 0, sizeof(streambuf));
40}
41static int are_streams_enabled() {
42 return streams_enabled ? 1 : 0;
43}
48 streams_enabled = 0;
49 printf("Streams disabled\r\n");
50}
55 streams_enabled = 1;
56 printf("Streams enabled\r\n");
57}
58
59
60static void clear_tns(struct stream *stream) {
61 int elements = sizeof(streambuf) / sizeof(streambuf[0]);
62 struct stbuf *sb;
63 int i;
64
65 for (i = 0; i < elements; i++) {
66 sb = &streambuf[i];
67 if (sb->fd != stream->fd) {
68 continue;
69 }
70 sb->fd = 0;
71 }
72}
76static struct stbuf *get_tn(struct stream *stream, int tn) {
77 int elements = sizeof(streambuf) / sizeof(streambuf[0]);
78 struct stbuf *sb;
79 int i;
80
81 for (i = 0; i < elements; i++) {
82 sb = &streambuf[i];
83 if ((sb->fd == stream->fd) && (sb->tn == tn)) {
84 return sb;
85 }
86 }
87 return NULL;
88}
89
93static void save_tn(struct stream *stream, int tn, const byte *buf, const int size) {
94 if (size > sizeof(streambuf[0].buf)) {
95 return; // too big to cache
96 }
97 if (get_tn(stream, tn)) {
98 return; // already saved
99 }
100 int elements = sizeof(streambuf) / sizeof(streambuf[0]);
101 struct stbuf *sb;
102 int i;
103
104 for (i = 0; i < elements; i++) {
105 sb = &streambuf[i];
106 if ((sb->fd == 0)
107 || ((sb->fd == stream->fd) && (sb->tn <= stream->ackedtn))
108 ) {
109 // use it
110 sb->fd = stream->fd;
111 sb->tn = tn;
112 sb->size = size;
113 for (i = 0; i < size; i++) {
114 sb->buf[i] = buf[i];
115 }
116 return;
117 }
118 }
119}
120
124int stream_associate(int fd, const struct consumerinfo *ci, byte *buf, int bufsize, int packetsize) {
125 struct stream *st;
126
127 st = get_stream(fd);
128 if (st == NULL) {
129 return 1;
130 }
131 if (packetsize > bufsize) {
132 return 2;
133 }
134 memset(buf, 0, bufsize);
135 st->lastack = 0;
136 st->ack_open = 0;
137 st->inactivity_ack_ctr = 0;
138 st->lasttn = 0;
139 st->failed = 0;
140 st->bytesinbuf = 0;
141 st->consumer = ci;
142 st->bufsize = bufsize;
143 st->packetsize = packetsize;
144 st->last_activity = mculib_get_seconds_since_boot();
145 if (bufsize == 1) {
146 st->bytebuf = &st->onebytebuf;
147 } else {
148 st->bytebuf = buf;
149 }
150 return 0;
151}
152
157 int i;
158 long now = mculib_get_seconds_since_boot();
159
160 for (i = 0; i < get_max_open_streams(); i++) {
161 if (streams[i].fd == 0) {
162 memset(&streams[i], 0, sizeof(streams[i]));
163 streams[i].fd = ((i + 1) * (mculib_get_seconds_since_boot() & 0xFFF) + 1);
164 // some defaults:
165 streams[i].ackfreq = 4;
166 streams[i].lastack = now;
167 streams[i].last_activity = now;
168 streams[i].target[0] = 0;
169 return &streams[i];
170 }
171 }
172 return NULL;
173}
174
175static struct stream *get_stream(int fd) {
176 int i;
177
178 if (fd == 0) {
179 return NULL;
180 }
181
182 for (i = 0; i < get_max_open_streams(); i++) {
183 if (streams[i].fd == fd) {
184 return &streams[i];
185 }
186 }
187 return NULL;
188}
189static struct stream *get_stream_by_target(const char *target) {
190 int i;
191
192 if ((target == NULL) || (strlen(target) == 0) || (strlen(target) > 16)) {
193 return NULL;
194 }
195 for (i = 0; i < get_max_open_streams(); i++) {
196 if (strcmp(target, streams[i].target) == 0) {
197 printf("Found stream by target %s: fd=%i\r\n", target, streams[i].fd);
198 return &streams[i];
199 }
200 }
201 return NULL;
202}
203
204void close_stream(int fd, int reason) {
205 struct stream *st;
206
207 st = get_stream(fd);
208 if (st == NULL) {
209 printf("Warning attempt to close stream #%i which is not open atm.\r\n", fd);
210 return;
211 }
212 printf("Closing stream %i, reason %i\r\n", fd, reason);
213 clear_tns(st);
214 if ((st->consumer != NULL) && (st->consumer->close_consumer != NULL)) {
215 st->consumer->close_consumer(fd, reason);
216 }
217
218 memset(st, 0, sizeof(streams[0]));
219 printf("Stream #%i closed.\r\n", fd);
220}
221
225static void repeat_sendack(struct stream *stream) {
226 struct command *com;
227
228 if (!are_streams_enabled()) {
229 return;
230 }
231
232 com = alloc_command();
233 if (com == NULL) {
234 printf("No memory to allocate command to send ack\r\n");
235 return;
236 }
238 com->target = stream->peer;
239 com->encoding = 'a';
240 com->com = 29; // streamcontrol
241 com->flags = COMFLAGS_SUCCESS | COMFLAGS_ACK;
242 command_add_arg(com, "fd=%i", stream->fd);
243 command_add_arg(com, "acktn=%i", stream->lasttn);
246}
247static void sendack(struct stream *stream) {
248 repeat_sendack(stream);
249 stream->ackedtn = stream->lasttn;
250 stream->lastack = mculib_get_seconds_since_boot();
251 stream->ack_open = 1;
252}
256static void sendfail(int fd, int tn, long sender, int err_no, const char *err) {
257 struct command *com;
258
259 com = alloc_command();
260 if (com == NULL) {
261 printf("No memory to allocate command to send ack\r\n");
262 return;
263 }
264 struct stream *st = get_stream(fd);
265
266 if (st != NULL) {
267 command_init(com);
268 com->target = sender;
269 com->encoding = 'a';
270 com->com = 29;
271 command_add_arg(com, "fd=%i", fd);
272 command_add_arg(com, "fc=%i", st->failed);
273 if (st->failed != 0) {
274 command_add_arg(com, "msg=%s", errbuf);
275 }
276 send_command(com);
277 free_command(com);
278 com = alloc_command();
279 if (com == NULL) {
280 printf("No memory to allocate command to send ack\r\n");
281 return;
282 }
283 }
284 command_init(com);
285 com->target = sender;
286 com->encoding = 'a';
287 com->com = 29;
288 command_add_arg(com, "fd=%i", fd);
289 command_add_arg(com, "tn=%i", tn);
290 command_add_arg(com, "errno=%i", err_no);
291 command_add_arg(com, "err=%s", err);
292 if (st != NULL) {
293 command_add_arg(com, "fc=%i", st->failed);
294 if (st->failed != 0) {
295 command_add_arg(com, "msg=%s", errbuf);
296 }
297 }
298 send_command(com);
299 free_command(com);
300}
301
305void streamsetup(struct command *com) {
306 struct stream *st = NULL;
307 struct command *data;
308
309 data = NULL;
310
311 if (!are_streams_enabled()) {
312 return;
313 }
314 const char *tgt = namedarg(com, "tgt"); // get the unique target (idempotency - if a stream with that same tgt (target) is open already, return that instead)
315
316 if (tgt != NULL) {
317 if (strlen(tgt) >= 16) {
318 printf("stream target too long\r\n");
319 return;
320 }
321 st = get_stream_by_target(tgt);
322 }
323 if (st == NULL) {
324 st = alloc_stream();
325 if (st == NULL) {
326 if (namedarg(com, "streaminit") != NULL) {
327 stream_init();
328 }
329 st = alloc_stream();
330 }
331 }
332 if (st == NULL) {
333 printf("No more available streams!\r\n");
334 goto fail;
335 }
336 if ((tgt != NULL)) {
337 strncpy(st->target, tgt, 16);
338 }
339 st->peer = com->sender;
340 data = get_data_reply(com);
341 if (data == NULL) {
342 goto fail;
343 }
344 command_add_arg(data, "fd=%i", st->fd);
345 const char *reqack = namedarg(com, "ackfreq");
346
347 if (reqack != NULL) {
348 int ra = atoi(reqack);
349 if (ra == 0) {
350 goto fail;
351 }
352 st->ackfreq = ra;
353 }
354 command_add_arg(data, "ackfreq=%i", st->ackfreq);
355 if (send_command(data) != 0) {
356 goto fail;
357 }
358
359 if (data != NULL) {
360 free_command(data);
361 }
362 send_command_reply(com, COMFLAGS_ACK | COMFLAGS_SUCCESS);
363 return;
364fail:
365 if (data != NULL) {
366 free_command(data);
367 }
368 if (st != NULL) {
369 close_stream(st->fd, 1);
370 }
371 send_command_reply(com, COMFLAGS_ACK);
372}
377static int call_consumer(struct stream *stream, byte *newdata, int size) {
378 int i;
379 int z;
380
381 // this should NEVER happen, but if it does bail out big time
382 if (stream->bytesinbuf >= stream->packetsize) {
383 stream->bytesinbuf = 0;
384 return 109;
385 }
386 // stream bytes in packets to consumer...
387 for (i = 0; i < size; i++) {
388 stream->bytebuf[stream->bytesinbuf++] = newdata[i];
389 if (stream->bytesinbuf >= stream->packetsize) {
390 int s = stream->bytesinbuf;
391 stream->bytesinbuf = 0;
392 if ((z = stream->consumer->data_consumer(stream->fd, stream->bytebuf, s)) != 0) {
393 return z;
394 }
395 }
396 }
397 return 0;
398}
402void flush(struct stream *stream) {
403 // call consumer one last time with the remaining data
404 int fd = stream->fd;
405 int s = stream->bytesinbuf;
406 int tn = stream->ackedtn;
407
408 printf("Stream: flush()\r\n");
409 stream->bytesinbuf = 0;
410 int z;
411
412 if (s != 0) {
413 if ((z = stream->consumer->data_consumer(stream->fd, stream->bytebuf, s)) != 0) {
414 printf("\r\nError: Stream #%i processing failure code %i\r\n", fd, z);
415 stream->failed = z;
416 sendfail(fd, tn, stream->peer, z, "Stream processing failure");
417 return;
418 }
419 }
420}
425void check_close(struct stream *stream) {
426 printf("Check_close: ackedtn: %i, closetn: %i\r\n", stream->ackedtn, stream->closetn);
427 if (!stream->closetn) {
428 return; // no close packet received yet
429 }
430 if (stream->ackedtn == (stream->closetn - 1)) {
431 flush(stream);
432 stream->lasttn = stream->closetn;
433 int i = 0;
434 for (i = 0; i < 5; i++) {
435 sendack(stream);
436 Delay(5);
437 }
438 close_stream(stream->fd, 0);
439 }
440}
441
451void streamdata(struct command *com) {
452 const int data_arg = 3;
453 const char *para;
454 struct stream *stream;
455 int i;
456 int len = -1;
457 int tn = 0;
458 int fd = 0;
459
460 if (!are_streams_enabled()) {
461 return;
462 }
463 if ((para = namedarg(com, "fd")) != NULL) {
464 fd = atoi(para);
465 }
466 if ((para = namedarg(com, "ln")) != NULL) {
467 len = atoi(para);
468 }
469 if ((para = namedarg(com, "tn")) != NULL) {
470 tn = atoi(para);
471 }
472
473 stream = get_stream(fd);
474 if (stream == NULL) {
475 printf("\r\nError: Stream #%i not open\r\n", fd);
476 sendfail(fd, tn, com->sender, 1, "Stream not open");
477 return;
478 }
479 if ((para = namedarg(com, "request")) != NULL) {
480 if (strcmp("close", para) == 0) {
481 printf("Closing stream due to peer's request\r\n");
482 sendack(stream);
483 stream->closetn = tn;
485 return;
486 } else if (strcmp("syncreq", para) == 0) {
487 printf("Sync requested\r\n");
488 repeat_sendack(stream);
489 return;
490 } else {
491 printf("\r\nError: Invalid stream request %s\r\n", para);
492 sendfail(fd, tn, com->sender, 5, "invalid stream request");
493 }
494 }
495
496 if (len != get_arg_size(com, data_arg)) {
497 printf("\r\nWarn: length mismatch sent=%i vs received=%i\r\n", len, get_arg_size(com, data_arg));
498 // ignore error, wait for resent
499 //sendfail(fd, tn, com->sender, 6, "length mismatch");
500 return;
501 }
502
503 if (stream->failed) {
504 sendfail(fd, tn, com->sender, stream->failed, "Stream processing failure.");
505 return;
506 }
507 if (com->argctr < 4) {
508 printf("\r\nError: Not enough parameters %i\r\n", com->argctr);
509 sendfail(fd, tn, com->sender, 2, "Not enough parameters for stream data");
510 return;
511 }
512 if (stream->consumer == NULL) {
513 printf("\r\nError: Stream #%i not associated with consumer\r\n", fd);
514 sendfail(fd, tn, com->sender, 3, "Stream not associated with consumer");
515 return;
516 }
517 if (stream->consumer->data_consumer == NULL) {
518 printf("\r\nError: Stream #%i associated with invalid consumer\r\n", fd);
519 sendfail(fd, tn, com->sender, 3, "Stream association invalid");
520 return;
521 }
522 if (tn < (stream->lasttn + 1)) {
523 printf("Warning: #%i received, but we're ahead at %i\r\n", tn, stream->lasttn);
524 stream->outoforderctr++;
525 if (stream->outoforderctr > 5) {
526 stream->outoforderctr = 0;
527 repeat_sendack(stream);
528 }
529 // ignore we had this before...
530 return;
531 }
532 byte *cbuf = (byte *)get_arg(com, data_arg);
533
534 if (tn > (stream->lasttn + 1)) {
535 // too large
536 printf("Out of sync: #%i received, but we're still at %i\r\n", tn, stream->lasttn);
537 stream->outoforderctr++;
538 save_tn(stream, tn, cbuf, len);
539 if (stream->outoforderctr > 5) {
540 stream->outoforderctr = 0;
541 repeat_sendack(stream);
542 }
543 return;
544 }
545 stream->outoforderctr = 0;
546 // got valid and good data!
547 //printf("Good Streamdata received: fd=%i, ln=%i, tn=%i, p=%p\r\n", fd, len, tn, stream);
548
549 i = call_consumer(stream, cbuf, len);
550 if (i != 0) {
551 printf("\r\nError: Stream #%i processing failure code %i\r\n", fd, i);
552 stream->failed = i;
553 sendfail(fd, tn, com->sender, i, "Stream processing failure");
554 return;
555 }
556 stream->lasttn = tn;
557 tn++;
558
559 // do we have more in cache?
560 struct stbuf *st;
561
562 while ((st = get_tn(stream, tn))) {
563 printf("Using cached tn %i\r\n", tn);
564 cbuf = (byte *)&st->buf;
565 len = st->size;
566 i = call_consumer(stream, cbuf, len);
567 if (i != 0) {
568 printf("\r\nError: Stream #%i processing failure code %i\r\n", fd, i);
569 stream->failed = i;
570 sendfail(fd, tn, com->sender, i, "Stream processing failure");
571 return;
572 }
573 tn++;
574 }
575 tn--; // last one was not available!
576 stream->lasttn = tn;
577 stream->ack_open = 0;
578 stream->last_activity = mculib_get_seconds_since_boot();
579 stream->inactivity_ack_ctr = 0;
580 // if ((stream->lasttn - stream->ackedtn) >= stream->ackfreq) {
581 sendack(stream);
582 //}
584 return;
585}
586
591 int i;
592 struct stream *st;
593
594 if (!are_streams_enabled()) {
595 return;
596 }
597 long now = mculib_get_seconds_since_boot();
598
599 for (i = 0; i < get_max_open_streams(); i++) {
600 st = &streams[i];
601 if (st->fd == 0) {
602 continue;
603 }
604 long diff = now - st->lastack;
605 long diffms = systick_diff(st->lastackms);
606 if (
607 ((st->inactivity_ack_ctr < 2) && (st->ack_open) && (diff >= 2))
608 || ((st->inactivity_ack_ctr < 5) && (diffms > 10))
609 ) {
610 st->lastackms = get_systickctr();
611 st->inactivity_ack_ctr++;
612 //printf("Stream %i stalled? sending ack for %i...\r\n", st->fd,st-lastack);
613 //sendack(st);
614 }
615 if ((now - st->last_activity) > 600) {
616 printf("Closing stream #%i due to inactivity\r\n", st->fd);
617 close_stream(st->fd, 3);
618 }
619 }
620}
625 int res = 0;
626 int i;
627 struct stream *st;
628
629 for (i = 0; i < get_max_open_streams(); i++) {
630 st = &streams[i];
631 if (st->fd == 0) {
632 continue;
633 }
634 res++;
635 }
636 return res;
637}
638
643 int i;
644 long now = mculib_get_seconds_since_boot();
645 struct stream *st;
646
647 printf("Stream info (at %D):\r\n", now);
648 for (i = 0; i < get_max_open_streams(); i++) {
649 st = &streams[i];
650 if (st->fd == 0) {
651 continue;
652 }
653 printf("Stream #%i\r\n", st->fd);
654 printf(" Ack Frequency: %i\r\n", st->ackfreq);
655 printf(" Ack Open: %i\r\n", st->ack_open);
656 printf(" Inactivity ctr: %i\r\n", st->inactivity_ack_ctr);
657 printf(" acked tn: %i\r\n", st->ackedtn);
658 printf(" last tn: %i\r\n", st->lasttn);
659 printf(" peer: %N\r\n", st->peer);
660 printf(" lastack secs: %i\r\n", st->lastack);
661 printf(" last activity: %i\r\n", st->last_activity);
662 printf(" byte consumer: %s\r\n", (st->consumer == NULL) ? "NOT SET" : "SET");
663 printf(" packetsize: %i\r\n", st->packetsize);
664 printf(" bufsize: %i\r\n", st->bufsize);
665 printf(" bytes in buf: %i\r\n", st->bytesinbuf);
666 printf(" closing: %i\r\n", st->closetn);
667 printf(" target: %s\r\n", st->target);
668 }
669 printf("End Stream info\r\n");
670}
671
672void stream_seterror(int streamfd, const char *format, ...) {
673 char *buf = &errbuf[0];
674 size_t size = sizeof(errbuf);
675 struct stream *st = get_stream(streamfd);
676 va_list args;
677
678 va_start(args, format);
679 vsnprintf(buf, size, format, args);
680 va_end(args);
681 if (st == NULL) {
682 return;
683 }
684 if (st->failed == 0) {
685 st->failed = 1;
686 }
687 printf("stream failed: %s\r\n", errbuf);
688}
void streamdata(struct command *com)
called when we receive a stream data packet TODO: this is a really simplistic, memory-consumption opt...
Definition: streams.c:451
void stream_loop()
called periodically, checks for stale streams and/or hung ones
Definition: streams.c:590
int stream_associate(int fd, const struct consumerinfo *ci, byte *buf, int bufsize, int packetsize)
associate a stream with a consumer. return 0 if ok otherwise errorcode
Definition: streams.c:124
int send_command(struct command *com)
send a command to another module (or broadcast)
Definition: queue.c:374
void free_command(struct command *com)
free a command
Definition: queue.c:200
struct stream streams[2]
mgmt for streams (the more ram, the more simultanous streams we can handle
Definition: streams.c:27
void command_add_arg(struct command *com, const char *format,...)
adds an arg to a partially initialised command structure
void streamsetup(struct command *com)
called when we receive a stream setup packet
Definition: streams.c:305
const char * namedarg(struct command *com, const char *name)
get a named arg (key-value pair) or NULL
struct stream * alloc_stream()
find a free stream struct, alloc an fd and return struct
Definition: streams.c:156
void print_stream_info()
print summary of current streams to console
Definition: streams.c:642
int send_command_reply(struct command *com, byte flags)
send a reply to a command
Definition: queue.c:562
void flush(struct stream *stream)
send remaining data to consumer (might be < packetsize!)
Definition: streams.c:402
void check_close(struct stream *stream)
check if closing flash ist set and we received all the data up to close packet if,...
Definition: streams.c:425
void command_init(struct command *com)
initialize a command structure with default values
int count_open_streams()
count number of open streams
Definition: streams.c:624
void stream_enable()
enable all stream handling (default)
Definition: streams.c:54
struct command * get_data_reply(struct command *com)
allocates and initializes a packet to be send as "data" to the command typically you'd add some data ...
Definition: queue.c:271
void stream_disable()
disable all stream handling
Definition: streams.c:47
struct command * alloc_command()
allocate a free command
Definition: queue.c:173
const char * get_arg(const struct command *com, int index)
given an argument by index[0..n], will return a pointer to the bytearray (excluding the fieldtype) th...
int get_arg_size(const struct command *com, int index)
given an argument by index [0..n], will returns the size of the value in bytes. for a string/array it...
int com
Definition: command.h:22
long target
Definition: command.h:16
uint8_t encoding
Definition: command.h:11
uint8_t argctr
Definition: command.h:24
long sender
Definition: command.h:14
Definition: streams.c:14
Definition: streams.h:14