1/* $Id$ */ 2 3/*** 4 This file is part of avahi. 5 6 avahi is free software; you can redistribute it and/or modify it 7 under the terms of the GNU Lesser General Public License as 8 published by the Free Software Foundation; either version 2.1 of the 9 License, or (at your option) any later version. 10 11 avahi is distributed in the hope that it will be useful, but WITHOUT 12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 13 or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General 14 Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with avahi; if not, write to the Free Software 18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 19 USA. 20***/ 21 22#ifdef HAVE_CONFIG_H 23#include <config.h> 24#endif 25 26#include <stdlib.h> 27 28#include <avahi-common/timeval.h> 29#include <avahi-common/malloc.h> 30 31#include "response-sched.h" 32#include "log.h" 33#include "rr-util.h" 34 35/* Local packets are supressed this long after sending them */ 36#define AVAHI_RESPONSE_HISTORY_MSEC 500 37 38/* Local packets are deferred this long before sending them */ 39#define AVAHI_RESPONSE_DEFER_MSEC 20 40 41/* Additional jitter for deferred packets */ 42#define AVAHI_RESPONSE_JITTER_MSEC 100 43 44/* Remote packets can suppress local traffic as long as this value */ 45#define AVAHI_RESPONSE_SUPPRESS_MSEC 700 46 47typedef struct AvahiResponseJob AvahiResponseJob; 48 49typedef enum { 50 AVAHI_SCHEDULED, 51 AVAHI_DONE, 52 AVAHI_SUPPRESSED 53} AvahiResponseJobState; 54 55struct AvahiResponseJob { 56 AvahiResponseScheduler *scheduler; 57 AvahiTimeEvent *time_event; 58 59 AvahiResponseJobState state; 60 struct timeval delivery; 61 62 AvahiRecord *record; 63 int flush_cache; 64 AvahiAddress querier; 65 int querier_valid; 66 67 AVAHI_LLIST_FIELDS(AvahiResponseJob, jobs); 68}; 69 70struct AvahiResponseScheduler { 71 AvahiInterface *interface; 72 AvahiTimeEventQueue *time_event_queue; 73 74 AVAHI_LLIST_HEAD(AvahiResponseJob, jobs); 75 AVAHI_LLIST_HEAD(AvahiResponseJob, history); 76 AVAHI_LLIST_HEAD(AvahiResponseJob, suppressed); 77}; 78 79static AvahiResponseJob* job_new(AvahiResponseScheduler *s, AvahiRecord *record, AvahiResponseJobState state) { 80 AvahiResponseJob *rj; 81 82 assert(s); 83 assert(record); 84 85 if (!(rj = avahi_new(AvahiResponseJob, 1))) { 86 avahi_log_error(__FILE__": Out of memory"); 87 return NULL; 88 } 89 90 rj->scheduler = s; 91 rj->record = avahi_record_ref(record); 92 rj->time_event = NULL; 93 rj->flush_cache = 0; 94 rj->querier_valid = 0; 95 96 if ((rj->state = state) == AVAHI_SCHEDULED) 97 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->jobs, rj); 98 else if (rj->state == AVAHI_DONE) 99 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj); 100 else /* rj->state == AVAHI_SUPPRESSED */ 101 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->suppressed, rj); 102 103 return rj; 104} 105 106static void job_free(AvahiResponseScheduler *s, AvahiResponseJob *rj) { 107 assert(s); 108 assert(rj); 109 110 if (rj->time_event) 111 avahi_time_event_free(rj->time_event); 112 113 if (rj->state == AVAHI_SCHEDULED) 114 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj); 115 else if (rj->state == AVAHI_DONE) 116 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->history, rj); 117 else /* rj->state == AVAHI_SUPPRESSED */ 118 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->suppressed, rj); 119 120 avahi_record_unref(rj->record); 121 avahi_free(rj); 122} 123 124static void elapse_callback(AvahiTimeEvent *e, void* data); 125 126static void job_set_elapse_time(AvahiResponseScheduler *s, AvahiResponseJob *rj, unsigned msec, unsigned jitter) { 127 struct timeval tv; 128 129 assert(s); 130 assert(rj); 131 132 avahi_elapse_time(&tv, msec, jitter); 133 134 if (rj->time_event) 135 avahi_time_event_update(rj->time_event, &tv); 136 else 137 rj->time_event = avahi_time_event_new(s->time_event_queue, &tv, elapse_callback, rj); 138} 139 140static void job_mark_done(AvahiResponseScheduler *s, AvahiResponseJob *rj) { 141 assert(s); 142 assert(rj); 143 144 assert(rj->state == AVAHI_SCHEDULED); 145 146 AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj); 147 AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj); 148 149 rj->state = AVAHI_DONE; 150 151 job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0); 152 153 gettimeofday(&rj->delivery, NULL); 154} 155 156AvahiResponseScheduler *avahi_response_scheduler_new(AvahiInterface *i) { 157 AvahiResponseScheduler *s; 158 assert(i); 159 160 if (!(s = avahi_new(AvahiResponseScheduler, 1))) { 161 avahi_log_error(__FILE__": Out of memory"); 162 return NULL; 163 } 164 165 s->interface = i; 166 s->time_event_queue = i->monitor->server->time_event_queue; 167 168 AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->jobs); 169 AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->history); 170 AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->suppressed); 171 172 return s; 173} 174 175void avahi_response_scheduler_free(AvahiResponseScheduler *s) { 176 assert(s); 177 178 avahi_response_scheduler_clear(s); 179 avahi_free(s); 180} 181 182void avahi_response_scheduler_clear(AvahiResponseScheduler *s) { 183 assert(s); 184 185 while (s->jobs) 186 job_free(s, s->jobs); 187 while (s->history) 188 job_free(s, s->history); 189 while (s->suppressed) 190 job_free(s, s->suppressed); 191} 192 193static void enumerate_aux_records_callback(AVAHI_GCC_UNUSED AvahiServer *s, AvahiRecord *r, int flush_cache, void* userdata) { 194 AvahiResponseJob *rj = userdata; 195 196 assert(r); 197 assert(rj); 198 199 avahi_response_scheduler_post(rj->scheduler, r, flush_cache, rj->querier_valid ? &rj->querier : NULL, 0); 200} 201 202static int packet_add_response_job(AvahiResponseScheduler *s, AvahiDnsPacket *p, AvahiResponseJob *rj) { 203 assert(s); 204 assert(p); 205 assert(rj); 206 207 /* Try to add this record to the packet */ 208 if (!avahi_dns_packet_append_record(p, rj->record, rj->flush_cache, 0)) 209 return 0; 210 211 /* Ok, this record will definitely be sent, so schedule the 212 * auxilliary packets, too */ 213 avahi_server_enumerate_aux_records(s->interface->monitor->server, s->interface, rj->record, enumerate_aux_records_callback, rj); 214 job_mark_done(s, rj); 215 216 return 1; 217} 218 219static void send_response_packet(AvahiResponseScheduler *s, AvahiResponseJob *rj) { 220 AvahiDnsPacket *p; 221 unsigned n; 222 223 assert(s); 224 assert(rj); 225 226 if (!(p = avahi_dns_packet_new_response(s->interface->hardware->mtu, 1))) 227 return; /* OOM */ 228 n = 1; 229 230 /* Put it in the packet. */ 231 if (packet_add_response_job(s, p, rj)) { 232 233 /* Try to fill up packet with more responses, if available */ 234 while (s->jobs) { 235 236 if (!packet_add_response_job(s, p, s->jobs)) 237 break; 238 239 n++; 240 } 241 242 } else { 243 size_t size; 244 245 avahi_dns_packet_free(p); 246 247 /* OK, the packet was too small, so create one that fits */ 248 size = avahi_record_get_estimate_size(rj->record) + AVAHI_DNS_PACKET_HEADER_SIZE; 249 250 if (!(p = avahi_dns_packet_new_response(size + AVAHI_DNS_PACKET_EXTRA_SIZE, 1))) 251 return; /* OOM */ 252 253 if (!packet_add_response_job(s, p, rj)) { 254 avahi_dns_packet_free(p); 255 256 avahi_log_warn("Record too large, cannot send"); 257 job_mark_done(s, rj); 258 return; 259 } 260 } 261 262 avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n); 263 avahi_interface_send_packet(s->interface, p); 264 avahi_dns_packet_free(p); 265} 266 267static void elapse_callback(AVAHI_GCC_UNUSED AvahiTimeEvent *e, void* data) { 268 AvahiResponseJob *rj = data; 269 270 assert(rj); 271 272 if (rj->state == AVAHI_DONE || rj->state == AVAHI_SUPPRESSED) 273 job_free(rj->scheduler, rj); /* Lets drop this entry */ 274 else 275 send_response_packet(rj->scheduler, rj); 276} 277 278static AvahiResponseJob* find_scheduled_job(AvahiResponseScheduler *s, AvahiRecord *record) { 279 AvahiResponseJob *rj; 280 281 assert(s); 282 assert(record); 283 284 for (rj = s->jobs; rj; rj = rj->jobs_next) { 285 assert(rj->state == AVAHI_SCHEDULED); 286 287 if (avahi_record_equal_no_ttl(rj->record, record)) 288 return rj; 289 } 290 291 return NULL; 292} 293 294static AvahiResponseJob* find_history_job(AvahiResponseScheduler *s, AvahiRecord *record) { 295 AvahiResponseJob *rj; 296 297 assert(s); 298 assert(record); 299 300 for (rj = s->history; rj; rj = rj->jobs_next) { 301 assert(rj->state == AVAHI_DONE); 302 303 if (avahi_record_equal_no_ttl(rj->record, record)) { 304 /* Check whether this entry is outdated */ 305 306/* avahi_log_debug("history age: %u", (unsigned) (avahi_age(&rj->delivery)/1000)); */ 307 308 if (avahi_age(&rj->delivery)/1000 > AVAHI_RESPONSE_HISTORY_MSEC) { 309 /* it is outdated, so let's remove it */ 310 job_free(s, rj); 311 return NULL; 312 } 313 314 return rj; 315 } 316 } 317 318 return NULL; 319} 320 321static AvahiResponseJob* find_suppressed_job(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) { 322 AvahiResponseJob *rj; 323 324 assert(s); 325 assert(record); 326 assert(querier); 327 328 for (rj = s->suppressed; rj; rj = rj->jobs_next) { 329 assert(rj->state == AVAHI_SUPPRESSED); 330 assert(rj->querier_valid); 331 332 if (avahi_record_equal_no_ttl(rj->record, record) && 333 avahi_address_cmp(&rj->querier, querier) == 0) { 334 /* Check whether this entry is outdated */ 335 336 if (avahi_age(&rj->delivery) > AVAHI_RESPONSE_SUPPRESS_MSEC*1000) { 337 /* it is outdated, so let's remove it */ 338 job_free(s, rj); 339 return NULL; 340 } 341 342 return rj; 343 } 344 } 345 346 return NULL; 347} 348 349int avahi_response_scheduler_post(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache, const AvahiAddress *querier, int immediately) { 350 AvahiResponseJob *rj; 351 struct timeval tv; 352/* char *t; */ 353 354 assert(s); 355 assert(record); 356 357 assert(!avahi_key_is_pattern(record->key)); 358 359/* t = avahi_record_to_string(record); */ 360/* avahi_log_debug("post %i %s", immediately, t); */ 361/* avahi_free(t); */ 362 363 /* Check whether this response is suppressed */ 364 if (querier && 365 (rj = find_suppressed_job(s, record, querier)) && 366 avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && 367 rj->record->ttl >= record->ttl/2) { 368 369/* avahi_log_debug("Response suppressed by known answer suppression."); */ 370 return 0; 371 } 372 373 /* Check if we already sent this response recently */ 374 if ((rj = find_history_job(s, record))) { 375 376 if (avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && 377 rj->record->ttl >= record->ttl/2 && 378 (rj->flush_cache || !flush_cache)) { 379/* avahi_log_debug("Response suppressed by local duplicate suppression (history)"); */ 380 return 0; 381 } 382 383 /* Outdated ... */ 384 job_free(s, rj); 385 } 386 387 avahi_elapse_time(&tv, immediately ? 0 : AVAHI_RESPONSE_DEFER_MSEC, immediately ? 0 : AVAHI_RESPONSE_JITTER_MSEC); 388 389 if ((rj = find_scheduled_job(s, record))) { 390/* avahi_log_debug("Response suppressed by local duplicate suppression (scheduled)"); */ 391 392 /* Update a little ... */ 393 394 /* Update the time if the new is prior to the old */ 395 if (avahi_timeval_compare(&tv, &rj->delivery) < 0) { 396 rj->delivery = tv; 397 avahi_time_event_update(rj->time_event, &rj->delivery); 398 } 399 400 /* Update the flush cache bit */ 401 if (flush_cache) 402 rj->flush_cache = 1; 403 404 /* Update the querier field */ 405 if (!querier || (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) != 0)) 406 rj->querier_valid = 0; 407 408 /* Update record data (just for the TTL) */ 409 avahi_record_unref(rj->record); 410 rj->record = avahi_record_ref(record); 411 412 return 1; 413 } else { 414/* avahi_log_debug("Accepted new response job."); */ 415 416 /* Create a new job and schedule it */ 417 if (!(rj = job_new(s, record, AVAHI_SCHEDULED))) 418 return 0; /* OOM */ 419 420 rj->delivery = tv; 421 rj->time_event = avahi_time_event_new(s->time_event_queue, &rj->delivery, elapse_callback, rj); 422 rj->flush_cache = flush_cache; 423 424 if ((rj->querier_valid = !!querier)) 425 rj->querier = *querier; 426 427 return 1; 428 } 429} 430 431void avahi_response_scheduler_incoming(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache) { 432 AvahiResponseJob *rj; 433 assert(s); 434 435 /* This function is called whenever an incoming response was 436 * receieved. We drop scheduled responses which match here. The 437 * keyword is "DUPLICATE ANSWER SUPPRESION". */ 438 439 if ((rj = find_scheduled_job(s, record))) { 440 441 if ((!rj->flush_cache || flush_cache) && /* flush cache bit was set correctly */ 442 avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && /* both goodbye packets, or both not */ 443 record->ttl >= rj->record->ttl/2) { /* sensible TTL */ 444 445 /* A matching entry was found, so let's mark it done */ 446/* avahi_log_debug("Response suppressed by distributed duplicate suppression"); */ 447 job_mark_done(s, rj); 448 } 449 450 return; 451 } 452 453 if ((rj = find_history_job(s, record))) { 454 /* Found a history job, let's update it */ 455 avahi_record_unref(rj->record); 456 rj->record = avahi_record_ref(record); 457 } else 458 /* Found no existing history job, so let's create a new one */ 459 if (!(rj = job_new(s, record, AVAHI_DONE))) 460 return; /* OOM */ 461 462 rj->flush_cache = flush_cache; 463 rj->querier_valid = 0; 464 465 gettimeofday(&rj->delivery, NULL); 466 job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0); 467} 468 469void avahi_response_scheduler_suppress(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) { 470 AvahiResponseJob *rj; 471 472 assert(s); 473 assert(record); 474 assert(querier); 475 476 if ((rj = find_scheduled_job(s, record))) { 477 478 if (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) == 0 && /* same originator */ 479 avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && /* both goodbye packets, or both not */ 480 record->ttl >= rj->record->ttl/2) { /* sensible TTL */ 481 482 /* A matching entry was found, so let's drop it */ 483/* avahi_log_debug("Known answer suppression active!"); */ 484 job_free(s, rj); 485 } 486 } 487 488 if ((rj = find_suppressed_job(s, record, querier))) { 489 490 /* Let's update the old entry */ 491 avahi_record_unref(rj->record); 492 rj->record = avahi_record_ref(record); 493 494 } else { 495 496 /* Create a new entry */ 497 if (!(rj = job_new(s, record, AVAHI_SUPPRESSED))) 498 return; /* OOM */ 499 rj->querier_valid = 1; 500 rj->querier = *querier; 501 } 502 503 gettimeofday(&rj->delivery, NULL); 504 job_set_elapse_time(s, rj, AVAHI_RESPONSE_SUPPRESS_MSEC, 0); 505} 506 507void avahi_response_scheduler_force(AvahiResponseScheduler *s) { 508 assert(s); 509 510 /* Send all scheduled responses immediately */ 511 while (s->jobs) 512 send_response_packet(s, s->jobs); 513} 514