netflow.c revision 260278
1/*-
2 * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru>
3 * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org>
4 * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 *
28 * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $
29 */
30
31#include <sys/cdefs.h>
32__FBSDID("$FreeBSD: stable/10/sys/netgraph/netflow/netflow.c 260278 2014-01-04 19:04:53Z dim $");
33
34#include "opt_inet6.h"
35#include "opt_route.h"
36#include <sys/param.h>
37#include <sys/kernel.h>
38#include <sys/limits.h>
39#include <sys/mbuf.h>
40#include <sys/syslog.h>
41#include <sys/systm.h>
42#include <sys/socket.h>
43#include <sys/endian.h>
44
45#include <machine/atomic.h>
46#include <machine/stdarg.h>
47
48#include <net/if.h>
49#include <net/route.h>
50#include <net/ethernet.h>
51#include <netinet/in.h>
52#include <netinet/in_systm.h>
53#include <netinet/ip.h>
54#include <netinet/ip6.h>
55#include <netinet/tcp.h>
56#include <netinet/udp.h>
57
58#include <netgraph/ng_message.h>
59#include <netgraph/netgraph.h>
60
61#include <netgraph/netflow/netflow.h>
62#include <netgraph/netflow/netflow_v9.h>
63#include <netgraph/netflow/ng_netflow.h>
64
65#define	NBUCKETS	(65536)		/* must be power of 2 */
66
67/* This hash is for TCP or UDP packets. */
68#define FULL_HASH(addr1, addr2, port1, port2)	\
69	(((addr1 ^ (addr1 >> 16) ^ 		\
70	htons(addr2 ^ (addr2 >> 16))) ^ 	\
71	port1 ^ htons(port2)) &			\
72	(NBUCKETS - 1))
73
74/* This hash is for all other IP packets. */
75#define ADDR_HASH(addr1, addr2)			\
76	((addr1 ^ (addr1 >> 16) ^ 		\
77	htons(addr2 ^ (addr2 >> 16))) &		\
78	(NBUCKETS - 1))
79
80/* Macros to shorten logical constructions */
81/* XXX: priv must exist in namespace */
82#define	INACTIVE(fle)	(time_uptime - fle->f.last > priv->info.nfinfo_inact_t)
83#define	AGED(fle)	(time_uptime - fle->f.first > priv->info.nfinfo_act_t)
84#define	ISFREE(fle)	(fle->f.packets == 0)
85
86/*
87 * 4 is a magical number: statistically number of 4-packet flows is
88 * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP
89 * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case
90 * of reachable host and 4-packet otherwise.
91 */
92#define	SMALL(fle)	(fle->f.packets <= 4)
93
94MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash");
95
96static int export_add(item_p, struct flow_entry *);
97static int export_send(priv_p, fib_export_p, item_p, int);
98
99static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *,
100    int, uint8_t, uint8_t);
101#ifdef INET6
102static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *,
103    int, uint8_t, uint8_t);
104#endif
105
106static void expire_flow(priv_p, fib_export_p, struct flow_entry *, int);
107
108/*
109 * Generate hash for a given flow record.
110 *
111 * FIB is not used here, because:
112 * most VRFS will carry public IPv4 addresses which are unique even
113 * without FIB private addresses can overlap, but this is worked out
114 * via flow_rec bcmp() containing fib id. In IPv6 world addresses are
115 * all globally unique (it's not fully true, there is FC00::/7 for example,
116 * but chances of address overlap are MUCH smaller)
117 */
118static inline uint32_t
119ip_hash(struct flow_rec *r)
120{
121
122	switch (r->r_ip_p) {
123	case IPPROTO_TCP:
124	case IPPROTO_UDP:
125		return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr,
126		    r->r_sport, r->r_dport);
127	default:
128		return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr);
129	}
130}
131
132#ifdef INET6
133/* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */
134static inline uint32_t
135ip6_hash(struct flow6_rec *r)
136{
137
138	switch (r->r_ip_p) {
139	case IPPROTO_TCP:
140	case IPPROTO_UDP:
141		return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
142		    r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport,
143		    r->r_dport);
144	default:
145		return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
146		    r->dst.r_dst6.__u6_addr.__u6_addr32[3]);
147 	}
148}
149#endif
150
151/* This is callback from uma(9), called on alloc. */
152static int
153uma_ctor_flow(void *mem, int size, void *arg, int how)
154{
155	priv_p priv = (priv_p )arg;
156
157	if (atomic_load_acq_32(&priv->info.nfinfo_used) >= CACHESIZE)
158		return (ENOMEM);
159
160	atomic_add_32(&priv->info.nfinfo_used, 1);
161
162	return (0);
163}
164
165/* This is callback from uma(9), called on free. */
166static void
167uma_dtor_flow(void *mem, int size, void *arg)
168{
169	priv_p priv = (priv_p )arg;
170
171	atomic_subtract_32(&priv->info.nfinfo_used, 1);
172}
173
174#ifdef INET6
175/* This is callback from uma(9), called on alloc. */
176static int
177uma_ctor_flow6(void *mem, int size, void *arg, int how)
178{
179	priv_p priv = (priv_p )arg;
180
181	if (atomic_load_acq_32(&priv->info.nfinfo_used6) >= CACHESIZE)
182		return (ENOMEM);
183
184	atomic_add_32(&priv->info.nfinfo_used6, 1);
185
186	return (0);
187}
188
189/* This is callback from uma(9), called on free. */
190static void
191uma_dtor_flow6(void *mem, int size, void *arg)
192{
193	priv_p priv = (priv_p )arg;
194
195	atomic_subtract_32(&priv->info.nfinfo_used6, 1);
196}
197#endif
198
199/*
200 * Detach export datagram from priv, if there is any.
201 * If there is no, allocate a new one.
202 */
203static item_p
204get_export_dgram(priv_p priv, fib_export_p fe)
205{
206	item_p	item = NULL;
207
208	mtx_lock(&fe->export_mtx);
209	if (fe->exp.item != NULL) {
210		item = fe->exp.item;
211		fe->exp.item = NULL;
212	}
213	mtx_unlock(&fe->export_mtx);
214
215	if (item == NULL) {
216		struct netflow_v5_export_dgram *dgram;
217		struct mbuf *m;
218
219		m = m_getcl(M_NOWAIT, MT_DATA, M_PKTHDR);
220		if (m == NULL)
221			return (NULL);
222		item = ng_package_data(m, NG_NOFLAGS);
223		if (item == NULL)
224			return (NULL);
225		dgram = mtod(m, struct netflow_v5_export_dgram *);
226		dgram->header.count = 0;
227		dgram->header.version = htons(NETFLOW_V5);
228		dgram->header.pad = 0;
229	}
230
231	return (item);
232}
233
234/*
235 * Re-attach incomplete datagram back to priv.
236 * If there is already another one, then send incomplete. */
237static void
238return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags)
239{
240
241	/*
242	 * It may happen on SMP, that some thread has already
243	 * put its item there, in this case we bail out and
244	 * send what we have to collector.
245	 */
246	mtx_lock(&fe->export_mtx);
247	if (fe->exp.item == NULL) {
248		fe->exp.item = item;
249		mtx_unlock(&fe->export_mtx);
250	} else {
251		mtx_unlock(&fe->export_mtx);
252		export_send(priv, fe, item, flags);
253	}
254}
255
256/*
257 * The flow is over. Call export_add() and free it. If datagram is
258 * full, then call export_send().
259 */
260static void
261expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags)
262{
263	struct netflow_export_item exp;
264	uint16_t version = fle->f.version;
265
266	if ((priv->export != NULL) && (version == IPVERSION)) {
267		exp.item = get_export_dgram(priv, fe);
268		if (exp.item == NULL) {
269			atomic_add_32(&priv->info.nfinfo_export_failed, 1);
270			if (priv->export9 != NULL)
271				atomic_add_32(&priv->info.nfinfo_export9_failed, 1);
272			/* fle definitely contains IPv4 flow. */
273			uma_zfree_arg(priv->zone, fle, priv);
274			return;
275		}
276
277		if (export_add(exp.item, fle) > 0)
278			export_send(priv, fe, exp.item, flags);
279		else
280			return_export_dgram(priv, fe, exp.item, NG_QUEUE);
281	}
282
283	if (priv->export9 != NULL) {
284		exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt);
285		if (exp.item9 == NULL) {
286			atomic_add_32(&priv->info.nfinfo_export9_failed, 1);
287			if (version == IPVERSION)
288				uma_zfree_arg(priv->zone, fle, priv);
289#ifdef INET6
290			else if (version == IP6VERSION)
291				uma_zfree_arg(priv->zone6, fle, priv);
292#endif
293			else
294				panic("ng_netflow: Unknown IP proto: %d",
295				    version);
296			return;
297		}
298
299		if (export9_add(exp.item9, exp.item9_opt, fle) > 0)
300			export9_send(priv, fe, exp.item9, exp.item9_opt, flags);
301		else
302			return_export9_dgram(priv, fe, exp.item9,
303			    exp.item9_opt, NG_QUEUE);
304	}
305
306	if (version == IPVERSION)
307		uma_zfree_arg(priv->zone, fle, priv);
308#ifdef INET6
309	else if (version == IP6VERSION)
310		uma_zfree_arg(priv->zone6, fle, priv);
311#endif
312}
313
314/* Get a snapshot of node statistics */
315void
316ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
317{
318
319	/* XXX: atomic */
320	memcpy((void *)i, (void *)&priv->info, sizeof(priv->info));
321}
322
323/*
324 * Insert a record into defined slot.
325 *
326 * First we get for us a free flow entry, then fill in all
327 * possible fields in it.
328 *
329 * TODO: consider dropping hash mutex while filling in datagram,
330 * as this was done in previous version. Need to test & profile
331 * to be sure.
332 */
333static int
334hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r,
335	int plen, uint8_t flags, uint8_t tcp_flags)
336{
337	struct flow_entry *fle;
338	struct sockaddr_in sin;
339	struct rtentry *rt;
340
341	mtx_assert(&hsh->mtx, MA_OWNED);
342
343	fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT);
344	if (fle == NULL) {
345		atomic_add_32(&priv->info.nfinfo_alloc_failed, 1);
346		return (ENOMEM);
347	}
348
349	/*
350	 * Now fle is totally ours. It is detached from all lists,
351	 * we can safely edit it.
352	 */
353	fle->f.version = IPVERSION;
354	bcopy(r, &fle->f.r, sizeof(struct flow_rec));
355	fle->f.bytes = plen;
356	fle->f.packets = 1;
357	fle->f.tcp_flags = tcp_flags;
358
359	fle->f.first = fle->f.last = time_uptime;
360
361	/*
362	 * First we do route table lookup on destination address. So we can
363	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
364	 */
365	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
366		bzero(&sin, sizeof(sin));
367		sin.sin_len = sizeof(struct sockaddr_in);
368		sin.sin_family = AF_INET;
369		sin.sin_addr = fle->f.r.r_dst;
370		rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib);
371		if (rt != NULL) {
372			fle->f.fle_o_ifx = rt->rt_ifp->if_index;
373
374			if (rt->rt_flags & RTF_GATEWAY &&
375			    rt->rt_gateway->sa_family == AF_INET)
376				fle->f.next_hop =
377				    ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr;
378
379			if (rt_mask(rt))
380				fle->f.dst_mask =
381				    bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
382			else if (rt->rt_flags & RTF_HOST)
383				/* Give up. We can't determine mask :( */
384				fle->f.dst_mask = 32;
385
386			RTFREE_LOCKED(rt);
387		}
388	}
389
390	/* Do route lookup on source address, to fill in src_mask. */
391	if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
392		bzero(&sin, sizeof(sin));
393		sin.sin_len = sizeof(struct sockaddr_in);
394		sin.sin_family = AF_INET;
395		sin.sin_addr = fle->f.r.r_src;
396		rt = rtalloc1_fib((struct sockaddr *)&sin, 0, 0, r->fib);
397		if (rt != NULL) {
398			if (rt_mask(rt))
399				fle->f.src_mask =
400				    bitcount32(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
401			else if (rt->rt_flags & RTF_HOST)
402				/* Give up. We can't determine mask :( */
403				fle->f.src_mask = 32;
404
405			RTFREE_LOCKED(rt);
406		}
407	}
408
409	/* Push new flow at the and of hash. */
410	TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
411
412	return (0);
413}
414
415#ifdef INET6
416/* XXX: make normal function, instead of.. */
417#define ipv6_masklen(x)		bitcount32((x).__u6_addr.__u6_addr32[0]) + \
418				bitcount32((x).__u6_addr.__u6_addr32[1]) + \
419				bitcount32((x).__u6_addr.__u6_addr32[2]) + \
420				bitcount32((x).__u6_addr.__u6_addr32[3])
421#define RT_MASK6(x)	(ipv6_masklen(((struct sockaddr_in6 *)rt_mask(x))->sin6_addr))
422static int
423hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r,
424	int plen, uint8_t flags, uint8_t tcp_flags)
425{
426	struct flow6_entry *fle6;
427	struct sockaddr_in6 *src, *dst;
428	struct rtentry *rt;
429	struct route_in6 rin6;
430
431	mtx_assert(&hsh6->mtx, MA_OWNED);
432
433	fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT);
434	if (fle6 == NULL) {
435		atomic_add_32(&priv->info.nfinfo_alloc_failed, 1);
436		return (ENOMEM);
437	}
438
439	/*
440	 * Now fle is totally ours. It is detached from all lists,
441	 * we can safely edit it.
442	 */
443
444	fle6->f.version = IP6VERSION;
445	bcopy(r, &fle6->f.r, sizeof(struct flow6_rec));
446	fle6->f.bytes = plen;
447	fle6->f.packets = 1;
448	fle6->f.tcp_flags = tcp_flags;
449
450	fle6->f.first = fle6->f.last = time_uptime;
451
452	/*
453	 * First we do route table lookup on destination address. So we can
454	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
455	 */
456	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
457		bzero(&rin6, sizeof(struct route_in6));
458		dst = (struct sockaddr_in6 *)&rin6.ro_dst;
459		dst->sin6_len = sizeof(struct sockaddr_in6);
460		dst->sin6_family = AF_INET6;
461		dst->sin6_addr = r->dst.r_dst6;
462
463		rin6.ro_rt = rtalloc1_fib((struct sockaddr *)dst, 0, 0, r->fib);
464
465		if (rin6.ro_rt != NULL) {
466			rt = rin6.ro_rt;
467			fle6->f.fle_o_ifx = rt->rt_ifp->if_index;
468
469			if (rt->rt_flags & RTF_GATEWAY &&
470			    rt->rt_gateway->sa_family == AF_INET6)
471				fle6->f.n.next_hop6 =
472				    ((struct sockaddr_in6 *)(rt->rt_gateway))->sin6_addr;
473
474			if (rt_mask(rt))
475				fle6->f.dst_mask = RT_MASK6(rt);
476			else
477				fle6->f.dst_mask = 128;
478
479			RTFREE_LOCKED(rt);
480		}
481	}
482
483	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
484		/* Do route lookup on source address, to fill in src_mask. */
485		bzero(&rin6, sizeof(struct route_in6));
486		src = (struct sockaddr_in6 *)&rin6.ro_dst;
487		src->sin6_len = sizeof(struct sockaddr_in6);
488		src->sin6_family = AF_INET6;
489		src->sin6_addr = r->src.r_src6;
490
491		rin6.ro_rt = rtalloc1_fib((struct sockaddr *)src, 0, 0, r->fib);
492
493		if (rin6.ro_rt != NULL) {
494			rt = rin6.ro_rt;
495
496			if (rt_mask(rt))
497				fle6->f.src_mask = RT_MASK6(rt);
498			else
499				fle6->f.src_mask = 128;
500
501			RTFREE_LOCKED(rt);
502		}
503	}
504
505	/* Push new flow at the and of hash. */
506	TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash);
507
508	return (0);
509}
510#undef ipv6_masklen
511#undef RT_MASK6
512#endif
513
514
515/*
516 * Non-static functions called from ng_netflow.c
517 */
518
519/* Allocate memory and set up flow cache */
520void
521ng_netflow_cache_init(priv_p priv)
522{
523	struct flow_hash_entry *hsh;
524	int i;
525
526	/* Initialize cache UMA zone. */
527	priv->zone = uma_zcreate("NetFlow IPv4 cache",
528	    sizeof(struct flow_entry), uma_ctor_flow, uma_dtor_flow, NULL,
529	    NULL, UMA_ALIGN_CACHE, 0);
530	uma_zone_set_max(priv->zone, CACHESIZE);
531#ifdef INET6
532	priv->zone6 = uma_zcreate("NetFlow IPv6 cache",
533	    sizeof(struct flow6_entry), uma_ctor_flow6, uma_dtor_flow6, NULL,
534	    NULL, UMA_ALIGN_CACHE, 0);
535	uma_zone_set_max(priv->zone6, CACHESIZE);
536#endif
537
538	/* Allocate hash. */
539	priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
540	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
541
542	/* Initialize hash. */
543	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) {
544		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
545		TAILQ_INIT(&hsh->head);
546	}
547
548#ifdef INET6
549	/* Allocate hash. */
550	priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
551	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
552
553	/* Initialize hash. */
554	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) {
555		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
556		TAILQ_INIT(&hsh->head);
557	}
558#endif
559
560	ng_netflow_v9_cache_init(priv);
561	CTR0(KTR_NET, "ng_netflow startup()");
562}
563
564/* Initialize new FIB table for v5 and v9 */
565int
566ng_netflow_fib_init(priv_p priv, int fib)
567{
568	fib_export_p	fe = priv_to_fib(priv, fib);
569
570	CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib);
571
572	if (fe != NULL)
573		return (0);
574
575	if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH,
576	    M_NOWAIT | M_ZERO)) == NULL)
577		return (ENOMEM);
578
579	mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF);
580	mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF);
581	fe->fib = fib;
582	fe->domain_id = fib;
583
584	if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib],
585	    (uintptr_t)NULL, (uintptr_t)fe) == 0) {
586		/* FIB already set up by other ISR */
587		CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p",
588		    fib, fe, priv_to_fib(priv, fib));
589		mtx_destroy(&fe->export_mtx);
590		mtx_destroy(&fe->export9_mtx);
591		free(fe, M_NETGRAPH);
592	} else {
593		/* Increase counter for statistics */
594		CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)",
595		    fib, fe, priv_to_fib(priv, fib));
596		atomic_fetchadd_32(&priv->info.nfinfo_alloc_fibs, 1);
597	}
598
599	return (0);
600}
601
602/* Free all flow cache memory. Called from node close method. */
603void
604ng_netflow_cache_flush(priv_p priv)
605{
606	struct flow_entry	*fle, *fle1;
607	struct flow_hash_entry	*hsh;
608	struct netflow_export_item exp;
609	fib_export_p fe;
610	int i;
611
612	bzero(&exp, sizeof(exp));
613
614	/*
615	 * We are going to free probably billable data.
616	 * Expire everything before freeing it.
617	 * No locking is required since callout is already drained.
618	 */
619	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++)
620		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
621			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
622			fe = priv_to_fib(priv, fle->f.r.fib);
623			expire_flow(priv, fe, fle, NG_QUEUE);
624		}
625#ifdef INET6
626	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++)
627		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
628			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
629			fe = priv_to_fib(priv, fle->f.r.fib);
630			expire_flow(priv, fe, fle, NG_QUEUE);
631		}
632#endif
633
634	uma_zdestroy(priv->zone);
635	/* Destroy hash mutexes. */
636	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++)
637		mtx_destroy(&hsh->mtx);
638
639	/* Free hash memory. */
640	if (priv->hash != NULL)
641		free(priv->hash, M_NETFLOW_HASH);
642#ifdef INET6
643	uma_zdestroy(priv->zone6);
644	/* Destroy hash mutexes. */
645	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++)
646		mtx_destroy(&hsh->mtx);
647
648	/* Free hash memory. */
649	if (priv->hash6 != NULL)
650		free(priv->hash6, M_NETFLOW_HASH);
651#endif
652
653	for (i = 0; i < priv->maxfibs; i++) {
654		if ((fe = priv_to_fib(priv, i)) == NULL)
655			continue;
656
657		if (fe->exp.item != NULL)
658			export_send(priv, fe, fe->exp.item, NG_QUEUE);
659
660		if (fe->exp.item9 != NULL)
661			export9_send(priv, fe, fe->exp.item9,
662			    fe->exp.item9_opt, NG_QUEUE);
663
664		mtx_destroy(&fe->export_mtx);
665		mtx_destroy(&fe->export9_mtx);
666		free(fe, M_NETGRAPH);
667	}
668
669	ng_netflow_v9_cache_flush(priv);
670}
671
672/* Insert packet from into flow cache. */
673int
674ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip,
675    caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
676    unsigned int src_if_index)
677{
678	struct flow_entry	*fle, *fle1;
679	struct flow_hash_entry	*hsh;
680	struct flow_rec		r;
681	int			hlen, plen;
682	int			error = 0;
683	uint16_t		eproto;
684	uint8_t			tcp_flags = 0;
685
686	bzero(&r, sizeof(r));
687
688	if (ip->ip_v != IPVERSION)
689		return (EINVAL);
690
691	hlen = ip->ip_hl << 2;
692	if (hlen < sizeof(struct ip))
693		return (EINVAL);
694
695	eproto = ETHERTYPE_IP;
696	/* Assume L4 template by default */
697	r.flow_type = NETFLOW_V9_FLOW_V4_L4;
698
699	r.r_src = ip->ip_src;
700	r.r_dst = ip->ip_dst;
701	r.fib = fe->fib;
702
703	plen = ntohs(ip->ip_len);
704
705	r.r_ip_p = ip->ip_p;
706	r.r_tos = ip->ip_tos;
707
708	r.r_i_ifx = src_if_index;
709
710	/*
711	 * XXX NOTE: only first fragment of fragmented TCP, UDP and
712	 * ICMP packet will be recorded with proper s_port and d_port.
713	 * Following fragments will be recorded simply as IP packet with
714	 * ip_proto = ip->ip_p and s_port, d_port set to zero.
715	 * I know, it looks like bug. But I don't want to re-implement
716	 * ip packet assebmling here. Anyway, (in)famous trafd works this way -
717	 * and nobody complains yet :)
718	 */
719	if ((ip->ip_off & htons(IP_OFFMASK)) == 0)
720		switch(r.r_ip_p) {
721		case IPPROTO_TCP:
722		    {
723			struct tcphdr *tcp;
724
725			tcp = (struct tcphdr *)((caddr_t )ip + hlen);
726			r.r_sport = tcp->th_sport;
727			r.r_dport = tcp->th_dport;
728			tcp_flags = tcp->th_flags;
729			break;
730		    }
731		case IPPROTO_UDP:
732			r.r_ports = *(uint32_t *)((caddr_t )ip + hlen);
733			break;
734		}
735
736	atomic_fetchadd_32(&priv->info.nfinfo_packets, 1);
737	/* XXX: atomic */
738	priv->info.nfinfo_bytes += plen;
739
740	/* Find hash slot. */
741	hsh = &priv->hash[ip_hash(&r)];
742
743	mtx_lock(&hsh->mtx);
744
745	/*
746	 * Go through hash and find our entry. If we encounter an
747	 * entry, that should be expired, purge it. We do a reverse
748	 * search since most active entries are first, and most
749	 * searches are done on most active entries.
750	 */
751	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
752		if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0)
753			break;
754		if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) {
755			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
756			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
757			    fle, NG_QUEUE);
758			atomic_add_32(&priv->info.nfinfo_act_exp, 1);
759		}
760	}
761
762	if (fle) {			/* An existent entry. */
763
764		fle->f.bytes += plen;
765		fle->f.packets ++;
766		fle->f.tcp_flags |= tcp_flags;
767		fle->f.last = time_uptime;
768
769		/*
770		 * We have the following reasons to expire flow in active way:
771		 * - it hit active timeout
772		 * - a TCP connection closed
773		 * - it is going to overflow counter
774		 */
775		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
776		    (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
777			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
778			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
779			    fle, NG_QUEUE);
780			atomic_add_32(&priv->info.nfinfo_act_exp, 1);
781		} else {
782			/*
783			 * It is the newest, move it to the tail,
784			 * if it isn't there already. Next search will
785			 * locate it quicker.
786			 */
787			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
788				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
789				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
790			}
791		}
792	} else				/* A new flow entry. */
793		error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags);
794
795	mtx_unlock(&hsh->mtx);
796
797	return (error);
798}
799
800#ifdef INET6
801/* Insert IPv6 packet from into flow cache. */
802int
803ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6,
804    caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
805    unsigned int src_if_index)
806{
807	struct flow_entry	*fle = NULL, *fle1;
808	struct flow6_entry	*fle6;
809	struct flow_hash_entry	*hsh;
810	struct flow6_rec	r;
811	int			plen;
812	int			error = 0;
813	uint8_t			tcp_flags = 0;
814
815	/* check version */
816	if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION)
817		return (EINVAL);
818
819	bzero(&r, sizeof(r));
820
821	r.src.r_src6 = ip6->ip6_src;
822	r.dst.r_dst6 = ip6->ip6_dst;
823	r.fib = fe->fib;
824
825	/* Assume L4 template by default */
826	r.flow_type = NETFLOW_V9_FLOW_V6_L4;
827
828	plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr);
829
830#if 0
831	/* XXX: set DSCP/CoS value */
832	r.r_tos = ip->ip_tos;
833#endif
834	if ((flags & NG_NETFLOW_IS_FRAG) == 0) {
835		switch(upper_proto) {
836		case IPPROTO_TCP:
837		    {
838			struct tcphdr *tcp;
839
840			tcp = (struct tcphdr *)upper_ptr;
841			r.r_ports = *(uint32_t *)upper_ptr;
842			tcp_flags = tcp->th_flags;
843			break;
844		    }
845 		case IPPROTO_UDP:
846		case IPPROTO_SCTP:
847			r.r_ports = *(uint32_t *)upper_ptr;
848			break;
849		}
850	}
851
852	r.r_ip_p = upper_proto;
853	r.r_i_ifx = src_if_index;
854
855	atomic_fetchadd_32(&priv->info.nfinfo_packets6, 1);
856	/* XXX: atomic */
857	priv->info.nfinfo_bytes6 += plen;
858
859	/* Find hash slot. */
860	hsh = &priv->hash6[ip6_hash(&r)];
861
862	mtx_lock(&hsh->mtx);
863
864	/*
865	 * Go through hash and find our entry. If we encounter an
866	 * entry, that should be expired, purge it. We do a reverse
867	 * search since most active entries are first, and most
868	 * searches are done on most active entries.
869	 */
870	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
871		if (fle->f.version != IP6VERSION)
872			continue;
873		fle6 = (struct flow6_entry *)fle;
874		if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0)
875			break;
876		if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) {
877			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
878			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
879			    NG_QUEUE);
880			atomic_add_32(&priv->info.nfinfo_act_exp, 1);
881		}
882	}
883
884	if (fle != NULL) {			/* An existent entry. */
885		fle6 = (struct flow6_entry *)fle;
886
887		fle6->f.bytes += plen;
888		fle6->f.packets ++;
889		fle6->f.tcp_flags |= tcp_flags;
890		fle6->f.last = time_uptime;
891
892		/*
893		 * We have the following reasons to expire flow in active way:
894		 * - it hit active timeout
895		 * - a TCP connection closed
896		 * - it is going to overflow counter
897		 */
898		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) ||
899		    (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
900			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
901			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
902			    NG_QUEUE);
903			atomic_add_32(&priv->info.nfinfo_act_exp, 1);
904		} else {
905			/*
906			 * It is the newest, move it to the tail,
907			 * if it isn't there already. Next search will
908			 * locate it quicker.
909			 */
910			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
911				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
912				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
913			}
914		}
915	} else				/* A new flow entry. */
916		error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags);
917
918	mtx_unlock(&hsh->mtx);
919
920	return (error);
921}
922#endif
923
924/*
925 * Return records from cache to userland.
926 *
927 * TODO: matching particular IP should be done in kernel, here.
928 */
929int
930ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req,
931struct ngnf_show_header *resp)
932{
933	struct flow_hash_entry	*hsh;
934	struct flow_entry	*fle;
935	struct flow_entry_data	*data = (struct flow_entry_data *)(resp + 1);
936#ifdef INET6
937	struct flow6_entry_data	*data6 = (struct flow6_entry_data *)(resp + 1);
938#endif
939	int	i, max;
940
941	i = req->hash_id;
942	if (i > NBUCKETS-1)
943		return (EINVAL);
944
945#ifdef INET6
946	if (req->version == 6) {
947		resp->version = 6;
948		hsh = priv->hash6 + i;
949		max = NREC6_AT_ONCE;
950	} else
951#endif
952	if (req->version == 4) {
953		resp->version = 4;
954		hsh = priv->hash + i;
955		max = NREC_AT_ONCE;
956	} else
957		return (EINVAL);
958
959	/*
960	 * We will transfer not more than NREC_AT_ONCE. More data
961	 * will come in next message.
962	 * We send current hash index and current record number in list
963	 * to userland, and userland should return it back to us.
964	 * Then, we will restart with new entry.
965	 *
966	 * The resulting cache snapshot can be inaccurate if flow expiration
967	 * is taking place on hash item between userland data requests for
968	 * this hash item id.
969	 */
970	resp->nentries = 0;
971	for (; i < NBUCKETS; hsh++, i++) {
972		int list_id;
973
974		if (mtx_trylock(&hsh->mtx) == 0) {
975			/*
976			 * Requested hash index is not available,
977			 * relay decision to skip or re-request data
978			 * to userland.
979			 */
980			resp->hash_id = i;
981			resp->list_id = 0;
982			return (0);
983		}
984
985		list_id = 0;
986		TAILQ_FOREACH(fle, &hsh->head, fle_hash) {
987			if (hsh->mtx.mtx_lock & MTX_CONTESTED) {
988				resp->hash_id = i;
989				resp->list_id = list_id;
990				mtx_unlock(&hsh->mtx);
991				return (0);
992			}
993
994			list_id++;
995			/* Search for particular record in list. */
996			if (req->list_id > 0) {
997				if (list_id < req->list_id)
998					continue;
999
1000				/* Requested list position found. */
1001				req->list_id = 0;
1002			}
1003#ifdef INET6
1004			if (req->version == 6) {
1005				struct flow6_entry *fle6;
1006
1007				fle6 = (struct flow6_entry *)fle;
1008				bcopy(&fle6->f, data6 + resp->nentries,
1009				    sizeof(fle6->f));
1010			} else
1011#endif
1012				bcopy(&fle->f, data + resp->nentries,
1013				    sizeof(fle->f));
1014			resp->nentries++;
1015			if (resp->nentries == max) {
1016				resp->hash_id = i;
1017				/*
1018				 * If it was the last item in list
1019				 * we simply skip to next hash_id.
1020				 */
1021				resp->list_id = list_id + 1;
1022				mtx_unlock(&hsh->mtx);
1023				return (0);
1024			}
1025		}
1026		mtx_unlock(&hsh->mtx);
1027	}
1028
1029	resp->hash_id = resp->list_id = 0;
1030
1031	return (0);
1032}
1033
1034/* We have full datagram in privdata. Send it to export hook. */
1035static int
1036export_send(priv_p priv, fib_export_p fe, item_p item, int flags)
1037{
1038	struct mbuf *m = NGI_M(item);
1039	struct netflow_v5_export_dgram *dgram = mtod(m,
1040					struct netflow_v5_export_dgram *);
1041	struct netflow_v5_header *header = &dgram->header;
1042	struct timespec ts;
1043	int error = 0;
1044
1045	/* Fill mbuf header. */
1046	m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) *
1047	   header->count + sizeof(struct netflow_v5_header);
1048
1049	/* Fill export header. */
1050	header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
1051	getnanotime(&ts);
1052	header->unix_secs  = htonl(ts.tv_sec);
1053	header->unix_nsecs = htonl(ts.tv_nsec);
1054	header->engine_type = 0;
1055	header->engine_id = fe->domain_id;
1056	header->pad = 0;
1057	header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq,
1058	    header->count));
1059	header->count = htons(header->count);
1060
1061	if (priv->export != NULL)
1062		NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags);
1063	else
1064		NG_FREE_ITEM(item);
1065
1066	return (error);
1067}
1068
1069
1070/* Add export record to dgram. */
1071static int
1072export_add(item_p item, struct flow_entry *fle)
1073{
1074	struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item),
1075					struct netflow_v5_export_dgram *);
1076	struct netflow_v5_header *header = &dgram->header;
1077	struct netflow_v5_record *rec;
1078
1079	rec = &dgram->r[header->count];
1080	header->count ++;
1081
1082	KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS,
1083	    ("ng_netflow: export too big"));
1084
1085	/* Fill in export record. */
1086	rec->src_addr = fle->f.r.r_src.s_addr;
1087	rec->dst_addr = fle->f.r.r_dst.s_addr;
1088	rec->next_hop = fle->f.next_hop.s_addr;
1089	rec->i_ifx    = htons(fle->f.fle_i_ifx);
1090	rec->o_ifx    = htons(fle->f.fle_o_ifx);
1091	rec->packets  = htonl(fle->f.packets);
1092	rec->octets   = htonl(fle->f.bytes);
1093	rec->first    = htonl(MILLIUPTIME(fle->f.first));
1094	rec->last     = htonl(MILLIUPTIME(fle->f.last));
1095	rec->s_port   = fle->f.r.r_sport;
1096	rec->d_port   = fle->f.r.r_dport;
1097	rec->flags    = fle->f.tcp_flags;
1098	rec->prot     = fle->f.r.r_ip_p;
1099	rec->tos      = fle->f.r.r_tos;
1100	rec->dst_mask = fle->f.dst_mask;
1101	rec->src_mask = fle->f.src_mask;
1102	rec->pad1     = 0;
1103	rec->pad2     = 0;
1104
1105	/* Not supported fields. */
1106	rec->src_as = rec->dst_as = 0;
1107
1108	if (header->count == NETFLOW_V5_MAX_RECORDS)
1109		return (1); /* end of datagram */
1110	else
1111		return (0);
1112}
1113
1114/* Periodic flow expiry run. */
1115void
1116ng_netflow_expire(void *arg)
1117{
1118	struct flow_entry	*fle, *fle1;
1119	struct flow_hash_entry	*hsh;
1120	priv_p			priv = (priv_p )arg;
1121	uint32_t		used;
1122	int			i;
1123
1124	/*
1125	 * Going through all the cache.
1126	 */
1127	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) {
1128		/*
1129		 * Skip entries, that are already being worked on.
1130		 */
1131		if (mtx_trylock(&hsh->mtx) == 0)
1132			continue;
1133
1134		used = atomic_load_acq_32(&priv->info.nfinfo_used);
1135		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1136			/*
1137			 * Interrupt thread wants this entry!
1138			 * Quick! Quick! Bail out!
1139			 */
1140			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1141				break;
1142
1143			/*
1144			 * Don't expire aggressively while hash collision
1145			 * ratio is predicted small.
1146			 */
1147			if (used <= (NBUCKETS*2) && !INACTIVE(fle))
1148				break;
1149
1150			if ((INACTIVE(fle) && (SMALL(fle) ||
1151			    (used > (NBUCKETS*2)))) || AGED(fle)) {
1152				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1153				expire_flow(priv, priv_to_fib(priv,
1154				    fle->f.r.fib), fle, NG_NOFLAGS);
1155				used--;
1156				atomic_add_32(&priv->info.nfinfo_inact_exp, 1);
1157			}
1158		}
1159		mtx_unlock(&hsh->mtx);
1160	}
1161
1162#ifdef INET6
1163	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) {
1164		struct flow6_entry	*fle6;
1165
1166		/*
1167		 * Skip entries, that are already being worked on.
1168		 */
1169		if (mtx_trylock(&hsh->mtx) == 0)
1170			continue;
1171
1172		used = atomic_load_acq_32(&priv->info.nfinfo_used6);
1173		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1174			fle6 = (struct flow6_entry *)fle;
1175			/*
1176			 * Interrupt thread wants this entry!
1177			 * Quick! Quick! Bail out!
1178			 */
1179			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1180				break;
1181
1182			/*
1183			 * Don't expire aggressively while hash collision
1184			 * ratio is predicted small.
1185			 */
1186			if (used <= (NBUCKETS*2) && !INACTIVE(fle6))
1187				break;
1188
1189			if ((INACTIVE(fle6) && (SMALL(fle6) ||
1190			    (used > (NBUCKETS*2)))) || AGED(fle6)) {
1191				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1192				expire_flow(priv, priv_to_fib(priv,
1193				    fle->f.r.fib), fle, NG_NOFLAGS);
1194				used--;
1195				atomic_add_32(&priv->info.nfinfo_inact_exp, 1);
1196			}
1197		}
1198		mtx_unlock(&hsh->mtx);
1199	}
1200#endif
1201
1202	/* Schedule next expire. */
1203	callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
1204	    (void *)priv);
1205}
1206