1/***
2  This file is part of avahi.
3
4  avahi is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Lesser General Public License as
6  published by the Free Software Foundation; either version 2.1 of the
7  License, or (at your option) any later version.
8
9  avahi is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
12  Public License for more details.
13
14  You should have received a copy of the GNU Lesser General Public
15  License along with avahi; if not, write to the Free Software
16  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  USA.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include <stdlib.h>
25
26#include <avahi-common/timeval.h>
27#include <avahi-common/malloc.h>
28
29#include "response-sched.h"
30#include "log.h"
31#include "rr-util.h"
32
33/* Local packets are supressed this long after sending them */
34#define AVAHI_RESPONSE_HISTORY_MSEC 500
35
36/* Local packets are deferred this long before sending them */
37#define AVAHI_RESPONSE_DEFER_MSEC 20
38
39/* Additional jitter for deferred packets */
40#define AVAHI_RESPONSE_JITTER_MSEC 100
41
42/* Remote packets can suppress local traffic as long as this value */
43#define AVAHI_RESPONSE_SUPPRESS_MSEC 700
44
45typedef struct AvahiResponseJob AvahiResponseJob;
46
47typedef enum {
48    AVAHI_SCHEDULED,
49    AVAHI_DONE,
50    AVAHI_SUPPRESSED
51} AvahiResponseJobState;
52
53struct AvahiResponseJob {
54    AvahiResponseScheduler *scheduler;
55    AvahiTimeEvent *time_event;
56
57    AvahiResponseJobState state;
58    struct timeval delivery;
59
60    AvahiRecord *record;
61    int flush_cache;
62    AvahiAddress querier;
63    int querier_valid;
64
65    AVAHI_LLIST_FIELDS(AvahiResponseJob, jobs);
66};
67
68struct AvahiResponseScheduler {
69    AvahiInterface *interface;
70    AvahiTimeEventQueue *time_event_queue;
71
72    AVAHI_LLIST_HEAD(AvahiResponseJob, jobs);
73    AVAHI_LLIST_HEAD(AvahiResponseJob, history);
74    AVAHI_LLIST_HEAD(AvahiResponseJob, suppressed);
75};
76
77static AvahiResponseJob* job_new(AvahiResponseScheduler *s, AvahiRecord *record, AvahiResponseJobState state) {
78    AvahiResponseJob *rj;
79
80    assert(s);
81    assert(record);
82
83    if (!(rj = avahi_new(AvahiResponseJob, 1))) {
84        avahi_log_error(__FILE__": Out of memory");
85        return NULL;
86    }
87
88    rj->scheduler = s;
89    rj->record = avahi_record_ref(record);
90    rj->time_event = NULL;
91    rj->flush_cache = 0;
92    rj->querier_valid = 0;
93
94    if ((rj->state = state) == AVAHI_SCHEDULED)
95        AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->jobs, rj);
96    else if (rj->state == AVAHI_DONE)
97        AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj);
98    else  /* rj->state == AVAHI_SUPPRESSED */
99        AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->suppressed, rj);
100
101    return rj;
102}
103
104static void job_free(AvahiResponseScheduler *s, AvahiResponseJob *rj) {
105    assert(s);
106    assert(rj);
107
108    if (rj->time_event)
109        avahi_time_event_free(rj->time_event);
110
111    if (rj->state == AVAHI_SCHEDULED)
112        AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj);
113    else if (rj->state == AVAHI_DONE)
114        AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->history, rj);
115    else /* rj->state == AVAHI_SUPPRESSED */
116        AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->suppressed, rj);
117
118    avahi_record_unref(rj->record);
119    avahi_free(rj);
120}
121
122static void elapse_callback(AvahiTimeEvent *e, void* data);
123
124static void job_set_elapse_time(AvahiResponseScheduler *s, AvahiResponseJob *rj, unsigned msec, unsigned jitter) {
125    struct timeval tv;
126
127    assert(s);
128    assert(rj);
129
130    avahi_elapse_time(&tv, msec, jitter);
131
132    if (rj->time_event)
133        avahi_time_event_update(rj->time_event, &tv);
134    else
135        rj->time_event = avahi_time_event_new(s->time_event_queue, &tv, elapse_callback, rj);
136}
137
138static void job_mark_done(AvahiResponseScheduler *s, AvahiResponseJob *rj) {
139    assert(s);
140    assert(rj);
141
142    assert(rj->state == AVAHI_SCHEDULED);
143
144    AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj);
145    AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj);
146
147    rj->state = AVAHI_DONE;
148
149    job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0);
150
151    gettimeofday(&rj->delivery, NULL);
152}
153
154AvahiResponseScheduler *avahi_response_scheduler_new(AvahiInterface *i) {
155    AvahiResponseScheduler *s;
156    assert(i);
157
158    if (!(s = avahi_new(AvahiResponseScheduler, 1))) {
159        avahi_log_error(__FILE__": Out of memory");
160        return NULL;
161    }
162
163    s->interface = i;
164    s->time_event_queue = i->monitor->server->time_event_queue;
165
166    AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->jobs);
167    AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->history);
168    AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->suppressed);
169
170    return s;
171}
172
173void avahi_response_scheduler_free(AvahiResponseScheduler *s) {
174    assert(s);
175
176    avahi_response_scheduler_clear(s);
177    avahi_free(s);
178}
179
180void avahi_response_scheduler_clear(AvahiResponseScheduler *s) {
181    assert(s);
182
183    while (s->jobs)
184        job_free(s, s->jobs);
185    while (s->history)
186        job_free(s, s->history);
187    while (s->suppressed)
188        job_free(s, s->suppressed);
189}
190
191static void enumerate_aux_records_callback(AVAHI_GCC_UNUSED AvahiServer *s, AvahiRecord *r, int flush_cache, void* userdata) {
192    AvahiResponseJob *rj = userdata;
193
194    assert(r);
195    assert(rj);
196
197    avahi_response_scheduler_post(rj->scheduler, r, flush_cache, rj->querier_valid ? &rj->querier : NULL, 0);
198}
199
200static int packet_add_response_job(AvahiResponseScheduler *s, AvahiDnsPacket *p, AvahiResponseJob *rj) {
201    assert(s);
202    assert(p);
203    assert(rj);
204
205    /* Try to add this record to the packet */
206    if (!avahi_dns_packet_append_record(p, rj->record, rj->flush_cache, 0))
207        return 0;
208
209    /* Ok, this record will definitely be sent, so schedule the
210     * auxilliary packets, too */
211    avahi_server_enumerate_aux_records(s->interface->monitor->server, s->interface, rj->record, enumerate_aux_records_callback, rj);
212    job_mark_done(s, rj);
213
214    return 1;
215}
216
217static void send_response_packet(AvahiResponseScheduler *s, AvahiResponseJob *rj) {
218    AvahiDnsPacket *p;
219    unsigned n;
220
221    assert(s);
222    assert(rj);
223
224    if (!(p = avahi_dns_packet_new_response(s->interface->hardware->mtu, 1)))
225        return; /* OOM */
226    n = 1;
227
228    /* Put it in the packet. */
229    if (packet_add_response_job(s, p, rj)) {
230
231        /* Try to fill up packet with more responses, if available */
232        while (s->jobs) {
233
234            if (!packet_add_response_job(s, p, s->jobs))
235                break;
236
237            n++;
238        }
239
240    } else {
241        size_t size;
242
243        avahi_dns_packet_free(p);
244
245        /* OK, the packet was too small, so create one that fits */
246        size = avahi_record_get_estimate_size(rj->record) + AVAHI_DNS_PACKET_HEADER_SIZE;
247
248        if (!(p = avahi_dns_packet_new_response(size + AVAHI_DNS_PACKET_EXTRA_SIZE, 1)))
249            return; /* OOM */
250
251        if (!packet_add_response_job(s, p, rj)) {
252            avahi_dns_packet_free(p);
253
254            avahi_log_warn("Record too large, cannot send");
255            job_mark_done(s, rj);
256            return;
257        }
258    }
259
260    avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
261    avahi_interface_send_packet(s->interface, p, AVAHI_MDNS);
262    avahi_dns_packet_free(p);
263}
264
265static void elapse_callback(AVAHI_GCC_UNUSED AvahiTimeEvent *e, void* data) {
266    AvahiResponseJob *rj = data;
267
268    assert(rj);
269
270    if (rj->state == AVAHI_DONE || rj->state == AVAHI_SUPPRESSED)
271        job_free(rj->scheduler, rj);         /* Lets drop this entry */
272    else
273        send_response_packet(rj->scheduler, rj);
274}
275
276static AvahiResponseJob* find_scheduled_job(AvahiResponseScheduler *s, AvahiRecord *record) {
277    AvahiResponseJob *rj;
278
279    assert(s);
280    assert(record);
281
282    for (rj = s->jobs; rj; rj = rj->jobs_next) {
283        assert(rj->state == AVAHI_SCHEDULED);
284
285        if (avahi_record_equal_no_ttl(rj->record, record))
286            return rj;
287    }
288
289    return NULL;
290}
291
292static AvahiResponseJob* find_history_job(AvahiResponseScheduler *s, AvahiRecord *record) {
293    AvahiResponseJob *rj;
294
295    assert(s);
296    assert(record);
297
298    for (rj = s->history; rj; rj = rj->jobs_next) {
299        assert(rj->state == AVAHI_DONE);
300
301        if (avahi_record_equal_no_ttl(rj->record, record)) {
302            /* Check whether this entry is outdated */
303
304/*             avahi_log_debug("history age: %u", (unsigned) (avahi_age(&rj->delivery)/1000)); */
305
306            if (avahi_age(&rj->delivery)/1000 > AVAHI_RESPONSE_HISTORY_MSEC) {
307                /* it is outdated, so let's remove it */
308                job_free(s, rj);
309                return NULL;
310            }
311
312            return rj;
313        }
314    }
315
316    return NULL;
317}
318
319static AvahiResponseJob* find_suppressed_job(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) {
320    AvahiResponseJob *rj;
321
322    assert(s);
323    assert(record);
324    assert(querier);
325
326    for (rj = s->suppressed; rj; rj = rj->jobs_next) {
327        assert(rj->state == AVAHI_SUPPRESSED);
328        assert(rj->querier_valid);
329
330        if (avahi_record_equal_no_ttl(rj->record, record) &&
331            avahi_address_cmp(&rj->querier, querier) == 0) {
332            /* Check whether this entry is outdated */
333
334            if (avahi_age(&rj->delivery) > AVAHI_RESPONSE_SUPPRESS_MSEC*1000) {
335                /* it is outdated, so let's remove it */
336                job_free(s, rj);
337                return NULL;
338            }
339
340            return rj;
341        }
342    }
343
344    return NULL;
345}
346
347int avahi_response_scheduler_post(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache, const AvahiAddress *querier, int immediately) {
348    AvahiResponseJob *rj;
349    struct timeval tv;
350/*     char *t; */
351
352    assert(s);
353    assert(record);
354
355    assert(!avahi_key_is_pattern(record->key));
356
357/*     t = avahi_record_to_string(record); */
358/*     avahi_log_debug("post %i %s", immediately, t); */
359/*     avahi_free(t); */
360
361    /* Check whether this response is suppressed */
362    if (querier &&
363        (rj = find_suppressed_job(s, record, querier)) &&
364        avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) &&
365        rj->record->ttl >= record->ttl/2) {
366
367/*         avahi_log_debug("Response suppressed by known answer suppression.");  */
368        return 0;
369    }
370
371    /* Check if we already sent this response recently */
372    if ((rj = find_history_job(s, record))) {
373
374        if (avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) &&
375            rj->record->ttl >= record->ttl/2 &&
376            (rj->flush_cache || !flush_cache)) {
377/*             avahi_log_debug("Response suppressed by local duplicate suppression (history)");  */
378            return 0;
379        }
380
381        /* Outdated ... */
382        job_free(s, rj);
383    }
384
385    avahi_elapse_time(&tv, immediately ? 0 : AVAHI_RESPONSE_DEFER_MSEC, immediately ? 0 : AVAHI_RESPONSE_JITTER_MSEC);
386
387    if ((rj = find_scheduled_job(s, record))) {
388/*          avahi_log_debug("Response suppressed by local duplicate suppression (scheduled)"); */
389
390        /* Update a little ... */
391
392        /* Update the time if the new is prior to the old */
393        if (avahi_timeval_compare(&tv, &rj->delivery) < 0) {
394            rj->delivery = tv;
395            avahi_time_event_update(rj->time_event, &rj->delivery);
396        }
397
398        /* Update the flush cache bit */
399        if (flush_cache)
400            rj->flush_cache = 1;
401
402        /* Update the querier field */
403        if (!querier || (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) != 0))
404            rj->querier_valid = 0;
405
406        /* Update record data (just for the TTL) */
407        avahi_record_unref(rj->record);
408        rj->record = avahi_record_ref(record);
409
410        return 1;
411    } else {
412/*         avahi_log_debug("Accepted new response job.");  */
413
414        /* Create a new job and schedule it */
415        if (!(rj = job_new(s, record, AVAHI_SCHEDULED)))
416            return 0; /* OOM */
417
418        rj->delivery = tv;
419        rj->time_event = avahi_time_event_new(s->time_event_queue, &rj->delivery, elapse_callback, rj);
420        rj->flush_cache = flush_cache;
421
422        if ((rj->querier_valid = !!querier))
423            rj->querier = *querier;
424
425        return 1;
426    }
427}
428
429void avahi_response_scheduler_incoming(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache) {
430    AvahiResponseJob *rj;
431    assert(s);
432
433    /* This function is called whenever an incoming response was
434     * receieved. We drop scheduled responses which match here. The
435     * keyword is "DUPLICATE ANSWER SUPPRESION". */
436
437    if ((rj = find_scheduled_job(s, record))) {
438
439        if ((!rj->flush_cache || flush_cache) &&    /* flush cache bit was set correctly */
440            avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) &&   /* both goodbye packets, or both not */
441            record->ttl >= rj->record->ttl/2) {     /* sensible TTL */
442
443            /* A matching entry was found, so let's mark it done */
444/*             avahi_log_debug("Response suppressed by distributed duplicate suppression"); */
445            job_mark_done(s, rj);
446        }
447
448        return;
449    }
450
451    if ((rj = find_history_job(s, record))) {
452        /* Found a history job, let's update it */
453        avahi_record_unref(rj->record);
454        rj->record = avahi_record_ref(record);
455    } else
456        /* Found no existing history job, so let's create a new one */
457        if (!(rj = job_new(s, record, AVAHI_DONE)))
458            return; /* OOM */
459
460    rj->flush_cache = flush_cache;
461    rj->querier_valid = 0;
462
463    gettimeofday(&rj->delivery, NULL);
464    job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0);
465}
466
467void avahi_response_scheduler_suppress(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) {
468    AvahiResponseJob *rj;
469
470    assert(s);
471    assert(record);
472    assert(querier);
473
474    if ((rj = find_scheduled_job(s, record))) {
475
476        if (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) == 0 && /* same originator */
477            avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && /* both goodbye packets, or both not */
478            record->ttl >= rj->record->ttl/2) {                                  /* sensible TTL */
479
480            /* A matching entry was found, so let's drop it */
481/*             avahi_log_debug("Known answer suppression active!"); */
482            job_free(s, rj);
483        }
484    }
485
486    if ((rj = find_suppressed_job(s, record, querier))) {
487
488        /* Let's update the old entry */
489        avahi_record_unref(rj->record);
490        rj->record = avahi_record_ref(record);
491
492    } else {
493
494        /* Create a new entry */
495        if (!(rj = job_new(s, record, AVAHI_SUPPRESSED)))
496            return; /* OOM */
497        rj->querier_valid = 1;
498        rj->querier = *querier;
499    }
500
501    gettimeofday(&rj->delivery, NULL);
502    job_set_elapse_time(s, rj, AVAHI_RESPONSE_SUPPRESS_MSEC, 0);
503}
504
505void avahi_response_scheduler_force(AvahiResponseScheduler *s) {
506    assert(s);
507
508    /* Send all scheduled responses immediately */
509    while (s->jobs)
510        send_response_packet(s, s->jobs);
511}
512