1/*
2 * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>
3 * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20#include "sync.h"
21#include "netlink.h"
22#include "traffic_stats.h"
23#include "log.h"
24#include "cache.h"
25#include "conntrackd.h"
26#include "network.h"
27#include "fds.h"
28#include "event.h"
29#include "queue.h"
30#include "process.h"
31#include "origin.h"
32#include "internal.h"
33#include "external.h"
34
35#include <errno.h>
36#include <unistd.h>
37#include <time.h>
38#include <string.h>
39#include <stdlib.h>
40#include <limits.h>
41#include <net/if.h>
42#include <fcntl.h>
43
44static struct nf_conntrack *msg2ct_alloc(struct nethdr *net, size_t remain)
45{
46	struct nf_conntrack *ct;
47
48	/* TODO: add stats on ENOMEM errors in the future. */
49	ct = nfct_new();
50	if (ct == NULL)
51		return NULL;
52
53	if (msg2ct(ct, net, remain) == -1) {
54		STATE_SYNC(error).msg_rcv_malformed++;
55		STATE_SYNC(error).msg_rcv_bad_payload++;
56		nfct_destroy(ct);
57		return NULL;
58	}
59	return ct;
60}
61
62static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)
63{
64	struct nf_expect *exp;
65
66	/* TODO: add stats on ENOMEM errors in the future. */
67	exp = nfexp_new();
68	if (exp == NULL)
69		return NULL;
70
71	if (msg2exp(exp, net, remain) == -1) {
72		STATE_SYNC(error).msg_rcv_malformed++;
73		STATE_SYNC(error).msg_rcv_bad_payload++;
74		nfexp_destroy(exp);
75		return NULL;
76	}
77	return exp;
78}
79
80static void
81do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain)
82{
83	struct nf_conntrack *ct = NULL;
84	struct nf_expect *exp = NULL;
85
86	if (net->version != CONNTRACKD_PROTOCOL_VERSION) {
87		STATE_SYNC(error).msg_rcv_malformed++;
88		STATE_SYNC(error).msg_rcv_bad_version++;
89		return;
90	}
91
92	switch (STATE_SYNC(sync)->recv(net)) {
93	case MSG_DATA:
94		multichannel_change_current_channel(STATE_SYNC(channel), c);
95		break;
96	case MSG_CTL:
97		multichannel_change_current_channel(STATE_SYNC(channel), c);
98		return;
99	case MSG_BAD:
100		STATE_SYNC(error).msg_rcv_malformed++;
101		STATE_SYNC(error).msg_rcv_bad_header++;
102		return;
103	case MSG_DROP:
104		return;
105	default:
106		break;
107	}
108
109	if (net->type > NET_T_STATE_MAX) {
110		STATE_SYNC(error).msg_rcv_malformed++;
111		STATE_SYNC(error).msg_rcv_bad_type++;
112		return;
113	}
114
115	switch(net->type) {
116	case NET_T_STATE_CT_NEW:
117		ct = msg2ct_alloc(net, remain);
118		if (ct == NULL)
119			return;
120		STATE_SYNC(external)->ct.new(ct);
121		break;
122	case NET_T_STATE_CT_UPD:
123		ct = msg2ct_alloc(net, remain);
124		if (ct == NULL)
125			return;
126		STATE_SYNC(external)->ct.upd(ct);
127		break;
128	case NET_T_STATE_CT_DEL:
129		ct = msg2ct_alloc(net, remain);
130		if (ct == NULL)
131			return;
132		STATE_SYNC(external)->ct.del(ct);
133		break;
134	case NET_T_STATE_EXP_NEW:
135		exp = msg2exp_alloc(net, remain);
136		if (exp == NULL)
137			return;
138		STATE_SYNC(external)->exp.new(exp);
139		break;
140	case NET_T_STATE_EXP_UPD:
141		exp = msg2exp_alloc(net, remain);
142		if (exp == NULL)
143			return;
144		STATE_SYNC(external)->exp.upd(exp);
145		break;
146	case NET_T_STATE_EXP_DEL:
147		exp = msg2exp_alloc(net, remain);
148		if (exp == NULL)
149			return;
150		STATE_SYNC(external)->exp.del(exp);
151		break;
152	default:
153		STATE_SYNC(error).msg_rcv_malformed++;
154		STATE_SYNC(error).msg_rcv_bad_type++;
155		break;
156	}
157	if (ct != NULL)
158		nfct_destroy(ct);
159	if (exp != NULL)
160		nfexp_destroy(exp);
161}
162
163static char __net[65536];		/* XXX: maximum MTU for IPv4 */
164static char *cur = __net;
165
166static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)
167{
168	if (m->channel_flags & CHANNEL_F_STREAM) {
169		/* truncated data. */
170		memcpy(__net, ptr, remain);
171		cur = __net + remain;
172		return 1;
173	}
174	return 0;
175}
176
177/* handler for messages received */
178static int channel_handler_routine(struct channel *m)
179{
180	ssize_t numbytes;
181	ssize_t remain, pending = cur - __net;
182	char *ptr = __net;
183
184	numbytes = channel_recv(m, cur, sizeof(__net) - pending);
185	if (numbytes <= 0)
186		return -1;
187
188	remain = numbytes;
189	if (pending) {
190		remain += pending;
191		cur = __net;
192	}
193
194	while (remain > 0) {
195		struct nethdr *net = (struct nethdr *) ptr;
196		int len;
197
198		if (remain < NETHDR_SIZ) {
199			if (!channel_stream(m, ptr, remain)) {
200				STATE_SYNC(error).msg_rcv_malformed++;
201				STATE_SYNC(error).msg_rcv_truncated++;
202			}
203			break;
204		}
205
206		len = ntohs(net->len);
207		if (len <= 0) {
208			STATE_SYNC(error).msg_rcv_malformed++;
209			STATE_SYNC(error).msg_rcv_bad_size++;
210			break;
211		}
212
213		if (len > remain) {
214			if (!channel_stream(m, ptr, remain)) {
215				STATE_SYNC(error).msg_rcv_malformed++;
216				STATE_SYNC(error).msg_rcv_bad_size++;
217			}
218			break;
219		}
220
221		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
222			if (remain < NETHDR_ACK_SIZ) {
223				if (!channel_stream(m, ptr, remain)) {
224					STATE_SYNC(error).msg_rcv_malformed++;
225					STATE_SYNC(error).msg_rcv_truncated++;
226				}
227				break;
228			}
229
230			if (len < NETHDR_ACK_SIZ) {
231				STATE_SYNC(error).msg_rcv_malformed++;
232				STATE_SYNC(error).msg_rcv_bad_size++;
233				break;
234			}
235		} else {
236			if (len < NETHDR_SIZ) {
237				STATE_SYNC(error).msg_rcv_malformed++;
238				STATE_SYNC(error).msg_rcv_bad_size++;
239				break;
240			}
241		}
242
243		HDR_NETWORK2HOST(net);
244
245		do_channel_handler_step(m, net, remain);
246		ptr += net->len;
247		remain -= net->len;
248	}
249	return 0;
250}
251
252/* handler for messages received */
253static void channel_handler(void *data)
254{
255	struct channel *c = data;
256	int k;
257
258	for (k=0; k<CONFIG(event_iterations_limit); k++) {
259		if (channel_handler_routine(c) == -1) {
260			break;
261		}
262	}
263}
264
265/* select a new interface candidate in a round robin basis */
266static void interface_candidate(void)
267{
268	int i, idx;
269	unsigned int flags;
270	char buf[IFNAMSIZ];
271
272	for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
273		idx = multichannel_get_ifindex(STATE_SYNC(channel), i);
274		if (idx == multichannel_get_current_ifindex(STATE_SYNC(channel)))
275			continue;
276		nlif_get_ifflags(STATE_SYNC(interface), idx, &flags);
277		if (flags & (IFF_RUNNING | IFF_UP)) {
278			multichannel_set_current_channel(STATE_SYNC(channel), i);
279			dlog(LOG_NOTICE, "device `%s' becomes "
280					 "dedicated link",
281					 if_indextoname(idx, buf));
282			return;
283		}
284	}
285	dlog(LOG_ERR, "no dedicated links available!");
286}
287
288static void interface_handler(void *data)
289{
290	int idx = multichannel_get_current_ifindex(STATE_SYNC(channel));
291	unsigned int flags;
292
293	nlif_catch(STATE_SYNC(interface));
294	nlif_get_ifflags(STATE_SYNC(interface), idx, &flags);
295	if (!(flags & IFF_RUNNING) || !(flags & IFF_UP))
296		interface_candidate();
297}
298
299static void do_reset_cache_alarm(struct alarm_block *a, void *data)
300{
301	STATE(stats).nl_kernel_table_flush++;
302	dlog(LOG_NOTICE, "flushing kernel conntrack table (scheduled)");
303
304	/* fork a child process that performs the flush operation,
305	 * meanwhile the parent process handles events. */
306	if (fork_process_new(CTD_PROC_FLUSH, CTD_PROC_F_EXCL,
307			     NULL, NULL) == 0) {
308		nl_flush_conntrack_table_selective();
309		exit(EXIT_SUCCESS);
310	}
311	/* this is not required if events don't get lost */
312	STATE(mode)->internal->ct.flush();
313}
314
315static void commit_cb(void *data)
316{
317	int ret;
318
319	read_evfd(STATE_SYNC(commit).evfd);
320
321	ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
322	if (ret == 0) {
323		/* we still have things in the callback queue. */
324		if (STATE_SYNC(commit).rq[1].cb) {
325			int fd = STATE_SYNC(commit).clientfd;
326
327			STATE_SYNC(commit).rq[0].cb =
328				STATE_SYNC(commit).rq[1].cb;
329
330			STATE_SYNC(commit).rq[1].cb = NULL;
331
332			STATE_SYNC(commit).clientfd = -1;
333			STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
334		} else {
335			/* Close the client socket now, we're done. */
336			close(STATE_SYNC(commit).clientfd);
337			STATE_SYNC(commit).clientfd = -1;
338		}
339	}
340}
341
342static void channel_accept_cb(void *data)
343{
344	struct channel *c = data;
345	int fd;
346
347	fd = channel_accept(data);
348	if (fd < 0)
349		return;
350
351	register_fd(fd, channel_handler, c, STATE(fds));
352}
353
354static void tx_queue_cb(void *data)
355{
356	STATE_SYNC(sync)->xmit();
357
358	/* flush pending messages */
359	multichannel_send_flush(STATE_SYNC(channel));
360}
361
362static int init_sync(void)
363{
364	int i;
365
366	state.sync = malloc(sizeof(struct ct_sync_state));
367	if (!state.sync) {
368		dlog(LOG_ERR, "can't allocate memory for sync");
369		return -1;
370	}
371	memset(state.sync, 0, sizeof(struct ct_sync_state));
372
373	if (CONFIG(flags) & CTD_SYNC_FTFW)
374		STATE_SYNC(sync) = &sync_ftfw;
375	else if (CONFIG(flags) & CTD_SYNC_ALARM)
376		STATE_SYNC(sync) = &sync_alarm;
377	else if (CONFIG(flags) & CTD_SYNC_NOTRACK)
378		STATE_SYNC(sync) = &sync_notrack;
379	else {
380		fprintf(stderr, "WARNING: No synchronization mode specified. "
381				"Defaulting to FT-FW mode.\n");
382		CONFIG(flags) |= CTD_SYNC_FTFW;
383		STATE_SYNC(sync) = &sync_ftfw;
384	}
385
386	if (STATE_SYNC(sync)->init)
387		STATE_SYNC(sync)->init();
388
389	if (CONFIG(sync).internal_cache_disable == 0) {
390		STATE(mode)->internal = &internal_cache;
391	} else {
392		STATE(mode)->internal = &internal_bypass;
393		dlog(LOG_NOTICE, "disabling internal cache");
394
395	}
396	if (STATE(mode)->internal->init() == -1)
397		return -1;
398
399	if (CONFIG(sync).external_cache_disable == 0) {
400		STATE_SYNC(external) = &external_cache;
401	} else {
402		STATE_SYNC(external) = &external_inject;
403		dlog(LOG_NOTICE, "disabling external cache");
404	}
405	if (STATE_SYNC(external)->init() == -1)
406		return -1;
407
408	if (channel_init() == -1)
409		return -1;
410
411	/* channel to send events on the wire */
412	STATE_SYNC(channel) =
413		multichannel_open(CONFIG(channel), CONFIG(channel_num));
414	if (STATE_SYNC(channel) == NULL) {
415		dlog(LOG_ERR, "can't open channel socket");
416		return -1;
417	}
418	for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
419		int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);
420		fcntl(fd, F_SETFL, O_NONBLOCK);
421
422		switch(channel_type(STATE_SYNC(channel)->channel[i])) {
423		case CHANNEL_T_STREAM:
424			register_fd(fd, channel_accept_cb,
425					STATE_SYNC(channel)->channel[i],
426					STATE(fds));
427			break;
428		case CHANNEL_T_DATAGRAM:
429			register_fd(fd, channel_handler,
430					STATE_SYNC(channel)->channel[i],
431					STATE(fds));
432			break;
433		}
434	}
435
436	STATE_SYNC(interface) = nl_init_interface_handler();
437	if (!STATE_SYNC(interface)) {
438		dlog(LOG_ERR, "can't open interface watcher");
439		return -1;
440	}
441	if (register_fd(nlif_fd(STATE_SYNC(interface)),
442			interface_handler, NULL, STATE(fds)) == -1)
443		return -1;
444
445	STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD);
446	if (STATE_SYNC(tx_queue) == NULL) {
447		dlog(LOG_ERR, "cannot create tx queue");
448		return -1;
449	}
450	if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),
451			tx_queue_cb, NULL, STATE(fds)) == -1)
452		return -1;
453
454	STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0);
455	if (STATE_SYNC(commit).h == NULL) {
456		dlog(LOG_ERR, "can't create handler to commit");
457		return -1;
458	}
459	origin_register(STATE_SYNC(commit).h, CTD_ORIGIN_COMMIT);
460
461	STATE_SYNC(commit).evfd = create_evfd();
462	if (STATE_SYNC(commit).evfd == NULL) {
463		dlog(LOG_ERR, "can't create eventfd to commit");
464		return -1;
465	}
466	if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd),
467				commit_cb, NULL, STATE(fds)) == -1) {
468		return -1;
469	}
470	STATE_SYNC(commit).clientfd = -1;
471
472	init_alarm(&STATE_SYNC(reset_cache_alarm), NULL, do_reset_cache_alarm);
473
474	/* initialization of message sequence generation */
475	STATE_SYNC(last_seq_sent) = time(NULL);
476
477	return 0;
478}
479
480static void kill_sync(void)
481{
482	STATE(mode)->internal->close();
483	STATE_SYNC(external)->close();
484
485	multichannel_close(STATE_SYNC(channel));
486
487	nlif_close(STATE_SYNC(interface));
488
489	queue_destroy(STATE_SYNC(tx_queue));
490
491	channel_end();
492
493	origin_unregister(STATE_SYNC(commit).h);
494	nfct_close(STATE_SYNC(commit).h);
495	destroy_evfd(STATE_SYNC(commit).evfd);
496
497	if (STATE_SYNC(sync)->kill)
498		STATE_SYNC(sync)->kill();
499}
500
501static void dump_stats_sync(int fd)
502{
503	char buf[512];
504	int size;
505
506	size = sprintf(buf, "message tracking:\n"
507			    "%20llu Malformed msgs "
508			    "%20llu Lost msgs\n\n",
509			(unsigned long long)STATE_SYNC(error).msg_rcv_malformed,
510			(unsigned long long)STATE_SYNC(error).msg_rcv_lost);
511
512	send(fd, buf, size, 0);
513}
514
515static void dump_stats_sync_extended(int fd)
516{
517	char buf[512];
518	int size;
519
520	size = snprintf(buf, sizeof(buf),
521			"network statistics:\n"
522			"\trecv:\n"
523			"\t\tMalformed messages:\t%20llu\n"
524			"\t\tWrong protocol version:\t%20u\n"
525			"\t\tMalformed header:\t%20u\n"
526			"\t\tMalformed payload:\t%20u\n"
527			"\t\tBad message type:\t%20u\n"
528			"\t\tTruncated message:\t%20u\n"
529			"\t\tBad message size:\t%20u\n"
530			"\tsend:\n"
531			"\t\tMalformed messages:\t%20u\n\n"
532			"sequence tracking statistics:\n"
533			"\trecv:\n"
534			"\t\tPackets lost:\t\t%20llu\n"
535			"\t\tPackets before:\t\t%20llu\n\n",
536			(unsigned long long)STATE_SYNC(error).msg_rcv_malformed,
537			STATE_SYNC(error).msg_rcv_bad_version,
538			STATE_SYNC(error).msg_rcv_bad_header,
539			STATE_SYNC(error).msg_rcv_bad_payload,
540			STATE_SYNC(error).msg_rcv_bad_type,
541			STATE_SYNC(error).msg_rcv_truncated,
542			STATE_SYNC(error).msg_rcv_bad_size,
543			STATE_SYNC(error).msg_snd_malformed,
544			(unsigned long long)STATE_SYNC(error).msg_rcv_lost,
545			(unsigned long long)STATE_SYNC(error).msg_rcv_before);
546
547	send(fd, buf, size, 0);
548}
549
550static int local_commit(int fd)
551{
552	int ret;
553
554	/* delete the reset alarm if any before committing */
555	del_alarm(&STATE_SYNC(reset_cache_alarm));
556
557	ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
558	if (ret == -1) {
559		dlog(LOG_NOTICE, "commit already in progress, skipping");
560		ret = LOCAL_RET_OK;
561	} else if (ret == 0) {
562		/* we've finished the commit. */
563		ret = LOCAL_RET_OK;
564	} else {
565		/* Keep open the client, we want synchronous commit. */
566		ret = LOCAL_RET_STOLEN;
567	}
568	return ret;
569}
570
571/* handler for requests coming via UNIX socket */
572static int local_handler_sync(int fd, int type, void *data)
573{
574	int ret = LOCAL_RET_OK;
575
576	switch(type) {
577	case CT_DUMP_INTERNAL:
578		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
579			STATE(mode)->internal->ct.dump(fd, NFCT_O_PLAIN);
580			exit(EXIT_SUCCESS);
581		}
582		break;
583	case CT_DUMP_EXTERNAL:
584		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
585			STATE_SYNC(external)->ct.dump(fd, NFCT_O_PLAIN);
586			exit(EXIT_SUCCESS);
587		}
588		break;
589	case CT_DUMP_INT_XML:
590		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
591			STATE(mode)->internal->ct.dump(fd, NFCT_O_XML);
592			exit(EXIT_SUCCESS);
593		}
594		break;
595	case CT_DUMP_EXT_XML:
596		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
597			STATE_SYNC(external)->ct.dump(fd, NFCT_O_XML);
598			exit(EXIT_SUCCESS);
599		}
600		break;
601	case CT_COMMIT:
602		dlog(LOG_NOTICE, "committing conntrack cache");
603		STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
604		STATE_SYNC(commit).rq[1].cb = NULL;
605		ret = local_commit(fd);
606		break;
607	case RESET_TIMERS:
608		if (!alarm_pending(&STATE_SYNC(reset_cache_alarm))) {
609			dlog(LOG_NOTICE, "flushing conntrack table in %d secs",
610					 CONFIG(purge_timeout));
611			add_alarm(&STATE_SYNC(reset_cache_alarm),
612				  CONFIG(purge_timeout), 0);
613		}
614		break;
615	case CT_FLUSH_CACHE:
616		/* if we're still committing, abort this command */
617		if (STATE_SYNC(commit).clientfd != -1) {
618			dlog(LOG_ERR, "ignoring flush command, "
619				      "commit still in progress");
620			break;
621		}
622		/* inmediate flush, remove pending flush scheduled if any */
623		del_alarm(&STATE_SYNC(reset_cache_alarm));
624		dlog(LOG_NOTICE, "flushing caches");
625		STATE(mode)->internal->ct.flush();
626		STATE_SYNC(external)->ct.flush();
627		break;
628	case CT_FLUSH_INT_CACHE:
629		/* inmediate flush, remove pending flush scheduled if any */
630		del_alarm(&STATE_SYNC(reset_cache_alarm));
631		dlog(LOG_NOTICE, "flushing internal cache");
632		STATE(mode)->internal->ct.flush();
633		break;
634	case CT_FLUSH_EXT_CACHE:
635		/* if we're still committing, abort this command */
636		if (STATE_SYNC(commit).clientfd != -1) {
637			dlog(LOG_ERR, "ignoring flush command, "
638				      "commit still in progress");
639			break;
640		}
641		dlog(LOG_NOTICE, "flushing external cache");
642		STATE_SYNC(external)->ct.flush();
643		break;
644	case STATS:
645		STATE(mode)->internal->ct.stats(fd);
646		STATE_SYNC(external)->ct.stats(fd);
647		dump_traffic_stats(fd);
648		multichannel_stats(STATE_SYNC(channel), fd);
649		dump_stats_sync(fd);
650		break;
651	case STATS_NETWORK:
652		dump_stats_sync_extended(fd);
653		multichannel_stats(STATE_SYNC(channel), fd);
654		break;
655	case STATS_CACHE:
656		STATE(mode)->internal->ct.stats_ext(fd);
657		STATE_SYNC(external)->ct.stats_ext(fd);
658		break;
659	case STATS_LINK:
660		multichannel_stats_extended(STATE_SYNC(channel),
661					    STATE_SYNC(interface), fd);
662		break;
663	case STATS_QUEUE:
664		queue_stats_show(fd);
665		break;
666	case EXP_STATS:
667		if (!(CONFIG(flags) & CTD_EXPECT))
668			break;
669
670		STATE(mode)->internal->exp.stats(fd);
671		STATE_SYNC(external)->exp.stats(fd);
672		dump_traffic_stats(fd);
673		multichannel_stats(STATE_SYNC(channel), fd);
674		dump_stats_sync(fd);
675		break;
676	case EXP_DUMP_INTERNAL:
677		if (!(CONFIG(flags) & CTD_EXPECT))
678			break;
679
680		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
681			STATE(mode)->internal->exp.dump(fd, NFCT_O_PLAIN);
682			exit(EXIT_SUCCESS);
683		}
684		break;
685	case EXP_DUMP_EXTERNAL:
686		if (!(CONFIG(flags) & CTD_EXPECT))
687			break;
688
689		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
690			STATE_SYNC(external)->exp.dump(fd, NFCT_O_PLAIN);
691			exit(EXIT_SUCCESS);
692		}
693		break;
694	case EXP_COMMIT:
695		if (!(CONFIG(flags) & CTD_EXPECT))
696			break;
697
698		dlog(LOG_NOTICE, "committing expectation cache");
699		STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->exp.commit;
700		STATE_SYNC(commit).rq[1].cb = NULL;
701		ret = local_commit(fd);
702		break;
703	case ALL_FLUSH_CACHE:
704		/* if we're still committing, abort this command */
705		if (STATE_SYNC(commit).clientfd != -1) {
706			dlog(LOG_ERR, "ignoring flush command, "
707				      "commit still in progress");
708			break;
709		}
710		dlog(LOG_NOTICE, "flushing caches");
711		STATE(mode)->internal->ct.flush();
712		STATE_SYNC(external)->ct.flush();
713		if (CONFIG(flags) & CTD_EXPECT) {
714			STATE(mode)->internal->exp.flush();
715			STATE_SYNC(external)->exp.flush();
716		}
717		break;
718	case ALL_COMMIT:
719		dlog(LOG_NOTICE, "committing all external caches");
720		STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
721		if (CONFIG(flags) & CTD_EXPECT) {
722			STATE_SYNC(commit).rq[1].cb =
723				STATE_SYNC(external)->exp.commit;
724		} else {
725			STATE_SYNC(commit).rq[1].cb = NULL;
726		}
727		ret = local_commit(fd);
728		break;
729	case EXP_DUMP_INT_XML:
730		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
731			STATE(mode)->internal->exp.dump(fd, NFCT_O_XML);
732			exit(EXIT_SUCCESS);
733		}
734		break;
735	case EXP_DUMP_EXT_XML:
736		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
737			STATE_SYNC(external)->exp.dump(fd, NFCT_O_XML);
738			exit(EXIT_SUCCESS);
739		}
740		break;
741	default:
742		if (STATE_SYNC(sync)->local)
743			ret = STATE_SYNC(sync)->local(fd, type, data);
744		break;
745	}
746
747	return ret;
748}
749
750struct ct_mode sync_mode = {
751	.init 			= init_sync,
752	.local			= local_handler_sync,
753	.kill			= kill_sync,
754	/* the internal handler is set in run-time. */
755};
756