1/*
2 * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
3 * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20#include "conntrackd.h"
21#include "sync.h"
22#include "queue.h"
23#include "network.h"
24#include "alarm.h"
25#include "log.h"
26#include "cache.h"
27#include "fds.h"
28
29#include <string.h>
30#include <errno.h>
31
32#if 0
33#define dp printf
34#else
35#define dp(...)
36#endif
37
38struct queue *rs_queue;
39static uint32_t exp_seq;
40static uint32_t window;
41static uint32_t ack_from;
42static int ack_from_set = 0;
43static struct alarm_block alive_alarm;
44
45enum {
46	HELLO_INIT,
47	HELLO_SAY,
48	HELLO_DONE,
49};
50static int hello_state = HELLO_INIT;
51static int say_hello_back;
52
53/* XXX: alive message expiration configurable */
54#define ALIVE_INT 1
55
56struct cache_ftfw {
57	struct queue_node	qnode;
58	struct cache_object	*obj;
59	uint32_t 		seq;
60};
61
62static void cache_ftfw_add(struct cache_object *obj, void *data)
63{
64	struct cache_ftfw *cn = data;
65	cn->obj = obj;
66	/* These nodes are not inserted in the list */
67	queue_node_init(&cn->qnode, Q_ELEM_OBJ);
68}
69
70static void cache_ftfw_del(struct cache_object *obj, void *data)
71{
72	struct cache_ftfw *cn = data;
73	queue_del(&cn->qnode);
74}
75
76static struct cache_extra cache_ftfw_extra = {
77	.size 		= sizeof(struct cache_ftfw),
78	.add		= cache_ftfw_add,
79	.destroy	= cache_ftfw_del
80};
81
82static void nethdr_set_hello(struct nethdr *net)
83{
84	switch(hello_state) {
85	case HELLO_INIT:
86		hello_state = HELLO_SAY;
87		/* fall through */
88	case HELLO_SAY:
89		net->flags |= NET_F_HELLO;
90		break;
91	}
92	if (say_hello_back) {
93		net->flags |= NET_F_HELLO_BACK;
94		say_hello_back = 0;
95	}
96}
97
98static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
99{
100	struct queue_object *qobj;
101	struct nethdr_ack *ack;
102
103	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
104	if (qobj == NULL)
105		return;
106
107	ack		= (struct nethdr_ack *)qobj->data;
108	ack->type 	= NET_T_CTL;
109	ack->flags	= flags;
110	ack->from	= from;
111	ack->to		= to;
112
113	if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0)
114		queue_object_free(qobj);
115}
116
117static void tx_queue_add_ctlmsg2(uint32_t flags)
118{
119	struct queue_object *qobj;
120	struct nethdr *ctl;
121
122	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
123	if (qobj == NULL)
124		return;
125
126	ctl		= (struct nethdr *)qobj->data;
127	ctl->type 	= NET_T_CTL;
128	ctl->flags	= flags;
129
130	if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0)
131		queue_object_free(qobj);
132}
133
134/* this function is called from the alarm framework */
135static void do_alive_alarm(struct alarm_block *a, void *data)
136{
137	if (ack_from_set && nethdr_track_is_seq_set()) {
138		/* exp_seq contains the last update received */
139		tx_queue_add_ctlmsg(NET_F_ACK,
140				    ack_from,
141				    STATE_SYNC(last_seq_recv));
142		ack_from_set = 0;
143	} else
144		tx_queue_add_ctlmsg2(NET_F_ALIVE);
145
146	add_alarm(&alive_alarm, ALIVE_INT, 0);
147}
148
149static int ftfw_init(void)
150{
151	rs_queue = queue_create("rsqueue", CONFIG(resend_queue_size), 0);
152	if (rs_queue == NULL) {
153		dlog(LOG_ERR, "cannot create rs queue");
154		return -1;
155	}
156
157	init_alarm(&alive_alarm, NULL, do_alive_alarm);
158	add_alarm(&alive_alarm, ALIVE_INT, 0);
159
160	/* set ack window size */
161	window = CONFIG(window_size);
162
163	return 0;
164}
165
166static void ftfw_kill(void)
167{
168	queue_destroy(rs_queue);
169}
170
171static int do_cache_to_tx(void *data1, void *data2)
172{
173	struct cache_object *obj = data2;
174	struct cache_ftfw *cn = cache_get_extra(obj);
175
176	if (queue_in(rs_queue, &cn->qnode)) {
177		queue_del(&cn->qnode);
178		queue_add(STATE_SYNC(tx_queue), &cn->qnode);
179	} else {
180		if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0)
181			cache_object_get(obj);
182	}
183	return 0;
184}
185
186static int rs_queue_dump(struct queue_node *n, const void *data2)
187{
188	const int *fd = data2;
189	char buf[512];
190	int size;
191
192	switch(n->type) {
193		case Q_ELEM_CTL: {
194			struct nethdr *net = queue_node_data(n);
195			size = sprintf(buf, "control -> seq:%u flags:%u\n",
196					    net->seq, net->flags);
197			break;
198		}
199		case Q_ELEM_OBJ: {
200			struct cache_ftfw *cn = (struct cache_ftfw *) n;
201			size = sprintf(buf, "object -> seq:%u\n", cn->seq);
202		break;
203		}
204		default:
205			return 0;
206	}
207	send(*fd, buf, size, 0);
208	return 0;
209}
210
211static void ftfw_local_queue(int fd)
212{
213	char buf[512];
214	int size;
215
216	size = sprintf(buf, "resent queue (len=%u)\n", queue_len(rs_queue));
217	send(fd, buf, size, 0);
218	queue_iterate(rs_queue, &fd, rs_queue_dump);
219}
220
221static int ftfw_local(int fd, int type, void *data)
222{
223	int ret = LOCAL_RET_OK;
224
225	switch(type) {
226	case REQUEST_DUMP:
227		dlog(LOG_NOTICE, "request resync");
228		tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0);
229		break;
230	case SEND_BULK:
231		dlog(LOG_NOTICE, "sending bulk update");
232		cache_iterate(STATE(mode)->internal->ct.data,
233			      NULL, do_cache_to_tx);
234		cache_iterate(STATE(mode)->internal->exp.data,
235			      NULL, do_cache_to_tx);
236		break;
237	case STATS_RSQUEUE:
238		ftfw_local_queue(fd);
239		break;
240	}
241
242	return ret;
243}
244
245static int rs_queue_to_tx(struct queue_node *n, const void *data)
246{
247	const struct nethdr_ack *nack = data;
248
249	switch(n->type) {
250	case Q_ELEM_CTL: {
251		struct nethdr_ack *net = queue_node_data(n);
252
253		if (before(net->seq, nack->from))
254			return 0;	/* continue */
255		else if (after(net->seq, nack->to))
256			return 1;	/* break */
257
258		dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
259			net->seq, net->flags, net->len);
260
261		queue_del(n);
262		queue_add(STATE_SYNC(tx_queue), n);
263		break;
264	}
265	case Q_ELEM_OBJ: {
266		struct cache_ftfw *cn;
267
268		cn = (struct cache_ftfw *) n;
269		if (before(cn->seq, nack->from))
270			return 0;
271		else if (after(cn->seq, nack->to))
272			return 1;
273
274		dp("resending nack'ed (oldseq=%u)\n", cn->seq);
275
276		queue_del(n);
277		queue_add(STATE_SYNC(tx_queue), n);
278		break;
279	}
280	}
281	return 0;
282}
283
284static int rs_queue_empty(struct queue_node *n, const void *data)
285{
286	const struct nethdr_ack *h = data;
287
288	switch(n->type) {
289	case Q_ELEM_CTL: {
290		struct nethdr_ack *net = queue_node_data(n);
291
292		if (h == NULL) {
293			queue_del(n);
294			queue_object_free((struct queue_object *)n);
295			return 0;
296		}
297		if (before(net->seq, h->from))
298			return 0;	/* continue */
299		else if (after(net->seq, h->to))
300			return 1;	/* break */
301
302		dp("remove from queue (seq=%u)\n", net->seq);
303		queue_del(n);
304		queue_object_free((struct queue_object *)n);
305		break;
306	}
307	case Q_ELEM_OBJ: {
308		struct cache_ftfw *cn;
309
310		cn = (struct cache_ftfw *) n;
311		if (h == NULL) {
312			queue_del(n);
313			cache_object_put(cn->obj);
314			return 0;
315		}
316		if (before(cn->seq, h->from))
317			return 0;
318		else if (after(cn->seq, h->to))
319			return 1;
320
321		dp("queue: deleting from queue (seq=%u)\n", cn->seq);
322		queue_del(n);
323		cache_object_put(cn->obj);
324		break;
325	}
326	}
327	return 0;
328}
329
330static int digest_msg(const struct nethdr *net)
331{
332	if (IS_DATA(net))
333		return MSG_DATA;
334
335	else if (IS_ACK(net)) {
336		const struct nethdr_ack *h = (const struct nethdr_ack *) net;
337
338		if (before(h->to, h->from))
339			return MSG_BAD;
340
341		queue_iterate(rs_queue, h, rs_queue_empty);
342		return MSG_CTL;
343
344	} else if (IS_NACK(net)) {
345		const struct nethdr_ack *nack = (const struct nethdr_ack *) net;
346
347		if (before(nack->to, nack->from))
348			return MSG_BAD;
349
350		queue_iterate(rs_queue, nack, rs_queue_to_tx);
351		return MSG_CTL;
352
353	} else if (IS_RESYNC(net)) {
354		dp("RESYNC ALL\n");
355		cache_iterate(STATE(mode)->internal->ct.data, NULL,
356			      do_cache_to_tx);
357		cache_iterate(STATE(mode)->internal->exp.data, NULL,
358			      do_cache_to_tx);
359		return MSG_CTL;
360
361	} else if (IS_ALIVE(net))
362		return MSG_CTL;
363
364	return MSG_BAD;
365}
366
367static int digest_hello(const struct nethdr *net)
368{
369	int ret = 0;
370
371	if (IS_HELLO(net)) {
372		say_hello_back = 1;
373		ret = 1;
374	}
375	if (IS_HELLO_BACK(net)) {
376		/* this is a hello back for a requested hello */
377		if (hello_state == HELLO_SAY)
378			hello_state = HELLO_DONE;
379	}
380
381	return ret;
382}
383
384static int ftfw_recv(const struct nethdr *net)
385{
386	int ret = MSG_DATA;
387
388	if (digest_hello(net)) {
389		/* we have received a hello while we had data to acknowledge.
390		 * reset the window, the other doesn't know anthing about it. */
391		if (ack_from_set && before(net->seq, ack_from)) {
392			window = CONFIG(window_size) - 1;
393			ack_from = net->seq;
394		}
395
396		/* XXX: flush the resend queues since the other does not
397		 * know anything about that data, we are unreliable until
398		 * the helloing finishes */
399		queue_iterate(rs_queue, NULL, rs_queue_empty);
400
401		goto bypass;
402	}
403
404	switch (nethdr_track_seq(net->seq, &exp_seq)) {
405	case SEQ_AFTER:
406		ret = digest_msg(net);
407		if (ret == MSG_BAD) {
408			ret = MSG_BAD;
409			goto out;
410		}
411
412		if (ack_from_set) {
413			tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
414			ack_from_set = 0;
415		}
416
417		tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
418
419		/* count this message as part of the new window */
420		window = CONFIG(window_size) - 1;
421		ack_from = net->seq;
422		ack_from_set = 1;
423		break;
424
425	case SEQ_BEFORE:
426		/* we don't accept delayed packets */
427		ret = MSG_DROP;
428		break;
429
430	case SEQ_UNSET:
431	case SEQ_IN_SYNC:
432bypass:
433		ret = digest_msg(net);
434		if (ret == MSG_BAD) {
435			ret = MSG_BAD;
436			goto out;
437		}
438
439		if (!ack_from_set) {
440			ack_from_set = 1;
441			ack_from = net->seq;
442		}
443
444		if (--window <= 0) {
445			/* received a window, send an acknowledgement */
446			tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
447			window = CONFIG(window_size);
448			ack_from_set = 0;
449		}
450	}
451
452out:
453	if ((ret == MSG_DATA || ret == MSG_CTL))
454		nethdr_track_update_seq(net->seq);
455
456	return ret;
457}
458
459static void rs_queue_purge_full(void)
460{
461	struct queue_node *n;
462
463	n = queue_del_head(rs_queue);
464	switch(n->type) {
465	case Q_ELEM_CTL: {
466		struct queue_object *qobj = (struct queue_object *)n;
467		queue_object_free(qobj);
468		break;
469	}
470	case Q_ELEM_OBJ: {
471		struct cache_ftfw *cn;
472
473		cn = (struct cache_ftfw *)n;
474		cache_object_put(cn->obj);
475		break;
476	}
477	}
478}
479
480static int tx_queue_xmit(struct queue_node *n, const void *data)
481{
482	queue_del(n);
483
484	switch(n->type) {
485	case Q_ELEM_CTL: {
486		struct nethdr *net = queue_node_data(n);
487
488		nethdr_set_hello(net);
489
490		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
491			nethdr_set_ack(net);
492		} else {
493			nethdr_set_ctl(net);
494		}
495		HDR_HOST2NETWORK(net);
496
497		dp("tx_queue sq: %u fl:%u len:%u\n",
498	               ntohl(net->seq), net->flags, ntohs(net->len));
499
500		multichannel_send(STATE_SYNC(channel), net);
501		HDR_NETWORK2HOST(net);
502
503		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
504			if (queue_add(rs_queue, n) < 0) {
505				if (errno == ENOSPC) {
506					rs_queue_purge_full();
507					queue_add(rs_queue, n);
508				}
509			}
510		} else
511			queue_object_free((struct queue_object *)n);
512		break;
513	}
514	case Q_ELEM_OBJ: {
515		struct cache_ftfw *cn;
516		int type;
517		struct nethdr *net;
518
519		cn = (struct cache_ftfw *)n;
520		type = object_status_to_network_type(cn->obj);
521		net = cn->obj->cache->ops->build_msg(cn->obj, type);
522		nethdr_set_hello(net);
523
524		dp("tx_list sq: %u fl:%u len:%u\n",
525	                ntohl(net->seq), net->flags, ntohs(net->len));
526
527		multichannel_send(STATE_SYNC(channel), net);
528		cn->seq = ntohl(net->seq);
529		if (queue_add(rs_queue, &cn->qnode) < 0) {
530			if (errno == ENOSPC) {
531				rs_queue_purge_full();
532				queue_add(rs_queue, &cn->qnode);
533			}
534		}
535		/* we release the object once we get the acknowlegment */
536		break;
537	}
538	}
539
540	return 0;
541}
542
543static void ftfw_xmit(void)
544{
545	queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
546	add_alarm(&alive_alarm, ALIVE_INT, 0);
547	dp("tx_queue_len:%u rs_queue_len:%u\n",
548		queue_len(tx_queue), queue_len(rs_queue));
549}
550
551static void ftfw_enqueue(struct cache_object *obj, int type)
552{
553	struct cache_ftfw *cn = cache_get_extra(obj);
554	if (queue_in(rs_queue, &cn->qnode)) {
555		queue_del(&cn->qnode);
556		queue_add(STATE_SYNC(tx_queue), &cn->qnode);
557	} else {
558		if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0)
559			cache_object_get(obj);
560	}
561}
562
563struct sync_mode sync_ftfw = {
564	.internal_cache_flags	= NO_FEATURES,
565	.external_cache_flags	= NO_FEATURES,
566	.internal_cache_extra	= &cache_ftfw_extra,
567	.init			= ftfw_init,
568	.kill			= ftfw_kill,
569	.local			= ftfw_local,
570	.recv			= ftfw_recv,
571	.enqueue		= ftfw_enqueue,
572	.xmit			= ftfw_xmit,
573};
574