1/*
2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *   1. Redistributions of source code must retain the above copyright
8 *      notice, this list of conditions and the following disclaimer.
9 *   2. Redistributions in binary form must reproduce the above copyright
10 *      notice, this list of conditions and the following disclaimer in the
11 *    documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 * SUCH DAMAGE.
24 */
25#include <ctype.h>
26#include <errno.h>
27#include <inttypes.h>
28#include <libnetmap.h>
29#include <netinet/in.h>		/* htonl */
30#include <pthread.h>
31#include <signal.h>
32#include <stdbool.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <syslog.h>
37#include <sys/ioctl.h>
38#include <sys/poll.h>
39#include <unistd.h>
40
41#include "pkt_hash.h"
42#include "ctrs.h"
43
44
45/*
46 * use our version of header structs, rather than bringing in a ton
47 * of platform specific ones
48 */
49#ifndef ETH_ALEN
50#define ETH_ALEN 6
51#endif
52
53struct compact_eth_hdr {
54	unsigned char h_dest[ETH_ALEN];
55	unsigned char h_source[ETH_ALEN];
56	u_int16_t h_proto;
57};
58
59struct compact_ip_hdr {
60	u_int8_t ihl:4, version:4;
61	u_int8_t tos;
62	u_int16_t tot_len;
63	u_int16_t id;
64	u_int16_t frag_off;
65	u_int8_t ttl;
66	u_int8_t protocol;
67	u_int16_t check;
68	u_int32_t saddr;
69	u_int32_t daddr;
70};
71
72struct compact_ipv6_hdr {
73	u_int8_t priority:4, version:4;
74	u_int8_t flow_lbl[3];
75	u_int16_t payload_len;
76	u_int8_t nexthdr;
77	u_int8_t hop_limit;
78	struct in6_addr saddr;
79	struct in6_addr daddr;
80};
81
82#define MAX_IFNAMELEN 	64
83#define MAX_PORTNAMELEN	(MAX_IFNAMELEN + 40)
84#define DEF_OUT_PIPES 	2
85#define DEF_EXTRA_BUFS 	0
86#define DEF_BATCH	2048
87#define DEF_WAIT_LINK	2
88#define DEF_STATS_INT	600
89#define BUF_REVOKE	150
90#define STAT_MSG_MAXSIZE 1024
91
92static struct {
93	char ifname[MAX_IFNAMELEN + 1];
94	char base_name[MAX_IFNAMELEN + 1];
95	int netmap_fd;
96	uint16_t output_rings;
97	uint16_t num_groups;
98	uint32_t extra_bufs;
99	uint16_t batch;
100	int stdout_interval;
101	int syslog_interval;
102	int wait_link;
103	bool busy_wait;
104} glob_arg;
105
106/*
107 * the overflow queue is a circular queue of buffers
108 */
109struct overflow_queue {
110	char name[MAX_IFNAMELEN + 16];
111	struct netmap_slot *slots;
112	uint32_t head;
113	uint32_t tail;
114	uint32_t n;
115	uint32_t size;
116};
117
118static struct overflow_queue *freeq;
119
120static inline int
121oq_full(struct overflow_queue *q)
122{
123	return q->n >= q->size;
124}
125
126static inline int
127oq_empty(struct overflow_queue *q)
128{
129	return q->n <= 0;
130}
131
132static inline void
133oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
134{
135	if (unlikely(oq_full(q))) {
136		D("%s: queue full!", q->name);
137		abort();
138	}
139	q->slots[q->tail] = *s;
140	q->n++;
141	q->tail++;
142	if (q->tail >= q->size)
143		q->tail = 0;
144}
145
146static inline struct netmap_slot
147oq_deq(struct overflow_queue *q)
148{
149	struct netmap_slot s = q->slots[q->head];
150	if (unlikely(oq_empty(q))) {
151		D("%s: queue empty!", q->name);
152		abort();
153	}
154	q->n--;
155	q->head++;
156	if (q->head >= q->size)
157		q->head = 0;
158	return s;
159}
160
161static volatile int do_abort = 0;
162
163static uint64_t dropped = 0;
164static uint64_t forwarded = 0;
165static uint64_t received_bytes = 0;
166static uint64_t received_pkts = 0;
167static uint64_t non_ip = 0;
168static uint32_t freeq_n = 0;
169
170struct port_des {
171	char interface[MAX_PORTNAMELEN];
172	struct my_ctrs ctr;
173	unsigned int last_sync;
174	uint32_t last_tail;
175	struct overflow_queue *oq;
176	struct nmport_d *nmd;
177	struct netmap_ring *ring;
178	struct group_des *group;
179};
180
181static struct port_des *ports;
182
183/* each group of pipes receives all the packets */
184struct group_des {
185	char pipename[MAX_IFNAMELEN];
186	struct port_des *ports;
187	int first_id;
188	int nports;
189	int last;
190	int custom_port;
191};
192
193static struct group_des *groups;
194
195/* statistcs */
196struct counters {
197	struct timeval ts;
198	struct my_ctrs *ctrs;
199	uint64_t received_pkts;
200	uint64_t received_bytes;
201	uint64_t non_ip;
202	uint32_t freeq_n;
203	int status __attribute__((aligned(64)));
204#define COUNTERS_EMPTY	0
205#define COUNTERS_FULL	1
206};
207
208static struct counters counters_buf;
209
210static void *
211print_stats(void *arg)
212{
213	int npipes = glob_arg.output_rings;
214	int sys_int = 0;
215	(void)arg;
216	struct my_ctrs cur, prev;
217	struct my_ctrs *pipe_prev;
218
219	pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220	if (pipe_prev == NULL) {
221		D("out of memory");
222		exit(1);
223	}
224
225	char stat_msg[STAT_MSG_MAXSIZE] = "";
226
227	memset(&prev, 0, sizeof(prev));
228	while (!do_abort) {
229		int j, dosyslog = 0, dostdout = 0, newdata;
230		uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
231		struct my_ctrs x;
232
233		counters_buf.status = COUNTERS_EMPTY;
234		newdata = 0;
235		memset(&cur, 0, sizeof(cur));
236		sleep(1);
237		if (counters_buf.status == COUNTERS_FULL) {
238			__sync_synchronize();
239			newdata = 1;
240			cur.t = counters_buf.ts;
241			if (prev.t.tv_sec || prev.t.tv_usec) {
242				usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243					cur.t.tv_usec - prev.t.tv_usec;
244			}
245		}
246
247		++sys_int;
248		if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
249				dostdout = 1;
250		if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
251				dosyslog = 1;
252
253		for (j = 0; j < npipes; ++j) {
254			struct my_ctrs *c = &counters_buf.ctrs[j];
255			cur.pkts += c->pkts;
256			cur.drop += c->drop;
257			cur.drop_bytes += c->drop_bytes;
258			cur.bytes += c->bytes;
259
260			if (usec) {
261				x.pkts = c->pkts - pipe_prev[j].pkts;
262				x.drop = c->drop - pipe_prev[j].drop;
263				x.bytes = c->bytes - pipe_prev[j].bytes;
264				x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265				pps = (x.pkts*1000000 + usec/2) / usec;
266				dps = (x.drop*1000000 + usec/2) / usec;
267				bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268				dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
269			}
270			pipe_prev[j] = *c;
271
272			if ( (dosyslog || dostdout) && newdata )
273				snprintf(stat_msg, STAT_MSG_MAXSIZE,
274				       "{"
275				       "\"ts\":%.6f,"
276				       "\"interface\":\"%s\","
277				       "\"output_ring\":%" PRIu16 ","
278				       "\"packets_forwarded\":%" PRIu64 ","
279				       "\"packets_dropped\":%" PRIu64 ","
280				       "\"data_forward_rate_Mbps\":%.4f,"
281				       "\"data_drop_rate_Mbps\":%.4f,"
282				       "\"packet_forward_rate_kpps\":%.4f,"
283				       "\"packet_drop_rate_kpps\":%.4f,"
284				       "\"overflow_queue_size\":%" PRIu32
285				       "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
286				            ports[j].interface,
287				            j,
288				            c->pkts,
289				            c->drop,
290				            (double)bps / 1024 / 1024,
291				            (double)dbps / 1024 / 1024,
292				            (double)pps / 1000,
293				            (double)dps / 1000,
294				            c->oq_n);
295
296			if (dosyslog && stat_msg[0])
297				syslog(LOG_INFO, "%s", stat_msg);
298			if (dostdout && stat_msg[0])
299				printf("%s\n", stat_msg);
300		}
301		if (usec) {
302			x.pkts = cur.pkts - prev.pkts;
303			x.drop = cur.drop - prev.drop;
304			x.bytes = cur.bytes - prev.bytes;
305			x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306			pps = (x.pkts*1000000 + usec/2) / usec;
307			dps = (x.drop*1000000 + usec/2) / usec;
308			bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309			dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
310		}
311
312		if ( (dosyslog || dostdout) && newdata )
313			snprintf(stat_msg, STAT_MSG_MAXSIZE,
314			         "{"
315			         "\"ts\":%.6f,"
316			         "\"interface\":\"%s\","
317			         "\"output_ring\":null,"
318			         "\"packets_received\":%" PRIu64 ","
319			         "\"packets_forwarded\":%" PRIu64 ","
320			         "\"packets_dropped\":%" PRIu64 ","
321			         "\"non_ip_packets\":%" PRIu64 ","
322			         "\"data_forward_rate_Mbps\":%.4f,"
323			         "\"data_drop_rate_Mbps\":%.4f,"
324			         "\"packet_forward_rate_kpps\":%.4f,"
325			         "\"packet_drop_rate_kpps\":%.4f,"
326			         "\"free_buffer_slots\":%" PRIu32
327			         "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
328			              glob_arg.ifname,
329			              received_pkts,
330			              cur.pkts,
331			              cur.drop,
332			              counters_buf.non_ip,
333			              (double)bps / 1024 / 1024,
334			              (double)dbps / 1024 / 1024,
335			              (double)pps / 1000,
336			              (double)dps / 1000,
337			              counters_buf.freeq_n);
338
339		if (dosyslog && stat_msg[0])
340			syslog(LOG_INFO, "%s", stat_msg);
341		if (dostdout && stat_msg[0])
342			printf("%s\n", stat_msg);
343
344		prev = cur;
345	}
346
347	free(pipe_prev);
348
349	return NULL;
350}
351
352static void
353free_buffers(void)
354{
355	int i, tot = 0;
356	struct port_des *rxport = &ports[glob_arg.output_rings];
357
358	/* build a netmap free list with the buffers in all the overflow queues */
359	for (i = 0; i < glob_arg.output_rings + 1; i++) {
360		struct port_des *cp = &ports[i];
361		struct overflow_queue *q = cp->oq;
362
363		if (!q)
364			continue;
365
366		while (q->n) {
367			struct netmap_slot s = oq_deq(q);
368			uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
369
370			*b = rxport->nmd->nifp->ni_bufs_head;
371			rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
372			tot++;
373		}
374	}
375	D("added %d buffers to netmap free list", tot);
376
377	for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378		nmport_close(ports[i].nmd);
379	}
380}
381
382
383static void sigint_h(int sig)
384{
385	(void)sig;		/* UNUSED */
386	do_abort = 1;
387	signal(SIGINT, SIG_DFL);
388}
389
390static void
391usage(void)
392{
393	printf("usage: lb [options]\n");
394	printf("where options are:\n");
395	printf("  -h              	view help text\n");
396	printf("  -i iface        	interface name (required)\n");
397	printf("  -p [prefix:]npipes	add a new group of output pipes\n");
398	printf("  -B nbufs        	number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
399	printf("  -b batch        	batch size (default: %d)\n", DEF_BATCH);
400	printf("  -w seconds        	wait for link up (default: %d)\n", DEF_WAIT_LINK);
401	printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
402	printf("  -s seconds      	seconds between syslog stats messages (default: 0)\n");
403	printf("  -o seconds      	seconds between stdout stats messages (default: 0)\n");
404	exit(0);
405}
406
407static int
408parse_pipes(const char *spec)
409{
410	const char *end = index(spec, ':');
411	static int max_groups = 0;
412	struct group_des *g;
413
414	ND("spec %s num_groups %d", spec, glob_arg.num_groups);
415	if (max_groups < glob_arg.num_groups + 1) {
416		size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
417		groups = realloc(groups, size);
418		if (groups == NULL) {
419			D("out of memory");
420			return 1;
421		}
422	}
423	g = &groups[glob_arg.num_groups];
424	memset(g, 0, sizeof(*g));
425
426	if (end != NULL) {
427		if (end - spec > MAX_IFNAMELEN - 8) {
428			D("name '%s' too long", spec);
429			return 1;
430		}
431		if (end == spec) {
432			D("missing prefix before ':' in '%s'", spec);
433			return 1;
434		}
435		strncpy(g->pipename, spec, end - spec);
436		g->custom_port = 1;
437		end++;
438	} else {
439		/* no prefix, this group will use the
440		 * name of the input port.
441		 * This will be set in init_groups(),
442		 * since here the input port may still
443		 * be uninitialized
444		 */
445		end = spec;
446	}
447	if (*end == '\0') {
448		g->nports = DEF_OUT_PIPES;
449	} else {
450		g->nports = atoi(end);
451		if (g->nports < 1) {
452			D("invalid number of pipes '%s' (must be at least 1)", end);
453			return 1;
454		}
455	}
456	glob_arg.output_rings += g->nports;
457	glob_arg.num_groups++;
458	return 0;
459}
460
461/* complete the initialization of the groups data structure */
462static void
463init_groups(void)
464{
465	int i, j, t = 0;
466	struct group_des *g = NULL;
467	for (i = 0; i < glob_arg.num_groups; i++) {
468		g = &groups[i];
469		g->ports = &ports[t];
470		for (j = 0; j < g->nports; j++)
471			g->ports[j].group = g;
472		t += g->nports;
473		if (!g->custom_port)
474			strcpy(g->pipename, glob_arg.base_name);
475		for (j = 0; j < i; j++) {
476			struct group_des *h = &groups[j];
477			if (!strcmp(h->pipename, g->pipename))
478				g->first_id += h->nports;
479		}
480	}
481	g->last = 1;
482}
483
484
485/* To support packets that span multiple slots (NS_MOREFRAG) we
486 * need to make sure of the following:
487 *
488 * - all fragments of the same packet must go to the same output pipe
489 * - when dropping, all fragments of the same packet must be dropped
490 *
491 * For the former point we remember and reuse the last hash computed
492 * in each input ring, and only update it when NS_MOREFRAG was not
493 * set in the last received slot (this marks the start of a new packet).
494 *
495 * For the latter point, we only update the output ring head pointer
496 * when an entire packet has been forwarded. We keep a shadow_head
497 * pointer to know where to put the next partial fragment and,
498 * when the need to drop arises, we roll it back to head.
499 */
500struct morefrag {
501	uint16_t last_flag;	/* for input rings */
502	uint32_t last_hash;	/* for input rings */
503	uint32_t shadow_head;	/* for output rings */
504};
505
506/* push the packet described by slot rs to the group g.
507 * This may cause other buffers to be pushed down the
508 * chain headed by g.
509 * Return a free buffer.
510 */
511static uint32_t
512forward_packet(struct group_des *g, struct netmap_slot *rs)
513{
514	uint32_t hash = rs->ptr;
515	uint32_t output_port = hash % g->nports;
516	struct port_des *port = &g->ports[output_port];
517	struct netmap_ring *ring = port->ring;
518	struct overflow_queue *q = port->oq;
519	struct morefrag *mf = (struct morefrag *)ring->sem;
520	uint16_t curmf = rs->flags & NS_MOREFRAG;
521
522	/* Move the packet to the output pipe, unless there is
523	 * either no space left on the ring, or there is some
524	 * packet still in the overflow queue (since those must
525	 * take precedence over the new one)
526	*/
527	if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) {
528		struct netmap_slot *ts = &ring->slot[mf->shadow_head];
529		struct netmap_slot old_slot = *ts;
530
531		ts->buf_idx = rs->buf_idx;
532		ts->len = rs->len;
533		ts->flags = rs->flags | NS_BUF_CHANGED;
534		ts->ptr = rs->ptr;
535		mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
536		if (!curmf) {
537			ring->head = mf->shadow_head;
538		}
539		ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u",
540				curmf, ts->flags, mf->shadow_head, ring->head, ring->tail);
541		port->ctr.bytes += rs->len;
542		port->ctr.pkts++;
543		forwarded++;
544		return old_slot.buf_idx;
545	}
546
547	/* use the overflow queue, if available */
548	if (q == NULL || oq_full(q)) {
549		uint32_t scan;
550		/* no space left on the ring and no overflow queue
551		 * available: we are forced to drop the packet
552		 */
553
554		/* drop previous fragments, if any */
555		for (scan = ring->head; scan != mf->shadow_head;
556				scan = nm_ring_next(ring, scan)) {
557			struct netmap_slot *ts = &ring->slot[scan];
558			dropped++;
559			port->ctr.drop_bytes += ts->len;
560		}
561		mf->shadow_head = ring->head;
562
563		dropped++;
564		port->ctr.drop++;
565		port->ctr.drop_bytes += rs->len;
566		return rs->buf_idx;
567	}
568
569	oq_enq(q, rs);
570
571	/*
572	 * we cannot continue down the chain and we need to
573	 * return a free buffer now. We take it from the free queue.
574	 */
575	if (oq_empty(freeq)) {
576		/* the free queue is empty. Revoke some buffers
577		 * from the longest overflow queue
578		 */
579		uint32_t j;
580		struct port_des *lp = &ports[0];
581		uint32_t max = lp->oq->n;
582
583		/* let lp point to the port with the longest queue */
584		for (j = 1; j < glob_arg.output_rings; j++) {
585			struct port_des *cp = &ports[j];
586			if (cp->oq->n > max) {
587				lp = cp;
588				max = cp->oq->n;
589			}
590		}
591
592		/* move the oldest BUF_REVOKE buffers from the
593		 * lp queue to the free queue
594		 *
595		 * We cannot revoke a partially received packet.
596		 * To make thinks simple we make sure to leave
597		 * at least NETMAP_MAX_FRAGS slots in the queue.
598		 */
599		for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) {
600			struct netmap_slot tmp = oq_deq(lp->oq);
601
602			dropped++;
603			lp->ctr.drop++;
604			lp->ctr.drop_bytes += tmp.len;
605
606			oq_enq(freeq, &tmp);
607		}
608
609		ND(1, "revoked %d buffers from %s", j, lq->name);
610	}
611
612	return oq_deq(freeq).buf_idx;
613}
614
615int main(int argc, char **argv)
616{
617	int ch;
618	uint32_t i;
619	int rv;
620	int poll_timeout = 10; /* default */
621
622	glob_arg.ifname[0] = '\0';
623	glob_arg.output_rings = 0;
624	glob_arg.batch = DEF_BATCH;
625	glob_arg.wait_link = DEF_WAIT_LINK;
626	glob_arg.busy_wait = false;
627	glob_arg.syslog_interval = 0;
628	glob_arg.stdout_interval = 0;
629
630	while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
631		switch (ch) {
632		case 'i':
633			D("interface is %s", optarg);
634			if (strlen(optarg) > MAX_IFNAMELEN - 8) {
635				D("ifname too long %s", optarg);
636				return 1;
637			}
638			if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
639				sprintf(glob_arg.ifname, "netmap:%s", optarg);
640			} else {
641				strcpy(glob_arg.ifname, optarg);
642			}
643			break;
644
645		case 'p':
646			if (parse_pipes(optarg)) {
647				usage();
648				return 1;
649			}
650			break;
651
652		case 'B':
653			glob_arg.extra_bufs = atoi(optarg);
654			D("requested %d extra buffers", glob_arg.extra_bufs);
655			break;
656
657		case 'b':
658			glob_arg.batch = atoi(optarg);
659			D("batch is %d", glob_arg.batch);
660			break;
661
662		case 'w':
663			glob_arg.wait_link = atoi(optarg);
664			D("link wait for up time is %d", glob_arg.wait_link);
665			break;
666
667		case 'W':
668			glob_arg.busy_wait = true;
669			break;
670
671		case 'o':
672			glob_arg.stdout_interval = atoi(optarg);
673			break;
674
675		case 's':
676			glob_arg.syslog_interval = atoi(optarg);
677			break;
678
679		case 'h':
680			usage();
681			return 0;
682			break;
683
684		default:
685			D("bad option %c %s", ch, optarg);
686			usage();
687			return 1;
688		}
689	}
690
691	if (glob_arg.ifname[0] == '\0') {
692		D("missing interface name");
693		usage();
694		return 1;
695	}
696
697	if (glob_arg.num_groups == 0)
698		parse_pipes("");
699
700	if (glob_arg.syslog_interval) {
701		setlogmask(LOG_UPTO(LOG_INFO));
702		openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
703	}
704
705	uint32_t npipes = glob_arg.output_rings;
706
707
708	pthread_t stat_thread;
709
710	ports = calloc(npipes + 1, sizeof(struct port_des));
711	if (!ports) {
712		D("failed to allocate the stats array");
713		return 1;
714	}
715	struct port_des *rxport = &ports[npipes];
716
717	rxport->nmd = nmport_prepare(glob_arg.ifname);
718	if (rxport->nmd == NULL) {
719		D("cannot parse %s", glob_arg.ifname);
720		return (1);
721	}
722	/* extract the base name */
723	strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN);
724
725	init_groups();
726
727	memset(&counters_buf, 0, sizeof(counters_buf));
728	counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
729	if (!counters_buf.ctrs) {
730		D("failed to allocate the counters snapshot buffer");
731		return 1;
732	}
733
734	rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs;
735
736	if (nmport_open_desc(rxport->nmd) < 0) {
737		D("cannot open %s", glob_arg.ifname);
738		return (1);
739	}
740	D("successfully opened %s", glob_arg.ifname);
741
742	uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs;
743	struct overflow_queue *oq = NULL;
744	/* reference ring to access the buffers */
745	rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
746
747	if (!glob_arg.extra_bufs)
748		goto run;
749
750	D("obtained %d extra buffers", extra_bufs);
751	if (!extra_bufs)
752		goto run;
753
754	/* one overflow queue for each output pipe, plus one for the
755	 * free extra buffers
756	 */
757	oq = calloc(npipes + 1, sizeof(struct overflow_queue));
758	if (!oq) {
759		D("failed to allocated overflow queues descriptors");
760		goto run;
761	}
762
763	freeq = &oq[npipes];
764	rxport->oq = freeq;
765
766	freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
767	if (!freeq->slots) {
768		D("failed to allocate the free list");
769	}
770	freeq->size = extra_bufs;
771	snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
772
773	/*
774	 * the list of buffers uses the first uint32_t in each buffer
775	 * as the index of the next buffer.
776	 */
777	uint32_t scan;
778	for (scan = rxport->nmd->nifp->ni_bufs_head;
779	     scan;
780	     scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
781	{
782		struct netmap_slot s;
783		s.len = s.flags = 0;
784		s.ptr = 0;
785		s.buf_idx = scan;
786		ND("freeq <- %d", s.buf_idx);
787		oq_enq(freeq, &s);
788	}
789
790
791	if (freeq->n != extra_bufs) {
792		D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
793				extra_bufs, freeq->n);
794		return 1;
795	}
796	rxport->nmd->nifp->ni_bufs_head = 0;
797
798run:
799	atexit(free_buffers);
800
801	int j, t = 0;
802	for (j = 0; j < glob_arg.num_groups; j++) {
803		struct group_des *g = &groups[j];
804		int k;
805		for (k = 0; k < g->nports; ++k) {
806			struct port_des *p = &g->ports[k];
807			snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
808					(strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
809					g->pipename, g->first_id + k,
810					rxport->nmd->reg.nr_mem_id);
811			D("opening pipe named %s", p->interface);
812
813			p->nmd = nmport_open(p->interface);
814
815			if (p->nmd == NULL) {
816				D("cannot open %s", p->interface);
817				return (1);
818			} else if (p->nmd->mem != rxport->nmd->mem) {
819				D("failed to open pipe #%d in zero-copy mode, "
820					"please close any application that uses either pipe %s}%d, "
821				        "or %s{%d, and retry",
822					k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
823				return (1);
824			} else {
825				struct morefrag *mf;
826
827				D("successfully opened pipe #%d %s (tx slots: %d)",
828				  k + 1, p->interface, p->nmd->reg.nr_tx_slots);
829				p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
830				p->last_tail = nm_ring_next(p->ring, p->ring->tail);
831				mf = (struct morefrag *)p->ring->sem;
832				mf->last_flag = 0;	/* unused */
833				mf->last_hash = 0;	/* unused */
834				mf->shadow_head = p->ring->head;
835			}
836			D("zerocopy %s",
837			  (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
838
839			if (extra_bufs) {
840				struct overflow_queue *q = &oq[t + k];
841				q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
842				if (!q->slots) {
843					D("failed to allocate overflow queue for pipe %d", k);
844					/* make all overflow queue management fail */
845					extra_bufs = 0;
846				}
847				q->size = extra_bufs;
848				snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
849				p->oq = q;
850			}
851		}
852		t += g->nports;
853	}
854
855	if (glob_arg.extra_bufs && !extra_bufs) {
856		if (oq) {
857			for (i = 0; i < npipes + 1; i++) {
858				free(oq[i].slots);
859				oq[i].slots = NULL;
860			}
861			free(oq);
862			oq = NULL;
863		}
864		D("*** overflow queues disabled ***");
865	}
866
867	sleep(glob_arg.wait_link);
868
869	/* start stats thread after wait_link */
870	if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
871		D("unable to create the stats thread: %s", strerror(errno));
872		return 1;
873	}
874
875	struct pollfd pollfd[npipes + 1];
876	memset(&pollfd, 0, sizeof(pollfd));
877	signal(SIGINT, sigint_h);
878
879	/* make sure we wake up as often as needed, even when there are no
880	 * packets coming in
881	 */
882	if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
883		poll_timeout = glob_arg.syslog_interval;
884	if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
885		poll_timeout = glob_arg.stdout_interval;
886
887	/* initialize the morefrag structures for the input rings */
888	for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
889		struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
890		struct morefrag *mf = (struct morefrag *)rxring->sem;
891
892		mf->last_flag = 0;
893		mf->last_hash = 0;
894		mf->shadow_head = 0; /* unused */
895	}
896
897	while (!do_abort) {
898		u_int polli = 0;
899
900		for (i = 0; i < npipes; ++i) {
901			struct netmap_ring *ring = ports[i].ring;
902			int pending = nm_tx_pending(ring);
903
904			/* if there are packets pending, we want to be notified when
905			 * tail moves, so we let cur=tail
906			 */
907			ring->cur = pending ? ring->tail : ring->head;
908
909			if (!glob_arg.busy_wait && !pending) {
910				/* no need to poll, there are no packets pending */
911				continue;
912			}
913			pollfd[polli].fd = ports[i].nmd->fd;
914			pollfd[polli].events = POLLOUT;
915			pollfd[polli].revents = 0;
916			++polli;
917		}
918
919		pollfd[polli].fd = rxport->nmd->fd;
920		pollfd[polli].events = POLLIN;
921		pollfd[polli].revents = 0;
922		++polli;
923
924		ND(5, "polling %d file descriptors", polli);
925		rv = poll(pollfd, polli, poll_timeout);
926		if (rv <= 0) {
927			if (rv < 0 && errno != EAGAIN && errno != EINTR)
928				RD(1, "poll error %s", strerror(errno));
929			goto send_stats;
930		}
931
932		/* if there are several groups, try pushing released packets from
933		 * upstream groups to the downstream ones.
934		 *
935		 * It is important to do this before returned slots are reused
936		 * for new transmissions. For the same reason, this must be
937		 * done starting from the last group going backwards.
938		 */
939		for (i = glob_arg.num_groups - 1U; i > 0; i--) {
940			struct group_des *g = &groups[i - 1];
941
942			for (j = 0; j < g->nports; j++) {
943				struct port_des *p = &g->ports[j];
944				struct netmap_ring *ring = p->ring;
945				uint32_t last = p->last_tail,
946					 stop = nm_ring_next(ring, ring->tail);
947
948				/* slight abuse of the API here: we touch the slot
949				 * pointed to by tail
950				 */
951				for ( ; last != stop; last = nm_ring_next(ring, last)) {
952					struct netmap_slot *rs = &ring->slot[last];
953					// XXX less aggressive?
954					rs->buf_idx = forward_packet(g + 1, rs);
955					rs->flags = NS_BUF_CHANGED;
956					rs->ptr = 0;
957				}
958				p->last_tail = last;
959			}
960		}
961
962
963
964		if (oq) {
965			/* try to push packets from the overflow queues
966			 * to the corresponding pipes
967			 */
968			for (i = 0; i < npipes; i++) {
969				struct port_des *p = &ports[i];
970				struct overflow_queue *q = p->oq;
971				uint32_t k;
972				int64_t lim;
973				struct netmap_ring *ring;
974				struct netmap_slot *slot;
975				struct morefrag *mf;
976
977				if (oq_empty(q))
978					continue;
979				ring = p->ring;
980				mf = (struct morefrag *)ring->sem;
981				lim = ring->tail - mf->shadow_head;
982				if (!lim)
983					continue;
984				if (lim < 0)
985					lim += ring->num_slots;
986				if (q->n < lim)
987					lim = q->n;
988				for (k = 0; k < lim; k++) {
989					struct netmap_slot s = oq_deq(q), tmp;
990					tmp.ptr = 0;
991					slot = &ring->slot[mf->shadow_head];
992					tmp.buf_idx = slot->buf_idx;
993					oq_enq(freeq, &tmp);
994					*slot = s;
995					slot->flags |= NS_BUF_CHANGED;
996					mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
997					if (!(slot->flags & NS_MOREFRAG))
998						ring->head = mf->shadow_head;
999				}
1000			}
1001		}
1002
1003		/* push any new packets from the input port to the first group */
1004		int batch = 0;
1005		for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
1006			struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
1007			struct morefrag *mf = (struct morefrag *)rxring->sem;
1008
1009			//D("prepare to scan rings");
1010			int next_head = rxring->head;
1011			struct netmap_slot *next_slot = &rxring->slot[next_head];
1012			const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1013			while (!nm_ring_empty(rxring)) {
1014				struct netmap_slot *rs = next_slot;
1015				struct group_des *g = &groups[0];
1016				++received_pkts;
1017				received_bytes += rs->len;
1018
1019				// CHOOSE THE CORRECT OUTPUT PIPE
1020				// If the previous slot had NS_MOREFRAG set, this is another
1021				// fragment of the last packet and it should go to the same
1022				// output pipe as before.
1023				if (!mf->last_flag) {
1024					// 'B' is just a hashing seed
1025					mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
1026				}
1027				mf->last_flag = rs->flags & NS_MOREFRAG;
1028				rs->ptr = mf->last_hash;
1029				if (rs->ptr == 0) {
1030					non_ip++; // XXX ??
1031				}
1032				// prefetch the buffer for the next round
1033				next_head = nm_ring_next(rxring, next_head);
1034				next_slot = &rxring->slot[next_head];
1035				next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1036				__builtin_prefetch(next_buf);
1037				rs->buf_idx = forward_packet(g, rs);
1038				rs->flags = NS_BUF_CHANGED;
1039				rxring->head = rxring->cur = next_head;
1040
1041				batch++;
1042				if (unlikely(batch >= glob_arg.batch)) {
1043					ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
1044					batch = 0;
1045				}
1046				ND(1,
1047				   "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
1048				   forwarded, dropped,
1049				   ((float)dropped / (float)forwarded * 100));
1050			}
1051
1052		}
1053
1054	send_stats:
1055		if (counters_buf.status == COUNTERS_FULL)
1056			continue;
1057		/* take a new snapshot of the counters */
1058		gettimeofday(&counters_buf.ts, NULL);
1059		for (i = 0; i < npipes; i++) {
1060			struct my_ctrs *c = &counters_buf.ctrs[i];
1061			*c = ports[i].ctr;
1062			/*
1063			 * If there are overflow queues, copy the number of them for each
1064			 * port to the ctrs.oq_n variable for each port.
1065			 */
1066			if (ports[i].oq != NULL)
1067				c->oq_n = ports[i].oq->n;
1068		}
1069		counters_buf.received_pkts = received_pkts;
1070		counters_buf.received_bytes = received_bytes;
1071		counters_buf.non_ip = non_ip;
1072		if (freeq != NULL)
1073			counters_buf.freeq_n = freeq->n;
1074		__sync_synchronize();
1075		counters_buf.status = COUNTERS_FULL;
1076	}
1077
1078	/*
1079	 * If freeq exists, copy the number to the freeq_n member of the
1080	 * message struct, otherwise set it to 0.
1081	 */
1082	if (freeq != NULL) {
1083		freeq_n = freeq->n;
1084	} else {
1085		freeq_n = 0;
1086	}
1087
1088	pthread_join(stat_thread, NULL);
1089
1090	printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1091	       dropped, forwarded + dropped);
1092	return 0;
1093}
1094