Update Copyright
[rawv.git] / net.cpp
blob6d4095fccde5c7701296a6cb95827f39980a299e
1 /* network submodule for rawv
2 * Copyright (C) 2010 Kirill Smelkov <kirr@navytux.spb.ru>
3 * Copyright (C) 2011 Marine Bridge and Navigation Systems (http://mns.spb.ru/)
5 * This library is free software: you can Use, Study, Modify and Redistribute
6 * it under the terms of the GNU Lesser General Public License version 2.1, or
7 * any later version. This library is distributed WITHOUT ANY WARRANTY. See
8 * COPYING.LIB file for full License terms.
9 */
11 #include "rawv.h"
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <string.h>
20 using std::min;
22 namespace rawv {
24 /******** NetTx ********/
26 /** whether an `addr` is IPv4 multicast */
27 int in_multicast(const struct sockaddr_in *addr)
29 /* class D 224.0.0.0 - 239.255.255.255 */
30 return ((ntohl(addr->sin_addr.s_addr) & 0xf0000000) == 0xe0000000);
33 /** whether an `addr` is IPv4 broadcast
35 * http://en.wikipedia.org/wiki/IPv4_subnetting_reference
37 int in_broadcast(const struct sockaddr_in *addr)
39 unsigned long ip = ntohl(addr->sin_addr.s_addr);
41 /* class A 0.0.0.0 - 127.255.255.255 /8 */
42 if ((ip & 0x80000000) == 0x00000000)
43 return (ip & 0x00ffffff) == 0x00ffffff;
45 /* class B 128.0.0.0 - 191.255.255.255 /16 */
46 if ((ip & 0xc0000000) == 0x80000000)
47 return (ip & 0x0000ffff) == 0x0000ffff;
49 /* class C 192.0.0.0 - 223.255.255.255 /24 */
50 if ((ip & 0xe0000000) == 0xc0000000)
51 return (ip & 0x000000ff) == 0x000000ff;
53 return 0;
57 NetTx::NetTx(const char *dest, int port, int mtu)
59 int one=1; /* XXX has to be uint8_t on win32 */
61 sk = socket(PF_INET, SOCK_DGRAM, 0);
62 if (sk == -1)
63 die_errno("socket");
65 tx_addr.sin_family = AF_INET;
66 tx_addr.sin_port = htons(port);
67 tx_addr.sin_addr.s_addr = inet_addr(dest);
68 if (tx_addr.sin_addr.s_addr == INADDR_NONE)
69 die("E: inet_addr(%s) fail", dest);
71 if (in_broadcast(&tx_addr)) {
72 fprintf(stderr, "tx: broadcasting...\n");
74 if (setsockopt(sk, SOL_SOCKET, SO_BROADCAST, &one, sizeof(one)) == -1)
75 die_errno("setsockopt(SO_BROADCAST)");
78 if (in_multicast(&tx_addr)) {
79 fprintf(stderr, "tx: multicasting...\n");
81 if (setsockopt(sk, IPPROTO_IP, IP_MULTICAST_TTL, &one, sizeof(one)) == -1)
82 die_errno("setsockopt(IP_MULTICAST_TTL)");
84 if (setsockopt(sk, IPPROTO_IP, IP_MULTICAST_LOOP, &one, sizeof(one)) == -1)
85 die_errno("setsockopt(IP_MULTICAST_LOOP)");
88 this->mtu = mtu;
92 NetTx::~NetTx()
94 close(sk);
98 void NetTx::tx_frame(const Frame *f)
100 #define RAWV_MAX_FRAG_NLINES 16 /* there is no limit, it's just me being lazy */
101 struct iovec iov[1+RAWV_MAX_FRAG_NLINES]; /* header + payload */
102 int iovlen;
103 struct msghdr msg;
105 int pixsize, frag_nlines, nfragment, line, i;
106 int fheight = f->height;
107 int bt = f->interlace_tb_swapped;
109 struct rawv_header h;
111 switch (f->pixfmt_4cc) {
112 case MKTAG32('Y','U','Y','V'):
113 case MKTAG32('Y','U','Y','2'):
114 pixsize = 2;
115 break;
117 // TODO GREY
119 default:
120 die("tx_frame: don't know pixfmt 0x%08x", f->pixfmt_4cc);
123 frag_nlines = (mtu - sizeof(struct rawv_header)) / (f->width * pixsize);
124 if (bt) {
125 if (bt < 0)
126 die("TODO bt=-1 in NetTx");
128 /* make frag_nlines and fheight even, if lines are swapped in captured frame */
129 frag_nlines &= ~1U;
130 fheight &= ~1U;
133 if (frag_nlines > RAWV_MAX_FRAG_NLINES)
134 die("FIXME frag_nlines > RAWV_MAX_FRAG_NLINES (%i > %i)", frag_nlines, RAWV_MAX_FRAG_NLINES);
136 nfragment = 0;
138 for (line=0; line<fheight; line+= frag_nlines, nfragment++) {
139 h.magic = MKTAG32('R','A','W','V');
140 h.version = 1;
141 h.__reserved_for_flags = 0x00;
142 h.nframe = f->sequence;
143 h.nfragment = nfragment;
144 h.fragments_total = (fheight + frag_nlines-1) / frag_nlines;
145 h.width = f->width;
146 h.height = fheight;
147 h.pixfmt = f->pixfmt_4cc; // XXX endian
149 h.frag_startline = line;
150 h.frag_nlines = min(frag_nlines, fheight - line);
152 /* temp: helps debugging bitstream on the wire */
153 h.__reserved[0] = 0xaaaaaaaa;
154 h.__reserved[1] = 0xffffffff;
156 iov[0].iov_base = &h;
157 iov[0].iov_len = sizeof(h);
158 iovlen = 1;
160 if ((f->width * pixsize == f->bytesperline) &&
161 !bt) {
162 /* fastpath for stride=width, progressive */
163 iov[iovlen].iov_base = f->start + line*f->bytesperline;
164 iov[iovlen].iov_len = frag_nlines*f->bytesperline;
166 ++iovlen;
168 else {
169 /* strided and/or top/bottom swapped frame */
170 for (i=0; i<frag_nlines; ++i) {
171 iov[iovlen].iov_base = f->start + (line+i+bt)*f->bytesperline;
172 iov[iovlen].iov_len = f->width * pixsize;
174 ++iovlen;
175 bt = -bt;
179 msg.msg_name = &tx_addr;
180 msg.msg_namelen = sizeof(tx_addr);
182 msg.msg_iov = iov;
183 msg.msg_iovlen = iovlen;
185 msg.msg_control = NULL;
186 msg.msg_controllen = 0;
187 msg.msg_flags = 0;
189 /* XXX what to do if not whole pkt sent? */
190 if (-1 == sendmsg(sk, &msg, 0 /*flags*/))
191 die_errno("sendmsg");
196 void NetTx::__tx_frame(const Frame *f, void *self)
198 ((NetTx *)self)->tx_frame(f);
203 /******** NetRx ********/
205 /* recommeneded minimum for socket rx buffer */
206 #define SK_RCVBUF_MIN (1024*1024)
208 NetRx::NetRx(const char *mcast_group, int port, int mtu)
210 int one=1; /* XXX has to be uint8_t on win32 */
211 int rcvbuf;
212 socklen_t rcvbuf_len;
213 struct ip_mreqn mreq;
215 sk = socket(PF_INET, SOCK_DGRAM, 0);
216 if (sk == -1)
217 die_errno("socket");
219 /* ensure rx buffer is big enough */
220 rcvbuf_len = sizeof(rcvbuf);
221 if (-1 == getsockopt(sk, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &rcvbuf_len))
222 die_errno("getsockopt(SO_RCVBUF)");
224 if (rcvbuf < SK_RCVBUF_MIN) {
225 rcvbuf = SK_RCVBUF_MIN;
227 if (-1 == setsockopt(sk, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)))
228 warn("W: can't increase sk rx buffer to %i bytes\n", rcvbuf);
230 rcvbuf_len = sizeof(rcvbuf);
231 if (-1 == getsockopt(sk, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &rcvbuf_len))
232 die_errno("getsockopt(SO_RCVBUF)");
234 if (rcvbuf < SK_RCVBUF_MIN)
235 warn("W: can't increase sk rx buffer to %i bytes (got only %i)\n",
236 SK_RCVBUF_MIN, rcvbuf);
240 /* allow multiple receivers */
241 if (-1 == setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
242 die_errno("setsockopt(SO_REUSEADDR)");
244 rx_addr.sin_family = AF_INET;
245 rx_addr.sin_port = htons(port);
246 rx_addr.sin_addr.s_addr = INADDR_ANY;
248 if (-1 == bind(sk, (sockaddr *)&rx_addr, sizeof(rx_addr)))
249 die_errno("bind");
251 // XXX is_broadcast ?
252 if (mcast_group) { // XXX or is_multicast(mcast_group) ?
253 mreq.imr_multiaddr.s_addr = inet_addr(mcast_group);
254 mreq.imr_address.s_addr = INADDR_ANY;
255 mreq.imr_ifindex = 0; /* any */
257 if (mreq.imr_multiaddr.s_addr == INADDR_NONE)
258 die("E: inet_addr(%s) fail", mcast_group);
260 if (-1 == setsockopt(sk, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)))
261 die_errno("setsockopt(IP_ADD_MEMBERSHIP)");
265 /* set socket non-blocking, so that we could avoid tons of select and just
266 * loop on read until EAGAIN */
267 if (-1 == fcntl(sk, F_SETFL, O_NONBLOCK))
268 die_errno("fcntl(F_SETFL, O_NONBLOCK)");
271 fragbuf.resize(mtu);
273 f.start = NULL;
274 f.length = 0;
275 f.width = f.height = f.bytesperline = 0;
276 f.pixfmt_4cc = 0;
277 f.sequence = 0;
278 f.interlace_tb_swapped = 0; /* always progressive on the wire */
280 nframe = 0; // XXX better -1 ?
281 nfragment = 0;
282 fragments_total = 0;
283 frag_received = 0;
284 frag_dropped_total = 0;
288 NetRx::~NetRx()
290 close(sk); /* this will also leave multicast group */
295 void NetRx::subscribe(void (*func)(const Frame *, void *), void *self)
297 FrameSubscription fs;
298 fs.func = func;
299 fs.self = self;
301 subscribers.push_back(fs);
305 int NetRx::handle_recv()
307 int err;
308 int count = 0;
310 /* loop till we get EAGAIN. This way we avoid lots of unneeded select
311 calls */
312 while (1) {
313 err = recv(sk, &fragbuf[0], fragbuf.size(), /*flags*/ 0);
314 if (err == -1)
315 switch (errno) {
316 case EAGAIN:
317 //fprintf(stderr, "Looped %i\n", count);
318 return 0;
320 default:
321 die_errno("recv");
324 ++count;
325 __handle_recv(err);
330 #define NFRAME_RESYNC 100
332 void NetRx::__handle_recv(unsigned len)
334 struct rawv_header *h;
335 unsigned pixsize, fragsize, framesize;
337 /* TODO die is not appropriate on RX path */
339 if (len < sizeof(h))
340 die("recv: len(pkt) < header (%i < %i)", len, sizeof(h));
342 // XXX endianness!!!
343 h = (struct rawv_header *)&fragbuf[0];
346 if (h->magic != MKTAG32('R','A','W','V'))
347 die("wrong magic (0x%08x)", h->magic);
349 if (h->version != 1)
350 die("unsupported version (%i)", h->version);
352 /* ignoring flags for now */
354 // fprintf(stderr, "nframe: %i, frag: %i / %i\r", h->nframe, h->nfragment, h->fragments_total);
356 if (h->nframe != nframe) {
357 if (abs(h->nframe - nframe) > NFRAME_RESYNC) {
358 /* too big nframe jump means we have to re-sync this stream */
359 fprintf(stderr, "Resync...\n");
361 else {
362 if ((h->nframe - nframe) % NFRAME_MAX < NFRAME_MAX/2)
363 /* newer frame */;
364 else {
365 /* old-frame fragment. Too late, sorry */
366 fprintf(stderr, "old (nframe=%i frag: %i / %i)\n",
367 h->nframe, h->nfragment, h->fragments_total);
368 return;
371 /* start of new frame. We have to flush current one first */
372 if (f.pixfmt_4cc)
373 flush_frame();
375 //fprintf(stderr, "\n F-%i", h->nframe);
378 //fprintf(stderr, " %i", h->nfragment);
380 // XXX endiannes
381 switch (h->pixfmt) {
382 case MKTAG32('Y','U','Y','V'):
383 case MKTAG32('Y','U','Y','2'):
384 pixsize = 2;
385 break;
387 case MKTAG32('Y','8','0','0'):
388 case MKTAG32('Y','8',' ',' '):
389 case MKTAG32('G','R','E','Y'):
390 pixsize = 1;
391 break;
393 default:
394 pixsize = 0; // make gcc happy
395 // XXX endianness
396 die("recv: W: unsupported pixfmt 0x%08x", h->pixfmt);
399 /* ensure we have enough data in payload */
400 fragsize = pixsize * h->width * h->frag_nlines;
401 if (len < sizeof(h) + fragsize)
402 die("recv: len(pkt) < header + fragsize (%i < %i + %i)",
403 len, sizeof(h), fragsize);
405 /* ensure header is self-consistent */
406 if (h->frag_startline + h->frag_nlines > h->height)
407 die("recv: frag_startline + frag_nlines > height (%i + %i > %i)",
408 h->frag_startline, h->frag_nlines, h->height);
410 if (!f.pixfmt_4cc) {
411 /* first fragment of a new frame */
412 f.pixfmt_4cc = h->pixfmt;
413 f.width = h->width;
414 f.height = h->height;
415 f.bytesperline = h->width * pixsize;
417 nframe = h->nframe;
418 nfragment = h->nfragment;
419 fragments_total = h->fragments_total;
421 f.sequence = nframe;
423 else {
424 /* check that frame parameters has not changed */
425 if ( !(fragments_total == h->fragments_total &&
426 f.pixfmt_4cc == h->pixfmt &&
427 f.width == h->width &&
428 f.height == h->height)
430 die("recv: frame params changed inside one frame: "
431 "(%iF 0x%08x %ix%i) and (%iF 0x%08x %ix%i)",
432 fragments_total, f.pixfmt_4cc, f.width, f.height,
433 h->fragments_total, h->pixfmt, h->width, h->height);
436 /* this fragment ok */
437 frag_received += 1;
439 /* reallocate framebuf if needed */
440 framesize = pixsize * h->width * h->height;
441 if (framebuf.size() < framesize)
442 framebuf.resize(framesize);
445 /* put fragment video data into frame */
446 memcpy(&framebuf[0] + h->frag_startline * h->width * pixsize,
447 &fragbuf[0] + sizeof(*h),
448 fragsize);
451 /* whole-frame received detection.
453 * TODO this can be reworked so we also support intra-frame fragments *
454 * reordering on the wire.
456 if (h->frag_startline + h->frag_nlines == h->height)
457 flush_frame();
461 void NetRx::flush_frame()
463 unsigned i, dropped;
465 f.start = &framebuf[0];
466 f.length = f.height * f.bytesperline;
468 /* flush currently-accumulated frame to subscribers */
469 for (i=0; i<subscribers.size(); ++i) {
470 FrameSubscription fs = subscribers[i];
472 fs.func(&f, fs.self);
475 /* update loss statistic */
476 dropped = (fragments_total - frag_received);
477 frag_dropped_total += dropped;
478 if (dropped)
479 fprintf(stderr, "Dropped %u fragments\n", dropped);
481 /* bump sequence number */
482 nframe += 1;
483 nfragment = 0;
484 fragments_total = 0;
485 frag_received = 0;
487 /* and clear current frame */
488 f.start = NULL; /* XXX ok ? */
489 f.length = 0;
490 f.width = f.height = f.bytesperline = 0;
491 f.pixfmt_4cc = 0;
492 f.sequence = 0;
495 } // rawv::