1/*
2 * Copyright (c) 2010 Mark Williams
3 *
4 * File change event API for netatalk
5 *
6 * for every detected filesystem change a UDP packet is sent to an arbitrary list
7 * of listeners. Each packet contains unix path of modified filesystem element,
8 * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending
9 * out packets synchronuosly as they are created by the afp functions. This should not affect
10 * performance measurably. The only delaying calls occur during initialization, if we have to
11 * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use
12 * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with
13 * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by
14 * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on.
15 *
16 * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that
17 * the listener has lost at least one filesystem event
18 *
19 * All Rights Reserved.  See COPYRIGHT.
20 */
21
22#ifdef HAVE_CONFIG_H
23#include "config.h"
24#endif /* HAVE_CONFIG_H */
25
26#include <stdio.h>
27
28#include <string.h>
29#include <stdlib.h>
30#include <errno.h>
31#include <time.h>
32
33
34#include <sys/param.h>
35#include <sys/socket.h>
36#include <netinet/in.h>
37#include <arpa/inet.h>
38#include <netdb.h>
39
40#include <netatalk/at.h>
41
42#include <atalk/adouble.h>
43#include <atalk/vfs.h>
44#include <atalk/logger.h>
45#include <atalk/afp.h>
46#include <atalk/util.h>
47#include <atalk/cnid.h>
48#include <atalk/unix.h>
49#include <atalk/fce_api.h>
50#include <atalk/globals.h>
51
52#include "fork.h"
53#include "file.h"
54#include "directory.h"
55#include "desktop.h"
56#include "volume.h"
57
58// ONLY USED IN THIS FILE
59#include "fce_api_internal.h"
60
61#define FCE_TRUE 1
62#define FCE_FALSE 0
63
64/* We store our connection data here */
65static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS];
66static int udp_sockets = 0;
67static int udp_initialized = FCE_FALSE;
68static unsigned long fce_ev_enabled =
69    (1 << FCE_FILE_MODIFY) |
70    (1 << FCE_FILE_DELETE) |
71    (1 << FCE_DIR_DELETE) |
72    (1 << FCE_FILE_CREATE) |
73    (1 << FCE_DIR_CREATE);
74
75static uint64_t tm_used;          /* used for passing to event handler */
76#define MAXIOBUF 1024
77static char iobuf[MAXIOBUF];
78static const char *skip_files[] =
79{
80	".DS_Store",
81	NULL
82};
83
84/*
85 *
86 * Initialize network structs for any listeners
87 * We dont give return code because all errors are handled internally (I hope..)
88 *
89 * */
90void fce_init_udp()
91{
92    int rv, i;
93    struct addrinfo hints, *servinfo, *p;
94
95    if (udp_initialized == FCE_TRUE)
96        return;
97
98    memset(&hints, 0, sizeof hints);
99    hints.ai_family = AF_UNSPEC;
100    hints.ai_socktype = SOCK_DGRAM;
101
102    for (i = 0; i < udp_sockets; i++) {
103        struct udp_entry *udp_entry = udp_socket_list + i;
104
105        /* Close any pending sockets */
106        if (udp_entry->sock != -1)
107            close(udp_entry->sock);
108
109        if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) {
110            LOG(log_error, logtype_afpd, "fce_init_udp: getaddrinfo(%s:%s): %s",
111                udp_entry->addr, udp_entry->port, gai_strerror(rv));
112            continue;
113        }
114
115        /* loop through all the results and make a socket */
116        for (p = servinfo; p != NULL; p = p->ai_next) {
117            if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
118                LOG(log_error, logtype_afpd, "fce_init_udp: socket(%s:%s): %s",
119                    udp_entry->addr, udp_entry->port, strerror(errno));
120                continue;
121            }
122            break;
123        }
124
125        if (p == NULL) {
126            LOG(log_error, logtype_afpd, "fce_init_udp: no socket for %s:%s",
127                udp_entry->addr, udp_entry->port);
128        }
129        udp_entry->addrinfo = *p;
130        memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo));
131        memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage));
132        freeaddrinfo(servinfo);
133    }
134
135    udp_initialized = FCE_TRUE;
136}
137
138void fce_cleanup()
139{
140	int i;
141    if (udp_initialized == FCE_FALSE )
142        return;
143
144    for (i = 0; i < udp_sockets; i++)
145    {
146        struct udp_entry *udp_entry = udp_socket_list + i;
147
148        /* Close any pending sockets */
149        if (udp_entry->sock != -1)
150        {
151            close( udp_entry->sock );
152            udp_entry->sock = -1;
153        }
154    }
155    udp_initialized = FCE_FALSE;
156}
157
158
159/*
160 * Construct a UDP packet for our listeners and return packet size
161 * */
162static ssize_t build_fce_packet( struct fce_packet *packet, char *path, int mode, uint32_t event_id )
163{
164    size_t pathlen;
165    ssize_t data_len = 0;
166
167    strncpy(packet->magic, FCE_PACKET_MAGIC, sizeof(packet->magic) );
168    packet->version = FCE_PACKET_VERSION;
169    packet->mode = mode;
170    packet->event_id = event_id;
171
172    pathlen = strlen(path) + 1; /* include string terminator */
173
174    /* This should never happen, but before we bust this server, we send nonsense, fce listener has to cope */
175    if (pathlen >= MAXPATHLEN)
176        pathlen = MAXPATHLEN - 1;
177
178    /* This is the payload len. Means: the stream has len bytes more until packet is finished */
179    /* A server should read the first 16 byte, decode them and then fetch the rest */
180    data_len = FCE_PACKET_HEADER_SIZE + pathlen;
181    packet->datalen = pathlen;
182
183    switch (mode) {
184    case FCE_TM_SIZE:
185        tm_used = hton64(tm_used);
186        memcpy(packet->data, &tm_used, sizeof(tm_used));
187        strncpy(packet->data + sizeof(tm_used), path, pathlen);
188
189        packet->datalen += sizeof(tm_used);
190        data_len += sizeof(tm_used);
191        break;
192    default:
193        strncpy(packet->data, path, pathlen);
194        break;
195    }
196
197    /* return the packet len */
198    return data_len;
199}
200
201static int pack_fce_packet(struct fce_packet *packet, unsigned char *buf)
202{
203    unsigned char *p = buf;
204
205    memcpy(p, &packet->magic[0], sizeof(packet->magic));
206    p += sizeof(packet->magic);
207
208    *p = packet->version;
209    p++;
210
211    *p = packet->mode;
212    p++;
213
214    uint32_t id = htonl(packet->event_id);
215    memcpy(p, &id, sizeof(id));
216    p += sizeof(packet->event_id);
217
218    uint16_t l = htons(packet->datalen);
219    memcpy(p, &l, sizeof(l));
220    p += sizeof(l);
221
222    memcpy(p, &packet->data[0], packet->datalen);
223    p += packet->datalen;
224
225    return 0;
226}
227
228/*
229 * Send the fce information to all (connected) listeners
230 * We dont give return code because all errors are handled internally (I hope..)
231 * */
232static void send_fce_event( char *path, int mode )
233{
234    struct fce_packet packet;
235    void *data = &packet;
236    static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */
237	int i;
238
239    LOG(log_debug, logtype_afpd, "send_fce_event: start");
240
241    time_t now = time(NULL);
242
243    /* build our data packet */
244    ssize_t data_len = build_fce_packet( &packet, path, mode, ++event_id );
245    pack_fce_packet(&packet, iobuf);
246
247    for (i = 0; i < udp_sockets; i++)
248    {
249        int sent_data = 0;
250        struct udp_entry *udp_entry = udp_socket_list + i;
251
252        /* we had a problem earlier ? */
253        if (udp_entry->sock == -1)
254        {
255            /* We still have to wait ?*/
256            if (now < udp_entry->next_try_on_error)
257                continue;
258
259            /* Reopen socket */
260            udp_entry->sock = socket(udp_entry->addrinfo.ai_family,
261                                     udp_entry->addrinfo.ai_socktype,
262                                     udp_entry->addrinfo.ai_protocol);
263
264            if (udp_entry->sock == -1) {
265                /* failed again, so go to rest again */
266                LOG(log_error, logtype_afpd, "Cannot recreate socket for fce UDP connection: errno %d", errno  );
267
268                udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
269                continue;
270            }
271
272            udp_entry->next_try_on_error = 0;
273
274            /* Okay, we have a running socket again, send server that we had a problem on our side*/
275            data_len = build_fce_packet( &packet, "", FCE_CONN_BROKEN, 0 );
276            pack_fce_packet(&packet, iobuf);
277
278            sendto(udp_entry->sock,
279                   iobuf,
280                   data_len,
281                   0,
282                   (struct sockaddr *)&udp_entry->sockaddr,
283                   udp_entry->addrinfo.ai_addrlen);
284
285            /* Rebuild our original data packet */
286            data_len = build_fce_packet( &packet, path, mode, event_id );
287            pack_fce_packet(&packet, iobuf);
288        }
289
290        sent_data = sendto(udp_entry->sock,
291                           iobuf,
292                           data_len,
293                           0,
294                           (struct sockaddr *)&udp_entry->sockaddr,
295                           udp_entry->addrinfo.ai_addrlen);
296
297        /* Problems ? */
298        if (sent_data != data_len) {
299            /* Argh, socket broke, we close and retry later */
300            LOG(log_error, logtype_afpd, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s",
301                udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno));
302
303            close( udp_entry->sock );
304            udp_entry->sock = -1;
305            udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
306        }
307    }
308}
309
310static int add_udp_socket(const char *target_ip, const char *target_port )
311{
312    if (target_port == NULL)
313        target_port = FCE_DEFAULT_PORT_STRING;
314
315    if (udp_sockets >= FCE_MAX_UDP_SOCKS) {
316        LOG(log_error, logtype_afpd, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS );
317        return AFPERR_PARAM;
318    }
319
320    udp_socket_list[udp_sockets].addr = strdup(target_ip);
321    udp_socket_list[udp_sockets].port = strdup(target_port);
322    udp_socket_list[udp_sockets].sock = -1;
323    memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo));
324    memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage));
325    udp_socket_list[udp_sockets].next_try_on_error = 0;
326
327    udp_sockets++;
328
329    return AFP_OK;
330}
331
332/*
333 *
334 * Dispatcher for all incoming file change events
335 *
336 * */
337static int register_fce(const char *u_name, int is_dir, int mode)
338{
339	int i;
340    if (udp_sockets == 0)
341        /* No listeners configured */
342        return AFP_OK;
343
344    if (u_name == NULL)
345        return AFPERR_PARAM;
346
347    static int first_event = FCE_TRUE;
348
349	/* do some initialization on the fly the first time */
350	if (first_event) {
351		fce_initialize_history();
352	}
353
354	/* handle files which should not cause events (.DS_Store atc. ) */
355	for (i = 0; skip_files[i] != NULL; i++)
356	{
357		if (!strcmp( u_name, skip_files[i]))
358			return AFP_OK;
359	}
360
361
362	char full_path_buffer[MAXPATHLEN + 1] = {""};
363	const char *cwd = getcwdpath();
364
365    if (mode == FCE_TM_SIZE) {
366        strlcpy(full_path_buffer, u_name, MAXPATHLEN);
367    } else if (!is_dir || mode == FCE_DIR_DELETE) {
368		if (strlen( cwd ) + strlen( u_name) + 1 >= MAXPATHLEN) {
369			LOG(log_error, logtype_afpd, "FCE file name too long: %s/%s", cwd, u_name );
370			return AFPERR_PARAM;
371		}
372		sprintf( full_path_buffer, "%s/%s", cwd, u_name );
373	} else {
374		if (strlen( cwd ) >= MAXPATHLEN) {
375			LOG(log_error, logtype_afpd, "FCE directory name too long: %s", cwd);
376			return AFPERR_PARAM;
377		}
378		strcpy( full_path_buffer, cwd);
379	}
380
381	/* Can we ignore this event based on type or history? */
382	if (!(mode & FCE_TM_SIZE) && fce_handle_coalescation( full_path_buffer, is_dir, mode ))
383	{
384		LOG(log_debug9, logtype_afpd, "Coalesced fc event <%d> for <%s>", mode, full_path_buffer );
385		return AFP_OK;
386	}
387
388	LOG(log_debug9, logtype_afpd, "Detected fc event <%d> for <%s>", mode, full_path_buffer );
389
390
391    /* we do initilization on the fly, no blocking calls in here
392     * (except when using FQDN in broken DNS environment)
393     */
394    if (first_event == FCE_TRUE)
395    {
396        fce_init_udp();
397
398        /* Notify listeners the we start from the beginning */
399        send_fce_event( "", FCE_CONN_START );
400
401        first_event = FCE_FALSE;
402    }
403
404	/* Handle UDP transport */
405    send_fce_event( full_path_buffer, mode );
406
407    return AFP_OK;
408}
409
410
411/******************** External calls start here **************************/
412
413/*
414 * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c)
415 * */
416#ifndef FCE_TEST_MAIN
417
418int fce_register_delete_file( struct path *path )
419{
420    int ret = AFP_OK;
421
422    if (path == NULL)
423        return AFPERR_PARAM;
424
425    if (!(fce_ev_enabled & (1 << FCE_FILE_DELETE)))
426        return ret;
427
428    ret = register_fce( path->u_name, FALSE, FCE_FILE_DELETE );
429
430    return ret;
431}
432int fce_register_delete_dir( char *name )
433{
434    int ret = AFP_OK;
435
436    if (name == NULL)
437        return AFPERR_PARAM;
438
439    if (!(fce_ev_enabled & (1 << FCE_DIR_DELETE)))
440        return ret;
441
442    ret = register_fce( name, TRUE, FCE_DIR_DELETE);
443
444    return ret;
445}
446
447int fce_register_new_dir( struct path *path )
448{
449    int ret = AFP_OK;
450
451    if (path == NULL)
452        return AFPERR_PARAM;
453
454    if (!(fce_ev_enabled & (1 << FCE_DIR_CREATE)))
455        return ret;
456
457    ret = register_fce( path->u_name, TRUE, FCE_DIR_CREATE );
458
459    return ret;
460}
461
462
463int fce_register_new_file( struct path *path )
464{
465    int ret = AFP_OK;
466
467    if (path == NULL)
468        return AFPERR_PARAM;
469
470    if (!(fce_ev_enabled & (1 << FCE_FILE_CREATE)))
471        return ret;
472
473    ret = register_fce( path->u_name, FALSE, FCE_FILE_CREATE );
474
475    return ret;
476}
477
478int fce_register_file_modification( struct ofork *ofork )
479{
480    char *u_name = NULL;
481    struct vol *vol;
482    int ret = AFP_OK;
483
484    if (ofork == NULL || ofork->of_vol == NULL)
485        return AFPERR_PARAM;
486
487    if (!(fce_ev_enabled & (1 << FCE_FILE_MODIFY)))
488        return ret;
489
490    vol = ofork->of_vol;
491
492    if (NULL == (u_name = mtoupath(vol, of_name(ofork), ofork->of_did, utf8_encoding())))
493    {
494        return AFPERR_MISC;
495    }
496
497    ret = register_fce( u_name, FALSE, FCE_FILE_MODIFY );
498
499    return ret;
500}
501
502int fce_register_tm_size(const char *vol, size_t used)
503{
504    int ret = AFP_OK;
505
506    if (vol == NULL)
507        return AFPERR_PARAM;
508
509    if (!(fce_ev_enabled & (1 << FCE_TM_SIZE)))
510        return ret;
511
512    tm_used = used;             /* oh what a hack */
513    ret = register_fce(vol, FALSE, FCE_TM_SIZE);
514
515    return ret;
516}
517#endif
518
519/*
520 *
521 * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times)
522 *
523 * */
524int fce_add_udp_socket(const char *target)
525{
526	const char *port = FCE_DEFAULT_PORT_STRING;
527	char target_ip[256] = {""};
528
529	strncpy(target_ip, target, sizeof(target_ip) -1);
530
531	char *port_delim = strchr( target_ip, ':' );
532	if (port_delim) {
533		*port_delim = 0;
534		port = port_delim + 1;
535	}
536	return add_udp_socket(target_ip, port);
537}
538
539int fce_set_events(const char *events)
540{
541    char *e;
542    char *p;
543
544    if (events == NULL)
545        return AFPERR_PARAM;
546
547    e = strdup(events);
548
549    fce_ev_enabled = 0;
550
551    for (p = strtok(e, ","); p; p = strtok(NULL, ",")) {
552        if (strcmp(p, "fmod") == 0) {
553            fce_ev_enabled |= (1 << FCE_FILE_MODIFY);
554        } else if (strcmp(p, "fdel") == 0) {
555            fce_ev_enabled |= (1 << FCE_FILE_DELETE);
556        } else if (strcmp(p, "ddel") == 0) {
557            fce_ev_enabled |= (1 << FCE_DIR_DELETE);
558        } else if (strcmp(p, "fcre") == 0) {
559            fce_ev_enabled |= (1 << FCE_FILE_CREATE);
560        } else if (strcmp(p, "dcre") == 0) {
561            fce_ev_enabled |= (1 << FCE_DIR_CREATE);
562        } else if (strcmp(p, "tmsz") == 0) {
563            fce_ev_enabled |= (1 << FCE_TM_SIZE);
564        }
565    }
566
567    free(e);
568}
569
570#ifdef FCE_TEST_MAIN
571
572
573void shortsleep( unsigned int us )
574{
575    usleep( us );
576}
577int main( int argc, char*argv[] )
578{
579    int c,ret;
580
581    char *port = FCE_DEFAULT_PORT_STRING;
582    char *host = "localhost";
583    int delay_between_events = 1000;
584    int event_code = FCE_FILE_MODIFY;
585    char pathbuff[1024];
586    int duration_in_seconds = 0; // TILL ETERNITY
587    char target[256];
588    char *path = getcwd( pathbuff, sizeof(pathbuff) );
589
590    // FULLSPEED TEST IS "-s 1001" -> delay is 0 -> send packets without pause
591
592    while ((c = getopt(argc, argv, "d:e:h:p:P:s:")) != -1) {
593        switch(c) {
594        case '?':
595            fprintf(stdout, "%s: [ -p Port -h Listener1 [ -h Listener2 ...] -P path -s Delay_between_events_in_us -e event_code -d Duration ]\n", argv[0]);
596            exit(1);
597            break;
598        case 'd':
599            duration_in_seconds = atoi(optarg);
600            break;
601        case 'e':
602            event_code = atoi(optarg);
603            break;
604        case 'h':
605            host = strdup(optarg);
606            break;
607        case 'p':
608            port = strdup(optarg);
609            break;
610        case 'P':
611            path = strdup(optarg);
612            break;
613        case 's':
614            delay_between_events = atoi(optarg);
615            break;
616        }
617    }
618
619    sprintf(target, "%s:%s", host, port);
620    if (fce_add_udp_socket(target) != 0)
621        return 1;
622
623    int ev_cnt = 0;
624    time_t start_time = time(NULL);
625    time_t end_time = 0;
626
627    if (duration_in_seconds)
628        end_time = start_time + duration_in_seconds;
629
630    while (1)
631    {
632        time_t now = time(NULL);
633        if (now > start_time)
634        {
635            start_time = now;
636            fprintf( stdout, "%d events/s\n", ev_cnt );
637            ev_cnt = 0;
638        }
639        if (end_time && now >= end_time)
640            break;
641
642        register_fce( path, 0, event_code );
643        ev_cnt++;
644
645
646        shortsleep( delay_between_events );
647    }
648}
649#endif /* TESTMAIN*/
650