SingingCat 0
application
streams.h
1#ifndef STREAMS_H
2
3#include "main-header.h"
4#include "command-parser.h"
5#include "command-handler.h"
6#include "queue.h"
7//#include "loader-api.h"
8//#include "userhooks.h"
9//#include "constants.h"
10//#include "function-instr.h"
11//#include "baseinfo.h"
12
13#define STREAMS_H 1
14typedef struct stream {
15 byte fd; /* 0 -> NOT a valid stream yet */
16 long closetn; /* >0 received close request in this tn (but missing data) */
17 byte pad;
18 byte failed; /* if not 0 -> stream has failed */
19 byte ackfreq; /* after how many packets must we sent an ack? */
20 byte ack_open; /* set to 1 by sending an ACK, set to 0 by receiving data >ack */
21 byte inactivity_ack_ctr; /* set to 0 by good data, increased if no activity */
22 byte outoforderctr; /* incremented each time an out-of-order packet arrives and reset to 0 when packet is received in order or after sending an ack due to to too many out-order packets */
23 int ackedtn; /* most recent number we acked */
24 int lasttn; /* most recent number we received */
25 long peer; /* nodeid who is this stream connected to? */
26 long lastack; /* secs since last ack */
27 long lastackms; /* ms since last ack */
28 long last_activity; /* secs since last received */
29 /* call consumer when we got this amount of bytes ready. if != 1, must set buffer as well */
30 int packetsize;
31 byte * bytebuf;
32 /* buffer for packetisation (must not be not-null if >1 packetisation is requested) */
33 byte onebytebuf;
34 /* how many bytes are currently in buf? */
35 int bytesinbuf;
36 int bufsize; /* buffer must be at least as big as packet size - bigger is better ;) */
37 const struct consumerinfo * consumer; /* pointer to consumer of this stream */
38 char target[18];
40
41
45typedef int (*stream_consume_data)(int streamfd, byte *b, int size);
49typedef int (*stream_consume_close)(int streamfd, int reason);
50
51typedef struct consumerinfo {
52 /* byte consumer if we have one */
53 stream_consume_data data_consumer;
54 stream_consume_close close_consumer;
56
57
58void stream_init();
59void streamdata(struct command *com);
60void streamsetup(struct command *com);
61void stream_loop();
62void stream_disable();
63void stream_enable();
65int stream_associate(int fd, const struct consumerinfo *c, byte *buf, int bufsize, int packetsize);
66struct stream *alloc_stream();
67void close_stream(int fd, int reason);
69/*
70 * a stream consumer might encounter an error - can enter short message here
71 */
72void stream_seterror(int streamfd, const char *format, ...);
73#endif
void streamdata(struct command *com)
called when we receive a stream data packet TODO: this is a really simplistic, memory-consumption opt...
Definition: streams.c:451
void stream_loop()
called periodically, checks for stale streams and/or hung ones
Definition: streams.c:590
int stream_associate(int fd, const struct consumerinfo *c, byte *buf, int bufsize, int packetsize)
associate a stream with a consumer. return 0 if ok otherwise errorcode
Definition: streams.c:124
void streamsetup(struct command *com)
called when we receive a stream setup packet
Definition: streams.c:305
struct stream * alloc_stream()
find a free stream struct, alloc an fd and return struct
Definition: streams.c:156
void print_stream_info()
print summary of current streams to console
Definition: streams.c:642
int count_open_streams()
count number of open streams
Definition: streams.c:624
void stream_enable()
enable all stream handling (default)
Definition: streams.c:54
void stream_disable()
disable all stream handling
Definition: streams.c:47
Definition: streams.h:14