1340279Svmaffione/*
2340279Svmaffione * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3340279Svmaffione *
4340279Svmaffione * Redistribution and use in source and binary forms, with or without
5340279Svmaffione * modification, are permitted provided that the following conditions
6340279Svmaffione * are met:
7340279Svmaffione *   1. Redistributions of source code must retain the above copyright
8340279Svmaffione *      notice, this list of conditions and the following disclaimer.
9340279Svmaffione *   2. Redistributions in binary form must reproduce the above copyright
10340279Svmaffione *      notice, this list of conditions and the following disclaimer in the
11340279Svmaffione *    documentation and/or other materials provided with the distribution.
12340279Svmaffione *
13340279Svmaffione * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14340279Svmaffione * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15340279Svmaffione * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16340279Svmaffione * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17340279Svmaffione * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18340279Svmaffione * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19340279Svmaffione * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20340279Svmaffione * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21340279Svmaffione * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22340279Svmaffione * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23340279Svmaffione * SUCH DAMAGE.
24340279Svmaffione */
25340279Svmaffione/* $FreeBSD: stable/11/tools/tools/netmap/lb.c 341434 2018-12-03 17:51:22Z vmaffione $ */
26340279Svmaffione#include <stdio.h>
27340279Svmaffione#include <string.h>
28340279Svmaffione#include <ctype.h>
29340279Svmaffione#include <stdbool.h>
30340279Svmaffione#include <inttypes.h>
31340279Svmaffione#include <syslog.h>
32340279Svmaffione
33340279Svmaffione#define NETMAP_WITH_LIBS
34340279Svmaffione#include <net/netmap_user.h>
35340279Svmaffione#include <sys/poll.h>
36340279Svmaffione
37340279Svmaffione#include <netinet/in.h>		/* htonl */
38340279Svmaffione
39340279Svmaffione#include <pthread.h>
40340279Svmaffione
41340279Svmaffione#include "pkt_hash.h"
42340279Svmaffione#include "ctrs.h"
43340279Svmaffione
44340279Svmaffione
45340279Svmaffione/*
46340279Svmaffione * use our version of header structs, rather than bringing in a ton
47340279Svmaffione * of platform specific ones
48340279Svmaffione */
49340279Svmaffione#ifndef ETH_ALEN
50340279Svmaffione#define ETH_ALEN 6
51340279Svmaffione#endif
52340279Svmaffione
53340279Svmaffionestruct compact_eth_hdr {
54340279Svmaffione	unsigned char h_dest[ETH_ALEN];
55340279Svmaffione	unsigned char h_source[ETH_ALEN];
56340279Svmaffione	u_int16_t h_proto;
57340279Svmaffione};
58340279Svmaffione
59340279Svmaffionestruct compact_ip_hdr {
60340279Svmaffione	u_int8_t ihl:4, version:4;
61340279Svmaffione	u_int8_t tos;
62340279Svmaffione	u_int16_t tot_len;
63340279Svmaffione	u_int16_t id;
64340279Svmaffione	u_int16_t frag_off;
65340279Svmaffione	u_int8_t ttl;
66340279Svmaffione	u_int8_t protocol;
67340279Svmaffione	u_int16_t check;
68340279Svmaffione	u_int32_t saddr;
69340279Svmaffione	u_int32_t daddr;
70340279Svmaffione};
71340279Svmaffione
72340279Svmaffionestruct compact_ipv6_hdr {
73340279Svmaffione	u_int8_t priority:4, version:4;
74340279Svmaffione	u_int8_t flow_lbl[3];
75340279Svmaffione	u_int16_t payload_len;
76340279Svmaffione	u_int8_t nexthdr;
77340279Svmaffione	u_int8_t hop_limit;
78340279Svmaffione	struct in6_addr saddr;
79340279Svmaffione	struct in6_addr daddr;
80340279Svmaffione};
81340279Svmaffione
82340279Svmaffione#define MAX_IFNAMELEN 	64
83340279Svmaffione#define MAX_PORTNAMELEN	(MAX_IFNAMELEN + 40)
84340279Svmaffione#define DEF_OUT_PIPES 	2
85340279Svmaffione#define DEF_EXTRA_BUFS 	0
86340279Svmaffione#define DEF_BATCH	2048
87340279Svmaffione#define DEF_WAIT_LINK	2
88340279Svmaffione#define DEF_STATS_INT	600
89340279Svmaffione#define BUF_REVOKE	100
90340279Svmaffione#define STAT_MSG_MAXSIZE 1024
91340279Svmaffione
92340279Svmaffionestruct {
93340279Svmaffione	char ifname[MAX_IFNAMELEN];
94340279Svmaffione	char base_name[MAX_IFNAMELEN];
95340279Svmaffione	int netmap_fd;
96340279Svmaffione	uint16_t output_rings;
97340279Svmaffione	uint16_t num_groups;
98340279Svmaffione	uint32_t extra_bufs;
99340279Svmaffione	uint16_t batch;
100340279Svmaffione	int stdout_interval;
101340279Svmaffione	int syslog_interval;
102340279Svmaffione	int wait_link;
103340279Svmaffione	bool busy_wait;
104340279Svmaffione} glob_arg;
105340279Svmaffione
106340279Svmaffione/*
107340279Svmaffione * the overflow queue is a circular queue of buffers
108340279Svmaffione */
109340279Svmaffionestruct overflow_queue {
110340279Svmaffione	char name[MAX_IFNAMELEN + 16];
111340279Svmaffione	struct netmap_slot *slots;
112340279Svmaffione	uint32_t head;
113340279Svmaffione	uint32_t tail;
114340279Svmaffione	uint32_t n;
115340279Svmaffione	uint32_t size;
116340279Svmaffione};
117340279Svmaffione
118340279Svmaffionestruct overflow_queue *freeq;
119340279Svmaffione
120340279Svmaffionestatic inline int
121340279Svmaffioneoq_full(struct overflow_queue *q)
122340279Svmaffione{
123340279Svmaffione	return q->n >= q->size;
124340279Svmaffione}
125340279Svmaffione
126340279Svmaffionestatic inline int
127340279Svmaffioneoq_empty(struct overflow_queue *q)
128340279Svmaffione{
129340279Svmaffione	return q->n <= 0;
130340279Svmaffione}
131340279Svmaffione
132340279Svmaffionestatic inline void
133340279Svmaffioneoq_enq(struct overflow_queue *q, const struct netmap_slot *s)
134340279Svmaffione{
135340279Svmaffione	if (unlikely(oq_full(q))) {
136340279Svmaffione		D("%s: queue full!", q->name);
137340279Svmaffione		abort();
138340279Svmaffione	}
139340279Svmaffione	q->slots[q->tail] = *s;
140340279Svmaffione	q->n++;
141340279Svmaffione	q->tail++;
142340279Svmaffione	if (q->tail >= q->size)
143340279Svmaffione		q->tail = 0;
144340279Svmaffione}
145340279Svmaffione
146340279Svmaffionestatic inline struct netmap_slot
147340279Svmaffioneoq_deq(struct overflow_queue *q)
148340279Svmaffione{
149340279Svmaffione	struct netmap_slot s = q->slots[q->head];
150340279Svmaffione	if (unlikely(oq_empty(q))) {
151340279Svmaffione		D("%s: queue empty!", q->name);
152340279Svmaffione		abort();
153340279Svmaffione	}
154340279Svmaffione	q->n--;
155340279Svmaffione	q->head++;
156340279Svmaffione	if (q->head >= q->size)
157340279Svmaffione		q->head = 0;
158340279Svmaffione	return s;
159340279Svmaffione}
160340279Svmaffione
161340279Svmaffionestatic volatile int do_abort = 0;
162340279Svmaffione
163340279Svmaffioneuint64_t dropped = 0;
164340279Svmaffioneuint64_t forwarded = 0;
165340279Svmaffioneuint64_t received_bytes = 0;
166340279Svmaffioneuint64_t received_pkts = 0;
167340279Svmaffioneuint64_t non_ip = 0;
168340279Svmaffioneuint32_t freeq_n = 0;
169340279Svmaffione
170340279Svmaffionestruct port_des {
171340279Svmaffione	char interface[MAX_PORTNAMELEN];
172340279Svmaffione	struct my_ctrs ctr;
173340279Svmaffione	unsigned int last_sync;
174340279Svmaffione	uint32_t last_tail;
175340279Svmaffione	struct overflow_queue *oq;
176340279Svmaffione	struct nm_desc *nmd;
177340279Svmaffione	struct netmap_ring *ring;
178340279Svmaffione	struct group_des *group;
179340279Svmaffione};
180340279Svmaffione
181340279Svmaffionestruct port_des *ports;
182340279Svmaffione
183340279Svmaffione/* each group of pipes receives all the packets */
184340279Svmaffionestruct group_des {
185340279Svmaffione	char pipename[MAX_IFNAMELEN];
186340279Svmaffione	struct port_des *ports;
187340279Svmaffione	int first_id;
188340279Svmaffione	int nports;
189340279Svmaffione	int last;
190340279Svmaffione	int custom_port;
191340279Svmaffione};
192340279Svmaffione
193340279Svmaffionestruct group_des *groups;
194340279Svmaffione
195340279Svmaffione/* statistcs */
196340279Svmaffionestruct counters {
197340279Svmaffione	struct timeval ts;
198340279Svmaffione	struct my_ctrs *ctrs;
199340279Svmaffione	uint64_t received_pkts;
200340279Svmaffione	uint64_t received_bytes;
201340279Svmaffione	uint64_t non_ip;
202340279Svmaffione	uint32_t freeq_n;
203340279Svmaffione	int status __attribute__((aligned(64)));
204340279Svmaffione#define COUNTERS_EMPTY	0
205340279Svmaffione#define COUNTERS_FULL	1
206340279Svmaffione};
207340279Svmaffione
208340279Svmaffionestruct counters counters_buf;
209340279Svmaffione
210340279Svmaffionestatic void *
211340279Svmaffioneprint_stats(void *arg)
212340279Svmaffione{
213340279Svmaffione	int npipes = glob_arg.output_rings;
214340279Svmaffione	int sys_int = 0;
215340279Svmaffione	(void)arg;
216340279Svmaffione	struct my_ctrs cur, prev;
217340279Svmaffione	struct my_ctrs *pipe_prev;
218340279Svmaffione
219340279Svmaffione	pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220340279Svmaffione	if (pipe_prev == NULL) {
221340279Svmaffione		D("out of memory");
222340279Svmaffione		exit(1);
223340279Svmaffione	}
224340279Svmaffione
225340279Svmaffione	char stat_msg[STAT_MSG_MAXSIZE] = "";
226340279Svmaffione
227340279Svmaffione	memset(&prev, 0, sizeof(prev));
228340279Svmaffione	while (!do_abort) {
229340279Svmaffione		int j, dosyslog = 0, dostdout = 0, newdata;
230340279Svmaffione		uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
231340279Svmaffione		struct my_ctrs x;
232340279Svmaffione
233340279Svmaffione		counters_buf.status = COUNTERS_EMPTY;
234340279Svmaffione		newdata = 0;
235340279Svmaffione		memset(&cur, 0, sizeof(cur));
236340279Svmaffione		sleep(1);
237340279Svmaffione		if (counters_buf.status == COUNTERS_FULL) {
238340279Svmaffione			__sync_synchronize();
239340279Svmaffione			newdata = 1;
240340279Svmaffione			cur.t = counters_buf.ts;
241340279Svmaffione			if (prev.t.tv_sec || prev.t.tv_usec) {
242340279Svmaffione				usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243340279Svmaffione					cur.t.tv_usec - prev.t.tv_usec;
244340279Svmaffione			}
245340279Svmaffione		}
246340279Svmaffione
247340279Svmaffione		++sys_int;
248340279Svmaffione		if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
249340279Svmaffione				dostdout = 1;
250340279Svmaffione		if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
251340279Svmaffione				dosyslog = 1;
252340279Svmaffione
253340279Svmaffione		for (j = 0; j < npipes; ++j) {
254340279Svmaffione			struct my_ctrs *c = &counters_buf.ctrs[j];
255340279Svmaffione			cur.pkts += c->pkts;
256340279Svmaffione			cur.drop += c->drop;
257340279Svmaffione			cur.drop_bytes += c->drop_bytes;
258340279Svmaffione			cur.bytes += c->bytes;
259340279Svmaffione
260340279Svmaffione			if (usec) {
261340279Svmaffione				x.pkts = c->pkts - pipe_prev[j].pkts;
262340279Svmaffione				x.drop = c->drop - pipe_prev[j].drop;
263340279Svmaffione				x.bytes = c->bytes - pipe_prev[j].bytes;
264340279Svmaffione				x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265340279Svmaffione				pps = (x.pkts*1000000 + usec/2) / usec;
266340279Svmaffione				dps = (x.drop*1000000 + usec/2) / usec;
267340279Svmaffione				bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268340279Svmaffione				dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
269340279Svmaffione			}
270340279Svmaffione			pipe_prev[j] = *c;
271340279Svmaffione
272340279Svmaffione			if ( (dosyslog || dostdout) && newdata )
273340279Svmaffione				snprintf(stat_msg, STAT_MSG_MAXSIZE,
274340279Svmaffione				       "{"
275340279Svmaffione				       "\"ts\":%.6f,"
276340279Svmaffione				       "\"interface\":\"%s\","
277340279Svmaffione				       "\"output_ring\":%" PRIu16 ","
278340279Svmaffione				       "\"packets_forwarded\":%" PRIu64 ","
279340279Svmaffione				       "\"packets_dropped\":%" PRIu64 ","
280340279Svmaffione				       "\"data_forward_rate_Mbps\":%.4f,"
281340279Svmaffione				       "\"data_drop_rate_Mbps\":%.4f,"
282340279Svmaffione				       "\"packet_forward_rate_kpps\":%.4f,"
283340279Svmaffione				       "\"packet_drop_rate_kpps\":%.4f,"
284340279Svmaffione				       "\"overflow_queue_size\":%" PRIu32
285340279Svmaffione				       "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
286340279Svmaffione				            ports[j].interface,
287340279Svmaffione				            j,
288340279Svmaffione				            c->pkts,
289340279Svmaffione				            c->drop,
290340279Svmaffione				            (double)bps / 1024 / 1024,
291340279Svmaffione				            (double)dbps / 1024 / 1024,
292340279Svmaffione				            (double)pps / 1000,
293340279Svmaffione				            (double)dps / 1000,
294340279Svmaffione				            c->oq_n);
295340279Svmaffione
296340279Svmaffione			if (dosyslog && stat_msg[0])
297340279Svmaffione				syslog(LOG_INFO, "%s", stat_msg);
298340279Svmaffione			if (dostdout && stat_msg[0])
299340279Svmaffione				printf("%s\n", stat_msg);
300340279Svmaffione		}
301340279Svmaffione		if (usec) {
302340279Svmaffione			x.pkts = cur.pkts - prev.pkts;
303340279Svmaffione			x.drop = cur.drop - prev.drop;
304340279Svmaffione			x.bytes = cur.bytes - prev.bytes;
305340279Svmaffione			x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306340279Svmaffione			pps = (x.pkts*1000000 + usec/2) / usec;
307340279Svmaffione			dps = (x.drop*1000000 + usec/2) / usec;
308340279Svmaffione			bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309340279Svmaffione			dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
310340279Svmaffione		}
311340279Svmaffione
312340279Svmaffione		if ( (dosyslog || dostdout) && newdata )
313340279Svmaffione			snprintf(stat_msg, STAT_MSG_MAXSIZE,
314340279Svmaffione			         "{"
315340279Svmaffione			         "\"ts\":%.6f,"
316340279Svmaffione			         "\"interface\":\"%s\","
317340279Svmaffione			         "\"output_ring\":null,"
318340279Svmaffione			         "\"packets_received\":%" PRIu64 ","
319340279Svmaffione			         "\"packets_forwarded\":%" PRIu64 ","
320340279Svmaffione			         "\"packets_dropped\":%" PRIu64 ","
321340279Svmaffione			         "\"non_ip_packets\":%" PRIu64 ","
322340279Svmaffione			         "\"data_forward_rate_Mbps\":%.4f,"
323340279Svmaffione			         "\"data_drop_rate_Mbps\":%.4f,"
324340279Svmaffione			         "\"packet_forward_rate_kpps\":%.4f,"
325340279Svmaffione			         "\"packet_drop_rate_kpps\":%.4f,"
326340279Svmaffione			         "\"free_buffer_slots\":%" PRIu32
327340279Svmaffione			         "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
328340279Svmaffione			              glob_arg.ifname,
329340279Svmaffione			              received_pkts,
330340279Svmaffione			              cur.pkts,
331340279Svmaffione			              cur.drop,
332340279Svmaffione			              counters_buf.non_ip,
333340279Svmaffione			              (double)bps / 1024 / 1024,
334340279Svmaffione			              (double)dbps / 1024 / 1024,
335340279Svmaffione			              (double)pps / 1000,
336340279Svmaffione			              (double)dps / 1000,
337340279Svmaffione			              counters_buf.freeq_n);
338340279Svmaffione
339340279Svmaffione		if (dosyslog && stat_msg[0])
340340279Svmaffione			syslog(LOG_INFO, "%s", stat_msg);
341340279Svmaffione		if (dostdout && stat_msg[0])
342340279Svmaffione			printf("%s\n", stat_msg);
343340279Svmaffione
344340279Svmaffione		prev = cur;
345340279Svmaffione	}
346340279Svmaffione
347340279Svmaffione	free(pipe_prev);
348340279Svmaffione
349340279Svmaffione	return NULL;
350340279Svmaffione}
351340279Svmaffione
352340279Svmaffionestatic void
353340279Svmaffionefree_buffers(void)
354340279Svmaffione{
355340279Svmaffione	int i, tot = 0;
356340279Svmaffione	struct port_des *rxport = &ports[glob_arg.output_rings];
357340279Svmaffione
358340279Svmaffione	/* build a netmap free list with the buffers in all the overflow queues */
359340279Svmaffione	for (i = 0; i < glob_arg.output_rings + 1; i++) {
360340279Svmaffione		struct port_des *cp = &ports[i];
361340279Svmaffione		struct overflow_queue *q = cp->oq;
362340279Svmaffione
363340279Svmaffione		if (!q)
364340279Svmaffione			continue;
365340279Svmaffione
366340279Svmaffione		while (q->n) {
367340279Svmaffione			struct netmap_slot s = oq_deq(q);
368340279Svmaffione			uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
369340279Svmaffione
370340279Svmaffione			*b = rxport->nmd->nifp->ni_bufs_head;
371340279Svmaffione			rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
372340279Svmaffione			tot++;
373340279Svmaffione		}
374340279Svmaffione	}
375340279Svmaffione	D("added %d buffers to netmap free list", tot);
376340279Svmaffione
377340279Svmaffione	for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378340279Svmaffione		nm_close(ports[i].nmd);
379340279Svmaffione	}
380340279Svmaffione}
381340279Svmaffione
382340279Svmaffione
383340279Svmaffionestatic void sigint_h(int sig)
384340279Svmaffione{
385340279Svmaffione	(void)sig;		/* UNUSED */
386340279Svmaffione	do_abort = 1;
387340279Svmaffione	signal(SIGINT, SIG_DFL);
388340279Svmaffione}
389340279Svmaffione
390340279Svmaffionevoid usage()
391340279Svmaffione{
392340279Svmaffione	printf("usage: lb [options]\n");
393340279Svmaffione	printf("where options are:\n");
394340279Svmaffione	printf("  -h              	view help text\n");
395340279Svmaffione	printf("  -i iface        	interface name (required)\n");
396340279Svmaffione	printf("  -p [prefix:]npipes	add a new group of output pipes\n");
397340279Svmaffione	printf("  -B nbufs        	number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
398340279Svmaffione	printf("  -b batch        	batch size (default: %d)\n", DEF_BATCH);
399340279Svmaffione	printf("  -w seconds        	wait for link up (default: %d)\n", DEF_WAIT_LINK);
400340279Svmaffione	printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
401340279Svmaffione	printf("  -s seconds      	seconds between syslog stats messages (default: 0)\n");
402340279Svmaffione	printf("  -o seconds      	seconds between stdout stats messages (default: 0)\n");
403340279Svmaffione	exit(0);
404340279Svmaffione}
405340279Svmaffione
406340279Svmaffionestatic int
407340279Svmaffioneparse_pipes(char *spec)
408340279Svmaffione{
409340279Svmaffione	char *end = index(spec, ':');
410340279Svmaffione	static int max_groups = 0;
411340279Svmaffione	struct group_des *g;
412340279Svmaffione
413340279Svmaffione	ND("spec %s num_groups %d", spec, glob_arg.num_groups);
414340279Svmaffione	if (max_groups < glob_arg.num_groups + 1) {
415340279Svmaffione		size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
416340279Svmaffione		groups = realloc(groups, size);
417340279Svmaffione		if (groups == NULL) {
418340279Svmaffione			D("out of memory");
419340279Svmaffione			return 1;
420340279Svmaffione		}
421340279Svmaffione	}
422340279Svmaffione	g = &groups[glob_arg.num_groups];
423340279Svmaffione	memset(g, 0, sizeof(*g));
424340279Svmaffione
425340279Svmaffione	if (end != NULL) {
426340279Svmaffione		if (end - spec > MAX_IFNAMELEN - 8) {
427340279Svmaffione			D("name '%s' too long", spec);
428340279Svmaffione			return 1;
429340279Svmaffione		}
430340279Svmaffione		if (end == spec) {
431340279Svmaffione			D("missing prefix before ':' in '%s'", spec);
432340279Svmaffione			return 1;
433340279Svmaffione		}
434340279Svmaffione		strncpy(g->pipename, spec, end - spec);
435340279Svmaffione		g->custom_port = 1;
436340279Svmaffione		end++;
437340279Svmaffione	} else {
438340279Svmaffione		/* no prefix, this group will use the
439340279Svmaffione		 * name of the input port.
440340279Svmaffione		 * This will be set in init_groups(),
441340279Svmaffione		 * since here the input port may still
442340279Svmaffione		 * be uninitialized
443340279Svmaffione		 */
444340279Svmaffione		end = spec;
445340279Svmaffione	}
446340279Svmaffione	if (*end == '\0') {
447340279Svmaffione		g->nports = DEF_OUT_PIPES;
448340279Svmaffione	} else {
449340279Svmaffione		g->nports = atoi(end);
450340279Svmaffione		if (g->nports < 1) {
451340279Svmaffione			D("invalid number of pipes '%s' (must be at least 1)", end);
452340279Svmaffione			return 1;
453340279Svmaffione		}
454340279Svmaffione	}
455340279Svmaffione	glob_arg.output_rings += g->nports;
456340279Svmaffione	glob_arg.num_groups++;
457340279Svmaffione	return 0;
458340279Svmaffione}
459340279Svmaffione
460340279Svmaffione/* complete the initialization of the groups data structure */
461340279Svmaffionevoid init_groups(void)
462340279Svmaffione{
463340279Svmaffione	int i, j, t = 0;
464340279Svmaffione	struct group_des *g = NULL;
465340279Svmaffione	for (i = 0; i < glob_arg.num_groups; i++) {
466340279Svmaffione		g = &groups[i];
467340279Svmaffione		g->ports = &ports[t];
468340279Svmaffione		for (j = 0; j < g->nports; j++)
469340279Svmaffione			g->ports[j].group = g;
470340279Svmaffione		t += g->nports;
471340279Svmaffione		if (!g->custom_port)
472340279Svmaffione			strcpy(g->pipename, glob_arg.base_name);
473340279Svmaffione		for (j = 0; j < i; j++) {
474340279Svmaffione			struct group_des *h = &groups[j];
475340279Svmaffione			if (!strcmp(h->pipename, g->pipename))
476340279Svmaffione				g->first_id += h->nports;
477340279Svmaffione		}
478340279Svmaffione	}
479340279Svmaffione	g->last = 1;
480340279Svmaffione}
481340279Svmaffione
482340279Svmaffione/* push the packet described by slot rs to the group g.
483340279Svmaffione * This may cause other buffers to be pushed down the
484340279Svmaffione * chain headed by g.
485340279Svmaffione * Return a free buffer.
486340279Svmaffione */
487340279Svmaffioneuint32_t forward_packet(struct group_des *g, struct netmap_slot *rs)
488340279Svmaffione{
489340279Svmaffione	uint32_t hash = rs->ptr;
490340279Svmaffione	uint32_t output_port = hash % g->nports;
491340279Svmaffione	struct port_des *port = &g->ports[output_port];
492340279Svmaffione	struct netmap_ring *ring = port->ring;
493340279Svmaffione	struct overflow_queue *q = port->oq;
494340279Svmaffione
495340279Svmaffione	/* Move the packet to the output pipe, unless there is
496340279Svmaffione	 * either no space left on the ring, or there is some
497340279Svmaffione	 * packet still in the overflow queue (since those must
498340279Svmaffione	 * take precedence over the new one)
499340279Svmaffione	*/
500340279Svmaffione	if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
501340279Svmaffione		struct netmap_slot *ts = &ring->slot[ring->head];
502340279Svmaffione		struct netmap_slot old_slot = *ts;
503340279Svmaffione
504340279Svmaffione		ts->buf_idx = rs->buf_idx;
505340279Svmaffione		ts->len = rs->len;
506340279Svmaffione		ts->flags |= NS_BUF_CHANGED;
507340279Svmaffione		ts->ptr = rs->ptr;
508340279Svmaffione		ring->head = nm_ring_next(ring, ring->head);
509340279Svmaffione		port->ctr.bytes += rs->len;
510340279Svmaffione		port->ctr.pkts++;
511340279Svmaffione		forwarded++;
512340279Svmaffione		return old_slot.buf_idx;
513340279Svmaffione	}
514340279Svmaffione
515340279Svmaffione	/* use the overflow queue, if available */
516340279Svmaffione	if (q == NULL || oq_full(q)) {
517340279Svmaffione		/* no space left on the ring and no overflow queue
518340279Svmaffione		 * available: we are forced to drop the packet
519340279Svmaffione		 */
520340279Svmaffione		dropped++;
521340279Svmaffione		port->ctr.drop++;
522340279Svmaffione		port->ctr.drop_bytes += rs->len;
523340279Svmaffione		return rs->buf_idx;
524340279Svmaffione	}
525340279Svmaffione
526340279Svmaffione	oq_enq(q, rs);
527340279Svmaffione
528340279Svmaffione	/*
529340279Svmaffione	 * we cannot continue down the chain and we need to
530340279Svmaffione	 * return a free buffer now. We take it from the free queue.
531340279Svmaffione	 */
532340279Svmaffione	if (oq_empty(freeq)) {
533340279Svmaffione		/* the free queue is empty. Revoke some buffers
534340279Svmaffione		 * from the longest overflow queue
535340279Svmaffione		 */
536340279Svmaffione		uint32_t j;
537340279Svmaffione		struct port_des *lp = &ports[0];
538340279Svmaffione		uint32_t max = lp->oq->n;
539340279Svmaffione
540340279Svmaffione		/* let lp point to the port with the longest queue */
541340279Svmaffione		for (j = 1; j < glob_arg.output_rings; j++) {
542340279Svmaffione			struct port_des *cp = &ports[j];
543340279Svmaffione			if (cp->oq->n > max) {
544340279Svmaffione				lp = cp;
545340279Svmaffione				max = cp->oq->n;
546340279Svmaffione			}
547340279Svmaffione		}
548340279Svmaffione
549340279Svmaffione		/* move the oldest BUF_REVOKE buffers from the
550340279Svmaffione		 * lp queue to the free queue
551340279Svmaffione		 */
552340279Svmaffione		// XXX optimize this cycle
553340279Svmaffione		for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
554340279Svmaffione			struct netmap_slot tmp = oq_deq(lp->oq);
555340279Svmaffione
556340279Svmaffione			dropped++;
557340279Svmaffione			lp->ctr.drop++;
558340279Svmaffione			lp->ctr.drop_bytes += tmp.len;
559340279Svmaffione
560340279Svmaffione			oq_enq(freeq, &tmp);
561340279Svmaffione		}
562340279Svmaffione
563340279Svmaffione		ND(1, "revoked %d buffers from %s", j, lq->name);
564340279Svmaffione	}
565340279Svmaffione
566340279Svmaffione	return oq_deq(freeq).buf_idx;
567340279Svmaffione}
568340279Svmaffione
569340279Svmaffioneint main(int argc, char **argv)
570340279Svmaffione{
571340279Svmaffione	int ch;
572340279Svmaffione	uint32_t i;
573340279Svmaffione	int rv;
574340279Svmaffione	unsigned int iter = 0;
575340279Svmaffione	int poll_timeout = 10; /* default */
576340279Svmaffione
577340279Svmaffione	glob_arg.ifname[0] = '\0';
578340279Svmaffione	glob_arg.output_rings = 0;
579340279Svmaffione	glob_arg.batch = DEF_BATCH;
580340279Svmaffione	glob_arg.wait_link = DEF_WAIT_LINK;
581340279Svmaffione	glob_arg.busy_wait = false;
582340279Svmaffione	glob_arg.syslog_interval = 0;
583340279Svmaffione	glob_arg.stdout_interval = 0;
584340279Svmaffione
585340279Svmaffione	while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
586340279Svmaffione		switch (ch) {
587340279Svmaffione		case 'i':
588340279Svmaffione			D("interface is %s", optarg);
589340279Svmaffione			if (strlen(optarg) > MAX_IFNAMELEN - 8) {
590340279Svmaffione				D("ifname too long %s", optarg);
591340279Svmaffione				return 1;
592340279Svmaffione			}
593340279Svmaffione			if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
594340279Svmaffione				sprintf(glob_arg.ifname, "netmap:%s", optarg);
595340279Svmaffione			} else {
596340279Svmaffione				strcpy(glob_arg.ifname, optarg);
597340279Svmaffione			}
598340279Svmaffione			break;
599340279Svmaffione
600340279Svmaffione		case 'p':
601340279Svmaffione			if (parse_pipes(optarg)) {
602340279Svmaffione				usage();
603340279Svmaffione				return 1;
604340279Svmaffione			}
605340279Svmaffione			break;
606340279Svmaffione
607340279Svmaffione		case 'B':
608340279Svmaffione			glob_arg.extra_bufs = atoi(optarg);
609340279Svmaffione			D("requested %d extra buffers", glob_arg.extra_bufs);
610340279Svmaffione			break;
611340279Svmaffione
612340279Svmaffione		case 'b':
613340279Svmaffione			glob_arg.batch = atoi(optarg);
614340279Svmaffione			D("batch is %d", glob_arg.batch);
615340279Svmaffione			break;
616340279Svmaffione
617340279Svmaffione		case 'w':
618340279Svmaffione			glob_arg.wait_link = atoi(optarg);
619340279Svmaffione			D("link wait for up time is %d", glob_arg.wait_link);
620340279Svmaffione			break;
621340279Svmaffione
622340279Svmaffione		case 'W':
623340279Svmaffione			glob_arg.busy_wait = true;
624340279Svmaffione			break;
625340279Svmaffione
626340279Svmaffione		case 'o':
627340279Svmaffione			glob_arg.stdout_interval = atoi(optarg);
628340279Svmaffione			break;
629340279Svmaffione
630340279Svmaffione		case 's':
631340279Svmaffione			glob_arg.syslog_interval = atoi(optarg);
632340279Svmaffione			break;
633340279Svmaffione
634340279Svmaffione		case 'h':
635340279Svmaffione			usage();
636340279Svmaffione			return 0;
637340279Svmaffione			break;
638340279Svmaffione
639340279Svmaffione		default:
640340279Svmaffione			D("bad option %c %s", ch, optarg);
641340279Svmaffione			usage();
642340279Svmaffione			return 1;
643340279Svmaffione		}
644340279Svmaffione	}
645340279Svmaffione
646340279Svmaffione	if (glob_arg.ifname[0] == '\0') {
647340279Svmaffione		D("missing interface name");
648340279Svmaffione		usage();
649340279Svmaffione		return 1;
650340279Svmaffione	}
651340279Svmaffione
652340279Svmaffione	/* extract the base name */
653340279Svmaffione	char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
654340279Svmaffione			glob_arg.ifname : glob_arg.ifname + 7;
655340279Svmaffione	strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN-1);
656340279Svmaffione	for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++)
657340279Svmaffione		;
658340279Svmaffione	*nscan = '\0';
659340279Svmaffione
660340279Svmaffione	if (glob_arg.num_groups == 0)
661340279Svmaffione		parse_pipes("");
662340279Svmaffione
663340279Svmaffione	if (glob_arg.syslog_interval) {
664340279Svmaffione		setlogmask(LOG_UPTO(LOG_INFO));
665340279Svmaffione		openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
666340279Svmaffione	}
667340279Svmaffione
668340279Svmaffione	uint32_t npipes = glob_arg.output_rings;
669340279Svmaffione
670340279Svmaffione
671340279Svmaffione	pthread_t stat_thread;
672340279Svmaffione
673340279Svmaffione	ports = calloc(npipes + 1, sizeof(struct port_des));
674340279Svmaffione	if (!ports) {
675340279Svmaffione		D("failed to allocate the stats array");
676340279Svmaffione		return 1;
677340279Svmaffione	}
678340279Svmaffione	struct port_des *rxport = &ports[npipes];
679340279Svmaffione	init_groups();
680340279Svmaffione
681340279Svmaffione	memset(&counters_buf, 0, sizeof(counters_buf));
682340279Svmaffione	counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
683340279Svmaffione	if (!counters_buf.ctrs) {
684340279Svmaffione		D("failed to allocate the counters snapshot buffer");
685340279Svmaffione		return 1;
686340279Svmaffione	}
687340279Svmaffione
688340279Svmaffione	/* we need base_req to specify pipes and extra bufs */
689340279Svmaffione	struct nmreq base_req;
690340279Svmaffione	memset(&base_req, 0, sizeof(base_req));
691340279Svmaffione
692340279Svmaffione	base_req.nr_arg1 = npipes;
693340279Svmaffione	base_req.nr_arg3 = glob_arg.extra_bufs;
694340279Svmaffione
695340279Svmaffione	rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
696340279Svmaffione
697340279Svmaffione	if (rxport->nmd == NULL) {
698340279Svmaffione		D("cannot open %s", glob_arg.ifname);
699340279Svmaffione		return (1);
700340279Svmaffione	} else {
701340279Svmaffione		D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
702340279Svmaffione		  rxport->nmd->req.nr_tx_slots);
703340279Svmaffione	}
704340279Svmaffione
705340279Svmaffione	uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
706340279Svmaffione	struct overflow_queue *oq = NULL;
707340279Svmaffione	/* reference ring to access the buffers */
708340279Svmaffione	rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
709340279Svmaffione
710340279Svmaffione	if (!glob_arg.extra_bufs)
711340279Svmaffione		goto run;
712340279Svmaffione
713340279Svmaffione	D("obtained %d extra buffers", extra_bufs);
714340279Svmaffione	if (!extra_bufs)
715340279Svmaffione		goto run;
716340279Svmaffione
717340279Svmaffione	/* one overflow queue for each output pipe, plus one for the
718340279Svmaffione	 * free extra buffers
719340279Svmaffione	 */
720340279Svmaffione	oq = calloc(npipes + 1, sizeof(struct overflow_queue));
721340279Svmaffione	if (!oq) {
722340279Svmaffione		D("failed to allocated overflow queues descriptors");
723340279Svmaffione		goto run;
724340279Svmaffione	}
725340279Svmaffione
726340279Svmaffione	freeq = &oq[npipes];
727340279Svmaffione	rxport->oq = freeq;
728340279Svmaffione
729340279Svmaffione	freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
730340279Svmaffione	if (!freeq->slots) {
731340279Svmaffione		D("failed to allocate the free list");
732340279Svmaffione	}
733340279Svmaffione	freeq->size = extra_bufs;
734340279Svmaffione	snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
735340279Svmaffione
736340279Svmaffione	/*
737340279Svmaffione	 * the list of buffers uses the first uint32_t in each buffer
738340279Svmaffione	 * as the index of the next buffer.
739340279Svmaffione	 */
740340279Svmaffione	uint32_t scan;
741340279Svmaffione	for (scan = rxport->nmd->nifp->ni_bufs_head;
742340279Svmaffione	     scan;
743340279Svmaffione	     scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
744340279Svmaffione	{
745340279Svmaffione		struct netmap_slot s;
746340279Svmaffione		s.len = s.flags = 0;
747340279Svmaffione		s.ptr = 0;
748340279Svmaffione		s.buf_idx = scan;
749340279Svmaffione		ND("freeq <- %d", s.buf_idx);
750340279Svmaffione		oq_enq(freeq, &s);
751340279Svmaffione	}
752340279Svmaffione
753340279Svmaffione
754340279Svmaffione	if (freeq->n != extra_bufs) {
755340279Svmaffione		D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
756340279Svmaffione				extra_bufs, freeq->n);
757340279Svmaffione		return 1;
758340279Svmaffione	}
759340279Svmaffione	rxport->nmd->nifp->ni_bufs_head = 0;
760340279Svmaffione
761340279Svmaffionerun:
762340279Svmaffione	atexit(free_buffers);
763340279Svmaffione
764340279Svmaffione	int j, t = 0;
765340279Svmaffione	for (j = 0; j < glob_arg.num_groups; j++) {
766340279Svmaffione		struct group_des *g = &groups[j];
767340279Svmaffione		int k;
768340279Svmaffione		for (k = 0; k < g->nports; ++k) {
769340279Svmaffione			struct port_des *p = &g->ports[k];
770340279Svmaffione			snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
771340279Svmaffione					(strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
772340279Svmaffione					g->pipename, g->first_id + k,
773340279Svmaffione					rxport->nmd->req.nr_arg2);
774340279Svmaffione			D("opening pipe named %s", p->interface);
775340279Svmaffione
776340279Svmaffione			p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd);
777340279Svmaffione
778340279Svmaffione			if (p->nmd == NULL) {
779340279Svmaffione				D("cannot open %s", p->interface);
780340279Svmaffione				return (1);
781340279Svmaffione			} else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) {
782340279Svmaffione				D("failed to open pipe #%d in zero-copy mode, "
783340279Svmaffione					"please close any application that uses either pipe %s}%d, "
784340279Svmaffione				        "or %s{%d, and retry",
785340279Svmaffione					k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
786340279Svmaffione				return (1);
787340279Svmaffione			} else {
788340279Svmaffione				D("successfully opened pipe #%d %s (tx slots: %d)",
789340279Svmaffione				  k + 1, p->interface, p->nmd->req.nr_tx_slots);
790340279Svmaffione				p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
791340279Svmaffione				p->last_tail = nm_ring_next(p->ring, p->ring->tail);
792340279Svmaffione			}
793340279Svmaffione			D("zerocopy %s",
794340279Svmaffione			  (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
795340279Svmaffione
796340279Svmaffione			if (extra_bufs) {
797340279Svmaffione				struct overflow_queue *q = &oq[t + k];
798340279Svmaffione				q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
799340279Svmaffione				if (!q->slots) {
800340279Svmaffione					D("failed to allocate overflow queue for pipe %d", k);
801340279Svmaffione					/* make all overflow queue management fail */
802340279Svmaffione					extra_bufs = 0;
803340279Svmaffione				}
804340279Svmaffione				q->size = extra_bufs;
805340279Svmaffione				snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
806340279Svmaffione				p->oq = q;
807340279Svmaffione			}
808340279Svmaffione		}
809340279Svmaffione		t += g->nports;
810340279Svmaffione	}
811340279Svmaffione
812340279Svmaffione	if (glob_arg.extra_bufs && !extra_bufs) {
813340279Svmaffione		if (oq) {
814340279Svmaffione			for (i = 0; i < npipes + 1; i++) {
815340279Svmaffione				free(oq[i].slots);
816340279Svmaffione				oq[i].slots = NULL;
817340279Svmaffione			}
818340279Svmaffione			free(oq);
819340279Svmaffione			oq = NULL;
820340279Svmaffione		}
821340279Svmaffione		D("*** overflow queues disabled ***");
822340279Svmaffione	}
823340279Svmaffione
824340279Svmaffione	sleep(glob_arg.wait_link);
825340279Svmaffione
826340279Svmaffione	/* start stats thread after wait_link */
827340279Svmaffione	if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
828340279Svmaffione		D("unable to create the stats thread: %s", strerror(errno));
829340279Svmaffione		return 1;
830340279Svmaffione	}
831340279Svmaffione
832340279Svmaffione	struct pollfd pollfd[npipes + 1];
833340279Svmaffione	memset(&pollfd, 0, sizeof(pollfd));
834340279Svmaffione	signal(SIGINT, sigint_h);
835340279Svmaffione
836340279Svmaffione	/* make sure we wake up as often as needed, even when there are no
837340279Svmaffione	 * packets coming in
838340279Svmaffione	 */
839340279Svmaffione	if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
840340279Svmaffione		poll_timeout = glob_arg.syslog_interval;
841340279Svmaffione	if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
842340279Svmaffione		poll_timeout = glob_arg.stdout_interval;
843340279Svmaffione
844340279Svmaffione	while (!do_abort) {
845340279Svmaffione		u_int polli = 0;
846340279Svmaffione		iter++;
847340279Svmaffione
848340279Svmaffione		for (i = 0; i < npipes; ++i) {
849340279Svmaffione			struct netmap_ring *ring = ports[i].ring;
850340279Svmaffione			int pending = nm_tx_pending(ring);
851340279Svmaffione
852340279Svmaffione			/* if there are packets pending, we want to be notified when
853340279Svmaffione			 * tail moves, so we let cur=tail
854340279Svmaffione			 */
855340279Svmaffione			ring->cur = pending ? ring->tail : ring->head;
856340279Svmaffione
857340279Svmaffione			if (!glob_arg.busy_wait && !pending) {
858340279Svmaffione				/* no need to poll, there are no packets pending */
859340279Svmaffione				continue;
860340279Svmaffione			}
861340279Svmaffione			pollfd[polli].fd = ports[i].nmd->fd;
862340279Svmaffione			pollfd[polli].events = POLLOUT;
863340279Svmaffione			pollfd[polli].revents = 0;
864340279Svmaffione			++polli;
865340279Svmaffione		}
866340279Svmaffione
867340279Svmaffione		pollfd[polli].fd = rxport->nmd->fd;
868340279Svmaffione		pollfd[polli].events = POLLIN;
869340279Svmaffione		pollfd[polli].revents = 0;
870340279Svmaffione		++polli;
871340279Svmaffione
872340279Svmaffione		//RD(5, "polling %d file descriptors", polli+1);
873340279Svmaffione		rv = poll(pollfd, polli, poll_timeout);
874340279Svmaffione		if (rv <= 0) {
875340279Svmaffione			if (rv < 0 && errno != EAGAIN && errno != EINTR)
876340279Svmaffione				RD(1, "poll error %s", strerror(errno));
877340279Svmaffione			goto send_stats;
878340279Svmaffione		}
879340279Svmaffione
880340279Svmaffione		/* if there are several groups, try pushing released packets from
881340279Svmaffione		 * upstream groups to the downstream ones.
882340279Svmaffione		 *
883340279Svmaffione		 * It is important to do this before returned slots are reused
884340279Svmaffione		 * for new transmissions. For the same reason, this must be
885340279Svmaffione		 * done starting from the last group going backwards.
886340279Svmaffione		 */
887340279Svmaffione		for (i = glob_arg.num_groups - 1U; i > 0; i--) {
888340279Svmaffione			struct group_des *g = &groups[i - 1];
889340279Svmaffione			int j;
890340279Svmaffione
891340279Svmaffione			for (j = 0; j < g->nports; j++) {
892340279Svmaffione				struct port_des *p = &g->ports[j];
893340279Svmaffione				struct netmap_ring *ring = p->ring;
894340279Svmaffione				uint32_t last = p->last_tail,
895340279Svmaffione					 stop = nm_ring_next(ring, ring->tail);
896340279Svmaffione
897340279Svmaffione				/* slight abuse of the API here: we touch the slot
898340279Svmaffione				 * pointed to by tail
899340279Svmaffione				 */
900340279Svmaffione				for ( ; last != stop; last = nm_ring_next(ring, last)) {
901340279Svmaffione					struct netmap_slot *rs = &ring->slot[last];
902340279Svmaffione					// XXX less aggressive?
903340279Svmaffione					rs->buf_idx = forward_packet(g + 1, rs);
904340279Svmaffione					rs->flags |= NS_BUF_CHANGED;
905340279Svmaffione					rs->ptr = 0;
906340279Svmaffione				}
907340279Svmaffione				p->last_tail = last;
908340279Svmaffione			}
909340279Svmaffione		}
910340279Svmaffione
911340279Svmaffione
912340279Svmaffione
913340279Svmaffione		if (oq) {
914340279Svmaffione			/* try to push packets from the overflow queues
915340279Svmaffione			 * to the corresponding pipes
916340279Svmaffione			 */
917340279Svmaffione			for (i = 0; i < npipes; i++) {
918340279Svmaffione				struct port_des *p = &ports[i];
919340279Svmaffione				struct overflow_queue *q = p->oq;
920340279Svmaffione				uint32_t j, lim;
921340279Svmaffione				struct netmap_ring *ring;
922340279Svmaffione				struct netmap_slot *slot;
923340279Svmaffione
924340279Svmaffione				if (oq_empty(q))
925340279Svmaffione					continue;
926340279Svmaffione				ring = p->ring;
927340279Svmaffione				lim = nm_ring_space(ring);
928340279Svmaffione				if (!lim)
929340279Svmaffione					continue;
930340279Svmaffione				if (q->n < lim)
931340279Svmaffione					lim = q->n;
932340279Svmaffione				for (j = 0; j < lim; j++) {
933340279Svmaffione					struct netmap_slot s = oq_deq(q), tmp;
934340279Svmaffione					tmp.ptr = 0;
935340279Svmaffione					slot = &ring->slot[ring->head];
936340279Svmaffione					tmp.buf_idx = slot->buf_idx;
937340279Svmaffione					oq_enq(freeq, &tmp);
938340279Svmaffione					*slot = s;
939340279Svmaffione					slot->flags |= NS_BUF_CHANGED;
940340279Svmaffione					ring->head = nm_ring_next(ring, ring->head);
941340279Svmaffione				}
942340279Svmaffione			}
943340279Svmaffione		}
944340279Svmaffione
945340279Svmaffione		/* push any new packets from the input port to the first group */
946340279Svmaffione		int batch = 0;
947340279Svmaffione		for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
948340279Svmaffione			struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
949340279Svmaffione
950340279Svmaffione			//D("prepare to scan rings");
951340279Svmaffione			int next_cur = rxring->cur;
952340279Svmaffione			struct netmap_slot *next_slot = &rxring->slot[next_cur];
953340279Svmaffione			const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
954340279Svmaffione			while (!nm_ring_empty(rxring)) {
955340279Svmaffione				struct netmap_slot *rs = next_slot;
956340279Svmaffione				struct group_des *g = &groups[0];
957340279Svmaffione				++received_pkts;
958340279Svmaffione				received_bytes += rs->len;
959340279Svmaffione
960340279Svmaffione				// CHOOSE THE CORRECT OUTPUT PIPE
961340279Svmaffione				rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
962340279Svmaffione				if (rs->ptr == 0) {
963340279Svmaffione					non_ip++; // XXX ??
964340279Svmaffione				}
965340279Svmaffione				// prefetch the buffer for the next round
966340279Svmaffione				next_cur = nm_ring_next(rxring, next_cur);
967340279Svmaffione				next_slot = &rxring->slot[next_cur];
968340279Svmaffione				next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
969340279Svmaffione				__builtin_prefetch(next_buf);
970340279Svmaffione				// 'B' is just a hashing seed
971340279Svmaffione				rs->buf_idx = forward_packet(g, rs);
972340279Svmaffione				rs->flags |= NS_BUF_CHANGED;
973340279Svmaffione				rxring->head = rxring->cur = next_cur;
974340279Svmaffione
975340279Svmaffione				batch++;
976340279Svmaffione				if (unlikely(batch >= glob_arg.batch)) {
977340279Svmaffione					ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
978340279Svmaffione					batch = 0;
979340279Svmaffione				}
980340279Svmaffione				ND(1,
981340279Svmaffione				   "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
982340279Svmaffione				   forwarded, dropped,
983340279Svmaffione				   ((float)dropped / (float)forwarded * 100));
984340279Svmaffione			}
985340279Svmaffione
986340279Svmaffione		}
987340279Svmaffione
988340279Svmaffione	send_stats:
989340279Svmaffione		if (counters_buf.status == COUNTERS_FULL)
990340279Svmaffione			continue;
991340279Svmaffione		/* take a new snapshot of the counters */
992340279Svmaffione		gettimeofday(&counters_buf.ts, NULL);
993340279Svmaffione		for (i = 0; i < npipes; i++) {
994340279Svmaffione			struct my_ctrs *c = &counters_buf.ctrs[i];
995340279Svmaffione			*c = ports[i].ctr;
996340279Svmaffione			/*
997340279Svmaffione			 * If there are overflow queues, copy the number of them for each
998340279Svmaffione			 * port to the ctrs.oq_n variable for each port.
999340279Svmaffione			 */
1000340279Svmaffione			if (ports[i].oq != NULL)
1001340279Svmaffione				c->oq_n = ports[i].oq->n;
1002340279Svmaffione		}
1003340279Svmaffione		counters_buf.received_pkts = received_pkts;
1004340279Svmaffione		counters_buf.received_bytes = received_bytes;
1005340279Svmaffione		counters_buf.non_ip = non_ip;
1006340279Svmaffione		if (freeq != NULL)
1007340279Svmaffione			counters_buf.freeq_n = freeq->n;
1008340279Svmaffione		__sync_synchronize();
1009340279Svmaffione		counters_buf.status = COUNTERS_FULL;
1010340279Svmaffione	}
1011340279Svmaffione
1012340279Svmaffione	/*
1013340279Svmaffione	 * If freeq exists, copy the number to the freeq_n member of the
1014340279Svmaffione	 * message struct, otherwise set it to 0.
1015340279Svmaffione	 */
1016340279Svmaffione	if (freeq != NULL) {
1017340279Svmaffione		freeq_n = freeq->n;
1018340279Svmaffione	} else {
1019340279Svmaffione		freeq_n = 0;
1020340279Svmaffione	}
1021340279Svmaffione
1022340279Svmaffione	pthread_join(stat_thread, NULL);
1023340279Svmaffione
1024340279Svmaffione	printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1025340279Svmaffione	       dropped, forwarded + dropped);
1026340279Svmaffione	return 0;
1027340279Svmaffione}
1028