1/* 2 * Copyright (c) 2010 Mark Williams 3 * Copyright (c) 2012 Frank Lahm <franklahm@gmail.com> 4 * 5 * File change event API for netatalk 6 * 7 * for every detected filesystem change a UDP packet is sent to an arbitrary list 8 * of listeners. Each packet contains unix path of modified filesystem element, 9 * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending 10 * out packets synchronuosly as they are created by the afp functions. This should not affect 11 * performance measurably. The only delaying calls occur during initialization, if we have to 12 * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use 13 * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with 14 * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by 15 * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on. 16 * 17 * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that 18 * the listener has lost at least one filesystem event 19 * 20 * All Rights Reserved. See COPYRIGHT. 21 */ 22 23#ifdef HAVE_CONFIG_H 24#include "config.h" 25#endif /* HAVE_CONFIG_H */ 26 27#include <stdio.h> 28 29#include <string.h> 30#include <stdlib.h> 31#include <errno.h> 32#include <time.h> 33#include <sys/param.h> 34#include <sys/socket.h> 35#include <netinet/in.h> 36#include <arpa/inet.h> 37#include <netdb.h> 38#include <stdbool.h> 39 40#include <atalk/adouble.h> 41#include <atalk/vfs.h> 42#include <atalk/logger.h> 43#include <atalk/afp.h> 44#include <atalk/util.h> 45#include <atalk/cnid.h> 46#include <atalk/unix.h> 47#include <atalk/fce_api.h> 48#include <atalk/globals.h> 49 50#include "fork.h" 51#include "file.h" 52#include "directory.h" 53#include "desktop.h" 54#include "volume.h" 55 56// ONLY USED IN THIS FILE 57#include "fce_api_internal.h" 58 59/* We store our connection data here */ 60static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS]; 61static int udp_sockets = 0; 62static bool udp_initialized = false; 63static unsigned long fce_ev_enabled = 64 (1 << FCE_FILE_MODIFY) | 65 (1 << FCE_FILE_DELETE) | 66 (1 << FCE_DIR_DELETE) | 67 (1 << FCE_FILE_CREATE) | 68 (1 << FCE_DIR_CREATE); 69 70#define MAXIOBUF 1024 71static unsigned char iobuf[MAXIOBUF]; 72static const char *skip_files[] = 73{ 74 ".DS_Store", 75 NULL 76}; 77static struct fce_close_event last_close_event; 78 79static char *fce_event_names[] = { 80 "", 81 "FCE_FILE_MODIFY", 82 "FCE_FILE_DELETE", 83 "FCE_DIR_DELETE", 84 "FCE_FILE_CREATE", 85 "FCE_DIR_CREATE" 86}; 87 88/* 89 * 90 * Initialize network structs for any listeners 91 * We dont give return code because all errors are handled internally (I hope..) 92 * 93 * */ 94void fce_init_udp() 95{ 96 int rv; 97 struct addrinfo hints, *servinfo, *p; 98 99 if (udp_initialized == true) 100 return; 101 102 memset(&hints, 0, sizeof hints); 103 hints.ai_family = AF_UNSPEC; 104 hints.ai_socktype = SOCK_DGRAM; 105 106 for (int i = 0; i < udp_sockets; i++) { 107 struct udp_entry *udp_entry = udp_socket_list + i; 108 109 /* Close any pending sockets */ 110 if (udp_entry->sock != -1) 111 close(udp_entry->sock); 112 113 if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) { 114 LOG(log_error, logtype_fce, "fce_init_udp: getaddrinfo(%s:%s): %s", 115 udp_entry->addr, udp_entry->port, gai_strerror(rv)); 116 continue; 117 } 118 119 /* loop through all the results and make a socket */ 120 for (p = servinfo; p != NULL; p = p->ai_next) { 121 if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { 122 LOG(log_error, logtype_fce, "fce_init_udp: socket(%s:%s): %s", 123 udp_entry->addr, udp_entry->port, strerror(errno)); 124 continue; 125 } 126 break; 127 } 128 129 if (p == NULL) { 130 LOG(log_error, logtype_fce, "fce_init_udp: no socket for %s:%s", 131 udp_entry->addr, udp_entry->port); 132 } 133 udp_entry->addrinfo = *p; 134 memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo)); 135 memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage)); 136 freeaddrinfo(servinfo); 137 } 138 139 udp_initialized = true; 140} 141 142void fce_cleanup() 143{ 144 if (udp_initialized == false ) 145 return; 146 147 for (int i = 0; i < udp_sockets; i++) 148 { 149 struct udp_entry *udp_entry = udp_socket_list + i; 150 151 /* Close any pending sockets */ 152 if (udp_entry->sock != -1) 153 { 154 close( udp_entry->sock ); 155 udp_entry->sock = -1; 156 } 157 } 158 udp_initialized = false; 159} 160 161/* 162 * Construct a UDP packet for our listeners and return packet size 163 * */ 164static ssize_t build_fce_packet( struct fce_packet *packet, const char *path, int event, uint32_t event_id ) 165{ 166 size_t pathlen = 0; 167 ssize_t data_len = 0; 168 169 /* Set content of packet */ 170 memcpy(packet->magic, FCE_PACKET_MAGIC, sizeof(packet->magic) ); 171 packet->version = FCE_PACKET_VERSION; 172 packet->mode = event; 173 174 packet->event_id = event_id; 175 176 pathlen = strlen(path); /* exclude string terminator */ 177 178 /* This should never happen, but before we bust this server, we send nonsense, fce listener has to cope */ 179 if (pathlen >= MAXPATHLEN) 180 pathlen = MAXPATHLEN - 1; 181 182 packet->datalen = pathlen; 183 184 /* This is the payload len. Means: the packet has len bytes more until packet is finished */ 185 data_len = FCE_PACKET_HEADER_SIZE + pathlen; 186 187 memcpy(packet->data, path, pathlen); 188 189 /* return the packet len */ 190 return data_len; 191} 192 193/* 194 * Handle Endianess and write into buffer w/o padding 195 **/ 196static void pack_fce_packet(struct fce_packet *packet, unsigned char *buf, int maxlen) 197{ 198 unsigned char *p = buf; 199 200 memcpy(p, &packet->magic[0], sizeof(packet->magic)); 201 p += sizeof(packet->magic); 202 203 *p = packet->version; 204 p++; 205 206 *p = packet->mode; 207 p++; 208 209 uint32_t *id = (uint32_t*)p; 210 *id = htonl(packet->event_id); 211 p += sizeof(packet->event_id); 212 213 uint16_t *l = ( uint16_t *)p; 214 *l = htons(packet->datalen); 215 p += sizeof(packet->datalen); 216 217 if (((p - buf) + packet->datalen) < maxlen) { 218 memcpy(p, &packet->data[0], packet->datalen); 219 } 220} 221 222/* 223 * Send the fce information to all (connected) listeners 224 * We dont give return code because all errors are handled internally (I hope..) 225 * */ 226static void send_fce_event(const char *path, int event) 227{ 228 static bool first_event = true; 229 230 struct fce_packet packet; 231 static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */ 232 time_t now = time(NULL); 233 234 LOG(log_debug, logtype_fce, "send_fce_event: start"); 235 236 /* initialized ? */ 237 if (first_event == true) { 238 first_event = false; 239 fce_init_udp(); 240 /* Notify listeners the we start from the beginning */ 241 send_fce_event( "", FCE_CONN_START ); 242 } 243 244 /* build our data packet */ 245 ssize_t data_len = build_fce_packet( &packet, path, event, ++event_id ); 246 pack_fce_packet(&packet, iobuf, MAXIOBUF); 247 248 for (int i = 0; i < udp_sockets; i++) 249 { 250 int sent_data = 0; 251 struct udp_entry *udp_entry = udp_socket_list + i; 252 253 /* we had a problem earlier ? */ 254 if (udp_entry->sock == -1) 255 { 256 /* We still have to wait ?*/ 257 if (now < udp_entry->next_try_on_error) 258 continue; 259 260 /* Reopen socket */ 261 udp_entry->sock = socket(udp_entry->addrinfo.ai_family, 262 udp_entry->addrinfo.ai_socktype, 263 udp_entry->addrinfo.ai_protocol); 264 265 if (udp_entry->sock == -1) { 266 /* failed again, so go to rest again */ 267 LOG(log_error, logtype_fce, "Cannot recreate socket for fce UDP connection: errno %d", errno ); 268 269 udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S; 270 continue; 271 } 272 273 udp_entry->next_try_on_error = 0; 274 275 /* Okay, we have a running socket again, send server that we had a problem on our side*/ 276 data_len = build_fce_packet( &packet, "", FCE_CONN_BROKEN, 0 ); 277 pack_fce_packet(&packet, iobuf, MAXIOBUF); 278 279 sendto(udp_entry->sock, 280 iobuf, 281 data_len, 282 0, 283 (struct sockaddr *)&udp_entry->sockaddr, 284 udp_entry->addrinfo.ai_addrlen); 285 286 /* Rebuild our original data packet */ 287 data_len = build_fce_packet(&packet, path, event, event_id); 288 pack_fce_packet(&packet, iobuf, MAXIOBUF); 289 } 290 291 sent_data = sendto(udp_entry->sock, 292 iobuf, 293 data_len, 294 0, 295 (struct sockaddr *)&udp_entry->sockaddr, 296 udp_entry->addrinfo.ai_addrlen); 297 298 /* Problems ? */ 299 if (sent_data != data_len) { 300 /* Argh, socket broke, we close and retry later */ 301 LOG(log_error, logtype_fce, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s", 302 udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno)); 303 304 close( udp_entry->sock ); 305 udp_entry->sock = -1; 306 udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S; 307 } 308 } 309} 310 311static int add_udp_socket(const char *target_ip, const char *target_port ) 312{ 313 if (target_port == NULL) 314 target_port = FCE_DEFAULT_PORT_STRING; 315 316 if (udp_sockets >= FCE_MAX_UDP_SOCKS) { 317 LOG(log_error, logtype_fce, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS ); 318 return AFPERR_PARAM; 319 } 320 321 udp_socket_list[udp_sockets].addr = strdup(target_ip); 322 udp_socket_list[udp_sockets].port = strdup(target_port); 323 udp_socket_list[udp_sockets].sock = -1; 324 memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo)); 325 memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage)); 326 udp_socket_list[udp_sockets].next_try_on_error = 0; 327 328 udp_sockets++; 329 330 return AFP_OK; 331} 332 333static void save_close_event(const char *path) 334{ 335 time_t now = time(NULL); 336 337 /* Check if it's a close for the same event as the last one */ 338 if (last_close_event.time /* is there any saved event ? */ 339 && (strcmp(path, last_close_event.path) != 0)) { 340 /* no, so send the saved event out now */ 341 send_fce_event(last_close_event.path, FCE_FILE_MODIFY); 342 } 343 344 LOG(log_debug, logtype_fce, "save_close_event: %s", path); 345 346 last_close_event.time = now; 347 strncpy(last_close_event.path, path, MAXPATHLEN); 348} 349 350/* 351 * 352 * Dispatcher for all incoming file change events 353 * 354 * */ 355int fce_register(fce_ev_t event, const char *path, const char *oldpath, fce_obj_t type) 356{ 357 static bool first_event = true; 358 const char *bname; 359 360 if (!(fce_ev_enabled & (1 << event))) 361 return AFP_OK; 362 363 AFP_ASSERT(event >= FCE_FIRST_EVENT && event <= FCE_LAST_EVENT); 364 AFP_ASSERT(path); 365 366 LOG(log_debug, logtype_fce, "register_fce(path: %s, type: %s, event: %s", 367 path, type == fce_dir ? "dir" : "file", fce_event_names[event]); 368 369 bname = basename_safe(path); 370 371 if (udp_sockets == 0) 372 /* No listeners configured */ 373 return AFP_OK; 374 375 376 /* do some initialization on the fly the first time */ 377 if (first_event) { 378 fce_initialize_history(); 379 first_event = false; 380 } 381 382 /* handle files which should not cause events (.DS_Store atc. ) */ 383 for (int i = 0; skip_files[i] != NULL; i++) { 384 if (strcmp(bname, skip_files[i]) == 0) 385 return AFP_OK; 386 } 387 388 /* Can we ignore this event based on type or history? */ 389 if (fce_handle_coalescation(event, path, type)) { 390 LOG(log_debug9, logtype_fce, "Coalesced fc event <%d> for <%s>", event, path); 391 return AFP_OK; 392 } 393 394 switch (event) { 395 case FCE_FILE_MODIFY: 396 save_close_event(path); 397 break; 398 default: 399 send_fce_event(path, event); 400 break; 401 } 402 403 return AFP_OK; 404} 405 406static void check_saved_close_events(int fmodwait) 407{ 408 time_t now = time(NULL); 409 410 /* check if configured holdclose time has passed */ 411 if (last_close_event.time && ((last_close_event.time + fmodwait) < now)) { 412 LOG(log_debug, logtype_fce, "check_saved_close_events: sending event: %s", last_close_event.path); 413 /* yes, send event */ 414 send_fce_event(&last_close_event.path[0], FCE_FILE_MODIFY); 415 last_close_event.path[0] = 0; 416 last_close_event.time = 0; 417 } 418} 419 420/******************** External calls start here **************************/ 421 422/* 423 * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c) 424 * */ 425void fce_pending_events(AFPObj *obj) 426{ 427 if (!udp_sockets) 428 return; 429 check_saved_close_events(obj->options.fce_fmodwait); 430} 431 432/* 433 * 434 * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times) 435 * 436 * */ 437int fce_add_udp_socket(const char *target) 438{ 439 const char *port = FCE_DEFAULT_PORT_STRING; 440 char target_ip[256] = {""}; 441 442 strncpy(target_ip, target, sizeof(target_ip) -1); 443 444 char *port_delim = strchr( target_ip, ':' ); 445 if (port_delim) { 446 *port_delim = 0; 447 port = port_delim + 1; 448 } 449 return add_udp_socket(target_ip, port); 450} 451 452int fce_set_events(const char *events) 453{ 454 char *e; 455 char *p; 456 457 if (events == NULL) 458 return AFPERR_PARAM; 459 460 e = strdup(events); 461 462 fce_ev_enabled = 0; 463 464 for (p = strtok(e, ", "); p; p = strtok(NULL, ", ")) { 465 if (strcmp(p, "fmod") == 0) { 466 fce_ev_enabled |= (1 << FCE_FILE_MODIFY); 467 } else if (strcmp(p, "fdel") == 0) { 468 fce_ev_enabled |= (1 << FCE_FILE_DELETE); 469 } else if (strcmp(p, "ddel") == 0) { 470 fce_ev_enabled |= (1 << FCE_DIR_DELETE); 471 } else if (strcmp(p, "fcre") == 0) { 472 fce_ev_enabled |= (1 << FCE_FILE_CREATE); 473 } else if (strcmp(p, "dcre") == 0) { 474 fce_ev_enabled |= (1 << FCE_DIR_CREATE); 475 } 476 } 477 478 free(e); 479 480 return AFP_OK; 481} 482 483#ifdef FCE_TEST_MAIN 484 485 486void shortsleep( unsigned int us ) 487{ 488 usleep( us ); 489} 490int main( int argc, char*argv[] ) 491{ 492 int c; 493 494 char *port = FCE_DEFAULT_PORT_STRING; 495 char *host = "localhost"; 496 int delay_between_events = 1000; 497 int event_code = FCE_FILE_MODIFY; 498 char pathbuff[1024]; 499 int duration_in_seconds = 0; // TILL ETERNITY 500 char target[256]; 501 char *path = getcwd( pathbuff, sizeof(pathbuff) ); 502 503 // FULLSPEED TEST IS "-s 1001" -> delay is 0 -> send packets without pause 504 505 while ((c = getopt(argc, argv, "d:e:h:p:P:s:")) != -1) { 506 switch(c) { 507 case '?': 508 fprintf(stdout, "%s: [ -p Port -h Listener1 [ -h Listener2 ...] -P path -s Delay_between_events_in_us -e event_code -d Duration ]\n", argv[0]); 509 exit(1); 510 break; 511 case 'd': 512 duration_in_seconds = atoi(optarg); 513 break; 514 case 'e': 515 event_code = atoi(optarg); 516 break; 517 case 'h': 518 host = strdup(optarg); 519 break; 520 case 'p': 521 port = strdup(optarg); 522 break; 523 case 'P': 524 path = strdup(optarg); 525 break; 526 case 's': 527 delay_between_events = atoi(optarg); 528 break; 529 } 530 } 531 532 sprintf(target, "%s:%s", host, port); 533 if (fce_add_udp_socket(target) != 0) 534 return 1; 535 536 int ev_cnt = 0; 537 time_t start_time = time(NULL); 538 time_t end_time = 0; 539 540 if (duration_in_seconds) 541 end_time = start_time + duration_in_seconds; 542 543 while (1) 544 { 545 time_t now = time(NULL); 546 if (now > start_time) 547 { 548 start_time = now; 549 fprintf( stdout, "%d events/s\n", ev_cnt ); 550 ev_cnt = 0; 551 } 552 if (end_time && now >= end_time) 553 break; 554 555 fce_register(event_code, path, NULL, 0); 556 ev_cnt++; 557 558 559 shortsleep( delay_between_events ); 560 } 561} 562#endif /* TESTMAIN*/ 563