21static struct stbuf streambuf[10];
28static byte streams_enabled;
29static struct stream *get_stream(
int fd);
30static void sendfail(
int fd,
int tn,
long sender,
int err_no,
const char *err);
31static int get_max_open_streams() {
34static char errbuf[256];
38 memset(&streambuf, 0,
sizeof(streambuf));
41static int are_streams_enabled() {
42 return streams_enabled ? 1 : 0;
49 printf(
"Streams disabled\r\n");
56 printf(
"Streams enabled\r\n");
61 int elements =
sizeof(streambuf) /
sizeof(streambuf[0]);
65 for (i = 0; i < elements; i++) {
67 if (sb->fd !=
stream->fd) {
77 int elements =
sizeof(streambuf) /
sizeof(streambuf[0]);
81 for (i = 0; i < elements; i++) {
83 if ((sb->fd ==
stream->fd) && (sb->tn == tn)) {
93static void save_tn(
struct stream *
stream,
int tn,
const byte *buf,
const int size) {
94 if (size >
sizeof(streambuf[0].buf)) {
100 int elements =
sizeof(streambuf) /
sizeof(streambuf[0]);
104 for (i = 0; i < elements; i++) {
107 || ((sb->fd ==
stream->fd) && (sb->tn <=
stream->ackedtn))
113 for (i = 0; i < size; i++) {
131 if (packetsize > bufsize) {
134 memset(buf, 0, bufsize);
137 st->inactivity_ack_ctr = 0;
142 st->bufsize = bufsize;
143 st->packetsize = packetsize;
144 st->last_activity = mculib_get_seconds_since_boot();
146 st->bytebuf = &st->onebytebuf;
158 long now = mculib_get_seconds_since_boot();
160 for (i = 0; i < get_max_open_streams(); i++) {
163 streams[i].fd = ((i + 1) * (mculib_get_seconds_since_boot() & 0xFFF) + 1);
167 streams[i].last_activity = now;
175static struct stream *get_stream(
int fd) {
182 for (i = 0; i < get_max_open_streams(); i++) {
189static struct stream *get_stream_by_target(
const char *target) {
192 if ((target == NULL) || (strlen(target) == 0) || (strlen(target) > 16)) {
195 for (i = 0; i < get_max_open_streams(); i++) {
196 if (strcmp(target,
streams[i].target) == 0) {
197 printf(
"Found stream by target %s: fd=%i\r\n", target,
streams[i].fd);
204void close_stream(
int fd,
int reason) {
209 printf(
"Warning attempt to close stream #%i which is not open atm.\r\n", fd);
212 printf(
"Closing stream %i, reason %i\r\n", fd, reason);
214 if ((st->consumer != NULL) && (st->consumer->close_consumer != NULL)) {
215 st->consumer->close_consumer(fd, reason);
218 memset(st, 0,
sizeof(
streams[0]));
219 printf(
"Stream #%i closed.\r\n", fd);
228 if (!are_streams_enabled()) {
234 printf(
"No memory to allocate command to send ack\r\n");
241 com->flags = COMFLAGS_SUCCESS | COMFLAGS_ACK;
250 stream->lastack = mculib_get_seconds_since_boot();
256static void sendfail(
int fd,
int tn,
long sender,
int err_no,
const char *err) {
261 printf(
"No memory to allocate command to send ack\r\n");
264 struct stream *st = get_stream(fd);
273 if (st->failed != 0) {
280 printf(
"No memory to allocate command to send ack\r\n");
294 if (st->failed != 0) {
311 if (!are_streams_enabled()) {
317 if (strlen(tgt) >= 16) {
318 printf(
"stream target too long\r\n");
321 st = get_stream_by_target(tgt);
333 printf(
"No more available streams!\r\n");
337 strncpy(st->target, tgt, 16);
339 st->peer =
com->sender;
347 if (reqack != NULL) {
348 int ra = atoi(reqack);
369 close_stream(st->fd, 1);
377static int call_consumer(
struct stream *
stream,
byte *newdata,
int size) {
387 for (i = 0; i < size; i++) {
390 int s =
stream->bytesinbuf;
405 int s =
stream->bytesinbuf;
408 printf(
"Stream: flush()\r\n");
414 printf(
"\r\nError: Stream #%i processing failure code %i\r\n", fd, z);
416 sendfail(fd, tn,
stream->peer, z,
"Stream processing failure");
426 printf(
"Check_close: ackedtn: %i, closetn: %i\r\n",
stream->ackedtn,
stream->closetn);
434 for (i = 0; i < 5; i++) {
438 close_stream(
stream->fd, 0);
452 const int data_arg = 3;
460 if (!are_streams_enabled()) {
463 if ((para =
namedarg(com,
"fd")) != NULL) {
466 if ((para =
namedarg(com,
"ln")) != NULL) {
469 if ((para =
namedarg(com,
"tn")) != NULL) {
475 printf(
"\r\nError: Stream #%i not open\r\n", fd);
476 sendfail(fd, tn, com->
sender, 1,
"Stream not open");
479 if ((para =
namedarg(com,
"request")) != NULL) {
480 if (strcmp(
"close", para) == 0) {
481 printf(
"Closing stream due to peer's request\r\n");
486 }
else if (strcmp(
"syncreq", para) == 0) {
487 printf(
"Sync requested\r\n");
491 printf(
"\r\nError: Invalid stream request %s\r\n", para);
492 sendfail(fd, tn, com->
sender, 5,
"invalid stream request");
497 printf(
"\r\nWarn: length mismatch sent=%i vs received=%i\r\n", len,
get_arg_size(com, data_arg));
504 sendfail(fd, tn, com->
sender,
stream->failed,
"Stream processing failure.");
508 printf(
"\r\nError: Not enough parameters %i\r\n", com->
argctr);
509 sendfail(fd, tn, com->
sender, 2,
"Not enough parameters for stream data");
512 if (
stream->consumer == NULL) {
513 printf(
"\r\nError: Stream #%i not associated with consumer\r\n", fd);
514 sendfail(fd, tn, com->
sender, 3,
"Stream not associated with consumer");
517 if (
stream->consumer->data_consumer == NULL) {
518 printf(
"\r\nError: Stream #%i associated with invalid consumer\r\n", fd);
519 sendfail(fd, tn, com->
sender, 3,
"Stream association invalid");
522 if (tn < (
stream->lasttn + 1)) {
523 printf(
"Warning: #%i received, but we're ahead at %i\r\n", tn,
stream->lasttn);
525 if (
stream->outoforderctr > 5) {
526 stream->outoforderctr = 0;
532 byte *cbuf = (
byte *)
get_arg(com, data_arg);
534 if (tn > (
stream->lasttn + 1)) {
536 printf(
"Out of sync: #%i received, but we're still at %i\r\n", tn,
stream->lasttn);
538 save_tn(
stream, tn, cbuf, len);
539 if (
stream->outoforderctr > 5) {
540 stream->outoforderctr = 0;
545 stream->outoforderctr = 0;
549 i = call_consumer(
stream, cbuf, len);
551 printf(
"\r\nError: Stream #%i processing failure code %i\r\n", fd, i);
553 sendfail(fd, tn, com->
sender, i,
"Stream processing failure");
562 while ((st = get_tn(
stream, tn))) {
563 printf(
"Using cached tn %i\r\n", tn);
564 cbuf = (
byte *)&st->buf;
566 i = call_consumer(
stream, cbuf, len);
568 printf(
"\r\nError: Stream #%i processing failure code %i\r\n", fd, i);
570 sendfail(fd, tn, com->
sender, i,
"Stream processing failure");
578 stream->last_activity = mculib_get_seconds_since_boot();
579 stream->inactivity_ack_ctr = 0;
594 if (!are_streams_enabled()) {
597 long now = mculib_get_seconds_since_boot();
599 for (i = 0; i < get_max_open_streams(); i++) {
604 long diff = now - st->lastack;
605 long diffms = systick_diff(st->lastackms);
607 ((st->inactivity_ack_ctr < 2) && (st->ack_open) && (diff >= 2))
608 || ((st->inactivity_ack_ctr < 5) && (diffms > 10))
610 st->lastackms = get_systickctr();
611 st->inactivity_ack_ctr++;
615 if ((now - st->last_activity) > 600) {
616 printf(
"Closing stream #%i due to inactivity\r\n", st->fd);
617 close_stream(st->fd, 3);
629 for (i = 0; i < get_max_open_streams(); i++) {
644 long now = mculib_get_seconds_since_boot();
647 printf(
"Stream info (at %D):\r\n", now);
648 for (i = 0; i < get_max_open_streams(); i++) {
653 printf(
"Stream #%i\r\n", st->fd);
654 printf(
" Ack Frequency: %i\r\n", st->ackfreq);
655 printf(
" Ack Open: %i\r\n", st->ack_open);
656 printf(
" Inactivity ctr: %i\r\n", st->inactivity_ack_ctr);
657 printf(
" acked tn: %i\r\n", st->ackedtn);
658 printf(
" last tn: %i\r\n", st->lasttn);
659 printf(
" peer: %N\r\n", st->peer);
660 printf(
" lastack secs: %i\r\n", st->lastack);
661 printf(
" last activity: %i\r\n", st->last_activity);
662 printf(
" byte consumer: %s\r\n", (st->consumer == NULL) ?
"NOT SET" :
"SET");
663 printf(
" packetsize: %i\r\n", st->packetsize);
664 printf(
" bufsize: %i\r\n", st->bufsize);
665 printf(
" bytes in buf: %i\r\n", st->bytesinbuf);
666 printf(
" closing: %i\r\n", st->closetn);
667 printf(
" target: %s\r\n", st->target);
669 printf(
"End Stream info\r\n");
672void stream_seterror(
int streamfd,
const char *format, ...) {
673 char *buf = &errbuf[0];
674 size_t size =
sizeof(errbuf);
675 struct stream *st = get_stream(streamfd);
678 va_start(args, format);
679 vsnprintf(buf, size, format, args);
684 if (st->failed == 0) {
687 printf(
"stream failed: %s\r\n", errbuf);
void streamdata(struct command *com)
called when we receive a stream data packet TODO: this is a really simplistic, memory-consumption opt...
void stream_loop()
called periodically, checks for stale streams and/or hung ones
int stream_associate(int fd, const struct consumerinfo *ci, byte *buf, int bufsize, int packetsize)
associate a stream with a consumer. return 0 if ok otherwise errorcode
int send_command(struct command *com)
send a command to another module (or broadcast)
void free_command(struct command *com)
free a command
struct stream streams[2]
mgmt for streams (the more ram, the more simultanous streams we can handle
void command_add_arg(struct command *com, const char *format,...)
adds an arg to a partially initialised command structure
void streamsetup(struct command *com)
called when we receive a stream setup packet
const char * namedarg(struct command *com, const char *name)
get a named arg (key-value pair) or NULL
struct stream * alloc_stream()
find a free stream struct, alloc an fd and return struct
void print_stream_info()
print summary of current streams to console
int send_command_reply(struct command *com, byte flags)
send a reply to a command
void flush(struct stream *stream)
send remaining data to consumer (might be < packetsize!)
void check_close(struct stream *stream)
check if closing flash ist set and we received all the data up to close packet if,...
void command_init(struct command *com)
initialize a command structure with default values
int count_open_streams()
count number of open streams
void stream_enable()
enable all stream handling (default)
struct command * get_data_reply(struct command *com)
allocates and initializes a packet to be send as "data" to the command typically you'd add some data ...
void stream_disable()
disable all stream handling
struct command * alloc_command()
allocate a free command
const char * get_arg(const struct command *com, int index)
given an argument by index[0..n], will return a pointer to the bytearray (excluding the fieldtype) th...
int get_arg_size(const struct command *com, int index)
given an argument by index [0..n], will returns the size of the value in bytes. for a string/array it...