1/*-
2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3 *
4 * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru>
5 * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org>
6 * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net>
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 *    notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 *    notice, this list of conditions and the following disclaimer in the
16 *    documentation and/or other materials provided with the distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 *
30 * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $
31 */
32
33#include <sys/cdefs.h>
34__FBSDID("$FreeBSD$");
35
36#include "opt_inet6.h"
37#include "opt_route.h"
38#include <sys/param.h>
39#include <sys/bitstring.h>
40#include <sys/systm.h>
41#include <sys/counter.h>
42#include <sys/kernel.h>
43#include <sys/ktr.h>
44#include <sys/limits.h>
45#include <sys/mbuf.h>
46#include <sys/syslog.h>
47#include <sys/socket.h>
48#include <vm/uma.h>
49
50#include <net/if.h>
51#include <net/if_dl.h>
52#include <net/if_var.h>
53#include <net/route.h>
54#include <net/ethernet.h>
55#include <netinet/in.h>
56#include <netinet/in_systm.h>
57#include <netinet/ip.h>
58#include <netinet/ip6.h>
59#include <netinet/tcp.h>
60#include <netinet/udp.h>
61
62#include <netgraph/ng_message.h>
63#include <netgraph/netgraph.h>
64
65#include <netgraph/netflow/netflow.h>
66#include <netgraph/netflow/netflow_v9.h>
67#include <netgraph/netflow/ng_netflow.h>
68
69#define	NBUCKETS	(65536)		/* must be power of 2 */
70
71/* This hash is for TCP or UDP packets. */
72#define FULL_HASH(addr1, addr2, port1, port2)	\
73	(((addr1 ^ (addr1 >> 16) ^ 		\
74	htons(addr2 ^ (addr2 >> 16))) ^ 	\
75	port1 ^ htons(port2)) &			\
76	(NBUCKETS - 1))
77
78/* This hash is for all other IP packets. */
79#define ADDR_HASH(addr1, addr2)			\
80	((addr1 ^ (addr1 >> 16) ^ 		\
81	htons(addr2 ^ (addr2 >> 16))) &		\
82	(NBUCKETS - 1))
83
84/* Macros to shorten logical constructions */
85/* XXX: priv must exist in namespace */
86#define	INACTIVE(fle)	(time_uptime - fle->f.last > priv->nfinfo_inact_t)
87#define	AGED(fle)	(time_uptime - fle->f.first > priv->nfinfo_act_t)
88#define	ISFREE(fle)	(fle->f.packets == 0)
89
90/*
91 * 4 is a magical number: statistically number of 4-packet flows is
92 * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP
93 * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case
94 * of reachable host and 4-packet otherwise.
95 */
96#define	SMALL(fle)	(fle->f.packets <= 4)
97
98MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash");
99
100static int export_add(item_p, struct flow_entry *);
101static int export_send(priv_p, fib_export_p, item_p, int);
102
103static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *,
104    int, uint8_t, uint8_t);
105#ifdef INET6
106static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *,
107    int, uint8_t, uint8_t);
108#endif
109
110static void expire_flow(priv_p, fib_export_p, struct flow_entry *, int);
111
112/*
113 * Generate hash for a given flow record.
114 *
115 * FIB is not used here, because:
116 * most VRFS will carry public IPv4 addresses which are unique even
117 * without FIB private addresses can overlap, but this is worked out
118 * via flow_rec bcmp() containing fib id. In IPv6 world addresses are
119 * all globally unique (it's not fully true, there is FC00::/7 for example,
120 * but chances of address overlap are MUCH smaller)
121 */
122static inline uint32_t
123ip_hash(struct flow_rec *r)
124{
125
126	switch (r->r_ip_p) {
127	case IPPROTO_TCP:
128	case IPPROTO_UDP:
129		return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr,
130		    r->r_sport, r->r_dport);
131	default:
132		return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr);
133	}
134}
135
136#ifdef INET6
137/* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */
138static inline uint32_t
139ip6_hash(struct flow6_rec *r)
140{
141
142	switch (r->r_ip_p) {
143	case IPPROTO_TCP:
144	case IPPROTO_UDP:
145		return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
146		    r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport,
147		    r->r_dport);
148	default:
149		return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
150		    r->dst.r_dst6.__u6_addr.__u6_addr32[3]);
151 	}
152}
153
154static inline int
155ip6_masklen(struct in6_addr *saddr, struct rt_addrinfo *info)
156{
157	const int nbits = sizeof(*saddr) * NBBY;
158	int mlen;
159
160	if (info->rti_addrs & RTA_NETMASK)
161		bit_count((bitstr_t *)saddr, 0, nbits, &mlen);
162	else
163		mlen = nbits;
164	return (mlen);
165}
166#endif
167
168/*
169 * Detach export datagram from priv, if there is any.
170 * If there is no, allocate a new one.
171 */
172static item_p
173get_export_dgram(priv_p priv, fib_export_p fe)
174{
175	item_p	item = NULL;
176
177	mtx_lock(&fe->export_mtx);
178	if (fe->exp.item != NULL) {
179		item = fe->exp.item;
180		fe->exp.item = NULL;
181	}
182	mtx_unlock(&fe->export_mtx);
183
184	if (item == NULL) {
185		struct netflow_v5_export_dgram *dgram;
186		struct mbuf *m;
187
188		m = m_getcl(M_NOWAIT, MT_DATA, M_PKTHDR);
189		if (m == NULL)
190			return (NULL);
191		item = ng_package_data(m, NG_NOFLAGS);
192		if (item == NULL)
193			return (NULL);
194		dgram = mtod(m, struct netflow_v5_export_dgram *);
195		dgram->header.count = 0;
196		dgram->header.version = htons(NETFLOW_V5);
197		dgram->header.pad = 0;
198	}
199
200	return (item);
201}
202
203/*
204 * Re-attach incomplete datagram back to priv.
205 * If there is already another one, then send incomplete. */
206static void
207return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags)
208{
209
210	/*
211	 * It may happen on SMP, that some thread has already
212	 * put its item there, in this case we bail out and
213	 * send what we have to collector.
214	 */
215	mtx_lock(&fe->export_mtx);
216	if (fe->exp.item == NULL) {
217		fe->exp.item = item;
218		mtx_unlock(&fe->export_mtx);
219	} else {
220		mtx_unlock(&fe->export_mtx);
221		export_send(priv, fe, item, flags);
222	}
223}
224
225/*
226 * The flow is over. Call export_add() and free it. If datagram is
227 * full, then call export_send().
228 */
229static void
230expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags)
231{
232	struct netflow_export_item exp;
233	uint16_t version = fle->f.version;
234
235	if ((priv->export != NULL) && (version == IPVERSION)) {
236		exp.item = get_export_dgram(priv, fe);
237		if (exp.item == NULL) {
238			priv->nfinfo_export_failed++;
239			if (priv->export9 != NULL)
240				priv->nfinfo_export9_failed++;
241			/* fle definitely contains IPv4 flow. */
242			uma_zfree_arg(priv->zone, fle, priv);
243			return;
244		}
245
246		if (export_add(exp.item, fle) > 0)
247			export_send(priv, fe, exp.item, flags);
248		else
249			return_export_dgram(priv, fe, exp.item, NG_QUEUE);
250	}
251
252	if (priv->export9 != NULL) {
253		exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt);
254		if (exp.item9 == NULL) {
255			priv->nfinfo_export9_failed++;
256			if (version == IPVERSION)
257				uma_zfree_arg(priv->zone, fle, priv);
258#ifdef INET6
259			else if (version == IP6VERSION)
260				uma_zfree_arg(priv->zone6, fle, priv);
261#endif
262			else
263				panic("ng_netflow: Unknown IP proto: %d",
264				    version);
265			return;
266		}
267
268		if (export9_add(exp.item9, exp.item9_opt, fle) > 0)
269			export9_send(priv, fe, exp.item9, exp.item9_opt, flags);
270		else
271			return_export9_dgram(priv, fe, exp.item9,
272			    exp.item9_opt, NG_QUEUE);
273	}
274
275	if (version == IPVERSION)
276		uma_zfree_arg(priv->zone, fle, priv);
277#ifdef INET6
278	else if (version == IP6VERSION)
279		uma_zfree_arg(priv->zone6, fle, priv);
280#endif
281}
282
283/* Get a snapshot of node statistics */
284void
285ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
286{
287
288	i->nfinfo_bytes = counter_u64_fetch(priv->nfinfo_bytes);
289	i->nfinfo_packets = counter_u64_fetch(priv->nfinfo_packets);
290	i->nfinfo_bytes6 = counter_u64_fetch(priv->nfinfo_bytes6);
291	i->nfinfo_packets6 = counter_u64_fetch(priv->nfinfo_packets6);
292	i->nfinfo_sbytes = counter_u64_fetch(priv->nfinfo_sbytes);
293	i->nfinfo_spackets = counter_u64_fetch(priv->nfinfo_spackets);
294	i->nfinfo_sbytes6 = counter_u64_fetch(priv->nfinfo_sbytes6);
295	i->nfinfo_spackets6 = counter_u64_fetch(priv->nfinfo_spackets6);
296	i->nfinfo_act_exp = counter_u64_fetch(priv->nfinfo_act_exp);
297	i->nfinfo_inact_exp = counter_u64_fetch(priv->nfinfo_inact_exp);
298
299	i->nfinfo_used = uma_zone_get_cur(priv->zone);
300#ifdef INET6
301	i->nfinfo_used6 = uma_zone_get_cur(priv->zone6);
302#endif
303
304	i->nfinfo_alloc_failed = priv->nfinfo_alloc_failed;
305	i->nfinfo_export_failed = priv->nfinfo_export_failed;
306	i->nfinfo_export9_failed = priv->nfinfo_export9_failed;
307	i->nfinfo_realloc_mbuf = priv->nfinfo_realloc_mbuf;
308	i->nfinfo_alloc_fibs = priv->nfinfo_alloc_fibs;
309	i->nfinfo_inact_t = priv->nfinfo_inact_t;
310	i->nfinfo_act_t = priv->nfinfo_act_t;
311}
312
313/*
314 * Insert a record into defined slot.
315 *
316 * First we get for us a free flow entry, then fill in all
317 * possible fields in it.
318 *
319 * TODO: consider dropping hash mutex while filling in datagram,
320 * as this was done in previous version. Need to test & profile
321 * to be sure.
322 */
323static int
324hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r,
325	int plen, uint8_t flags, uint8_t tcp_flags)
326{
327	struct flow_entry *fle;
328	struct sockaddr_in sin, sin_mask;
329	struct sockaddr_dl rt_gateway;
330	struct rt_addrinfo info;
331
332	mtx_assert(&hsh->mtx, MA_OWNED);
333
334	fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT);
335	if (fle == NULL) {
336		priv->nfinfo_alloc_failed++;
337		return (ENOMEM);
338	}
339
340	/*
341	 * Now fle is totally ours. It is detached from all lists,
342	 * we can safely edit it.
343	 */
344	fle->f.version = IPVERSION;
345	bcopy(r, &fle->f.r, sizeof(struct flow_rec));
346	fle->f.bytes = plen;
347	fle->f.packets = 1;
348	fle->f.tcp_flags = tcp_flags;
349
350	fle->f.first = fle->f.last = time_uptime;
351
352	/*
353	 * First we do route table lookup on destination address. So we can
354	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
355	 */
356	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
357		bzero(&sin, sizeof(sin));
358		sin.sin_len = sizeof(struct sockaddr_in);
359		sin.sin_family = AF_INET;
360		sin.sin_addr = fle->f.r.r_dst;
361
362		rt_gateway.sdl_len = sizeof(rt_gateway);
363		sin_mask.sin_len = sizeof(struct sockaddr_in);
364		bzero(&info, sizeof(info));
365
366		info.rti_info[RTAX_GATEWAY] = (struct sockaddr *)&rt_gateway;
367		info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin_mask;
368
369		if (rib_lookup_info(r->fib, (struct sockaddr *)&sin, NHR_REF, 0,
370		    &info) == 0) {
371			fle->f.fle_o_ifx = info.rti_ifp->if_index;
372
373			if (info.rti_flags & RTF_GATEWAY &&
374			    rt_gateway.sdl_family == AF_INET)
375				fle->f.next_hop =
376				    ((struct sockaddr_in *)&rt_gateway)->sin_addr;
377
378			if (info.rti_addrs & RTA_NETMASK)
379				fle->f.dst_mask = bitcount32(sin_mask.sin_addr.s_addr);
380			else if (info.rti_flags & RTF_HOST)
381				/* Give up. We can't determine mask :( */
382				fle->f.dst_mask = 32;
383
384			rib_free_info(&info);
385		}
386	}
387
388	/* Do route lookup on source address, to fill in src_mask. */
389	if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
390		bzero(&sin, sizeof(sin));
391		sin.sin_len = sizeof(struct sockaddr_in);
392		sin.sin_family = AF_INET;
393		sin.sin_addr = fle->f.r.r_src;
394
395		sin_mask.sin_len = sizeof(struct sockaddr_in);
396		bzero(&info, sizeof(info));
397
398		info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin_mask;
399
400		if (rib_lookup_info(r->fib, (struct sockaddr *)&sin, 0, 0,
401		    &info) == 0) {
402			if (info.rti_addrs & RTA_NETMASK)
403				fle->f.src_mask =
404				    bitcount32(sin_mask.sin_addr.s_addr);
405			else if (info.rti_flags & RTF_HOST)
406				/* Give up. We can't determine mask :( */
407				fle->f.src_mask = 32;
408		}
409	}
410
411	/* Push new flow at the and of hash. */
412	TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
413
414	return (0);
415}
416
417#ifdef INET6
418static int
419hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r,
420	int plen, uint8_t flags, uint8_t tcp_flags)
421{
422	struct flow6_entry *fle6;
423	struct sockaddr_in6 sin6, sin6_mask;
424	struct sockaddr_dl rt_gateway;
425	struct rt_addrinfo info;
426
427	mtx_assert(&hsh6->mtx, MA_OWNED);
428
429	fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT);
430	if (fle6 == NULL) {
431		priv->nfinfo_alloc_failed++;
432		return (ENOMEM);
433	}
434
435	/*
436	 * Now fle is totally ours. It is detached from all lists,
437	 * we can safely edit it.
438	 */
439
440	fle6->f.version = IP6VERSION;
441	bcopy(r, &fle6->f.r, sizeof(struct flow6_rec));
442	fle6->f.bytes = plen;
443	fle6->f.packets = 1;
444	fle6->f.tcp_flags = tcp_flags;
445
446	fle6->f.first = fle6->f.last = time_uptime;
447
448	/*
449	 * First we do route table lookup on destination address. So we can
450	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
451	 */
452	if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
453		bzero(&sin6, sizeof(struct sockaddr_in6));
454		sin6.sin6_len = sizeof(struct sockaddr_in6);
455		sin6.sin6_family = AF_INET6;
456		sin6.sin6_addr = r->dst.r_dst6;
457
458		rt_gateway.sdl_len = sizeof(rt_gateway);
459		sin6_mask.sin6_len = sizeof(struct sockaddr_in6);
460		bzero(&info, sizeof(info));
461
462		info.rti_info[RTAX_GATEWAY] = (struct sockaddr *)&rt_gateway;
463		info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin6_mask;
464
465		if (rib_lookup_info(r->fib, (struct sockaddr *)&sin6, NHR_REF,
466		    0, &info) == 0) {
467			fle6->f.fle_o_ifx = info.rti_ifp->if_index;
468
469			if (info.rti_flags & RTF_GATEWAY &&
470			    rt_gateway.sdl_family == AF_INET6)
471				fle6->f.n.next_hop6 =
472				    ((struct sockaddr_in6 *)&rt_gateway)->sin6_addr;
473
474			fle6->f.dst_mask =
475			    ip6_masklen(&sin6_mask.sin6_addr, &info);
476
477			rib_free_info(&info);
478		}
479	}
480
481	if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
482		/* Do route lookup on source address, to fill in src_mask. */
483		bzero(&sin6, sizeof(struct sockaddr_in6));
484		sin6.sin6_len = sizeof(struct sockaddr_in6);
485		sin6.sin6_family = AF_INET6;
486		sin6.sin6_addr = r->src.r_src6;
487
488		sin6_mask.sin6_len = sizeof(struct sockaddr_in6);
489		bzero(&info, sizeof(info));
490
491		info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin6_mask;
492
493		if (rib_lookup_info(r->fib, (struct sockaddr *)&sin6, 0, 0,
494		    &info) == 0)
495			fle6->f.src_mask =
496			    ip6_masklen(&sin6_mask.sin6_addr, &info);
497	}
498
499	/* Push new flow at the and of hash. */
500	TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash);
501
502	return (0);
503}
504#endif
505
506
507/*
508 * Non-static functions called from ng_netflow.c
509 */
510
511/* Allocate memory and set up flow cache */
512void
513ng_netflow_cache_init(priv_p priv)
514{
515	struct flow_hash_entry *hsh;
516	int i;
517
518	/* Initialize cache UMA zone. */
519	priv->zone = uma_zcreate("NetFlow IPv4 cache",
520	    sizeof(struct flow_entry), NULL, NULL, NULL, NULL,
521	    UMA_ALIGN_CACHE, 0);
522	uma_zone_set_max(priv->zone, CACHESIZE);
523#ifdef INET6
524	priv->zone6 = uma_zcreate("NetFlow IPv6 cache",
525	    sizeof(struct flow6_entry), NULL, NULL, NULL, NULL,
526	    UMA_ALIGN_CACHE, 0);
527	uma_zone_set_max(priv->zone6, CACHESIZE);
528#endif
529
530	/* Allocate hash. */
531	priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
532	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
533
534	/* Initialize hash. */
535	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) {
536		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
537		TAILQ_INIT(&hsh->head);
538	}
539
540#ifdef INET6
541	/* Allocate hash. */
542	priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
543	    M_NETFLOW_HASH, M_WAITOK | M_ZERO);
544
545	/* Initialize hash. */
546	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) {
547		mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
548		TAILQ_INIT(&hsh->head);
549	}
550#endif
551
552	priv->nfinfo_bytes = counter_u64_alloc(M_WAITOK);
553	priv->nfinfo_packets = counter_u64_alloc(M_WAITOK);
554	priv->nfinfo_bytes6 = counter_u64_alloc(M_WAITOK);
555	priv->nfinfo_packets6 = counter_u64_alloc(M_WAITOK);
556	priv->nfinfo_sbytes = counter_u64_alloc(M_WAITOK);
557	priv->nfinfo_spackets = counter_u64_alloc(M_WAITOK);
558	priv->nfinfo_sbytes6 = counter_u64_alloc(M_WAITOK);
559	priv->nfinfo_spackets6 = counter_u64_alloc(M_WAITOK);
560	priv->nfinfo_act_exp = counter_u64_alloc(M_WAITOK);
561	priv->nfinfo_inact_exp = counter_u64_alloc(M_WAITOK);
562
563	ng_netflow_v9_cache_init(priv);
564	CTR0(KTR_NET, "ng_netflow startup()");
565}
566
567/* Initialize new FIB table for v5 and v9 */
568int
569ng_netflow_fib_init(priv_p priv, int fib)
570{
571	fib_export_p	fe = priv_to_fib(priv, fib);
572
573	CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib);
574
575	if (fe != NULL)
576		return (0);
577
578	if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH,
579	    M_NOWAIT | M_ZERO)) == NULL)
580		return (ENOMEM);
581
582	mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF);
583	mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF);
584	fe->fib = fib;
585	fe->domain_id = fib;
586
587	if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib],
588	    (uintptr_t)NULL, (uintptr_t)fe) == 0) {
589		/* FIB already set up by other ISR */
590		CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p",
591		    fib, fe, priv_to_fib(priv, fib));
592		mtx_destroy(&fe->export_mtx);
593		mtx_destroy(&fe->export9_mtx);
594		free(fe, M_NETGRAPH);
595	} else {
596		/* Increase counter for statistics */
597		CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)",
598		    fib, fe, priv_to_fib(priv, fib));
599		priv->nfinfo_alloc_fibs++;
600	}
601
602	return (0);
603}
604
605/* Free all flow cache memory. Called from node close method. */
606void
607ng_netflow_cache_flush(priv_p priv)
608{
609	struct flow_entry	*fle, *fle1;
610	struct flow_hash_entry	*hsh;
611	struct netflow_export_item exp;
612	fib_export_p fe;
613	int i;
614
615	bzero(&exp, sizeof(exp));
616
617	/*
618	 * We are going to free probably billable data.
619	 * Expire everything before freeing it.
620	 * No locking is required since callout is already drained.
621	 */
622	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++)
623		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
624			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
625			fe = priv_to_fib(priv, fle->f.r.fib);
626			expire_flow(priv, fe, fle, NG_QUEUE);
627		}
628#ifdef INET6
629	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++)
630		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
631			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
632			fe = priv_to_fib(priv, fle->f.r.fib);
633			expire_flow(priv, fe, fle, NG_QUEUE);
634		}
635#endif
636
637	uma_zdestroy(priv->zone);
638	/* Destroy hash mutexes. */
639	for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++)
640		mtx_destroy(&hsh->mtx);
641
642	/* Free hash memory. */
643	if (priv->hash != NULL)
644		free(priv->hash, M_NETFLOW_HASH);
645#ifdef INET6
646	uma_zdestroy(priv->zone6);
647	/* Destroy hash mutexes. */
648	for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++)
649		mtx_destroy(&hsh->mtx);
650
651	/* Free hash memory. */
652	if (priv->hash6 != NULL)
653		free(priv->hash6, M_NETFLOW_HASH);
654#endif
655
656	for (i = 0; i < priv->maxfibs; i++) {
657		if ((fe = priv_to_fib(priv, i)) == NULL)
658			continue;
659
660		if (fe->exp.item != NULL)
661			export_send(priv, fe, fe->exp.item, NG_QUEUE);
662
663		if (fe->exp.item9 != NULL)
664			export9_send(priv, fe, fe->exp.item9,
665			    fe->exp.item9_opt, NG_QUEUE);
666
667		mtx_destroy(&fe->export_mtx);
668		mtx_destroy(&fe->export9_mtx);
669		free(fe, M_NETGRAPH);
670	}
671
672	counter_u64_free(priv->nfinfo_bytes);
673	counter_u64_free(priv->nfinfo_packets);
674	counter_u64_free(priv->nfinfo_bytes6);
675	counter_u64_free(priv->nfinfo_packets6);
676	counter_u64_free(priv->nfinfo_sbytes);
677	counter_u64_free(priv->nfinfo_spackets);
678	counter_u64_free(priv->nfinfo_sbytes6);
679	counter_u64_free(priv->nfinfo_spackets6);
680	counter_u64_free(priv->nfinfo_act_exp);
681	counter_u64_free(priv->nfinfo_inact_exp);
682
683	ng_netflow_v9_cache_flush(priv);
684}
685
686/* Insert packet from into flow cache. */
687int
688ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip,
689    caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
690    unsigned int src_if_index)
691{
692	struct flow_entry	*fle, *fle1;
693	struct flow_hash_entry	*hsh;
694	struct flow_rec		r;
695	int			hlen, plen;
696	int			error = 0;
697	uint16_t		eproto;
698	uint8_t			tcp_flags = 0;
699
700	bzero(&r, sizeof(r));
701
702	if (ip->ip_v != IPVERSION)
703		return (EINVAL);
704
705	hlen = ip->ip_hl << 2;
706	if (hlen < sizeof(struct ip))
707		return (EINVAL);
708
709	eproto = ETHERTYPE_IP;
710	/* Assume L4 template by default */
711	r.flow_type = NETFLOW_V9_FLOW_V4_L4;
712
713	r.r_src = ip->ip_src;
714	r.r_dst = ip->ip_dst;
715	r.fib = fe->fib;
716
717	plen = ntohs(ip->ip_len);
718
719	r.r_ip_p = ip->ip_p;
720	r.r_tos = ip->ip_tos;
721
722	r.r_i_ifx = src_if_index;
723
724	/*
725	 * XXX NOTE: only first fragment of fragmented TCP, UDP and
726	 * ICMP packet will be recorded with proper s_port and d_port.
727	 * Following fragments will be recorded simply as IP packet with
728	 * ip_proto = ip->ip_p and s_port, d_port set to zero.
729	 * I know, it looks like bug. But I don't want to re-implement
730	 * ip packet assebmling here. Anyway, (in)famous trafd works this way -
731	 * and nobody complains yet :)
732	 */
733	if ((ip->ip_off & htons(IP_OFFMASK)) == 0)
734		switch(r.r_ip_p) {
735		case IPPROTO_TCP:
736		    {
737			struct tcphdr *tcp;
738
739			tcp = (struct tcphdr *)((caddr_t )ip + hlen);
740			r.r_sport = tcp->th_sport;
741			r.r_dport = tcp->th_dport;
742			tcp_flags = tcp->th_flags;
743			break;
744		    }
745		case IPPROTO_UDP:
746			r.r_ports = *(uint32_t *)((caddr_t )ip + hlen);
747			break;
748		}
749
750	counter_u64_add(priv->nfinfo_packets, 1);
751	counter_u64_add(priv->nfinfo_bytes, plen);
752
753	/* Find hash slot. */
754	hsh = &priv->hash[ip_hash(&r)];
755
756	mtx_lock(&hsh->mtx);
757
758	/*
759	 * Go through hash and find our entry. If we encounter an
760	 * entry, that should be expired, purge it. We do a reverse
761	 * search since most active entries are first, and most
762	 * searches are done on most active entries.
763	 */
764	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
765		if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0)
766			break;
767		if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) {
768			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
769			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
770			    fle, NG_QUEUE);
771			counter_u64_add(priv->nfinfo_act_exp, 1);
772		}
773	}
774
775	if (fle) {			/* An existent entry. */
776
777		fle->f.bytes += plen;
778		fle->f.packets ++;
779		fle->f.tcp_flags |= tcp_flags;
780		fle->f.last = time_uptime;
781
782		/*
783		 * We have the following reasons to expire flow in active way:
784		 * - it hit active timeout
785		 * - a TCP connection closed
786		 * - it is going to overflow counter
787		 */
788		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
789		    (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
790			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
791			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
792			    fle, NG_QUEUE);
793			counter_u64_add(priv->nfinfo_act_exp, 1);
794		} else {
795			/*
796			 * It is the newest, move it to the tail,
797			 * if it isn't there already. Next search will
798			 * locate it quicker.
799			 */
800			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
801				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
802				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
803			}
804		}
805	} else				/* A new flow entry. */
806		error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags);
807
808	mtx_unlock(&hsh->mtx);
809
810	return (error);
811}
812
813#ifdef INET6
814/* Insert IPv6 packet from into flow cache. */
815int
816ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6,
817    caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
818    unsigned int src_if_index)
819{
820	struct flow_entry	*fle = NULL, *fle1;
821	struct flow6_entry	*fle6;
822	struct flow_hash_entry	*hsh;
823	struct flow6_rec	r;
824	int			plen;
825	int			error = 0;
826	uint8_t			tcp_flags = 0;
827
828	/* check version */
829	if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION)
830		return (EINVAL);
831
832	bzero(&r, sizeof(r));
833
834	r.src.r_src6 = ip6->ip6_src;
835	r.dst.r_dst6 = ip6->ip6_dst;
836	r.fib = fe->fib;
837
838	/* Assume L4 template by default */
839	r.flow_type = NETFLOW_V9_FLOW_V6_L4;
840
841	plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr);
842
843#if 0
844	/* XXX: set DSCP/CoS value */
845	r.r_tos = ip->ip_tos;
846#endif
847	if ((flags & NG_NETFLOW_IS_FRAG) == 0) {
848		switch(upper_proto) {
849		case IPPROTO_TCP:
850		    {
851			struct tcphdr *tcp;
852
853			tcp = (struct tcphdr *)upper_ptr;
854			r.r_ports = *(uint32_t *)upper_ptr;
855			tcp_flags = tcp->th_flags;
856			break;
857		    }
858 		case IPPROTO_UDP:
859		case IPPROTO_SCTP:
860			r.r_ports = *(uint32_t *)upper_ptr;
861			break;
862		}
863	}
864
865	r.r_ip_p = upper_proto;
866	r.r_i_ifx = src_if_index;
867
868	counter_u64_add(priv->nfinfo_packets6, 1);
869	counter_u64_add(priv->nfinfo_bytes6, plen);
870
871	/* Find hash slot. */
872	hsh = &priv->hash6[ip6_hash(&r)];
873
874	mtx_lock(&hsh->mtx);
875
876	/*
877	 * Go through hash and find our entry. If we encounter an
878	 * entry, that should be expired, purge it. We do a reverse
879	 * search since most active entries are first, and most
880	 * searches are done on most active entries.
881	 */
882	TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
883		if (fle->f.version != IP6VERSION)
884			continue;
885		fle6 = (struct flow6_entry *)fle;
886		if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0)
887			break;
888		if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) {
889			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
890			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
891			    NG_QUEUE);
892			counter_u64_add(priv->nfinfo_act_exp, 1);
893		}
894	}
895
896	if (fle != NULL) {			/* An existent entry. */
897		fle6 = (struct flow6_entry *)fle;
898
899		fle6->f.bytes += plen;
900		fle6->f.packets ++;
901		fle6->f.tcp_flags |= tcp_flags;
902		fle6->f.last = time_uptime;
903
904		/*
905		 * We have the following reasons to expire flow in active way:
906		 * - it hit active timeout
907		 * - a TCP connection closed
908		 * - it is going to overflow counter
909		 */
910		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) ||
911		    (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
912			TAILQ_REMOVE(&hsh->head, fle, fle_hash);
913			expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
914			    NG_QUEUE);
915			counter_u64_add(priv->nfinfo_act_exp, 1);
916		} else {
917			/*
918			 * It is the newest, move it to the tail,
919			 * if it isn't there already. Next search will
920			 * locate it quicker.
921			 */
922			if (fle != TAILQ_LAST(&hsh->head, fhead)) {
923				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
924				TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
925			}
926		}
927	} else				/* A new flow entry. */
928		error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags);
929
930	mtx_unlock(&hsh->mtx);
931
932	return (error);
933}
934#endif
935
936/*
937 * Return records from cache to userland.
938 *
939 * TODO: matching particular IP should be done in kernel, here.
940 */
941int
942ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req,
943struct ngnf_show_header *resp)
944{
945	struct flow_hash_entry	*hsh;
946	struct flow_entry	*fle;
947	struct flow_entry_data	*data = (struct flow_entry_data *)(resp + 1);
948#ifdef INET6
949	struct flow6_entry_data	*data6 = (struct flow6_entry_data *)(resp + 1);
950#endif
951	int	i, max;
952
953	i = req->hash_id;
954	if (i > NBUCKETS-1)
955		return (EINVAL);
956
957#ifdef INET6
958	if (req->version == 6) {
959		resp->version = 6;
960		hsh = priv->hash6 + i;
961		max = NREC6_AT_ONCE;
962	} else
963#endif
964	if (req->version == 4) {
965		resp->version = 4;
966		hsh = priv->hash + i;
967		max = NREC_AT_ONCE;
968	} else
969		return (EINVAL);
970
971	/*
972	 * We will transfer not more than NREC_AT_ONCE. More data
973	 * will come in next message.
974	 * We send current hash index and current record number in list
975	 * to userland, and userland should return it back to us.
976	 * Then, we will restart with new entry.
977	 *
978	 * The resulting cache snapshot can be inaccurate if flow expiration
979	 * is taking place on hash item between userland data requests for
980	 * this hash item id.
981	 */
982	resp->nentries = 0;
983	for (; i < NBUCKETS; hsh++, i++) {
984		int list_id;
985
986		if (mtx_trylock(&hsh->mtx) == 0) {
987			/*
988			 * Requested hash index is not available,
989			 * relay decision to skip or re-request data
990			 * to userland.
991			 */
992			resp->hash_id = i;
993			resp->list_id = 0;
994			return (0);
995		}
996
997		list_id = 0;
998		TAILQ_FOREACH(fle, &hsh->head, fle_hash) {
999			if (hsh->mtx.mtx_lock & MTX_CONTESTED) {
1000				resp->hash_id = i;
1001				resp->list_id = list_id;
1002				mtx_unlock(&hsh->mtx);
1003				return (0);
1004			}
1005
1006			list_id++;
1007			/* Search for particular record in list. */
1008			if (req->list_id > 0) {
1009				if (list_id < req->list_id)
1010					continue;
1011
1012				/* Requested list position found. */
1013				req->list_id = 0;
1014			}
1015#ifdef INET6
1016			if (req->version == 6) {
1017				struct flow6_entry *fle6;
1018
1019				fle6 = (struct flow6_entry *)fle;
1020				bcopy(&fle6->f, data6 + resp->nentries,
1021				    sizeof(fle6->f));
1022			} else
1023#endif
1024				bcopy(&fle->f, data + resp->nentries,
1025				    sizeof(fle->f));
1026			resp->nentries++;
1027			if (resp->nentries == max) {
1028				resp->hash_id = i;
1029				/*
1030				 * If it was the last item in list
1031				 * we simply skip to next hash_id.
1032				 */
1033				resp->list_id = list_id + 1;
1034				mtx_unlock(&hsh->mtx);
1035				return (0);
1036			}
1037		}
1038		mtx_unlock(&hsh->mtx);
1039	}
1040
1041	resp->hash_id = resp->list_id = 0;
1042
1043	return (0);
1044}
1045
1046/* We have full datagram in privdata. Send it to export hook. */
1047static int
1048export_send(priv_p priv, fib_export_p fe, item_p item, int flags)
1049{
1050	struct mbuf *m = NGI_M(item);
1051	struct netflow_v5_export_dgram *dgram = mtod(m,
1052					struct netflow_v5_export_dgram *);
1053	struct netflow_v5_header *header = &dgram->header;
1054	struct timespec ts;
1055	int error = 0;
1056
1057	/* Fill mbuf header. */
1058	m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) *
1059	   header->count + sizeof(struct netflow_v5_header);
1060
1061	/* Fill export header. */
1062	header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
1063	getnanotime(&ts);
1064	header->unix_secs  = htonl(ts.tv_sec);
1065	header->unix_nsecs = htonl(ts.tv_nsec);
1066	header->engine_type = 0;
1067	header->engine_id = fe->domain_id;
1068	header->pad = 0;
1069	header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq,
1070	    header->count));
1071	header->count = htons(header->count);
1072
1073	if (priv->export != NULL)
1074		NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags);
1075	else
1076		NG_FREE_ITEM(item);
1077
1078	return (error);
1079}
1080
1081
1082/* Add export record to dgram. */
1083static int
1084export_add(item_p item, struct flow_entry *fle)
1085{
1086	struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item),
1087					struct netflow_v5_export_dgram *);
1088	struct netflow_v5_header *header = &dgram->header;
1089	struct netflow_v5_record *rec;
1090
1091	rec = &dgram->r[header->count];
1092	header->count ++;
1093
1094	KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS,
1095	    ("ng_netflow: export too big"));
1096
1097	/* Fill in export record. */
1098	rec->src_addr = fle->f.r.r_src.s_addr;
1099	rec->dst_addr = fle->f.r.r_dst.s_addr;
1100	rec->next_hop = fle->f.next_hop.s_addr;
1101	rec->i_ifx    = htons(fle->f.fle_i_ifx);
1102	rec->o_ifx    = htons(fle->f.fle_o_ifx);
1103	rec->packets  = htonl(fle->f.packets);
1104	rec->octets   = htonl(fle->f.bytes);
1105	rec->first    = htonl(MILLIUPTIME(fle->f.first));
1106	rec->last     = htonl(MILLIUPTIME(fle->f.last));
1107	rec->s_port   = fle->f.r.r_sport;
1108	rec->d_port   = fle->f.r.r_dport;
1109	rec->flags    = fle->f.tcp_flags;
1110	rec->prot     = fle->f.r.r_ip_p;
1111	rec->tos      = fle->f.r.r_tos;
1112	rec->dst_mask = fle->f.dst_mask;
1113	rec->src_mask = fle->f.src_mask;
1114	rec->pad1     = 0;
1115	rec->pad2     = 0;
1116
1117	/* Not supported fields. */
1118	rec->src_as = rec->dst_as = 0;
1119
1120	if (header->count == NETFLOW_V5_MAX_RECORDS)
1121		return (1); /* end of datagram */
1122	else
1123		return (0);
1124}
1125
1126/* Periodic flow expiry run. */
1127void
1128ng_netflow_expire(void *arg)
1129{
1130	struct flow_entry	*fle, *fle1;
1131	struct flow_hash_entry	*hsh;
1132	priv_p			priv = (priv_p )arg;
1133	int			used, i;
1134
1135	/*
1136	 * Going through all the cache.
1137	 */
1138	used = uma_zone_get_cur(priv->zone);
1139	for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) {
1140		/*
1141		 * Skip entries, that are already being worked on.
1142		 */
1143		if (mtx_trylock(&hsh->mtx) == 0)
1144			continue;
1145
1146		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1147			/*
1148			 * Interrupt thread wants this entry!
1149			 * Quick! Quick! Bail out!
1150			 */
1151			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1152				break;
1153
1154			/*
1155			 * Don't expire aggressively while hash collision
1156			 * ratio is predicted small.
1157			 */
1158			if (used <= (NBUCKETS*2) && !INACTIVE(fle))
1159				break;
1160
1161			if ((INACTIVE(fle) && (SMALL(fle) ||
1162			    (used > (NBUCKETS*2)))) || AGED(fle)) {
1163				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1164				expire_flow(priv, priv_to_fib(priv,
1165				    fle->f.r.fib), fle, NG_NOFLAGS);
1166				used--;
1167				counter_u64_add(priv->nfinfo_inact_exp, 1);
1168			}
1169		}
1170		mtx_unlock(&hsh->mtx);
1171	}
1172
1173#ifdef INET6
1174	used = uma_zone_get_cur(priv->zone6);
1175	for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) {
1176		struct flow6_entry	*fle6;
1177
1178		/*
1179		 * Skip entries, that are already being worked on.
1180		 */
1181		if (mtx_trylock(&hsh->mtx) == 0)
1182			continue;
1183
1184		TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1185			fle6 = (struct flow6_entry *)fle;
1186			/*
1187			 * Interrupt thread wants this entry!
1188			 * Quick! Quick! Bail out!
1189			 */
1190			if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1191				break;
1192
1193			/*
1194			 * Don't expire aggressively while hash collision
1195			 * ratio is predicted small.
1196			 */
1197			if (used <= (NBUCKETS*2) && !INACTIVE(fle6))
1198				break;
1199
1200			if ((INACTIVE(fle6) && (SMALL(fle6) ||
1201			    (used > (NBUCKETS*2)))) || AGED(fle6)) {
1202				TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1203				expire_flow(priv, priv_to_fib(priv,
1204				    fle->f.r.fib), fle, NG_NOFLAGS);
1205				used--;
1206				counter_u64_add(priv->nfinfo_inact_exp, 1);
1207			}
1208		}
1209		mtx_unlock(&hsh->mtx);
1210	}
1211#endif
1212
1213	/* Schedule next expire. */
1214	callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
1215	    (void *)priv);
1216}
1217