ng_pipe.c revision 209723
1/*-
2 * Copyright (c) 2004-2008 University of Zagreb
3 * Copyright (c) 2007-2008 FreeBSD Foundation
4 *
5 * This software was developed by the University of Zagreb and the
6 * FreeBSD Foundation under sponsorship by the Stichting NLnet and the
7 * FreeBSD Foundation.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 *    notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 *    notice, this list of conditions and the following disclaimer in the
16 *    documentation and/or other materials provided with the distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 *
30 * $FreeBSD: head/sys/netgraph/ng_pipe.c 209723 2010-07-06 12:13:15Z zec $
31 */
32
33/*
34 * This node permits simple traffic shaping by emulating bandwidth
35 * and delay, as well as random packet losses.
36 * The node has two hooks, upper and lower. Traffic flowing from upper to
37 * lower hook is referenced as downstream, and vice versa. Parameters for
38 * both directions can be set separately, except for delay.
39 */
40
41
42#include <sys/param.h>
43#include <sys/errno.h>
44#include <sys/systm.h>
45#include <sys/kernel.h>
46#include <sys/malloc.h>
47#include <sys/mbuf.h>
48#include <sys/time.h>
49
50#include <vm/uma.h>
51
52#include <net/vnet.h>
53
54#include <netinet/in.h>
55#include <netinet/in_systm.h>
56#include <netinet/ip.h>
57
58#include <netgraph/ng_message.h>
59#include <netgraph/netgraph.h>
60#include <netgraph/ng_parse.h>
61#include <netgraph/ng_pipe.h>
62
63static MALLOC_DEFINE(M_NG_PIPE, "ng_pipe", "ng_pipe");
64
65struct mtx ng_pipe_giant;
66
67/* Packet header struct */
68struct ngp_hdr {
69	TAILQ_ENTRY(ngp_hdr)	ngp_link;	/* next pkt in queue */
70	struct timeval		when;		/* this packet's due time */
71	struct mbuf		*m;		/* ptr to the packet data */
72};
73TAILQ_HEAD(p_head, ngp_hdr);
74
75/* FIFO queue struct */
76struct ngp_fifo {
77	TAILQ_ENTRY(ngp_fifo)	fifo_le;	/* list of active queues only */
78	struct p_head		packet_head;	/* FIFO queue head */
79	u_int32_t		hash;		/* flow signature */
80	struct timeval		vtime;		/* virtual time, for WFQ */
81	u_int32_t		rr_deficit;	/* for DRR */
82	u_int32_t		packets;	/* # of packets in this queue */
83};
84
85/* Per hook info */
86struct hookinfo {
87	hook_p			hook;
88	int			noqueue;	/* bypass any processing */
89	TAILQ_HEAD(, ngp_fifo)	fifo_head;	/* FIFO queues */
90	TAILQ_HEAD(, ngp_hdr)	qout_head;	/* delay queue head */
91	LIST_ENTRY(hookinfo)	active_le;	/* active hooks */
92	struct timeval		qin_utime;
93	struct ng_pipe_hookcfg	cfg;
94	struct ng_pipe_hookrun	run;
95	struct ng_pipe_hookstat	stats;
96	uint64_t		*ber_p;		/* loss_p(BER,psize) map */
97};
98
99/* Per node info */
100struct node_priv {
101	u_int64_t		delay;
102	u_int32_t		overhead;
103	u_int32_t		header_offset;
104	struct hookinfo		lower;
105	struct hookinfo		upper;
106};
107typedef struct node_priv *priv_p;
108
109/* Macro for calculating the virtual time for packet dequeueing in WFQ */
110#define FIFO_VTIME_SORT(plen)						\
111	if (hinfo->cfg.wfq && hinfo->cfg.bandwidth) {			\
112		ngp_f->vtime.tv_usec = now->tv_usec + ((uint64_t) (plen) \
113			+ priv->overhead ) * hinfo->run.fifo_queues *	\
114			8000000 / hinfo->cfg.bandwidth;			\
115		ngp_f->vtime.tv_sec = now->tv_sec +			\
116			ngp_f->vtime.tv_usec / 1000000;			\
117		ngp_f->vtime.tv_usec = ngp_f->vtime.tv_usec % 1000000;	\
118		TAILQ_FOREACH(ngp_f1, &hinfo->fifo_head, fifo_le)	\
119			if (ngp_f1->vtime.tv_sec > ngp_f->vtime.tv_sec || \
120			    (ngp_f1->vtime.tv_sec == ngp_f->vtime.tv_sec && \
121			    ngp_f1->vtime.tv_usec > ngp_f->vtime.tv_usec)) \
122				break;					\
123		if (ngp_f1 == NULL)					\
124			TAILQ_INSERT_TAIL(&hinfo->fifo_head, ngp_f, fifo_le); \
125		else							\
126			TAILQ_INSERT_BEFORE(ngp_f1, ngp_f, fifo_le);	\
127	} else								\
128		TAILQ_INSERT_TAIL(&hinfo->fifo_head, ngp_f, fifo_le);	\
129
130
131static void	parse_cfg(struct ng_pipe_hookcfg *, struct ng_pipe_hookcfg *,
132			struct hookinfo *, priv_p);
133static void	pipe_dequeue(struct hookinfo *, struct timeval *);
134static void	pipe_scheduler(void *);
135static void	pipe_poll(void);
136static int	ngp_modevent(module_t, int, void *);
137
138/* linked list of active "pipe" hooks */
139static LIST_HEAD(, hookinfo) active_head;
140static int active_gen_id = 0;
141
142/* timeout handle for pipe_scheduler */
143static struct callout polling_timer;
144
145/* zone for storing ngp_hdr-s */
146static uma_zone_t ngp_zone;
147
148/* Netgraph methods */
149static ng_constructor_t	ngp_constructor;
150static ng_rcvmsg_t	ngp_rcvmsg;
151static ng_shutdown_t	ngp_shutdown;
152static ng_newhook_t	ngp_newhook;
153static ng_rcvdata_t	ngp_rcvdata;
154static ng_disconnect_t	ngp_disconnect;
155
156/* Parse type for struct ng_pipe_hookstat */
157static const struct ng_parse_struct_field
158	ng_pipe_hookstat_type_fields[] = NG_PIPE_HOOKSTAT_INFO;
159static const struct ng_parse_type ng_pipe_hookstat_type = {
160	&ng_parse_struct_type,
161	&ng_pipe_hookstat_type_fields
162};
163
164/* Parse type for struct ng_pipe_stats */
165static const struct ng_parse_struct_field ng_pipe_stats_type_fields[] =
166	NG_PIPE_STATS_INFO(&ng_pipe_hookstat_type);
167static const struct ng_parse_type ng_pipe_stats_type = {
168	&ng_parse_struct_type,
169	&ng_pipe_stats_type_fields
170};
171
172/* Parse type for struct ng_pipe_hookrun */
173static const struct ng_parse_struct_field
174	ng_pipe_hookrun_type_fields[] = NG_PIPE_HOOKRUN_INFO;
175static const struct ng_parse_type ng_pipe_hookrun_type = {
176	&ng_parse_struct_type,
177	&ng_pipe_hookrun_type_fields
178};
179
180/* Parse type for struct ng_pipe_run */
181static const struct ng_parse_struct_field
182	ng_pipe_run_type_fields[] = NG_PIPE_RUN_INFO(&ng_pipe_hookrun_type);
183static const struct ng_parse_type ng_pipe_run_type = {
184	&ng_parse_struct_type,
185	&ng_pipe_run_type_fields
186};
187
188/* Parse type for struct ng_pipe_hookcfg */
189static const struct ng_parse_struct_field
190	ng_pipe_hookcfg_type_fields[] = NG_PIPE_HOOKCFG_INFO;
191static const struct ng_parse_type ng_pipe_hookcfg_type = {
192	&ng_parse_struct_type,
193	&ng_pipe_hookcfg_type_fields
194};
195
196/* Parse type for struct ng_pipe_cfg */
197static const struct ng_parse_struct_field
198	ng_pipe_cfg_type_fields[] = NG_PIPE_CFG_INFO(&ng_pipe_hookcfg_type);
199static const struct ng_parse_type ng_pipe_cfg_type = {
200	&ng_parse_struct_type,
201	&ng_pipe_cfg_type_fields
202};
203
204/* List of commands and how to convert arguments to/from ASCII */
205static const struct ng_cmdlist ngp_cmds[] = {
206	{
207		.cookie =	NGM_PIPE_COOKIE,
208		.cmd =		NGM_PIPE_GET_STATS,
209		.name = 	"getstats",
210		.respType =	 &ng_pipe_stats_type
211	},
212	{
213		.cookie =	NGM_PIPE_COOKIE,
214		.cmd =		NGM_PIPE_CLR_STATS,
215		.name =		"clrstats"
216	},
217	{
218		.cookie =	NGM_PIPE_COOKIE,
219		.cmd =		NGM_PIPE_GETCLR_STATS,
220		.name =		"getclrstats",
221		.respType =	&ng_pipe_stats_type
222	},
223	{
224		.cookie =	NGM_PIPE_COOKIE,
225		.cmd =		NGM_PIPE_GET_RUN,
226		.name =		"getrun",
227		.respType =	&ng_pipe_run_type
228	},
229	{
230		.cookie =	NGM_PIPE_COOKIE,
231		.cmd =		NGM_PIPE_GET_CFG,
232		.name =		"getcfg",
233		.respType =	&ng_pipe_cfg_type
234	},
235	{
236		.cookie =	NGM_PIPE_COOKIE,
237		.cmd =		NGM_PIPE_SET_CFG,
238		.name =		"setcfg",
239		.mesgType =	&ng_pipe_cfg_type,
240	},
241	{ 0 }
242};
243
244/* Netgraph type descriptor */
245static struct ng_type ng_pipe_typestruct = {
246	.version =	NG_ABI_VERSION,
247	.name =		NG_PIPE_NODE_TYPE,
248	.mod_event =	ngp_modevent,
249	.constructor =	ngp_constructor,
250	.shutdown =	ngp_shutdown,
251	.rcvmsg =	ngp_rcvmsg,
252	.newhook =	ngp_newhook,
253	.rcvdata =	ngp_rcvdata,
254	.disconnect =	ngp_disconnect,
255	.cmdlist =	ngp_cmds
256};
257NETGRAPH_INIT(pipe, &ng_pipe_typestruct);
258
259/* Node constructor */
260static int
261ngp_constructor(node_p node)
262{
263	priv_p priv;
264
265	priv = malloc(sizeof(*priv), M_NG_PIPE, M_ZERO | M_NOWAIT);
266	if (priv == NULL)
267		return (ENOMEM);
268	NG_NODE_SET_PRIVATE(node, priv);
269
270	return (0);
271}
272
273/* Add a hook */
274static int
275ngp_newhook(node_p node, hook_p hook, const char *name)
276{
277	const priv_p priv = NG_NODE_PRIVATE(node);
278	struct hookinfo *hinfo;
279
280	if (strcmp(name, NG_PIPE_HOOK_UPPER) == 0) {
281		bzero(&priv->upper, sizeof(priv->upper));
282		priv->upper.hook = hook;
283		NG_HOOK_SET_PRIVATE(hook, &priv->upper);
284	} else if (strcmp(name, NG_PIPE_HOOK_LOWER) == 0) {
285		bzero(&priv->lower, sizeof(priv->lower));
286		priv->lower.hook = hook;
287		NG_HOOK_SET_PRIVATE(hook, &priv->lower);
288	} else
289		return (EINVAL);
290
291	/* Load non-zero initial cfg values */
292	hinfo = NG_HOOK_PRIVATE(hook);
293	hinfo->cfg.qin_size_limit = 50;
294	hinfo->cfg.fifo = 1;
295	hinfo->cfg.droptail = 1;
296	TAILQ_INIT(&hinfo->fifo_head);
297	TAILQ_INIT(&hinfo->qout_head);
298	return (0);
299}
300
301/* Receive a control message */
302static int
303ngp_rcvmsg(node_p node, item_p item, hook_p lasthook)
304{
305	const priv_p priv = NG_NODE_PRIVATE(node);
306	struct ng_mesg *resp = NULL;
307	struct ng_mesg *msg;
308	struct ng_pipe_stats *stats;
309	struct ng_pipe_run *run;
310	struct ng_pipe_cfg *cfg;
311	int error = 0;
312
313	mtx_lock(&ng_pipe_giant);
314
315	NGI_GET_MSG(item, msg);
316	switch (msg->header.typecookie) {
317	case NGM_PIPE_COOKIE:
318		switch (msg->header.cmd) {
319		case NGM_PIPE_GET_STATS:
320		case NGM_PIPE_CLR_STATS:
321		case NGM_PIPE_GETCLR_STATS:
322			if (msg->header.cmd != NGM_PIPE_CLR_STATS) {
323				NG_MKRESPONSE(resp, msg,
324				    sizeof(*stats), M_NOWAIT);
325				if (resp == NULL) {
326					error = ENOMEM;
327					break;
328				}
329				stats = (struct ng_pipe_stats *)resp->data;
330				bcopy(&priv->upper.stats, &stats->downstream,
331				    sizeof(stats->downstream));
332				bcopy(&priv->lower.stats, &stats->upstream,
333				    sizeof(stats->upstream));
334			}
335			if (msg->header.cmd != NGM_PIPE_GET_STATS) {
336				bzero(&priv->upper.stats,
337				    sizeof(priv->upper.stats));
338				bzero(&priv->lower.stats,
339				    sizeof(priv->lower.stats));
340			}
341			break;
342		case NGM_PIPE_GET_RUN:
343			NG_MKRESPONSE(resp, msg, sizeof(*run), M_NOWAIT);
344			if (resp == NULL) {
345				error = ENOMEM;
346				break;
347			}
348			run = (struct ng_pipe_run *)resp->data;
349			bcopy(&priv->upper.run, &run->downstream,
350				sizeof(run->downstream));
351			bcopy(&priv->lower.run, &run->upstream,
352				sizeof(run->upstream));
353			break;
354		case NGM_PIPE_GET_CFG:
355			NG_MKRESPONSE(resp, msg, sizeof(*cfg), M_NOWAIT);
356			if (resp == NULL) {
357				error = ENOMEM;
358				break;
359			}
360			cfg = (struct ng_pipe_cfg *)resp->data;
361			bcopy(&priv->upper.cfg, &cfg->downstream,
362				sizeof(cfg->downstream));
363			bcopy(&priv->lower.cfg, &cfg->upstream,
364				sizeof(cfg->upstream));
365			cfg->delay = priv->delay;
366			cfg->overhead = priv->overhead;
367			cfg->header_offset = priv->header_offset;
368			if (cfg->upstream.bandwidth ==
369			    cfg->downstream.bandwidth) {
370				cfg->bandwidth = cfg->upstream.bandwidth;
371				cfg->upstream.bandwidth = 0;
372				cfg->downstream.bandwidth = 0;
373			} else
374				cfg->bandwidth = 0;
375			break;
376		case NGM_PIPE_SET_CFG:
377			cfg = (struct ng_pipe_cfg *)msg->data;
378			if (msg->header.arglen != sizeof(*cfg)) {
379				error = EINVAL;
380				break;
381			}
382
383			if (cfg->delay == -1)
384				priv->delay = 0;
385			else if (cfg->delay > 0 && cfg->delay < 10000000)
386				priv->delay = cfg->delay;
387
388			if (cfg->bandwidth == -1) {
389				priv->upper.cfg.bandwidth = 0;
390				priv->lower.cfg.bandwidth = 0;
391				priv->overhead = 0;
392			} else if (cfg->bandwidth >= 100 &&
393			    cfg->bandwidth <= 1000000000) {
394				priv->upper.cfg.bandwidth = cfg->bandwidth;
395				priv->lower.cfg.bandwidth = cfg->bandwidth;
396				if (cfg->bandwidth >= 10000000)
397					priv->overhead = 8+4+12; /* Ethernet */
398				else
399					priv->overhead = 10; /* HDLC */
400			}
401
402			if (cfg->overhead == -1)
403				priv->overhead = 0;
404			else if (cfg->overhead > 0 && cfg->overhead < 256)
405				priv->overhead = cfg->overhead;
406
407			if (cfg->header_offset == -1)
408				priv->header_offset = 0;
409			else if (cfg->header_offset > 0 &&
410			    cfg->header_offset < 64)
411				priv->header_offset = cfg->header_offset;
412
413			parse_cfg(&priv->upper.cfg, &cfg->downstream,
414				  &priv->upper, priv);
415			parse_cfg(&priv->lower.cfg, &cfg->upstream,
416				  &priv->lower, priv);
417			break;
418		default:
419			error = EINVAL;
420			break;
421		}
422		break;
423	default:
424		error = EINVAL;
425		break;
426	}
427	NG_RESPOND_MSG(error, node, item, resp);
428	NG_FREE_MSG(msg);
429
430	mtx_unlock(&ng_pipe_giant);
431
432	return (error);
433}
434
435static void
436parse_cfg(struct ng_pipe_hookcfg *current, struct ng_pipe_hookcfg *new,
437	struct hookinfo *hinfo, priv_p priv)
438{
439
440	if (new->ber == -1) {
441		current->ber = 0;
442		if (hinfo->ber_p) {
443			free(hinfo->ber_p, M_NG_PIPE);
444			hinfo->ber_p = NULL;
445		}
446	} else if (new->ber >= 1 && new->ber <= 1000000000000) {
447		static const uint64_t one = 0x1000000000000; /* = 2^48 */
448		uint64_t p0, p;
449		uint32_t fsize, i;
450
451		if (hinfo->ber_p == NULL)
452			hinfo->ber_p = malloc(\
453				(MAX_FSIZE + MAX_OHSIZE)*sizeof(uint64_t), \
454				M_NG_PIPE, M_NOWAIT);
455		current->ber = new->ber;
456
457		/*
458		 * For given BER and each frame size N (in bytes) calculate
459		 * the probability P_OK that the frame is clean:
460		 *
461		 * P_OK(BER,N) = (1 - 1/BER)^(N*8)
462		 *
463		 * We use a 64-bit fixed-point format with decimal point
464		 * positioned between bits 47 and 48.
465		 */
466		p0 = one - one / new->ber;
467		p = one;
468		for (fsize = 0; fsize < MAX_FSIZE + MAX_OHSIZE; fsize++) {
469			hinfo->ber_p[fsize] = p;
470			for (i=0; i<8; i++)
471				p = (p*(p0&0xffff)>>48) + \
472				    (p*((p0>>16)&0xffff)>>32) + \
473				    (p*(p0>>32)>>16);
474		}
475	}
476
477	if (new->qin_size_limit == -1)
478		current->qin_size_limit = 0;
479	else if (new->qin_size_limit >= 5)
480		current->qin_size_limit = new->qin_size_limit;
481
482	if (new->qout_size_limit == -1)
483		current->qout_size_limit = 0;
484	else if (new->qout_size_limit >= 5)
485		current->qout_size_limit = new->qout_size_limit;
486
487	if (new->duplicate == -1)
488		current->duplicate = 0;
489	else if (new->duplicate > 0 && new->duplicate <= 50)
490		current->duplicate = new->duplicate;
491
492	if (new->fifo) {
493		current->fifo = 1;
494		current->wfq = 0;
495		current->drr = 0;
496	}
497
498	if (new->wfq) {
499		current->fifo = 0;
500		current->wfq = 1;
501		current->drr = 0;
502	}
503
504	if (new->drr) {
505		current->fifo = 0;
506		current->wfq = 0;
507		/* DRR quantum */
508		if (new->drr >= 32)
509			current->drr = new->drr;
510		else
511			current->drr = 2048;		/* default quantum */
512	}
513
514	if (new->droptail) {
515		current->droptail = 1;
516		current->drophead = 0;
517	}
518
519	if (new->drophead) {
520		current->droptail = 0;
521		current->drophead = 1;
522	}
523
524	if (new->bandwidth == -1) {
525		current->bandwidth = 0;
526		current->fifo = 1;
527		current->wfq = 0;
528		current->drr = 0;
529	} else if (new->bandwidth >= 100 && new->bandwidth <= 1000000000)
530		current->bandwidth = new->bandwidth;
531
532	if (current->bandwidth | priv->delay |
533	    current->duplicate | current->ber)
534		hinfo->noqueue = 0;
535	else
536		hinfo->noqueue = 1;
537}
538
539/*
540 * Compute a hash signature for a packet. This function suffers from the
541 * NIH sindrome, so probably it would be wise to look around what other
542 * folks have found out to be a good and efficient IP hash function...
543 */
544static int
545ip_hash(struct mbuf *m, int offset)
546{
547	u_int64_t i;
548	struct ip *ip = (struct ip *)(mtod(m, u_char *) + offset);
549
550	if (m->m_len < sizeof(struct ip) + offset ||
551	    ip->ip_v != 4 || ip->ip_hl << 2 != sizeof(struct ip))
552		return 0;
553
554	i = ((u_int64_t) ip->ip_src.s_addr ^
555	    ((u_int64_t) ip->ip_src.s_addr << 13) ^
556	    ((u_int64_t) ip->ip_dst.s_addr << 7) ^
557	    ((u_int64_t) ip->ip_dst.s_addr << 19));
558	return (i ^ (i >> 32));
559}
560
561/*
562 * Receive data on a hook - both in upstream and downstream direction.
563 * We put the frame on the inbound queue, and try to initiate dequeuing
564 * sequence immediately. If inbound queue is full, discard one frame
565 * depending on dropping policy (from the head or from the tail of the
566 * queue).
567 */
568static int
569ngp_rcvdata(hook_p hook, item_p item)
570{
571	struct hookinfo *const hinfo = NG_HOOK_PRIVATE(hook);
572	const priv_p priv = NG_NODE_PRIVATE(NG_HOOK_NODE(hook));
573	struct timeval uuptime;
574	struct timeval *now = &uuptime;
575	struct ngp_fifo *ngp_f = NULL, *ngp_f1;
576	struct ngp_hdr *ngp_h = NULL;
577	struct mbuf *m;
578	int hash;
579	int error = 0;
580
581	if (hinfo->noqueue) {
582		struct hookinfo *dest;
583		if (hinfo == &priv->lower)
584			dest = &priv->upper;
585		else
586			dest = &priv->lower;
587		NG_FWD_ITEM_HOOK(error, item, dest->hook);
588		return error;
589	}
590
591	mtx_lock(&ng_pipe_giant);
592	microuptime(now);
593
594	/*
595	 * Attach us to the list of active ng_pipes if this was an empty
596	 * one before, and also update the queue service deadline time.
597	 */
598	if (hinfo->run.qin_frames == 0) {
599		struct timeval *when = &hinfo->qin_utime;
600		if (when->tv_sec < now->tv_sec || (when->tv_sec == now->tv_sec
601		    && when->tv_usec < now->tv_usec)) {
602			when->tv_sec = now->tv_sec;
603			when->tv_usec = now->tv_usec;
604		}
605		if (hinfo->run.qout_frames == 0)
606			LIST_INSERT_HEAD(&active_head, hinfo, active_le);
607	}
608
609	/* Populate the packet header */
610	ngp_h = uma_zalloc(ngp_zone, M_NOWAIT);
611	KASSERT((ngp_h != NULL), ("ngp_h zalloc failed (1)"));
612	NGI_GET_M(item, m);
613	KASSERT(m != NULL, ("NGI_GET_M failed"));
614	ngp_h->m = m;
615	NG_FREE_ITEM(item);
616
617	if (hinfo->cfg.fifo)
618		hash = 0;	/* all packets go into a single FIFO queue */
619	else
620		hash = ip_hash(m, priv->header_offset);
621
622	/* Find the appropriate FIFO queue for the packet and enqueue it*/
623	TAILQ_FOREACH(ngp_f, &hinfo->fifo_head, fifo_le)
624		if (hash == ngp_f->hash)
625			break;
626	if (ngp_f == NULL) {
627		ngp_f = uma_zalloc(ngp_zone, M_NOWAIT);
628		KASSERT(ngp_h != NULL, ("ngp_h zalloc failed (2)"));
629		TAILQ_INIT(&ngp_f->packet_head);
630		ngp_f->hash = hash;
631		ngp_f->packets = 1;
632		ngp_f->rr_deficit = hinfo->cfg.drr;	/* DRR quantum */
633		hinfo->run.fifo_queues++;
634		TAILQ_INSERT_TAIL(&ngp_f->packet_head, ngp_h, ngp_link);
635		FIFO_VTIME_SORT(m->m_pkthdr.len);
636	} else {
637		TAILQ_INSERT_TAIL(&ngp_f->packet_head, ngp_h, ngp_link);
638		ngp_f->packets++;
639	}
640	hinfo->run.qin_frames++;
641	hinfo->run.qin_octets += m->m_pkthdr.len;
642
643	/* Discard a frame if inbound queue limit has been reached */
644	if (hinfo->run.qin_frames > hinfo->cfg.qin_size_limit) {
645		struct mbuf *m1;
646		int longest = 0;
647
648		/* Find the longest queue */
649		TAILQ_FOREACH(ngp_f1, &hinfo->fifo_head, fifo_le)
650			if (ngp_f1->packets > longest) {
651				longest = ngp_f1->packets;
652				ngp_f = ngp_f1;
653			}
654
655		/* Drop a frame from the queue head/tail, depending on cfg */
656		if (hinfo->cfg.drophead)
657			ngp_h = TAILQ_FIRST(&ngp_f->packet_head);
658		else
659			ngp_h = TAILQ_LAST(&ngp_f->packet_head, p_head);
660		TAILQ_REMOVE(&ngp_f->packet_head, ngp_h, ngp_link);
661		m1 = ngp_h->m;
662		uma_zfree(ngp_zone, ngp_h);
663		hinfo->run.qin_octets -= m1->m_pkthdr.len;
664		hinfo->stats.in_disc_octets += m1->m_pkthdr.len;
665		m_freem(m1);
666		if (--(ngp_f->packets) == 0) {
667			TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
668			uma_zfree(ngp_zone, ngp_f);
669			hinfo->run.fifo_queues--;
670		}
671		hinfo->run.qin_frames--;
672		hinfo->stats.in_disc_frames++;
673	} else if (hinfo->run.qin_frames > hinfo->cfg.qin_size_limit) {
674		struct mbuf *m1;
675		int longest = 0;
676
677		/* Find the longest queue */
678		TAILQ_FOREACH(ngp_f1, &hinfo->fifo_head, fifo_le)
679			if (ngp_f1->packets > longest) {
680				longest = ngp_f1->packets;
681				ngp_f = ngp_f1;
682			}
683
684		/* Drop a frame from the queue head/tail, depending on cfg */
685		if (hinfo->cfg.drophead)
686			ngp_h = TAILQ_FIRST(&ngp_f->packet_head);
687		else
688			ngp_h = TAILQ_LAST(&ngp_f->packet_head, p_head);
689		TAILQ_REMOVE(&ngp_f->packet_head, ngp_h, ngp_link);
690		m1 = ngp_h->m;
691		uma_zfree(ngp_zone, ngp_h);
692		hinfo->run.qin_octets -= m1->m_pkthdr.len;
693		hinfo->stats.in_disc_octets += m1->m_pkthdr.len;
694		m_freem(m1);
695		if (--(ngp_f->packets) == 0) {
696			TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
697			uma_zfree(ngp_zone, ngp_f);
698			hinfo->run.fifo_queues--;
699		}
700		hinfo->run.qin_frames--;
701		hinfo->stats.in_disc_frames++;
702	}
703
704	/*
705	 * Try to start the dequeuing process immediately.  We must
706	 * hold the ng_pipe_giant lock here and pipe_dequeue() will
707	 * release it
708	 */
709	pipe_dequeue(hinfo, now);
710
711	return (0);
712}
713
714
715/*
716 * Dequeueing sequence - we basically do the following:
717 *  1) Try to extract the frame from the inbound (bandwidth) queue;
718 *  2) In accordance to BER specified, discard the frame randomly;
719 *  3) If the frame survives BER, prepend it with delay info and move it
720 *     to outbound (delay) queue;
721 *  4) Loop to 2) until bandwidth quota for this timeslice is reached, or
722 *     inbound queue is flushed completely;
723 *  5) Extract the first frame from the outbound queue, if it's time has
724 *     come.  Queue the frame for transmission on the outbound hook;
725 *  6) Loop to 5) until outbound queue is flushed completely, or the next
726 *     frame in the queue is not scheduled to be dequeued yet;
727 *  7) Transimit all frames queued in 5)
728 *
729 * Note: the caller must hold the ng_pipe_giant lock; this function
730 * returns with the lock released.
731 */
732static void
733pipe_dequeue(struct hookinfo *hinfo, struct timeval *now) {
734	static uint64_t rand, oldrand;
735	const priv_p priv = NG_NODE_PRIVATE(NG_HOOK_NODE(hinfo->hook));
736	struct hookinfo *dest;
737	struct ngp_fifo *ngp_f, *ngp_f1;
738	struct ngp_hdr *ngp_h;
739	struct timeval *when;
740	struct mbuf *q_head = NULL;
741	struct mbuf *q_tail = NULL;
742	struct mbuf *m;
743	int error = 0;
744
745	/* Which one is the destination hook? */
746	if (hinfo == &priv->lower)
747		dest = &priv->upper;
748	else
749		dest = &priv->lower;
750
751	/* Bandwidth queue processing */
752	while ((ngp_f = TAILQ_FIRST(&hinfo->fifo_head))) {
753		when = &hinfo->qin_utime;
754		if (when->tv_sec > now->tv_sec || (when->tv_sec == now->tv_sec
755		    && when->tv_usec > now->tv_usec))
756			break;
757
758		ngp_h = TAILQ_FIRST(&ngp_f->packet_head);
759		m = ngp_h->m;
760
761		/* Deficit Round Robin (DRR) processing */
762		if (hinfo->cfg.drr) {
763			if (ngp_f->rr_deficit >= m->m_pkthdr.len) {
764				ngp_f->rr_deficit -= m->m_pkthdr.len;
765			} else {
766				ngp_f->rr_deficit += hinfo->cfg.drr;
767				TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
768				TAILQ_INSERT_TAIL(&hinfo->fifo_head,
769				    ngp_f, fifo_le);
770				continue;
771			}
772		}
773
774		/*
775		 * Either create a duplicate and pass it on, or dequeue
776		 * the original packet...
777		 */
778		if (hinfo->cfg.duplicate &&
779		    random() % 100 <= hinfo->cfg.duplicate) {
780			ngp_h = uma_zalloc(ngp_zone, M_NOWAIT);
781			KASSERT(ngp_h != NULL, ("ngp_h zalloc failed (3)"));
782			m = m_dup(m, M_NOWAIT);
783			KASSERT(m != NULL, ("m_dup failed"));
784			ngp_h->m = m;
785		} else {
786			TAILQ_REMOVE(&ngp_f->packet_head, ngp_h, ngp_link);
787			hinfo->run.qin_frames--;
788			hinfo->run.qin_octets -= m->m_pkthdr.len;
789			ngp_f->packets--;
790		}
791
792		/* Calculate the serialization delay */
793		if (hinfo->cfg.bandwidth) {
794			hinfo->qin_utime.tv_usec += ((uint64_t) m->m_pkthdr.len
795				+ priv->overhead ) *
796				8000000 / hinfo->cfg.bandwidth;
797			hinfo->qin_utime.tv_sec +=
798				hinfo->qin_utime.tv_usec / 1000000;
799			hinfo->qin_utime.tv_usec =
800				hinfo->qin_utime.tv_usec % 1000000;
801		}
802		when = &ngp_h->when;
803		when->tv_sec = hinfo->qin_utime.tv_sec;
804		when->tv_usec = hinfo->qin_utime.tv_usec;
805
806		/* Sort / rearrange inbound queues */
807		if (ngp_f->packets) {
808			if (hinfo->cfg.wfq) {
809				TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
810				FIFO_VTIME_SORT(TAILQ_FIRST(
811				    &ngp_f->packet_head)->m->m_pkthdr.len)
812			}
813		} else {
814			TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
815			uma_zfree(ngp_zone, ngp_f);
816			hinfo->run.fifo_queues--;
817		}
818
819		/* Randomly discard the frame, according to BER setting */
820		if (hinfo->cfg.ber) {
821			oldrand = rand;
822			rand = random();
823			if (((oldrand ^ rand) << 17) >=
824			    hinfo->ber_p[priv->overhead + m->m_pkthdr.len]) {
825				hinfo->stats.out_disc_frames++;
826				hinfo->stats.out_disc_octets += m->m_pkthdr.len;
827				uma_zfree(ngp_zone, ngp_h);
828				m_freem(m);
829				continue;
830			}
831		}
832
833		/* Discard frame if outbound queue size limit exceeded */
834		if (hinfo->cfg.qout_size_limit &&
835		    hinfo->run.qout_frames>=hinfo->cfg.qout_size_limit) {
836			hinfo->stats.out_disc_frames++;
837			hinfo->stats.out_disc_octets += m->m_pkthdr.len;
838			uma_zfree(ngp_zone, ngp_h);
839			m_freem(m);
840			continue;
841		}
842
843		/* Calculate the propagation delay */
844		when->tv_usec += priv->delay;
845		when->tv_sec += when->tv_usec / 1000000;
846		when->tv_usec = when->tv_usec % 1000000;
847
848		/* Put the frame into the delay queue */
849		TAILQ_INSERT_TAIL(&hinfo->qout_head, ngp_h, ngp_link);
850		hinfo->run.qout_frames++;
851		hinfo->run.qout_octets += m->m_pkthdr.len;
852	}
853
854	/* Delay queue processing */
855	while ((ngp_h = TAILQ_FIRST(&hinfo->qout_head))) {
856		struct mbuf *m = ngp_h->m;
857
858		when = &ngp_h->when;
859		if (when->tv_sec > now->tv_sec ||
860		    (when->tv_sec == now->tv_sec &&
861		    when->tv_usec > now->tv_usec))
862			break;
863
864		/* Update outbound queue stats */
865		hinfo->stats.fwd_frames++;
866		hinfo->stats.fwd_octets += m->m_pkthdr.len;
867		hinfo->run.qout_frames--;
868		hinfo->run.qout_octets -= m->m_pkthdr.len;
869
870		/* Dequeue the packet from qout */
871		TAILQ_REMOVE(&hinfo->qout_head, ngp_h, ngp_link);
872		uma_zfree(ngp_zone, ngp_h);
873
874		/* Enqueue locally for sending downstream */
875		if (q_head == NULL)
876			q_head = m;
877		if (q_tail)
878			q_tail->m_nextpkt = m;
879		q_tail = m;
880		m->m_nextpkt = NULL;
881	}
882
883	/* If both queues are empty detach us from the list of active queues */
884	if (hinfo->run.qin_frames + hinfo->run.qout_frames == 0) {
885		LIST_REMOVE(hinfo, active_le);
886		active_gen_id++;
887	}
888
889	mtx_unlock(&ng_pipe_giant);
890
891	while ((m = q_head) != NULL) {
892		q_head = m->m_nextpkt;
893		m->m_nextpkt = NULL;
894		NG_SEND_DATA(error, dest->hook, m, meta);
895	}
896}
897
898
899/*
900 * This routine is called on every clock tick. We poll all nodes/hooks
901 * for queued frames by calling pipe_dequeue().
902 */
903static void
904pipe_scheduler(void *arg)
905{
906	pipe_poll();
907
908	/* Reschedule  */
909	callout_reset(&polling_timer, 1, &pipe_scheduler, NULL);
910}
911
912
913/*
914 * Traverse the list of all active hooks and attempt to dequeue
915 * some packets.  Hooks with empty queues are not traversed since
916 * they are not linked into this list.
917 */
918static void
919pipe_poll(void)
920{
921	struct hookinfo *hinfo;
922	struct timeval now;
923	int old_gen_id = active_gen_id;
924
925	mtx_lock(&ng_pipe_giant);
926	microuptime(&now);
927	LIST_FOREACH(hinfo, &active_head, active_le) {
928		CURVNET_SET(NG_HOOK_NODE(hinfo->hook)->nd_vnet);
929		pipe_dequeue(hinfo, &now);
930		CURVNET_RESTORE();
931		mtx_lock(&ng_pipe_giant);
932		if (old_gen_id != active_gen_id) {
933			/* the list was updated; restart traversing */
934			hinfo = LIST_FIRST(&active_head);
935			if (hinfo == NULL)
936				break;
937			old_gen_id = active_gen_id;
938			continue;
939		}
940	}
941	mtx_unlock(&ng_pipe_giant);
942}
943
944
945/*
946 * Shutdown processing
947 *
948 * This is tricky. If we have both a lower and upper hook, then we
949 * probably want to extricate ourselves and leave the two peers
950 * still linked to each other. Otherwise we should just shut down as
951 * a normal node would.
952 */
953static int
954ngp_shutdown(node_p node)
955{
956	const priv_p priv = NG_NODE_PRIVATE(node);
957
958	if (priv->lower.hook && priv->upper.hook)
959		ng_bypass(priv->lower.hook, priv->upper.hook);
960	else {
961		if (priv->upper.hook != NULL)
962			ng_rmhook_self(priv->upper.hook);
963		if (priv->lower.hook != NULL)
964			ng_rmhook_self(priv->lower.hook);
965	}
966	NG_NODE_UNREF(node);
967	free(priv, M_NG_PIPE);
968	return (0);
969}
970
971
972/*
973 * Hook disconnection
974 */
975static int
976ngp_disconnect(hook_p hook)
977{
978	struct hookinfo *const hinfo = NG_HOOK_PRIVATE(hook);
979	struct ngp_fifo *ngp_f;
980	struct ngp_hdr *ngp_h;
981	int removed = 0;
982
983	mtx_lock(&ng_pipe_giant);
984
985	KASSERT(hinfo != NULL, ("%s: null info", __FUNCTION__));
986	hinfo->hook = NULL;
987
988	/* Flush all fifo queues associated with the hook */
989	while ((ngp_f = TAILQ_FIRST(&hinfo->fifo_head))) {
990		while ((ngp_h = TAILQ_FIRST(&ngp_f->packet_head))) {
991			TAILQ_REMOVE(&ngp_f->packet_head, ngp_h, ngp_link);
992			m_freem(ngp_h->m);
993			uma_zfree(ngp_zone, ngp_h);
994			removed++;
995		}
996		TAILQ_REMOVE(&hinfo->fifo_head, ngp_f, fifo_le);
997		uma_zfree(ngp_zone, ngp_f);
998	}
999
1000	/* Flush the delay queue */
1001	while ((ngp_h = TAILQ_FIRST(&hinfo->qout_head))) {
1002		TAILQ_REMOVE(&hinfo->qout_head, ngp_h, ngp_link);
1003		m_freem(ngp_h->m);
1004		uma_zfree(ngp_zone, ngp_h);
1005		removed++;
1006	}
1007
1008	/*
1009	 * Both queues should be empty by now, so detach us from
1010	 * the list of active queues
1011	 */
1012	if (removed) {
1013		LIST_REMOVE(hinfo, active_le);
1014		active_gen_id++;
1015	}
1016	if (hinfo->run.qin_frames + hinfo->run.qout_frames != removed)
1017		printf("Mismatch: queued=%d but removed=%d !?!",
1018		    hinfo->run.qin_frames + hinfo->run.qout_frames, removed);
1019
1020	/* Release the packet loss probability table (BER) */
1021	if (hinfo->ber_p)
1022		free(hinfo->ber_p, M_NG_PIPE);
1023
1024	mtx_unlock(&ng_pipe_giant);
1025
1026	return (0);
1027}
1028
1029static int
1030ngp_modevent(module_t mod, int type, void *unused)
1031{
1032	int error = 0;
1033
1034	switch (type) {
1035	case MOD_LOAD:
1036		ngp_zone = uma_zcreate("ng_pipe", max(sizeof(struct ngp_hdr),
1037		    sizeof (struct ngp_fifo)), NULL, NULL, NULL, NULL,
1038		    UMA_ALIGN_PTR, 0);
1039		if (ngp_zone == NULL)
1040			panic("ng_pipe: couldn't allocate descriptor zone");
1041
1042		mtx_init(&ng_pipe_giant, "ng_pipe_giant", NULL, MTX_DEF);
1043		LIST_INIT(&active_head);
1044		callout_init(&polling_timer, CALLOUT_MPSAFE);
1045		callout_reset(&polling_timer, 1, &pipe_scheduler, NULL);
1046		break;
1047	case MOD_UNLOAD:
1048		callout_drain(&polling_timer);
1049		uma_zdestroy(ngp_zone);
1050		mtx_destroy(&ng_pipe_giant);
1051		break;
1052	default:
1053		error = EOPNOTSUPP;
1054		break;
1055	}
1056
1057	return (error);
1058}
1059