1/* 2 * Copyright (c) 2010 Mark Williams 3 * 4 * File change event API for netatalk 5 * 6 * for every detected filesystem change a UDP packet is sent to an arbitrary list 7 * of listeners. Each packet contains unix path of modified filesystem element, 8 * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending 9 * out packets synchronuosly as they are created by the afp functions. This should not affect 10 * performance measurably. The only delaying calls occur during initialization, if we have to 11 * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use 12 * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with 13 * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by 14 * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on. 15 * 16 * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that 17 * the listener has lost at least one filesystem event 18 * 19 * All Rights Reserved. See COPYRIGHT. 20 */ 21 22#ifdef HAVE_CONFIG_H 23#include "config.h" 24#endif /* HAVE_CONFIG_H */ 25 26#include <stdio.h> 27 28#include <string.h> 29#include <stdlib.h> 30#include <errno.h> 31#include <time.h> 32 33 34#include <sys/param.h> 35#include <sys/socket.h> 36#include <netinet/in.h> 37#include <arpa/inet.h> 38#include <netdb.h> 39 40#include <netatalk/at.h> 41 42#include <atalk/adouble.h> 43#include <atalk/vfs.h> 44#include <atalk/logger.h> 45#include <atalk/afp.h> 46#include <atalk/util.h> 47#include <atalk/cnid.h> 48#include <atalk/unix.h> 49#include <atalk/fce_api.h> 50#include <atalk/globals.h> 51 52#include "fork.h" 53#include "file.h" 54#include "directory.h" 55#include "desktop.h" 56#include "volume.h" 57 58// ONLY USED IN THIS FILE 59#include "fce_api_internal.h" 60 61#define FCE_TRUE 1 62#define FCE_FALSE 0 63 64/* We store our connection data here */ 65static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS]; 66static int udp_sockets = 0; 67static int udp_initialized = FCE_FALSE; 68static unsigned long fce_ev_enabled = 69 (1 << FCE_FILE_MODIFY) | 70 (1 << FCE_FILE_DELETE) | 71 (1 << FCE_DIR_DELETE) | 72 (1 << FCE_FILE_CREATE) | 73 (1 << FCE_DIR_CREATE); 74 75static uint64_t tm_used; /* used for passing to event handler */ 76#define MAXIOBUF 1024 77static char iobuf[MAXIOBUF]; 78static const char *skip_files[] = 79{ 80 ".DS_Store", 81 NULL 82}; 83 84/* 85 * 86 * Initialize network structs for any listeners 87 * We dont give return code because all errors are handled internally (I hope..) 88 * 89 * */ 90void fce_init_udp() 91{ 92 int rv, i; 93 struct addrinfo hints, *servinfo, *p; 94 95 if (udp_initialized == FCE_TRUE) 96 return; 97 98 memset(&hints, 0, sizeof hints); 99 hints.ai_family = AF_UNSPEC; 100 hints.ai_socktype = SOCK_DGRAM; 101 102 for (i = 0; i < udp_sockets; i++) { 103 struct udp_entry *udp_entry = udp_socket_list + i; 104 105 /* Close any pending sockets */ 106 if (udp_entry->sock != -1) 107 close(udp_entry->sock); 108 109 if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) { 110 LOG(log_error, logtype_afpd, "fce_init_udp: getaddrinfo(%s:%s): %s", 111 udp_entry->addr, udp_entry->port, gai_strerror(rv)); 112 continue; 113 } 114 115 /* loop through all the results and make a socket */ 116 for (p = servinfo; p != NULL; p = p->ai_next) { 117 if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { 118 LOG(log_error, logtype_afpd, "fce_init_udp: socket(%s:%s): %s", 119 udp_entry->addr, udp_entry->port, strerror(errno)); 120 continue; 121 } 122 break; 123 } 124 125 if (p == NULL) { 126 LOG(log_error, logtype_afpd, "fce_init_udp: no socket for %s:%s", 127 udp_entry->addr, udp_entry->port); 128 } 129 udp_entry->addrinfo = *p; 130 memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo)); 131 memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage)); 132 freeaddrinfo(servinfo); 133 } 134 135 udp_initialized = FCE_TRUE; 136} 137 138void fce_cleanup() 139{ 140 int i; 141 if (udp_initialized == FCE_FALSE ) 142 return; 143 144 for (i = 0; i < udp_sockets; i++) 145 { 146 struct udp_entry *udp_entry = udp_socket_list + i; 147 148 /* Close any pending sockets */ 149 if (udp_entry->sock != -1) 150 { 151 close( udp_entry->sock ); 152 udp_entry->sock = -1; 153 } 154 } 155 udp_initialized = FCE_FALSE; 156} 157 158 159/* 160 * Construct a UDP packet for our listeners and return packet size 161 * */ 162static ssize_t build_fce_packet( struct fce_packet *packet, char *path, int mode, uint32_t event_id ) 163{ 164 size_t pathlen; 165 ssize_t data_len = 0; 166 167 strncpy(packet->magic, FCE_PACKET_MAGIC, sizeof(packet->magic) ); 168 packet->version = FCE_PACKET_VERSION; 169 packet->mode = mode; 170 packet->event_id = event_id; 171 172 pathlen = strlen(path) + 1; /* include string terminator */ 173 174 /* This should never happen, but before we bust this server, we send nonsense, fce listener has to cope */ 175 if (pathlen >= MAXPATHLEN) 176 pathlen = MAXPATHLEN - 1; 177 178 /* This is the payload len. Means: the stream has len bytes more until packet is finished */ 179 /* A server should read the first 16 byte, decode them and then fetch the rest */ 180 data_len = FCE_PACKET_HEADER_SIZE + pathlen; 181 packet->datalen = pathlen; 182 183 switch (mode) { 184 case FCE_TM_SIZE: 185 tm_used = hton64(tm_used); 186 memcpy(packet->data, &tm_used, sizeof(tm_used)); 187 strncpy(packet->data + sizeof(tm_used), path, pathlen); 188 189 packet->datalen += sizeof(tm_used); 190 data_len += sizeof(tm_used); 191 break; 192 default: 193 strncpy(packet->data, path, pathlen); 194 break; 195 } 196 197 /* return the packet len */ 198 return data_len; 199} 200 201static int pack_fce_packet(struct fce_packet *packet, unsigned char *buf) 202{ 203 unsigned char *p = buf; 204 205 memcpy(p, &packet->magic[0], sizeof(packet->magic)); 206 p += sizeof(packet->magic); 207 208 *p = packet->version; 209 p++; 210 211 *p = packet->mode; 212 p++; 213 214 uint32_t id = htonl(packet->event_id); 215 memcpy(p, &id, sizeof(id)); 216 p += sizeof(packet->event_id); 217 218 uint16_t l = htons(packet->datalen); 219 memcpy(p, &l, sizeof(l)); 220 p += sizeof(l); 221 222 memcpy(p, &packet->data[0], packet->datalen); 223 p += packet->datalen; 224 225 return 0; 226} 227 228/* 229 * Send the fce information to all (connected) listeners 230 * We dont give return code because all errors are handled internally (I hope..) 231 * */ 232static void send_fce_event( char *path, int mode ) 233{ 234 struct fce_packet packet; 235 void *data = &packet; 236 static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */ 237 int i; 238 239 LOG(log_debug, logtype_afpd, "send_fce_event: start"); 240 241 time_t now = time(NULL); 242 243 /* build our data packet */ 244 ssize_t data_len = build_fce_packet( &packet, path, mode, ++event_id ); 245 pack_fce_packet(&packet, iobuf); 246 247 for (i = 0; i < udp_sockets; i++) 248 { 249 int sent_data = 0; 250 struct udp_entry *udp_entry = udp_socket_list + i; 251 252 /* we had a problem earlier ? */ 253 if (udp_entry->sock == -1) 254 { 255 /* We still have to wait ?*/ 256 if (now < udp_entry->next_try_on_error) 257 continue; 258 259 /* Reopen socket */ 260 udp_entry->sock = socket(udp_entry->addrinfo.ai_family, 261 udp_entry->addrinfo.ai_socktype, 262 udp_entry->addrinfo.ai_protocol); 263 264 if (udp_entry->sock == -1) { 265 /* failed again, so go to rest again */ 266 LOG(log_error, logtype_afpd, "Cannot recreate socket for fce UDP connection: errno %d", errno ); 267 268 udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S; 269 continue; 270 } 271 272 udp_entry->next_try_on_error = 0; 273 274 /* Okay, we have a running socket again, send server that we had a problem on our side*/ 275 data_len = build_fce_packet( &packet, "", FCE_CONN_BROKEN, 0 ); 276 pack_fce_packet(&packet, iobuf); 277 278 sendto(udp_entry->sock, 279 iobuf, 280 data_len, 281 0, 282 (struct sockaddr *)&udp_entry->sockaddr, 283 udp_entry->addrinfo.ai_addrlen); 284 285 /* Rebuild our original data packet */ 286 data_len = build_fce_packet( &packet, path, mode, event_id ); 287 pack_fce_packet(&packet, iobuf); 288 } 289 290 sent_data = sendto(udp_entry->sock, 291 iobuf, 292 data_len, 293 0, 294 (struct sockaddr *)&udp_entry->sockaddr, 295 udp_entry->addrinfo.ai_addrlen); 296 297 /* Problems ? */ 298 if (sent_data != data_len) { 299 /* Argh, socket broke, we close and retry later */ 300 LOG(log_error, logtype_afpd, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s", 301 udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno)); 302 303 close( udp_entry->sock ); 304 udp_entry->sock = -1; 305 udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S; 306 } 307 } 308} 309 310static int add_udp_socket(const char *target_ip, const char *target_port ) 311{ 312 if (target_port == NULL) 313 target_port = FCE_DEFAULT_PORT_STRING; 314 315 if (udp_sockets >= FCE_MAX_UDP_SOCKS) { 316 LOG(log_error, logtype_afpd, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS ); 317 return AFPERR_PARAM; 318 } 319 320 udp_socket_list[udp_sockets].addr = strdup(target_ip); 321 udp_socket_list[udp_sockets].port = strdup(target_port); 322 udp_socket_list[udp_sockets].sock = -1; 323 memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo)); 324 memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage)); 325 udp_socket_list[udp_sockets].next_try_on_error = 0; 326 327 udp_sockets++; 328 329 return AFP_OK; 330} 331 332/* 333 * 334 * Dispatcher for all incoming file change events 335 * 336 * */ 337static int register_fce(const char *u_name, int is_dir, int mode) 338{ 339 int i; 340 if (udp_sockets == 0) 341 /* No listeners configured */ 342 return AFP_OK; 343 344 if (u_name == NULL) 345 return AFPERR_PARAM; 346 347 static int first_event = FCE_TRUE; 348 349 /* do some initialization on the fly the first time */ 350 if (first_event) { 351 fce_initialize_history(); 352 } 353 354 /* handle files which should not cause events (.DS_Store atc. ) */ 355 for (i = 0; skip_files[i] != NULL; i++) 356 { 357 if (!strcmp( u_name, skip_files[i])) 358 return AFP_OK; 359 } 360 361 362 char full_path_buffer[MAXPATHLEN + 1] = {""}; 363 const char *cwd = getcwdpath(); 364 365 if (mode == FCE_TM_SIZE) { 366 strlcpy(full_path_buffer, u_name, MAXPATHLEN); 367 } else if (!is_dir || mode == FCE_DIR_DELETE) { 368 if (strlen( cwd ) + strlen( u_name) + 1 >= MAXPATHLEN) { 369 LOG(log_error, logtype_afpd, "FCE file name too long: %s/%s", cwd, u_name ); 370 return AFPERR_PARAM; 371 } 372 sprintf( full_path_buffer, "%s/%s", cwd, u_name ); 373 } else { 374 if (strlen( cwd ) >= MAXPATHLEN) { 375 LOG(log_error, logtype_afpd, "FCE directory name too long: %s", cwd); 376 return AFPERR_PARAM; 377 } 378 strcpy( full_path_buffer, cwd); 379 } 380 381 /* Can we ignore this event based on type or history? */ 382 if (!(mode & FCE_TM_SIZE) && fce_handle_coalescation( full_path_buffer, is_dir, mode )) 383 { 384 LOG(log_debug9, logtype_afpd, "Coalesced fc event <%d> for <%s>", mode, full_path_buffer ); 385 return AFP_OK; 386 } 387 388 LOG(log_debug9, logtype_afpd, "Detected fc event <%d> for <%s>", mode, full_path_buffer ); 389 390 391 /* we do initilization on the fly, no blocking calls in here 392 * (except when using FQDN in broken DNS environment) 393 */ 394 if (first_event == FCE_TRUE) 395 { 396 fce_init_udp(); 397 398 /* Notify listeners the we start from the beginning */ 399 send_fce_event( "", FCE_CONN_START ); 400 401 first_event = FCE_FALSE; 402 } 403 404 /* Handle UDP transport */ 405 send_fce_event( full_path_buffer, mode ); 406 407 return AFP_OK; 408} 409 410 411/******************** External calls start here **************************/ 412 413/* 414 * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c) 415 * */ 416#ifndef FCE_TEST_MAIN 417 418int fce_register_delete_file( struct path *path ) 419{ 420 int ret = AFP_OK; 421 422 if (path == NULL) 423 return AFPERR_PARAM; 424 425 if (!(fce_ev_enabled & (1 << FCE_FILE_DELETE))) 426 return ret; 427 428 ret = register_fce( path->u_name, FALSE, FCE_FILE_DELETE ); 429 430 return ret; 431} 432int fce_register_delete_dir( char *name ) 433{ 434 int ret = AFP_OK; 435 436 if (name == NULL) 437 return AFPERR_PARAM; 438 439 if (!(fce_ev_enabled & (1 << FCE_DIR_DELETE))) 440 return ret; 441 442 ret = register_fce( name, TRUE, FCE_DIR_DELETE); 443 444 return ret; 445} 446 447int fce_register_new_dir( struct path *path ) 448{ 449 int ret = AFP_OK; 450 451 if (path == NULL) 452 return AFPERR_PARAM; 453 454 if (!(fce_ev_enabled & (1 << FCE_DIR_CREATE))) 455 return ret; 456 457 ret = register_fce( path->u_name, TRUE, FCE_DIR_CREATE ); 458 459 return ret; 460} 461 462 463int fce_register_new_file( struct path *path ) 464{ 465 int ret = AFP_OK; 466 467 if (path == NULL) 468 return AFPERR_PARAM; 469 470 if (!(fce_ev_enabled & (1 << FCE_FILE_CREATE))) 471 return ret; 472 473 ret = register_fce( path->u_name, FALSE, FCE_FILE_CREATE ); 474 475 return ret; 476} 477 478int fce_register_file_modification( struct ofork *ofork ) 479{ 480 char *u_name = NULL; 481 struct vol *vol; 482 int ret = AFP_OK; 483 484 if (ofork == NULL || ofork->of_vol == NULL) 485 return AFPERR_PARAM; 486 487 if (!(fce_ev_enabled & (1 << FCE_FILE_MODIFY))) 488 return ret; 489 490 vol = ofork->of_vol; 491 492 if (NULL == (u_name = mtoupath(vol, of_name(ofork), ofork->of_did, utf8_encoding()))) 493 { 494 return AFPERR_MISC; 495 } 496 497 ret = register_fce( u_name, FALSE, FCE_FILE_MODIFY ); 498 499 return ret; 500} 501 502int fce_register_tm_size(const char *vol, size_t used) 503{ 504 int ret = AFP_OK; 505 506 if (vol == NULL) 507 return AFPERR_PARAM; 508 509 if (!(fce_ev_enabled & (1 << FCE_TM_SIZE))) 510 return ret; 511 512 tm_used = used; /* oh what a hack */ 513 ret = register_fce(vol, FALSE, FCE_TM_SIZE); 514 515 return ret; 516} 517#endif 518 519/* 520 * 521 * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times) 522 * 523 * */ 524int fce_add_udp_socket(const char *target) 525{ 526 const char *port = FCE_DEFAULT_PORT_STRING; 527 char target_ip[256] = {""}; 528 529 strncpy(target_ip, target, sizeof(target_ip) -1); 530 531 char *port_delim = strchr( target_ip, ':' ); 532 if (port_delim) { 533 *port_delim = 0; 534 port = port_delim + 1; 535 } 536 return add_udp_socket(target_ip, port); 537} 538 539int fce_set_events(const char *events) 540{ 541 char *e; 542 char *p; 543 544 if (events == NULL) 545 return AFPERR_PARAM; 546 547 e = strdup(events); 548 549 fce_ev_enabled = 0; 550 551 for (p = strtok(e, ","); p; p = strtok(NULL, ",")) { 552 if (strcmp(p, "fmod") == 0) { 553 fce_ev_enabled |= (1 << FCE_FILE_MODIFY); 554 } else if (strcmp(p, "fdel") == 0) { 555 fce_ev_enabled |= (1 << FCE_FILE_DELETE); 556 } else if (strcmp(p, "ddel") == 0) { 557 fce_ev_enabled |= (1 << FCE_DIR_DELETE); 558 } else if (strcmp(p, "fcre") == 0) { 559 fce_ev_enabled |= (1 << FCE_FILE_CREATE); 560 } else if (strcmp(p, "dcre") == 0) { 561 fce_ev_enabled |= (1 << FCE_DIR_CREATE); 562 } else if (strcmp(p, "tmsz") == 0) { 563 fce_ev_enabled |= (1 << FCE_TM_SIZE); 564 } 565 } 566 567 free(e); 568} 569 570#ifdef FCE_TEST_MAIN 571 572 573void shortsleep( unsigned int us ) 574{ 575 usleep( us ); 576} 577int main( int argc, char*argv[] ) 578{ 579 int c,ret; 580 581 char *port = FCE_DEFAULT_PORT_STRING; 582 char *host = "localhost"; 583 int delay_between_events = 1000; 584 int event_code = FCE_FILE_MODIFY; 585 char pathbuff[1024]; 586 int duration_in_seconds = 0; // TILL ETERNITY 587 char target[256]; 588 char *path = getcwd( pathbuff, sizeof(pathbuff) ); 589 590 // FULLSPEED TEST IS "-s 1001" -> delay is 0 -> send packets without pause 591 592 while ((c = getopt(argc, argv, "d:e:h:p:P:s:")) != -1) { 593 switch(c) { 594 case '?': 595 fprintf(stdout, "%s: [ -p Port -h Listener1 [ -h Listener2 ...] -P path -s Delay_between_events_in_us -e event_code -d Duration ]\n", argv[0]); 596 exit(1); 597 break; 598 case 'd': 599 duration_in_seconds = atoi(optarg); 600 break; 601 case 'e': 602 event_code = atoi(optarg); 603 break; 604 case 'h': 605 host = strdup(optarg); 606 break; 607 case 'p': 608 port = strdup(optarg); 609 break; 610 case 'P': 611 path = strdup(optarg); 612 break; 613 case 's': 614 delay_between_events = atoi(optarg); 615 break; 616 } 617 } 618 619 sprintf(target, "%s:%s", host, port); 620 if (fce_add_udp_socket(target) != 0) 621 return 1; 622 623 int ev_cnt = 0; 624 time_t start_time = time(NULL); 625 time_t end_time = 0; 626 627 if (duration_in_seconds) 628 end_time = start_time + duration_in_seconds; 629 630 while (1) 631 { 632 time_t now = time(NULL); 633 if (now > start_time) 634 { 635 start_time = now; 636 fprintf( stdout, "%d events/s\n", ev_cnt ); 637 ev_cnt = 0; 638 } 639 if (end_time && now >= end_time) 640 break; 641 642 register_fce( path, 0, event_code ); 643 ev_cnt++; 644 645 646 shortsleep( delay_between_events ); 647 } 648} 649#endif /* TESTMAIN*/ 650