1/* 2 * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org> 3 * (C) 2011 by Vyatta Inc. <http://www.vyatta.com> 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License 16 * along with this program; if not, write to the Free Software 17 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 18 */ 19 20#include "conntrackd.h" 21#include "sync.h" 22#include "queue.h" 23#include "network.h" 24#include "alarm.h" 25#include "log.h" 26#include "cache.h" 27#include "fds.h" 28 29#include <string.h> 30#include <errno.h> 31 32#if 0 33#define dp printf 34#else 35#define dp(...) 36#endif 37 38struct queue *rs_queue; 39static uint32_t exp_seq; 40static uint32_t window; 41static uint32_t ack_from; 42static int ack_from_set = 0; 43static struct alarm_block alive_alarm; 44 45enum { 46 HELLO_INIT, 47 HELLO_SAY, 48 HELLO_DONE, 49}; 50static int hello_state = HELLO_INIT; 51static int say_hello_back; 52 53/* XXX: alive message expiration configurable */ 54#define ALIVE_INT 1 55 56struct cache_ftfw { 57 struct queue_node qnode; 58 struct cache_object *obj; 59 uint32_t seq; 60}; 61 62static void cache_ftfw_add(struct cache_object *obj, void *data) 63{ 64 struct cache_ftfw *cn = data; 65 cn->obj = obj; 66 /* These nodes are not inserted in the list */ 67 queue_node_init(&cn->qnode, Q_ELEM_OBJ); 68} 69 70static void cache_ftfw_del(struct cache_object *obj, void *data) 71{ 72 struct cache_ftfw *cn = data; 73 queue_del(&cn->qnode); 74} 75 76static struct cache_extra cache_ftfw_extra = { 77 .size = sizeof(struct cache_ftfw), 78 .add = cache_ftfw_add, 79 .destroy = cache_ftfw_del 80}; 81 82static void nethdr_set_hello(struct nethdr *net) 83{ 84 switch(hello_state) { 85 case HELLO_INIT: 86 hello_state = HELLO_SAY; 87 /* fall through */ 88 case HELLO_SAY: 89 net->flags |= NET_F_HELLO; 90 break; 91 } 92 if (say_hello_back) { 93 net->flags |= NET_F_HELLO_BACK; 94 say_hello_back = 0; 95 } 96} 97 98static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) 99{ 100 struct queue_object *qobj; 101 struct nethdr_ack *ack; 102 103 qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); 104 if (qobj == NULL) 105 return; 106 107 ack = (struct nethdr_ack *)qobj->data; 108 ack->type = NET_T_CTL; 109 ack->flags = flags; 110 ack->from = from; 111 ack->to = to; 112 113 if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0) 114 queue_object_free(qobj); 115} 116 117static void tx_queue_add_ctlmsg2(uint32_t flags) 118{ 119 struct queue_object *qobj; 120 struct nethdr *ctl; 121 122 qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); 123 if (qobj == NULL) 124 return; 125 126 ctl = (struct nethdr *)qobj->data; 127 ctl->type = NET_T_CTL; 128 ctl->flags = flags; 129 130 if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0) 131 queue_object_free(qobj); 132} 133 134/* this function is called from the alarm framework */ 135static void do_alive_alarm(struct alarm_block *a, void *data) 136{ 137 if (ack_from_set && nethdr_track_is_seq_set()) { 138 /* exp_seq contains the last update received */ 139 tx_queue_add_ctlmsg(NET_F_ACK, 140 ack_from, 141 STATE_SYNC(last_seq_recv)); 142 ack_from_set = 0; 143 } else 144 tx_queue_add_ctlmsg2(NET_F_ALIVE); 145 146 add_alarm(&alive_alarm, ALIVE_INT, 0); 147} 148 149static int ftfw_init(void) 150{ 151 rs_queue = queue_create("rsqueue", CONFIG(resend_queue_size), 0); 152 if (rs_queue == NULL) { 153 dlog(LOG_ERR, "cannot create rs queue"); 154 return -1; 155 } 156 157 init_alarm(&alive_alarm, NULL, do_alive_alarm); 158 add_alarm(&alive_alarm, ALIVE_INT, 0); 159 160 /* set ack window size */ 161 window = CONFIG(window_size); 162 163 return 0; 164} 165 166static void ftfw_kill(void) 167{ 168 queue_destroy(rs_queue); 169} 170 171static int do_cache_to_tx(void *data1, void *data2) 172{ 173 struct cache_object *obj = data2; 174 struct cache_ftfw *cn = cache_get_extra(obj); 175 176 if (queue_in(rs_queue, &cn->qnode)) { 177 queue_del(&cn->qnode); 178 queue_add(STATE_SYNC(tx_queue), &cn->qnode); 179 } else { 180 if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0) 181 cache_object_get(obj); 182 } 183 return 0; 184} 185 186static int rs_queue_dump(struct queue_node *n, const void *data2) 187{ 188 const int *fd = data2; 189 char buf[512]; 190 int size; 191 192 switch(n->type) { 193 case Q_ELEM_CTL: { 194 struct nethdr *net = queue_node_data(n); 195 size = sprintf(buf, "control -> seq:%u flags:%u\n", 196 net->seq, net->flags); 197 break; 198 } 199 case Q_ELEM_OBJ: { 200 struct cache_ftfw *cn = (struct cache_ftfw *) n; 201 size = sprintf(buf, "object -> seq:%u\n", cn->seq); 202 break; 203 } 204 default: 205 return 0; 206 } 207 send(*fd, buf, size, 0); 208 return 0; 209} 210 211static void ftfw_local_queue(int fd) 212{ 213 char buf[512]; 214 int size; 215 216 size = sprintf(buf, "resent queue (len=%u)\n", queue_len(rs_queue)); 217 send(fd, buf, size, 0); 218 queue_iterate(rs_queue, &fd, rs_queue_dump); 219} 220 221static int ftfw_local(int fd, int type, void *data) 222{ 223 int ret = LOCAL_RET_OK; 224 225 switch(type) { 226 case REQUEST_DUMP: 227 dlog(LOG_NOTICE, "request resync"); 228 tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); 229 break; 230 case SEND_BULK: 231 dlog(LOG_NOTICE, "sending bulk update"); 232 cache_iterate(STATE(mode)->internal->ct.data, 233 NULL, do_cache_to_tx); 234 cache_iterate(STATE(mode)->internal->exp.data, 235 NULL, do_cache_to_tx); 236 break; 237 case STATS_RSQUEUE: 238 ftfw_local_queue(fd); 239 break; 240 } 241 242 return ret; 243} 244 245static int rs_queue_to_tx(struct queue_node *n, const void *data) 246{ 247 const struct nethdr_ack *nack = data; 248 249 switch(n->type) { 250 case Q_ELEM_CTL: { 251 struct nethdr_ack *net = queue_node_data(n); 252 253 if (before(net->seq, nack->from)) 254 return 0; /* continue */ 255 else if (after(net->seq, nack->to)) 256 return 1; /* break */ 257 258 dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", 259 net->seq, net->flags, net->len); 260 261 queue_del(n); 262 queue_add(STATE_SYNC(tx_queue), n); 263 break; 264 } 265 case Q_ELEM_OBJ: { 266 struct cache_ftfw *cn; 267 268 cn = (struct cache_ftfw *) n; 269 if (before(cn->seq, nack->from)) 270 return 0; 271 else if (after(cn->seq, nack->to)) 272 return 1; 273 274 dp("resending nack'ed (oldseq=%u)\n", cn->seq); 275 276 queue_del(n); 277 queue_add(STATE_SYNC(tx_queue), n); 278 break; 279 } 280 } 281 return 0; 282} 283 284static int rs_queue_empty(struct queue_node *n, const void *data) 285{ 286 const struct nethdr_ack *h = data; 287 288 switch(n->type) { 289 case Q_ELEM_CTL: { 290 struct nethdr_ack *net = queue_node_data(n); 291 292 if (h == NULL) { 293 queue_del(n); 294 queue_object_free((struct queue_object *)n); 295 return 0; 296 } 297 if (before(net->seq, h->from)) 298 return 0; /* continue */ 299 else if (after(net->seq, h->to)) 300 return 1; /* break */ 301 302 dp("remove from queue (seq=%u)\n", net->seq); 303 queue_del(n); 304 queue_object_free((struct queue_object *)n); 305 break; 306 } 307 case Q_ELEM_OBJ: { 308 struct cache_ftfw *cn; 309 310 cn = (struct cache_ftfw *) n; 311 if (h == NULL) { 312 queue_del(n); 313 cache_object_put(cn->obj); 314 return 0; 315 } 316 if (before(cn->seq, h->from)) 317 return 0; 318 else if (after(cn->seq, h->to)) 319 return 1; 320 321 dp("queue: deleting from queue (seq=%u)\n", cn->seq); 322 queue_del(n); 323 cache_object_put(cn->obj); 324 break; 325 } 326 } 327 return 0; 328} 329 330static int digest_msg(const struct nethdr *net) 331{ 332 if (IS_DATA(net)) 333 return MSG_DATA; 334 335 else if (IS_ACK(net)) { 336 const struct nethdr_ack *h = (const struct nethdr_ack *) net; 337 338 if (before(h->to, h->from)) 339 return MSG_BAD; 340 341 queue_iterate(rs_queue, h, rs_queue_empty); 342 return MSG_CTL; 343 344 } else if (IS_NACK(net)) { 345 const struct nethdr_ack *nack = (const struct nethdr_ack *) net; 346 347 if (before(nack->to, nack->from)) 348 return MSG_BAD; 349 350 queue_iterate(rs_queue, nack, rs_queue_to_tx); 351 return MSG_CTL; 352 353 } else if (IS_RESYNC(net)) { 354 dp("RESYNC ALL\n"); 355 cache_iterate(STATE(mode)->internal->ct.data, NULL, 356 do_cache_to_tx); 357 cache_iterate(STATE(mode)->internal->exp.data, NULL, 358 do_cache_to_tx); 359 return MSG_CTL; 360 361 } else if (IS_ALIVE(net)) 362 return MSG_CTL; 363 364 return MSG_BAD; 365} 366 367static int digest_hello(const struct nethdr *net) 368{ 369 int ret = 0; 370 371 if (IS_HELLO(net)) { 372 say_hello_back = 1; 373 ret = 1; 374 } 375 if (IS_HELLO_BACK(net)) { 376 /* this is a hello back for a requested hello */ 377 if (hello_state == HELLO_SAY) 378 hello_state = HELLO_DONE; 379 } 380 381 return ret; 382} 383 384static int ftfw_recv(const struct nethdr *net) 385{ 386 int ret = MSG_DATA; 387 388 if (digest_hello(net)) { 389 /* we have received a hello while we had data to acknowledge. 390 * reset the window, the other doesn't know anthing about it. */ 391 if (ack_from_set && before(net->seq, ack_from)) { 392 window = CONFIG(window_size) - 1; 393 ack_from = net->seq; 394 } 395 396 /* XXX: flush the resend queues since the other does not 397 * know anything about that data, we are unreliable until 398 * the helloing finishes */ 399 queue_iterate(rs_queue, NULL, rs_queue_empty); 400 401 goto bypass; 402 } 403 404 switch (nethdr_track_seq(net->seq, &exp_seq)) { 405 case SEQ_AFTER: 406 ret = digest_msg(net); 407 if (ret == MSG_BAD) { 408 ret = MSG_BAD; 409 goto out; 410 } 411 412 if (ack_from_set) { 413 tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1); 414 ack_from_set = 0; 415 } 416 417 tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); 418 419 /* count this message as part of the new window */ 420 window = CONFIG(window_size) - 1; 421 ack_from = net->seq; 422 ack_from_set = 1; 423 break; 424 425 case SEQ_BEFORE: 426 /* we don't accept delayed packets */ 427 ret = MSG_DROP; 428 break; 429 430 case SEQ_UNSET: 431 case SEQ_IN_SYNC: 432bypass: 433 ret = digest_msg(net); 434 if (ret == MSG_BAD) { 435 ret = MSG_BAD; 436 goto out; 437 } 438 439 if (!ack_from_set) { 440 ack_from_set = 1; 441 ack_from = net->seq; 442 } 443 444 if (--window <= 0) { 445 /* received a window, send an acknowledgement */ 446 tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq); 447 window = CONFIG(window_size); 448 ack_from_set = 0; 449 } 450 } 451 452out: 453 if ((ret == MSG_DATA || ret == MSG_CTL)) 454 nethdr_track_update_seq(net->seq); 455 456 return ret; 457} 458 459static void rs_queue_purge_full(void) 460{ 461 struct queue_node *n; 462 463 n = queue_del_head(rs_queue); 464 switch(n->type) { 465 case Q_ELEM_CTL: { 466 struct queue_object *qobj = (struct queue_object *)n; 467 queue_object_free(qobj); 468 break; 469 } 470 case Q_ELEM_OBJ: { 471 struct cache_ftfw *cn; 472 473 cn = (struct cache_ftfw *)n; 474 cache_object_put(cn->obj); 475 break; 476 } 477 } 478} 479 480static int tx_queue_xmit(struct queue_node *n, const void *data) 481{ 482 queue_del(n); 483 484 switch(n->type) { 485 case Q_ELEM_CTL: { 486 struct nethdr *net = queue_node_data(n); 487 488 nethdr_set_hello(net); 489 490 if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { 491 nethdr_set_ack(net); 492 } else { 493 nethdr_set_ctl(net); 494 } 495 HDR_HOST2NETWORK(net); 496 497 dp("tx_queue sq: %u fl:%u len:%u\n", 498 ntohl(net->seq), net->flags, ntohs(net->len)); 499 500 multichannel_send(STATE_SYNC(channel), net); 501 HDR_NETWORK2HOST(net); 502 503 if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { 504 if (queue_add(rs_queue, n) < 0) { 505 if (errno == ENOSPC) { 506 rs_queue_purge_full(); 507 queue_add(rs_queue, n); 508 } 509 } 510 } else 511 queue_object_free((struct queue_object *)n); 512 break; 513 } 514 case Q_ELEM_OBJ: { 515 struct cache_ftfw *cn; 516 int type; 517 struct nethdr *net; 518 519 cn = (struct cache_ftfw *)n; 520 type = object_status_to_network_type(cn->obj); 521 net = cn->obj->cache->ops->build_msg(cn->obj, type); 522 nethdr_set_hello(net); 523 524 dp("tx_list sq: %u fl:%u len:%u\n", 525 ntohl(net->seq), net->flags, ntohs(net->len)); 526 527 multichannel_send(STATE_SYNC(channel), net); 528 cn->seq = ntohl(net->seq); 529 if (queue_add(rs_queue, &cn->qnode) < 0) { 530 if (errno == ENOSPC) { 531 rs_queue_purge_full(); 532 queue_add(rs_queue, &cn->qnode); 533 } 534 } 535 /* we release the object once we get the acknowlegment */ 536 break; 537 } 538 } 539 540 return 0; 541} 542 543static void ftfw_xmit(void) 544{ 545 queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); 546 add_alarm(&alive_alarm, ALIVE_INT, 0); 547 dp("tx_queue_len:%u rs_queue_len:%u\n", 548 queue_len(tx_queue), queue_len(rs_queue)); 549} 550 551static void ftfw_enqueue(struct cache_object *obj, int type) 552{ 553 struct cache_ftfw *cn = cache_get_extra(obj); 554 if (queue_in(rs_queue, &cn->qnode)) { 555 queue_del(&cn->qnode); 556 queue_add(STATE_SYNC(tx_queue), &cn->qnode); 557 } else { 558 if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0) 559 cache_object_get(obj); 560 } 561} 562 563struct sync_mode sync_ftfw = { 564 .internal_cache_flags = NO_FEATURES, 565 .external_cache_flags = NO_FEATURES, 566 .internal_cache_extra = &cache_ftfw_extra, 567 .init = ftfw_init, 568 .kill = ftfw_kill, 569 .local = ftfw_local, 570 .recv = ftfw_recv, 571 .enqueue = ftfw_enqueue, 572 .xmit = ftfw_xmit, 573}; 574