netflow.c revision 143923
1/*-
2 * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org>
3 * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 *
27 * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $
28 */
29
30static const char rcs_id[] =
31    "@(#) $FreeBSD: head/sys/netgraph/netflow/netflow.c 143923 2005-03-21 15:34:03Z glebius $";
32
33#include <sys/param.h>
34#include <sys/kernel.h>
35#include <sys/limits.h>
36#include <sys/mbuf.h>
37#include <sys/syslog.h>
38#include <sys/systm.h>
39#include <sys/socket.h>
40
41#include <net/if.h>
42#include <net/if_var.h>
43#include <net/if_dl.h>
44#include <net/route.h>
45#include <netinet/in.h>
46#include <netinet/in_systm.h>
47#include <netinet/ip.h>
48#include <netinet/tcp.h>
49#include <netinet/udp.h>
50
51#include <netgraph/ng_message.h>
52#include <netgraph/netgraph.h>
53
54#include <netgraph/netflow/netflow.h>
55#include <netgraph/netflow/ng_netflow.h>
56
57#define	NBUCKETS	(4096)	/* must be power of 2 */
58
59/* This hash is for TCP or UDP packets */
60#define FULL_HASH(addr1,addr2,port1,port2)\
61	(((addr1 >> 16) ^		\
62	  (addr2 & 0x00FF) ^		\
63	  ((port1 ^ port2) << 8) )&	\
64	 (NBUCKETS - 1))
65
66/* This hash for all other IP packets */
67#define ADDR_HASH(addr1,addr2)\
68	(((addr1 >> 16) ^		\
69	  (addr2 & 0x00FF) )&		\
70	 (NBUCKETS - 1))
71
72/* Macros to shorten logical constructions */
73/* XXX: priv must exist in namespace */
74#define	INACTIVE(fle)	(time_uptime - fle->f.last > priv->info.nfinfo_inact_t)
75#define	AGED(fle)	(time_uptime - fle->f.first > priv->info.nfinfo_act_t)
76#define	ISFREE(fle)	(fle->f.packets == 0)
77
78/*
79 * 4 is a magical number: statistically number of 4-packet flows is
80 * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP
81 * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case
82 * of reachable host and 4-packet otherwise.
83 */
84#define	SMALL(fle)	(fle->f.packets <= 4)
85
86/*
87 * Cisco uses milliseconds for uptime. Bad idea, since it overflows
88 * every 48+ days. But we will do same to keep compatibility. This macro
89 * does overflowable multiplication to 1000.
90 */
91#define	MILLIUPTIME(t)	(((t) << 9) +	/* 512 */	\
92			 ((t) << 8) +	/* 256 */	\
93			 ((t) << 7) +	/* 128 */	\
94			 ((t) << 6) +	/* 64  */	\
95			 ((t) << 5) +	/* 32  */	\
96			 ((t) << 3))	/* 8   */
97
98MALLOC_DECLARE(M_NETFLOW);
99MALLOC_DEFINE(M_NETFLOW, "NetFlow", "flow cache");
100
101static int export_add(priv_p , struct flow_entry *);
102static int export_send(priv_p );
103
104/* Generate hash for a given flow record */
105static __inline uint32_t
106ip_hash(struct flow_rec *r)
107{
108	switch (r->r_ip_p) {
109	case IPPROTO_TCP:
110	case IPPROTO_UDP:
111		return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr,
112		    r->r_sport, r->r_dport);
113	default:
114		return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr);
115	}
116}
117
118/* Lookup for record in given slot */
119static __inline struct flow_entry *
120hash_lookup(struct flow_hash_entry *h, int slot, struct flow_rec *r)
121{
122	struct flow_entry *fle;
123
124	LIST_FOREACH(fle, &(h[slot].head), fle_hash)
125		if (bcmp(r, &fle->f.r, sizeof(struct flow_rec)) == 0)
126			return (fle);
127
128	return (NULL);
129}
130
131/* Get a flow entry from free list */
132static __inline struct flow_entry *
133alloc_flow(priv_p priv, int *flows)
134{
135	register struct flow_entry	*fle;
136
137	mtx_lock(&priv->free_mtx);
138
139	if (SLIST_EMPTY(&priv->free_list)) {
140		mtx_unlock(&priv->free_mtx);
141		return(NULL);
142	}
143
144	fle = SLIST_FIRST(&priv->free_list);
145	SLIST_REMOVE_HEAD(&priv->free_list, fle_free);
146
147	priv->info.nfinfo_used++;
148	priv->info.nfinfo_free--;
149
150	if (flows != NULL)
151		*flows = priv->info.nfinfo_used;
152
153	mtx_unlock(&priv->free_mtx);
154
155	return (fle);
156}
157
158/* Insert flow entry into a free list. */
159static __inline int
160free_flow(priv_p priv, struct flow_entry *fle)
161{
162	int flows;
163
164	mtx_lock(&priv->free_mtx);
165	fle->f.packets = 0;
166	SLIST_INSERT_HEAD(&priv->free_list, fle, fle_free);
167	flows = priv->info.nfinfo_used--;
168	priv->info.nfinfo_free++;
169	mtx_unlock(&priv->free_mtx);
170
171	return flows;
172}
173
174#define	NGNF_GETUSED(priv, rval)	do {	\
175	mtx_lock(&priv->free_mtx);		\
176	rval = priv->info.nfinfo_used;		\
177	mtx_unlock(&priv->free_mtx);		\
178	} while (0)
179
180/* Insert flow entry into expire list. */
181/* XXX: Flow must be detached from work queue, but not from cache */
182static __inline void
183expire_flow(priv_p priv, struct flow_entry *fle)
184{
185	mtx_assert(&priv->work_mtx, MA_OWNED);
186	LIST_REMOVE(fle, fle_hash);
187
188	mtx_lock(&priv->expire_mtx);
189	SLIST_INSERT_HEAD(&priv->expire_list, fle, fle_free);
190	mtx_unlock(&priv->expire_mtx);
191}
192
193/* Get a snapshot of node statistics */
194void
195ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
196{
197	mtx_lock(&priv->free_mtx);
198	memcpy((void *)i, (void *)&priv->info, sizeof(priv->info));
199	mtx_unlock(&priv->free_mtx);
200}
201
202/* Calculate number of bits in netmask */
203#define	g21	0x55555555ul	/* = 0101_0101_0101_0101_0101_0101_0101_0101 */
204#define	g22	0x33333333ul	/* = 0011_0011_0011_0011_0011_0011_0011_0011 */
205#define	g23	0x0f0f0f0ful	/* = 0000_1111_0000_1111_0000_1111_0000_1111 */
206static __inline u_char
207bit_count(uint32_t v)
208{
209	v = (v & g21) + ((v >> 1) & g21);
210	v = (v & g22) + ((v >> 2) & g22);
211	v = (v + (v >> 4)) & g23;
212	return (v + (v >> 8) + (v >> 16) + (v >> 24)) & 0x3f;
213}
214
215/*
216 * Insert a record into defined slot.
217 *
218 * First we get for us a free flow entry, then fill in all
219 * possible fields in it. Then obtain lock on flow cache
220 * and insert flow entry.
221 */
222static __inline int
223hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen,
224	uint8_t tcp_flags)
225{
226	struct flow_hash_entry	*h = priv->hash;
227	struct flow_entry	*fle;
228	struct route ro;
229	struct sockaddr_in *sin;
230
231	fle = alloc_flow(priv, NULL);
232	if (fle == NULL)
233		return (ENOMEM);
234
235	/*
236	 * Now fle is totally ours. It is detached from all lists,
237	 * we can safely edit it.
238	 */
239
240	bcopy(r, &fle->f.r, sizeof(struct flow_rec));
241	fle->f.bytes = plen;
242	fle->f.packets = 1;
243	fle->f.tcp_flags = tcp_flags;
244
245	fle->f.first = fle->f.last = time_uptime;
246
247	/*
248	 * First we do route table lookup on destination address. So we can
249	 * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
250	 */
251	bzero((caddr_t)&ro, sizeof(ro));
252	sin = (struct sockaddr_in *)&ro.ro_dst;
253	sin->sin_len = sizeof(*sin);
254	sin->sin_family = AF_INET;
255	sin->sin_addr = fle->f.r.r_dst;
256	rtalloc_ign(&ro, RTF_CLONING);
257	if (ro.ro_rt != NULL) {
258		struct rtentry *rt = ro.ro_rt;
259
260		fle->f.fle_o_ifx = rt->rt_ifp->if_index;
261
262		if (rt->rt_flags & RTF_GATEWAY &&
263		    rt->rt_gateway->sa_family == AF_INET)
264			fle->f.next_hop =
265			    ((struct sockaddr_in *)(rt->rt_gateway))->sin_addr;
266
267		if (rt_mask(rt))
268			fle->f.dst_mask =
269			    bit_count(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
270		else if (rt->rt_flags & RTF_HOST)
271			/* Give up. We can't determine mask :( */
272			fle->f.dst_mask = 32;
273
274		RTFREE(ro.ro_rt);
275	}
276
277	/* Do route lookup on source address, to fill in src_mask. */
278
279	bzero((caddr_t)&ro, sizeof(ro));
280	sin = (struct sockaddr_in *)&ro.ro_dst;
281	sin->sin_len = sizeof(*sin);
282	sin->sin_family = AF_INET;
283	sin->sin_addr = fle->f.r.r_src;
284	rtalloc_ign(&ro, RTF_CLONING);
285	if (ro.ro_rt != NULL) {
286		struct rtentry *rt = ro.ro_rt;
287
288		if (rt_mask(rt))
289			fle->f.src_mask =
290			    bit_count(((struct sockaddr_in *)rt_mask(rt))->sin_addr.s_addr);
291		else if (rt->rt_flags & RTF_HOST)
292			/* Give up. We can't determine mask :( */
293			fle->f.src_mask = 32;
294
295		RTFREE(ro.ro_rt);
296	}
297
298	/* Push new flow entry into flow cache */
299	mtx_lock(&priv->work_mtx);
300	LIST_INSERT_HEAD(&(h[slot].head), fle, fle_hash);
301	TAILQ_INSERT_TAIL(&priv->work_queue, fle, fle_work);
302	mtx_unlock(&priv->work_mtx);
303
304	return (0);
305}
306
307
308/*
309 * Non-static functions called from ng_netflow.c
310 */
311
312/* Allocate memory and set up flow cache */
313int
314ng_netflow_cache_init(priv_p priv)
315{
316	struct flow_entry *fle;
317	int i;
318
319	/* allocate cache */
320	MALLOC(priv->cache, struct flow_entry *,
321	    CACHESIZE * sizeof(struct flow_entry),
322	    M_NETFLOW, M_WAITOK | M_ZERO);
323
324	if (priv->cache == NULL)
325		return (ENOMEM);
326
327	/* allocate hash */
328	MALLOC(priv->hash, struct flow_hash_entry *,
329	    NBUCKETS * sizeof(struct flow_hash_entry),
330	    M_NETFLOW, M_WAITOK | M_ZERO);
331
332	if (priv->hash == NULL) {
333		FREE(priv->cache, M_NETFLOW);
334		return (ENOMEM);
335	}
336
337	TAILQ_INIT(&priv->work_queue);
338	SLIST_INIT(&priv->free_list);
339	SLIST_INIT(&priv->expire_list);
340
341	mtx_init(&priv->work_mtx, "ng_netflow cache mutex", NULL, MTX_DEF);
342	mtx_init(&priv->free_mtx, "ng_netflow free mutex", NULL, MTX_DEF);
343	mtx_init(&priv->expire_mtx, "ng_netflow expire mutex", NULL, MTX_DEF);
344
345	/* build free list */
346	for (i = 0, fle = priv->cache; i < CACHESIZE; i++, fle++)
347		SLIST_INSERT_HEAD(&priv->free_list, fle, fle_free);
348
349	priv->info.nfinfo_free = CACHESIZE;
350
351	return (0);
352}
353
354/* Free all flow cache memory. Called from node close method. */
355void
356ng_netflow_cache_flush(priv_p priv)
357{
358	register struct flow_entry	*fle;
359	int i;
360
361	/*
362	 * We are going to free probably billable data.
363	 * Expire everything before freeing it.
364	 * No locking is required since callout is already drained.
365	 */
366
367	for (i = 0, fle = priv->cache; i < CACHESIZE; i++, fle++)
368		if (!ISFREE(fle))
369			/* ignore errors now */
370			(void )export_add(priv, fle);
371
372	mtx_destroy(&priv->work_mtx);
373	mtx_destroy(&priv->free_mtx);
374	mtx_destroy(&priv->expire_mtx);
375
376	/* free hash memory */
377	if (priv->hash)
378		FREE(priv->hash, M_NETFLOW);
379
380	/* free flow cache */
381	if (priv->cache)
382		FREE(priv->cache, M_NETFLOW);
383
384}
385
386/* Insert packet from &m into flow cache. */
387int
388ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface,
389	struct ifnet *ifp)
390{
391	struct flow_hash_entry		*h = priv->hash;
392	register struct flow_entry	*fle;
393	struct flow_rec		r;
394	int			hlen, plen;
395	uint32_t		slot;
396	uint8_t			tcp_flags = 0;
397
398	/* Try to fill flow_rec r */
399	bzero(&r, sizeof(r));
400	/* check version */
401	if (ip->ip_v != IPVERSION)
402		return (EINVAL);
403
404	/* verify min header length */
405	hlen = ip->ip_hl << 2;
406
407	if (hlen < sizeof(struct ip))
408		return (EINVAL);
409
410	r.r_src = ip->ip_src;
411	r.r_dst = ip->ip_dst;
412
413	/* save packet length */
414	plen = ntohs(ip->ip_len);
415
416	r.r_ip_p = ip->ip_p;
417	r.r_tos = ip->ip_tos;
418
419	/* Configured in_ifx overrides mbuf's */
420	if (iface->info.ifinfo_index == 0) {
421		if (ifp != NULL)
422			r.r_i_ifx = ifp->if_index;
423	} else
424		r.r_i_ifx = iface->info.ifinfo_index;
425
426	/*
427	 * XXX NOTE: only first fragment of fragmented TCP, UDP and
428	 * ICMP packet will be recorded with proper s_port and d_port.
429	 * Following fragments will be recorded simply as IP packet with
430	 * ip_proto = ip->ip_p and s_port, d_port set to zero.
431	 * I know, it looks like bug. But I don't want to re-implement
432	 * ip packet assebmling here. Anyway, (in)famous trafd works this way -
433	 * and nobody complains yet :)
434	 */
435	if(ip->ip_off & htons(IP_OFFMASK))
436		goto flow_rec_done;
437
438	switch(r.r_ip_p) {
439	case IPPROTO_TCP:
440	{
441		register struct tcphdr *tcp;
442
443		tcp = (struct tcphdr *)((caddr_t )ip + hlen);
444		r.r_sport = tcp->th_sport;
445		r.r_dport = tcp->th_dport;
446		tcp_flags = tcp->th_flags;
447		break;
448	}
449	case IPPROTO_UDP:
450		r.r_ports = *(uint32_t *)((caddr_t )ip + hlen);
451		break;
452	}
453
454flow_rec_done:
455
456	slot = ip_hash(&r);
457
458	mtx_lock(&priv->work_mtx);
459
460	/* Update node statistics. */
461	priv->info.nfinfo_packets ++;
462	priv->info.nfinfo_bytes += plen;
463
464	fle = hash_lookup(h, slot, &r); /* New flow entry or existent? */
465
466	if (fle) {	/* an existent entry */
467
468		TAILQ_REMOVE(&priv->work_queue, fle, fle_work);
469
470		fle->f.bytes += plen;
471		fle->f.packets ++;
472		fle->f.tcp_flags |= tcp_flags;
473		fle->f.last = time_uptime;
474
475		/*
476		 * We have the following reasons to expire flow in active way:
477		 * - it hit active timeout
478		 * - a TCP connection closed
479		 * - it is going to overflow counter
480		 */
481		if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
482		    (fle->f.bytes >= (UINT_MAX - IF_MAXMTU)) )
483			expire_flow(priv, fle);
484		else
485			TAILQ_INSERT_TAIL(&priv->work_queue, fle, fle_work);
486
487		mtx_unlock(&priv->work_mtx);
488
489	} else {	/* a new flow entry */
490
491		mtx_unlock(&priv->work_mtx);
492		return hash_insert(priv, slot, &r, plen, tcp_flags);
493
494	}
495
496	mtx_assert(&priv->work_mtx, MA_NOTOWNED);
497	mtx_assert(&priv->expire_mtx, MA_NOTOWNED);
498	mtx_assert(&priv->free_mtx, MA_NOTOWNED);
499
500	return (0);
501}
502
503/*
504 * Return records from cache. netgraph(4) guarantees us that we
505 * are locked against ng_netflow_rcvdata(). However we can
506 * work with ng_netflow_expire() in parrallel. XXX: Is it dangerous?
507 *
508 * TODO: matching particular IP should be done in kernel, here.
509 */
510int
511ng_netflow_flow_show(priv_p priv, uint32_t last, struct ng_mesg *resp)
512{
513	struct flow_entry *fle;
514	struct ngnf_flows *data;
515
516	data = (struct ngnf_flows *)resp->data;
517	data->last = 0;
518	data->nentries = 0;
519
520	/* Check if this is a first run */
521	if (last == 0)
522		fle = priv->cache;
523	else {
524		if (last > CACHESIZE-1)
525			return (EINVAL);
526		fle = priv->cache + last;
527	}
528
529	/*
530	 * We will transfer not more than NREC_AT_ONCE. More data
531	 * will come in next message.
532	 * We send current stop point to userland, and userland should return
533	 * it back to us.
534	 */
535	for (; last < CACHESIZE; fle++, last++) {
536		if (ISFREE(fle))
537			continue;
538		bcopy(&fle->f, &(data->entries[data->nentries]),
539		    sizeof(fle->f));
540		data->nentries ++;
541		if (data->nentries == NREC_AT_ONCE) {
542			if (++last < CACHESIZE)
543				data->last = (++fle - priv->cache);
544			return (0);
545		}
546     	}
547
548	return (0);
549}
550
551/* We have full datagram in privdata. Send it to export hook. */
552static int
553export_send(priv_p priv)
554{
555	struct netflow_v5_header *header = &priv->dgram.header;
556	struct timespec ts;
557	struct mbuf *m;
558	int error = 0;
559	int mlen;
560
561	header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
562
563	getnanotime(&ts);
564	header->unix_secs  = htonl(ts.tv_sec);
565	header->unix_nsecs = htonl(ts.tv_nsec);
566
567	/* Flow sequence contains number of first record */
568	header->flow_seq = htonl(priv->flow_seq - header->count);
569
570	mlen = sizeof(struct netflow_v5_header) +
571	    sizeof(struct netflow_v5_record) * header->count;
572
573	header->count = htons(header->count);
574	if ((m = m_devget((caddr_t)header, mlen, 0, NULL, NULL)) == NULL) {
575		log(LOG_CRIT, "ng_netflow: m_devget() failed, losing export "
576		    "dgram\n");
577		header->count = 0;
578		return(ENOBUFS);
579	}
580
581	header->count = 0;
582
583	/* Giant is required in sosend() at this moment. */
584	NET_LOCK_GIANT();
585	NG_SEND_DATA_ONLY(error, priv->export, m);
586	NET_UNLOCK_GIANT();
587
588	if (error)
589		NG_FREE_M(m);
590
591	return (error);
592}
593
594
595/* Create export datagram. */
596static int
597export_add(priv_p priv, struct flow_entry *fle)
598{
599	struct netflow_v5_header *header = &priv->dgram.header;
600	struct netflow_v5_record *rec;
601
602	if (header->count == 0 ) {	/* first record */
603		rec = &priv->dgram.r[0];
604		header->count = 1;
605	} else {			/* continue filling datagram */
606		rec = &priv->dgram.r[header->count];
607		header->count ++;
608	}
609
610	/* Fill in export record */
611	rec->src_addr = fle->f.r.r_src.s_addr;
612	rec->dst_addr = fle->f.r.r_dst.s_addr;
613	rec->next_hop = fle->f.next_hop.s_addr;
614	rec->i_ifx    = htons(fle->f.fle_i_ifx);
615	rec->o_ifx    = htons(fle->f.fle_o_ifx);
616	rec->packets  = htonl(fle->f.packets);
617	rec->octets   = htonl(fle->f.bytes);
618	rec->first    = htonl(MILLIUPTIME(fle->f.first));
619	rec->last     = htonl(MILLIUPTIME(fle->f.last));
620	rec->s_port   = fle->f.r.r_sport;
621	rec->d_port   = fle->f.r.r_dport;
622	rec->flags    = fle->f.tcp_flags;
623	rec->prot     = fle->f.r.r_ip_p;
624	rec->tos      = fle->f.r.r_tos;
625	rec->dst_mask = fle->f.dst_mask;
626	rec->src_mask = fle->f.src_mask;
627
628	priv->flow_seq++;
629
630	if (header->count == NETFLOW_V5_MAX_RECORDS) /* end of datagram */
631		return export_send(priv);
632
633	return (0);
634}
635
636/* Periodic flow expiry run. */
637void
638ng_netflow_expire(void *arg)
639{
640	register struct flow_entry	*fle, *fle1;
641	priv_p priv = (priv_p )arg;
642	uint32_t used;
643	int error = 0;
644
645	/* First pack actively expired entries */
646	mtx_lock(&priv->expire_mtx);
647	while (!SLIST_EMPTY(&(priv->expire_list))) {
648		fle = SLIST_FIRST(&(priv->expire_list));
649		SLIST_REMOVE_HEAD(&(priv->expire_list), fle_free);
650		mtx_unlock(&priv->expire_mtx);
651
652		/*
653		 * While we have dropped the lock, expire_flow() may
654		 * insert another flow into top of the list.
655		 * This is not harmful for us, since we have already
656		 * detached our own.
657		 */
658
659		if ((error = export_add(priv, fle)) != 0)
660			log(LOG_CRIT, "ng_netflow: export_add() failed: %u\n",
661			    error);
662		(void )free_flow(priv, fle);
663
664		mtx_lock(&priv->expire_mtx);
665	}
666	mtx_unlock(&priv->expire_mtx);
667
668	NGNF_GETUSED(priv, used);
669	mtx_lock(&priv->work_mtx);
670	TAILQ_FOREACH_SAFE(fle, &(priv->work_queue), fle_work, fle1) {
671		/*
672		 * When cache size has not reached CACHELOWAT yet, we keep
673		 * both inactive and active flows in cache. Doing this, we
674		 * reduce number of exports, since many inactive flows may
675		 * wake up and continue their life. However, we make an
676		 * exclusion for scans. It is very rare situation that
677		 * inactive 1-packet flow will wake up.
678		 * When cache has reached CACHELOWAT, we expire all inactive
679		 * flows, until cache gets to a sane size.
680		 */
681		if (used <= CACHELOWAT && !INACTIVE(fle))
682			goto finish;
683
684		if ((INACTIVE(fle) && (SMALL(fle) || (used > CACHELOWAT))) ||
685		    AGED(fle)) {
686
687			/* Detach flow entry from cache */
688			LIST_REMOVE(fle, fle_hash);
689			TAILQ_REMOVE(&priv->work_queue, fle, fle_work);
690
691			/*
692			 * While we are sending to collector, unlock cache.
693			 * XXX: it can happen, however with a small probability,
694			 * that item, we are holding now, can be moved to the
695			 * top of flow cache by node thread. In this case our
696			 * expire thread stops checking. Since this is not
697			 * fatal we will just ignore it now.
698			 */
699			mtx_unlock(&priv->work_mtx);
700
701			if ((error = export_add(priv, fle)) != 0)
702				log(LOG_CRIT, "ng_netflow: export_add() "
703				    "failed: %u\n", error);
704
705			used = free_flow(priv, fle);
706
707			mtx_lock(&priv->work_mtx);
708		}
709     	}
710
711finish:
712	mtx_unlock(&priv->work_mtx);
713
714	mtx_assert(&priv->expire_mtx, MA_NOTOWNED);
715	mtx_assert(&priv->free_mtx, MA_NOTOWNED);
716
717	/* schedule next expire */
718	callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
719	    (void *)priv);
720
721}
722