1This patch changes the receiving side to have the receiving code use a thread 2instead of a forked process. This extra thread does read from the socket, but 3it sends any stdout/stderr messages to the generator (main thread) to output. 4 5** This is very new code. ** Yes, it passes the "make test" testsuite, but 6there may still be some problems, especially in some of the untested features. 7(For one thing, I haven't yet added code to properly handle any keep-alive 8messages that arrive on the receiving side during the --delete-after phase!) 9 10This code just uses pthread.h directly, so configure changes will probably be 11needed to make this compatible with more systems. I have also tested that 12this code works fine using the GNU pth library without any code changes if 13you configured it with --enable-syscall-soft --enable-pthread (you may need 14to twiddle the Makefile options if you didn't install the library, though). 15 16NOTE: we still need to duplicate the partial_fname static in util.c! 17 18If you try this out, please send some email to wayned@samba.org or the rsync 19mailing list with your results, build changes, bug reports, etc. Thanks! 20 21To use this patch, run these commands for a successful build: 22 23 patch -p1 <patches/threaded-receiver.diff 24 ./configure 25 make 26 27--- old/Makefile.in 28+++ new/Makefile.in 29@@ -7,7 +7,7 @@ exec_prefix=@exec_prefix@ 30 bindir=@bindir@ 31 mandir=@mandir@ 32 33-LIBS=@LIBS@ 34+LIBS=@LIBS@ -lpthread 35 CC=@CC@ 36 CFLAGS=@CFLAGS@ 37 CPPFLAGS=@CPPFLAGS@ 38--- old/cleanup.c 39+++ new/cleanup.c 40@@ -31,10 +31,6 @@ extern int log_got_error; 41 extern char *partial_dir; 42 extern char *logfile_name; 43 44-#ifdef HAVE_SIGACTION 45-static struct sigaction sigact; 46-#endif 47- 48 /** 49 * Close all open sockets and files, allowing a (somewhat) graceful 50 * shutdown() of socket connections. This eliminates the abortive 51@@ -98,9 +94,6 @@ NORETURN void _exit_cleanup(int code, co 52 static int exit_code = 0; 53 static int unmodified_code = 0; 54 55- SIGACTION(SIGUSR1, SIG_IGN); 56- SIGACTION(SIGUSR2, SIG_IGN); 57- 58 if (exit_code) /* Preserve first error code when recursing. */ 59 code = exit_code; 60 61@@ -157,8 +150,6 @@ NORETURN void _exit_cleanup(int code, co 62 63 if (cleanup_fname) 64 do_unlink(cleanup_fname); 65- if (code) 66- kill_all(SIGUSR1); 67 if (cleanup_pid && cleanup_pid == getpid()) { 68 char *pidf = lp_pid_file(); 69 if (pidf && *pidf) 70--- old/errcode.h 71+++ new/errcode.h 72@@ -37,7 +37,6 @@ 73 #define RERR_CRASHED 15 /* sibling crashed */ 74 #define RERR_TERMINATED 16 /* sibling terminated abnormally */ 75 76-#define RERR_SIGNAL1 19 /* status returned when sent SIGUSR1 */ 77 #define RERR_SIGNAL 20 /* status returned when sent SIGINT, SIGTERM, SIGHUP */ 78 #define RERR_WAITCHILD 21 /* some error returned by waitpid() */ 79 #define RERR_MALLOC 22 /* error allocating core memory buffers */ 80--- old/generator.c 81+++ new/generator.c 82@@ -66,7 +66,6 @@ extern OFF_T min_size; 83 extern int io_error; 84 extern int allowed_lull; 85 extern int sock_f_out; 86-extern int ignore_timeout; 87 extern int protocol_version; 88 extern int fuzzy_basis; 89 extern int always_checksum; 90@@ -95,6 +94,11 @@ extern struct filter_list_struct server_ 91 92 static int deletion_count = 0; /* used to implement --max-delete */ 93 94+/* These vars are local copies so that the receiver can use the originals. */ 95+static int GEN_append_mode; 96+static int GEN_make_backups; 97+static int GEN_csum_length; 98+ 99 /* For calling delete_file() */ 100 #define DEL_FORCE_RECURSE (1<<1) /* recurse even w/o --force */ 101 #define DEL_TERSE (1<<3) 102@@ -445,8 +449,8 @@ static void sum_sizes_sqroot(struct sum_ 103 } 104 105 if (protocol_version < 27) { 106- s2length = csum_length; 107- } else if (csum_length == SUM_LENGTH) { 108+ s2length = GEN_csum_length; 109+ } else if (GEN_csum_length == SUM_LENGTH) { 110 s2length = SUM_LENGTH; 111 } else { 112 int32 c; 113@@ -456,7 +460,7 @@ static void sum_sizes_sqroot(struct sum_ 114 for (c = blength; (c >>= 1) && b; b--) {} 115 /* add a bit, subtract rollsum, round up. */ 116 s2length = (b + 1 - 32 + 7) / 8; /* --optimize in compiler-- */ 117- s2length = MAX(s2length, csum_length); 118+ s2length = MAX(s2length, GEN_csum_length); 119 s2length = MIN(s2length, SUM_LENGTH); 120 } 121 122@@ -490,7 +494,7 @@ static void generate_and_send_sums(int f 123 sum_sizes_sqroot(&sum, len); 124 write_sum_head(f_out, &sum); 125 126- if (append_mode > 0 && f_copy < 0) 127+ if (GEN_append_mode > 0 && f_copy < 0) 128 return; 129 130 if (len > 0) 131@@ -509,7 +513,7 @@ static void generate_and_send_sums(int f 132 133 if (f_copy >= 0) { 134 full_write(f_copy, map, n1); 135- if (append_mode > 0) 136+ if (GEN_append_mode > 0) 137 continue; 138 } 139 140@@ -1208,7 +1212,7 @@ static void recv_generator(char *fname, 141 return; 142 } 143 144- if (append_mode && st.st_size > file->length) 145+ if (GEN_append_mode && st.st_size > file->length) 146 return; 147 148 if (fnamecmp_type <= FNAMECMP_BASIS_DIR_HIGH) 149@@ -1271,7 +1275,7 @@ static void recv_generator(char *fname, 150 goto notify_others; 151 } 152 153- if (inplace && make_backups && fnamecmp_type == FNAMECMP_FNAME) { 154+ if (inplace && GEN_make_backups && fnamecmp_type == FNAMECMP_FNAME) { 155 if (!(backupptr = get_backup_name(fname))) { 156 close(fd); 157 return; 158@@ -1362,9 +1366,12 @@ void generate_files(int f_out, struct fi 159 int save_ignore_existing = ignore_existing; 160 int save_ignore_non_existing = ignore_non_existing; 161 int save_do_progress = do_progress; 162- int save_make_backups = make_backups; 163+ int save_make_backups = GEN_make_backups = make_backups; 164 int dir_tweaking = !(list_only || local_name || dry_run); 165 166+ GEN_append_mode = append_mode; 167+ GEN_csum_length = csum_length; 168+ 169 if (protocol_version >= 29) { 170 itemizing = 1; 171 maybe_ATTRS_REPORT = stdout_format_has_i ? 0 : ATTRS_REPORT; 172@@ -1392,7 +1399,7 @@ void generate_files(int f_out, struct fi 173 do_delete_pass(flist); 174 do_progress = 0; 175 176- if (append_mode || whole_file < 0) 177+ if (GEN_append_mode || whole_file < 0) 178 whole_file = 0; 179 if (verbose >= 2) { 180 rprintf(FINFO, "delta-transmission %s\n", 181@@ -1401,12 +1408,6 @@ void generate_files(int f_out, struct fi 182 : "enabled"); 183 } 184 185- /* Since we often fill up the outgoing socket and then just sit around 186- * waiting for the other 2 processes to do their thing, we don't want 187- * to exit on a timeout. If the data stops flowing, the receiver will 188- * notice that and let us know via the redo pipe (or its closing). */ 189- ignore_timeout = 1; 190- 191 for (i = 0; i < flist->count; i++) { 192 struct file_struct *file = flist->files[i]; 193 194@@ -1450,23 +1451,34 @@ void generate_files(int f_out, struct fi 195 delete_in_dir(NULL, NULL, NULL, NULL); 196 197 phase++; 198- csum_length = SUM_LENGTH; 199+ GEN_csum_length = SUM_LENGTH; /* csum_length is set by the receiver */ 200 max_size = min_size = ignore_existing = ignore_non_existing = 0; 201 update_only = always_checksum = size_only = 0; 202 ignore_times = 1; 203- if (append_mode) /* resend w/o append mode */ 204- append_mode = -1; /* ... but only longer files */ 205- make_backups = 0; /* avoid a duplicate backup for inplace processing */ 206+ if (GEN_append_mode) /* resend w/o append mode */ 207+ GEN_append_mode = -1; /* ... but only longer files */ 208+ GEN_make_backups = 0; /* avoid a duplicate backup for inplace processing */ 209 210 if (verbose > 2) 211 rprintf(FINFO,"generate_files phase=%d\n",phase); 212 213 write_int(f_out, -1); 214+ io_flush(NORMAL_FLUSH); 215 216 /* files can cycle through the system more than once 217 * to catch initial checksum errors */ 218- while ((i = get_redo_num(itemizing, code)) != -1) { 219- struct file_struct *file = flist->files[i]; 220+ while (1) { 221+ struct file_struct *file; 222+ if (preserve_hard_links) 223+ check_for_finished_hlinks(itemizing, code); 224+ if ((i = get_redo_num()) < 0) { 225+ if (i == -2) 226+ break; 227+ io_flush(NORMAL_FLUSH); 228+ msleep(20); 229+ continue; 230+ } 231+ file = flist->files[i]; 232 if (local_name) 233 strlcpy(fbuf, local_name, sizeof fbuf); 234 else 235@@ -1478,27 +1490,43 @@ void generate_files(int f_out, struct fi 236 phase++; 237 ignore_non_existing = save_ignore_non_existing; 238 ignore_existing = save_ignore_existing; 239- make_backups = save_make_backups; 240+ GEN_make_backups = save_make_backups; 241 242 if (verbose > 2) 243 rprintf(FINFO,"generate_files phase=%d\n",phase); 244 245 write_int(f_out, -1); 246+ io_flush(NORMAL_FLUSH); 247+ 248 /* Reduce round-trip lag-time for a useless delay-updates phase. */ 249- if (protocol_version >= 29 && !delay_updates) 250+ if (protocol_version >= 29 && !delay_updates) { 251 write_int(f_out, -1); 252+ io_flush(NORMAL_FLUSH); 253+ } 254 255- /* Read MSG_DONE for the redo phase (and any prior messages). */ 256- get_redo_num(itemizing, code); 257+ /* Read end marker for the redo phase (and any prior messages). */ 258+ while (1) { 259+ if (preserve_hard_links) 260+ check_for_finished_hlinks(itemizing, code); 261+ if (get_redo_num() == -2) 262+ break; 263+ io_flush(NORMAL_FLUSH); 264+ msleep(20); 265+ } 266 267 if (protocol_version >= 29) { 268 phase++; 269 if (verbose > 2) 270 rprintf(FINFO, "generate_files phase=%d\n", phase); 271- if (delay_updates) 272+ if (delay_updates) { 273 write_int(f_out, -1); 274- /* Read MSG_DONE for delay-updates phase & prior messages. */ 275- get_redo_num(itemizing, code); 276+ io_flush(NORMAL_FLUSH); 277+ } 278+ /* Read end marker for delay-updates phase & prior messages. */ 279+ while (get_redo_num() != -2) { 280+ io_flush(NORMAL_FLUSH); 281+ msleep(20); 282+ } 283 } 284 285 do_progress = save_do_progress; 286--- old/io.c 287+++ new/io.c 288@@ -40,20 +40,17 @@ extern int allowed_lull; 289 extern int am_server; 290 extern int am_daemon; 291 extern int am_sender; 292-extern int am_generator; 293 extern int eol_nulls; 294 extern int read_batch; 295 extern int csum_length; 296 extern int checksum_seed; 297 extern int protocol_version; 298-extern int remove_source_files; 299 extern int preserve_hard_links; 300 extern char *filesfrom_host; 301 extern struct stats stats; 302 extern struct file_list *the_file_list; 303 304 const char phase_unknown[] = "unknown"; 305-int ignore_timeout = 0; 306 int batch_fd = -1; 307 int batch_gen_fd = -1; 308 309@@ -61,7 +58,6 @@ int batch_gen_fd = -1; 310 int kluge_around_eof = 0; 311 312 int msg_fd_in = -1; 313-int msg_fd_out = -1; 314 int sock_f_in = -1; 315 int sock_f_out = -1; 316 317@@ -88,27 +84,31 @@ static OFF_T active_bytecnt = 0; 318 static void read_loop(int fd, char *buf, size_t len); 319 320 struct flist_ndx_item { 321- struct flist_ndx_item *next; 322+ volatile struct flist_ndx_item *next; 323 int ndx; 324 }; 325 326 struct flist_ndx_list { 327- struct flist_ndx_item *head, *tail; 328+ volatile struct flist_ndx_item *head, *tail; 329+ pthread_mutex_t mutex; 330 }; 331 332-static struct flist_ndx_list redo_list, hlink_list; 333+static struct flist_ndx_list redo_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; 334+static struct flist_ndx_list hlink_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; 335 336 struct msg_list_item { 337- struct msg_list_item *next; 338+ volatile struct msg_list_item *next; 339 int len; 340+ enum msgcode code; 341 char buf[1]; 342 }; 343 344 struct msg_list { 345- struct msg_list_item *head, *tail; 346+ volatile struct msg_list_item *head, *tail; 347+ pthread_mutex_t mutex; 348 }; 349 350-static struct msg_list msg2genr, msg2sndr; 351+static struct msg_list msg_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; 352 353 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) 354 { 355@@ -118,27 +118,31 @@ static void flist_ndx_push(struct flist_ 356 out_of_memory("flist_ndx_push"); 357 item->next = NULL; 358 item->ndx = ndx; 359+ pthread_mutex_lock(&redo_list.mutex); 360 if (lp->tail) 361 lp->tail->next = item; 362 else 363 lp->head = item; 364 lp->tail = item; 365+ pthread_mutex_unlock(&redo_list.mutex); 366 } 367 368 static int flist_ndx_pop(struct flist_ndx_list *lp) 369 { 370- struct flist_ndx_item *next; 371+ struct flist_ndx_item *head, *next; 372 int ndx; 373 374 if (!lp->head) 375 return -1; 376 377- ndx = lp->head->ndx; 378- next = lp->head->next; 379- free(lp->head); 380- lp->head = next; 381- if (!next) 382+ pthread_mutex_lock(&hlink_list.mutex); 383+ head = (struct flist_ndx_item *)lp->head; 384+ next = (struct flist_ndx_item *)head->next; 385+ ndx = head->ndx; 386+ if (!(lp->head = next)) 387 lp->tail = NULL; 388+ pthread_mutex_unlock(&hlink_list.mutex); 389+ free(head); 390 391 return ndx; 392 } 393@@ -147,7 +151,7 @@ static void check_timeout(void) 394 { 395 time_t t; 396 397- if (!io_timeout || ignore_timeout) 398+ if (!io_timeout) 399 return; 400 401 if (!last_io_in) { 402@@ -188,44 +192,38 @@ void set_io_timeout(int secs) 403 404 /* Setup the fd used to receive MSG_* messages. Only needed during the 405 * early stages of being a local sender (up through the sending of the 406- * file list) or when we're the generator (to fetch the messages from 407- * the receiver). */ 408+ * file list). */ 409 void set_msg_fd_in(int fd) 410 { 411 msg_fd_in = fd; 412 } 413 414-/* Setup the fd used to send our MSG_* messages. Only needed when 415- * we're the receiver (to send our messages to the generator). */ 416-void set_msg_fd_out(int fd) 417-{ 418- msg_fd_out = fd; 419- set_nonblocking(msg_fd_out); 420-} 421- 422 /* Add a message to the pending MSG_* list. */ 423-static void msg_list_add(struct msg_list *lst, int code, char *buf, int len) 424+static void msg_list_add(int code, char *buf, int len) 425 { 426 struct msg_list_item *m; 427- int sz = len + 4 + sizeof m[0] - 1; 428+ int sz = len + sizeof m[0] - 1; 429 430+ assert(am_receiver()); 431 if (!(m = (struct msg_list_item *)new_array(char, sz))) 432 out_of_memory("msg_list_add"); 433 m->next = NULL; 434- m->len = len + 4; 435- SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); 436- memcpy(m->buf + 4, buf, len); 437- if (lst->tail) 438- lst->tail->next = m; 439+ m->len = len; 440+ m->code = code; 441+ memcpy(m->buf, buf, len); 442+ 443+ pthread_mutex_lock(&msg_list.mutex); 444+ if (msg_list.tail) 445+ msg_list.tail->next = m; 446 else 447- lst->head = m; 448- lst->tail = m; 449+ msg_list.head = m; 450+ msg_list.tail = m; 451+ pthread_mutex_unlock(&msg_list.mutex); 452 } 453 454-/* Read a message from the MSG_* fd and handle it. This is called either 455+/* Read a message from the MSG_* fd and handle it. This is only called 456 * during the early stages of being a local sender (up through the sending 457- * of the file list) or when we're the generator (to fetch the messages 458- * from the receiver). */ 459+ * of the file list). */ 460 static void read_msg_fd(void) 461 { 462 char buf[2048]; 463@@ -244,51 +242,6 @@ static void read_msg_fd(void) 464 tag = (tag >> 24) - MPLEX_BASE; 465 466 switch (tag) { 467- case MSG_DONE: 468- if (len != 0 || !am_generator) { 469- rprintf(FERROR, "invalid message %d:%d\n", tag, len); 470- exit_cleanup(RERR_STREAMIO); 471- } 472- flist_ndx_push(&redo_list, -1); 473- break; 474- case MSG_REDO: 475- if (len != 4 || !am_generator) { 476- rprintf(FERROR, "invalid message %d:%d\n", tag, len); 477- exit_cleanup(RERR_STREAMIO); 478- } 479- read_loop(fd, buf, 4); 480- if (remove_source_files) 481- decrement_active_files(IVAL(buf,0)); 482- flist_ndx_push(&redo_list, IVAL(buf,0)); 483- break; 484- case MSG_DELETED: 485- if (len >= (int)sizeof buf || !am_generator) { 486- rprintf(FERROR, "invalid message %d:%d\n", tag, len); 487- exit_cleanup(RERR_STREAMIO); 488- } 489- read_loop(fd, buf, len); 490- send_msg(MSG_DELETED, buf, len); 491- break; 492- case MSG_SUCCESS: 493- if (len != 4 || !am_generator) { 494- rprintf(FERROR, "invalid message %d:%d\n", tag, len); 495- exit_cleanup(RERR_STREAMIO); 496- } 497- read_loop(fd, buf, len); 498- if (remove_source_files) { 499- decrement_active_files(IVAL(buf,0)); 500- send_msg(MSG_SUCCESS, buf, len); 501- } 502- if (preserve_hard_links) 503- flist_ndx_push(&hlink_list, IVAL(buf,0)); 504- break; 505- case MSG_SOCKERR: 506- if (!am_generator) { 507- rprintf(FERROR, "invalid message %d:%d\n", tag, len); 508- exit_cleanup(RERR_STREAMIO); 509- } 510- close_multiplexing_out(); 511- /* FALL THROUGH */ 512 case MSG_INFO: 513 case MSG_ERROR: 514 case MSG_LOG: 515@@ -332,75 +285,80 @@ void decrement_active_files(int ndx) 516 active_bytecnt -= the_file_list->files[ndx]->length; 517 } 518 519-/* Try to push messages off the list onto the wire. If we leave with more 520+/* Try to pop messages off the list onto the wire. If we leave with more 521 * to do, return 0. On error, return -1. If everything flushed, return 1. 522- * This is only active in the receiver. */ 523-static int msg2genr_flush(int flush_it_all) 524+ * This is only called by the generator. */ 525+static void msg_list_flush(void) 526 { 527- static int written = 0; 528- struct timeval tv; 529- fd_set fds; 530+ assert(am_generator()); 531 532- if (msg_fd_out < 0) 533- return -1; 534+ if (defer_forwarding_messages) 535+ return; 536 537- while (msg2genr.head) { 538- struct msg_list_item *m = msg2genr.head; 539- int n = write(msg_fd_out, m->buf + written, m->len - written); 540- if (n < 0) { 541- if (errno == EINTR) 542- continue; 543- if (errno != EWOULDBLOCK && errno != EAGAIN) 544- return -1; 545- if (!flush_it_all) 546- return 0; 547- FD_ZERO(&fds); 548- FD_SET(msg_fd_out, &fds); 549- tv.tv_sec = select_timeout; 550- tv.tv_usec = 0; 551- if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv)) 552- check_timeout(); 553- } else if ((written += n) == m->len) { 554- msg2genr.head = m->next; 555- if (!msg2genr.head) 556- msg2genr.tail = NULL; 557- free(m); 558- written = 0; 559+ no_flush++; 560+ defer_forwarding_messages = 1; 561+ while (msg_list.head) { 562+ struct msg_list_item *m = (struct msg_list_item *)msg_list.head; 563+ pthread_mutex_lock(&msg_list.mutex); 564+ if (!(msg_list.head = m->next)) 565+ msg_list.tail = NULL; 566+ pthread_mutex_unlock(&msg_list.mutex); 567+ switch (m->code) { 568+ case MSG_SOCKERR: 569+ close_multiplexing_out(); 570+ /* FALL THROUGH */ 571+ case MSG_INFO: 572+ case MSG_ERROR: 573+ case MSG_LOG: 574+ rwrite(m->code, m->buf, m->len); 575+ break; 576+ default: 577+ io_multiplex_write(m->code, m->buf, m->len); 578+ break; 579 } 580+ free(m); 581 } 582- return 1; 583+ defer_forwarding_messages = 0; 584+ no_flush--; 585 } 586 587 int send_msg(enum msgcode code, char *buf, int len) 588 { 589- if (msg_fd_out < 0) { 590+ if (!am_receiver()) { 591 if (!defer_forwarding_messages) 592 return io_multiplex_write(code, buf, len); 593 if (!io_multiplexing_out) 594 return 0; 595- msg_list_add(&msg2sndr, code, buf, len); 596- return 1; 597 } 598- msg_list_add(&msg2genr, code, buf, len); 599- msg2genr_flush(NORMAL_FLUSH); 600+ msg_list_add(code, buf, len); 601 return 1; 602 } 603 604-int get_redo_num(int itemizing, enum logcode code) 605+/* This is only used by the receiver. */ 606+void push_redo_num(int ndx) 607 { 608- while (1) { 609- if (hlink_list.head) 610- check_for_finished_hlinks(itemizing, code); 611- if (redo_list.head) 612- break; 613- read_msg_fd(); 614- } 615+ assert(am_receiver()); 616+ flist_ndx_push(&redo_list, ndx); 617+} 618 619+/* This is only used by the generator. */ 620+int get_redo_num(void) 621+{ 622+ assert(am_generator()); 623 return flist_ndx_pop(&redo_list); 624 } 625 626+/* This is only used by the receiver. */ 627+void push_hlink_num(int ndx) 628+{ 629+ assert(am_receiver()); 630+ flist_ndx_push(&hlink_list, ndx); 631+} 632+ 633+/* This is only used by the generator. */ 634 int get_hlink_num(void) 635 { 636+ assert(am_generator()); 637 return flist_ndx_pop(&hlink_list); 638 } 639 640@@ -480,11 +438,6 @@ static int read_timeout(int fd, char *bu 641 FD_ZERO(&r_fds); 642 FD_ZERO(&w_fds); 643 FD_SET(fd, &r_fds); 644- if (msg2genr.head) { 645- FD_SET(msg_fd_out, &w_fds); 646- if (msg_fd_out > maxfd) 647- maxfd = msg_fd_out; 648- } 649 if (io_filesfrom_f_out >= 0) { 650 int new_fd; 651 if (io_filesfrom_buflen == 0) { 652@@ -517,9 +470,6 @@ static int read_timeout(int fd, char *bu 653 continue; 654 } 655 656- if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds)) 657- msg2genr_flush(NORMAL_FLUSH); 658- 659 if (io_filesfrom_f_out >= 0) { 660 if (io_filesfrom_buflen) { 661 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) { 662@@ -847,6 +797,8 @@ static void readfd(int fd, char *buffer, 663 } 664 665 if (fd == write_batch_monitor_in) { 666+ if (am_generator()) 667+ rprintf(FINFO, "writing %d bytes to batch file from generator\n", total); 668 if ((size_t)write(batch_fd, buffer, total) != total) 669 exit_cleanup(RERR_FILEIO); 670 } 671@@ -1115,7 +1067,6 @@ static void writefd_unbuffered(int fd,ch 672 * to grab any messages they sent before they died. */ 673 while (fd == sock_f_out && io_multiplexing_in) { 674 set_io_timeout(30); 675- ignore_timeout = 0; 676 readfd_unbuffered(sock_f_in, io_filesfrom_buf, 677 sizeof io_filesfrom_buf); 678 } 679@@ -1126,7 +1077,7 @@ static void writefd_unbuffered(int fd,ch 680 defer_forwarding_messages = 1; 681 682 if (fd == sock_f_out) { 683- if (io_timeout || am_generator) 684+ if (io_timeout || am_generator()) 685 last_io_out = time(NULL); 686 sleep_for_bwlimit(cnt); 687 } 688@@ -1136,23 +1087,6 @@ static void writefd_unbuffered(int fd,ch 689 no_flush--; 690 } 691 692-static void msg2sndr_flush(void) 693-{ 694- if (defer_forwarding_messages) 695- return; 696- 697- while (msg2sndr.head && io_multiplexing_out) { 698- struct msg_list_item *m = msg2sndr.head; 699- if (!(msg2sndr.head = m->next)) 700- msg2sndr.tail = NULL; 701- stats.total_written += m->len; 702- defer_forwarding_messages = 1; 703- writefd_unbuffered(sock_f_out, m->buf, m->len); 704- defer_forwarding_messages = 0; 705- free(m); 706- } 707-} 708- 709 /** 710 * Write an message to a multiplexed stream. If this fails then rsync 711 * exits. 712@@ -1178,14 +1112,15 @@ static void mplex_write(enum msgcode cod 713 defer_forwarding_messages = 1; 714 writefd_unbuffered(sock_f_out, buf, len); 715 defer_forwarding_messages = 0; 716- msg2sndr_flush(); 717+ if (am_generator()) 718+ msg_list_flush(); 719 } 720 } 721 722-void io_flush(int flush_it_all) 723+void io_flush(UNUSED(int flush_it_all)) 724 { 725- msg2genr_flush(flush_it_all); 726- msg2sndr_flush(); 727+ if (am_generator()) 728+ msg_list_flush(); 729 730 if (!iobuf_out_cnt || no_flush) 731 return; 732@@ -1199,11 +1134,6 @@ void io_flush(int flush_it_all) 733 734 static void writefd(int fd,char *buf,size_t len) 735 { 736- if (fd == msg_fd_out) { 737- rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); 738- exit_cleanup(RERR_PROTOCOL); 739- } 740- 741 if (fd == sock_f_out) 742 stats.total_written += len; 743 744@@ -1409,9 +1339,3 @@ void start_write_batch(int fd) 745 else 746 write_batch_monitor_in = fd; 747 } 748- 749-void stop_write_batch(void) 750-{ 751- write_batch_monitor_out = -1; 752- write_batch_monitor_in = -1; 753-} 754--- old/log.c 755+++ new/log.c 756@@ -33,7 +33,6 @@ extern int am_sender; 757 extern int local_server; 758 extern int quiet; 759 extern int module_id; 760-extern int msg_fd_out; 761 extern int allow_8bit_chars; 762 extern int protocol_version; 763 extern int preserve_times; 764@@ -75,7 +74,6 @@ struct { 765 { RERR_IPC , "error in IPC code" }, 766 { RERR_CRASHED , "sibling process crashed" }, 767 { RERR_TERMINATED , "sibling process terminated abnormally" }, 768- { RERR_SIGNAL1 , "received SIGUSR1" }, 769 { RERR_SIGNAL , "received SIGINT, SIGTERM, or SIGHUP" }, 770 { RERR_WAITCHILD , "waitpid() failed" }, 771 { RERR_MALLOC , "error allocating core memory buffers" }, 772@@ -241,8 +239,8 @@ void rwrite(enum logcode code, char *buf 773 if (len < 0) 774 exit_cleanup(RERR_MESSAGEIO); 775 776- if (am_server && msg_fd_out >= 0) { 777- /* Pass the message to our sibling. */ 778+ if (am_receiver()) { 779+ /* Pass the message to the generator thread. */ 780 send_msg((enum msgcode)code, buf, len); 781 return; 782 } 783--- old/main.c 784+++ new/main.c 785@@ -32,7 +32,6 @@ extern int list_only; 786 extern int am_root; 787 extern int am_server; 788 extern int am_sender; 789-extern int am_generator; 790 extern int am_daemon; 791 extern int blocking_io; 792 extern int remove_source_files; 793@@ -96,9 +95,20 @@ struct pid_status { 794 795 static time_t starttime, endtime; 796 static int64 total_read, total_written; 797+static pthread_t receiver_tid; 798 799 static void show_malloc_stats(void); 800 801+int am_generator() 802+{ 803+ return receiver_tid != 0 && pthread_self() != receiver_tid; 804+} 805+ 806+int am_receiver() 807+{ 808+ return receiver_tid != 0 && pthread_self() == receiver_tid; 809+} 810+ 811 /* Works like waitpid(), but if we already harvested the child pid in our 812 * remember_children(), we succeed instead of returning an error. */ 813 pid_t wait_process(pid_t pid, int *status_ptr, int flags) 814@@ -175,7 +185,7 @@ static void handle_stats(int f) 815 show_flist_stats(); 816 } 817 818- if (am_generator) 819+ if (am_generator()) 820 return; 821 822 if (am_daemon) { 823@@ -683,12 +693,30 @@ static void do_server_sender(int f_in, i 824 exit_cleanup(0); 825 } 826 827+struct thread_args { 828+ struct file_list *flist; 829+ char *local_name; 830+ int f_in; 831+}; 832+ 833+static void *start_receiver_thread(void *arg) 834+{ 835+ static int exit_code; 836+ struct thread_args *ta = (struct thread_args *)arg; 837+ 838+ recv_files(ta->f_in, ta->flist, ta->local_name); 839+ handle_stats(ta->f_in); 840+ 841+ push_redo_num(-2); 842+ 843+ exit_code = log_got_error ? RERR_PARTIAL : 0; 844+ return &exit_code; 845+} 846 847 static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name) 848 { 849- int pid; 850- int exit_code = 0; 851- int error_pipe[2]; 852+ void *value_ptr; 853+ struct thread_args args; 854 855 /* The receiving side mustn't obey this, or an existing symlink that 856 * points to an identical file won't be replaced by the referent. */ 857@@ -697,70 +725,16 @@ static int do_recv(int f_in,int f_out,st 858 if (preserve_hard_links) 859 init_hard_links(); 860 861- if (fd_pair(error_pipe) < 0) { 862- rsyserr(FERROR, errno, "pipe failed in do_recv"); 863- exit_cleanup(RERR_IPC); 864- } 865- 866- io_flush(NORMAL_FLUSH); 867- 868- if ((pid = do_fork()) == -1) { 869- rsyserr(FERROR, errno, "fork failed in do_recv"); 870+ args.f_in = f_in; 871+ args.flist = flist; 872+ args.local_name = local_name; 873+ if (pthread_create(&receiver_tid, NULL, start_receiver_thread, &args) < 0) { 874+ rsyserr(FERROR, errno, "pthread_create failed in do_recv"); 875 exit_cleanup(RERR_IPC); 876 } 877 878- if (pid == 0) { 879- close(error_pipe[0]); 880- if (f_in != f_out) 881- close(f_out); 882- 883- /* we can't let two processes write to the socket at one time */ 884- close_multiplexing_out(); 885- 886- /* set place to send errors */ 887- set_msg_fd_out(error_pipe[1]); 888- 889- recv_files(f_in, flist, local_name); 890- io_flush(FULL_FLUSH); 891- handle_stats(f_in); 892- 893- send_msg(MSG_DONE, "", 0); 894- io_flush(FULL_FLUSH); 895- 896- /* Handle any keep-alive packets from the post-processing work 897- * that the generator does. */ 898- if (protocol_version >= 29) { 899- kluge_around_eof = -1; 900- 901- /* This should only get stopped via a USR2 signal. */ 902- while (read_int(f_in) == flist->count 903- && read_shortint(f_in) == ITEM_IS_NEW) {} 904- 905- rprintf(FERROR, "Invalid packet at end of run [%s]\n", 906- who_am_i()); 907- exit_cleanup(RERR_PROTOCOL); 908- } 909- 910- /* Finally, we go to sleep until our parent kills us with a 911- * USR2 signal. We sleep for a short time, as on some OSes 912- * a signal won't interrupt a sleep! */ 913- while (1) 914- msleep(20); 915- } 916- 917- am_generator = 1; 918- close_multiplexing_in(); 919- if (write_batch && !am_server) 920- stop_write_batch(); 921- 922- close(error_pipe[1]); 923- if (f_in != f_out) 924- close(f_in); 925- 926 io_start_buffering_out(); 927 928- set_msg_fd_in(error_pipe[0]); 929- 930 generate_files(f_out, flist, local_name); 931 932 handle_stats(-1); 933@@ -771,10 +745,13 @@ static int do_recv(int f_in,int f_out,st 934 } 935 io_flush(FULL_FLUSH); 936 937- set_msg_fd_in(-1); 938- kill(pid, SIGUSR2); 939- wait_process_with_flush(pid, &exit_code); 940- return exit_code; 941+ pthread_join(receiver_tid, &value_ptr); 942+ if (!am_server) 943+ output_summary(); 944+ 945+ close_all(); 946+ 947+ return *(int*)value_ptr; 948 } 949 950 951@@ -1176,22 +1153,6 @@ static int start_client(int argc, char * 952 return ret; 953 } 954 955- 956-static RETSIGTYPE sigusr1_handler(UNUSED(int val)) 957-{ 958- exit_cleanup(RERR_SIGNAL1); 959-} 960- 961-static RETSIGTYPE sigusr2_handler(UNUSED(int val)) 962-{ 963- if (!am_server) 964- output_summary(); 965- close_all(); 966- if (log_got_error) 967- _exit(RERR_PARTIAL); 968- _exit(0); 969-} 970- 971 RETSIGTYPE remember_children(UNUSED(int val)) 972 { 973 #ifdef WNOHANG 974@@ -1283,8 +1244,6 @@ int main(int argc,char *argv[]) 975 # endif 976 sigact.sa_flags = SA_NOCLDSTOP; 977 #endif 978- SIGACTMASK(SIGUSR1, sigusr1_handler); 979- SIGACTMASK(SIGUSR2, sigusr2_handler); 980 SIGACTMASK(SIGCHLD, remember_children); 981 #ifdef MAINTAINER_MODE 982 SIGACTMASK(SIGSEGV, rsync_panic_handler); 983--- old/match.c 984+++ new/match.c 985@@ -23,7 +23,7 @@ 986 #include "rsync.h" 987 988 extern int verbose; 989-extern int do_progress; 990+extern int recv_progress; 991 extern int checksum_seed; 992 extern int append_mode; 993 994@@ -113,7 +113,7 @@ static void matched(int f, struct sum_st 995 else 996 last_match = offset; 997 998- if (buf && do_progress) 999+ if (buf && recv_progress) 1000 show_progress(last_match, buf->file_size); 1001 } 1002 1003@@ -317,7 +317,7 @@ void match_sums(int f, struct sum_struct 1004 if (append_mode) { 1005 OFF_T j = 0; 1006 for (j = CHUNK_SIZE; j < s->flength; j += CHUNK_SIZE) { 1007- if (buf && do_progress) 1008+ if (buf && recv_progress) 1009 show_progress(last_match, buf->file_size); 1010 sum_update(map_ptr(buf, last_match, CHUNK_SIZE), 1011 CHUNK_SIZE); 1012@@ -325,7 +325,7 @@ void match_sums(int f, struct sum_struct 1013 } 1014 if (last_match < s->flength) { 1015 int32 len = s->flength - last_match; 1016- if (buf && do_progress) 1017+ if (buf && recv_progress) 1018 show_progress(last_match, buf->file_size); 1019 sum_update(map_ptr(buf, last_match, len), len); 1020 last_match = s->flength; 1021--- old/options.c 1022+++ new/options.c 1023@@ -74,7 +74,6 @@ int def_compress_level = Z_DEFAULT_COMPR 1024 int am_root = 0; 1025 int am_server = 0; 1026 int am_sender = 0; 1027-int am_generator = 0; 1028 int am_starting_up = 1; 1029 int relative_paths = -1; 1030 int implied_dirs = 1; 1031@@ -95,6 +94,7 @@ int am_daemon = 0; 1032 int daemon_over_rsh = 0; 1033 int do_stats = 0; 1034 int do_progress = 0; 1035+int recv_progress = 0; 1036 int keep_partial = 0; 1037 int safe_symlinks = 0; 1038 int copy_unsafe_links = 0; 1039@@ -1306,6 +1306,7 @@ int parse_arguments(int *argc, const cha 1040 1041 if (do_progress && !verbose && !log_before_transfer && !am_server) 1042 verbose = 1; 1043+ recv_progress = do_progress; 1044 1045 if (dry_run) 1046 do_xfers = 0; 1047--- old/pipe.c 1048+++ new/pipe.c 1049@@ -59,7 +59,7 @@ pid_t piped_child(char **command, int *f 1050 exit_cleanup(RERR_IPC); 1051 } 1052 1053- pid = do_fork(); 1054+ pid = fork(); 1055 if (pid == -1) { 1056 rsyserr(FERROR, errno, "fork"); 1057 exit_cleanup(RERR_IPC); 1058@@ -123,7 +123,7 @@ pid_t local_child(int argc, char **argv, 1059 exit_cleanup(RERR_IPC); 1060 } 1061 1062- pid = do_fork(); 1063+ pid = fork(); 1064 if (pid == -1) { 1065 rsyserr(FERROR, errno, "fork"); 1066 exit_cleanup(RERR_IPC); 1067--- old/receiver.c 1068+++ new/receiver.c 1069@@ -25,7 +25,7 @@ 1070 extern int verbose; 1071 extern int do_xfers; 1072 extern int am_server; 1073-extern int do_progress; 1074+extern int recv_progress; 1075 extern int log_before_transfer; 1076 extern int stdout_format_has_i; 1077 extern int logfile_format_has_i; 1078@@ -157,7 +157,7 @@ static int receive_data(int f_in, char * 1079 if (sum.remainder) 1080 sum.flength -= sum.blength - sum.remainder; 1081 for (j = CHUNK_SIZE; j < sum.flength; j += CHUNK_SIZE) { 1082- if (do_progress) 1083+ if (recv_progress) 1084 show_progress(offset, total_size); 1085 sum_update(map_ptr(mapbuf, offset, CHUNK_SIZE), 1086 CHUNK_SIZE); 1087@@ -165,7 +165,7 @@ static int receive_data(int f_in, char * 1088 } 1089 if (offset < sum.flength) { 1090 int32 len = sum.flength - offset; 1091- if (do_progress) 1092+ if (recv_progress) 1093 show_progress(offset, total_size); 1094 sum_update(map_ptr(mapbuf, offset, len), len); 1095 offset = sum.flength; 1096@@ -178,7 +178,7 @@ static int receive_data(int f_in, char * 1097 } 1098 1099 while ((i = recv_token(f_in, &data)) != 0) { 1100- if (do_progress) 1101+ if (recv_progress) 1102 show_progress(offset, total_size); 1103 1104 if (i > 0) { 1105@@ -248,7 +248,7 @@ static int receive_data(int f_in, char * 1106 ftruncate(fd, offset); 1107 #endif 1108 1109- if (do_progress) 1110+ if (recv_progress) 1111 end_progress(total_size); 1112 1113 if (fd != -1 && offset > 0 && sparse_end(fd) != 0) { 1114@@ -299,12 +299,12 @@ static void handle_delayed_updates(struc 1115 "rename failed for %s (from %s)", 1116 full_fname(fname), partialptr); 1117 } else { 1118- if (remove_source_files 1119- || (preserve_hard_links 1120- && file->link_u.links)) { 1121+ if (remove_source_files) { 1122 SIVAL(numbuf, 0, i); 1123 send_msg(MSG_SUCCESS,numbuf,4); 1124 } 1125+ if (preserve_hard_links && file->link_u.links) 1126+ push_hlink_num(i); 1127 handle_partial_dir(partialptr, PDIR_DELETE); 1128 } 1129 } 1130@@ -355,11 +355,6 @@ int recv_files(int f_in, struct file_lis 1131 if (verbose > 2) 1132 rprintf(FINFO,"recv_files(%d) starting\n",flist->count); 1133 1134- if (flist->hlink_pool) { 1135- pool_destroy(flist->hlink_pool); 1136- flist->hlink_pool = NULL; 1137- } 1138- 1139 if (delay_updates) 1140 delayed_bits = bitbag_create(flist->count); 1141 1142@@ -382,7 +377,7 @@ int recv_files(int f_in, struct file_lis 1143 rprintf(FINFO, "recv_files phase=%d\n", phase); 1144 if (phase == 2 && delay_updates) 1145 handle_delayed_updates(flist, local_name); 1146- send_msg(MSG_DONE, "", 0); 1147+ push_redo_num(-2); 1148 if (keep_partial && !partial_dir) 1149 make_backups = 0; /* prevents double backup */ 1150 if (append_mode) { 1151@@ -607,7 +602,7 @@ int recv_files(int f_in, struct file_lis 1152 /* log the transfer */ 1153 if (log_before_transfer) 1154 log_item(FCLIENT, file, &initial_stats, iflags, NULL); 1155- else if (!am_server && verbose && do_progress) 1156+ else if (!am_server && verbose && recv_progress) 1157 rprintf(FINFO, "%s\n", fname); 1158 1159 /* recv file data */ 1160@@ -654,11 +649,13 @@ int recv_files(int f_in, struct file_lis 1161 cleanup_disable(); 1162 1163 if (recv_ok > 0) { 1164- if (remove_source_files 1165- || (preserve_hard_links && file->link_u.links)) { 1166+ if (remove_source_files) { 1167+ decrement_active_files(i); 1168 SIVAL(numbuf, 0, i); 1169 send_msg(MSG_SUCCESS, numbuf, 4); 1170 } 1171+ if (preserve_hard_links && file->link_u.links) 1172+ push_hlink_num(i); 1173 } else if (!recv_ok) { 1174 int msgtype = phase || read_batch ? FERROR : FINFO; 1175 if (msgtype == FERROR || verbose) { 1176@@ -681,8 +678,8 @@ int recv_files(int f_in, struct file_lis 1177 errstr, fname, keptstr, redostr); 1178 } 1179 if (!phase) { 1180- SIVAL(numbuf, 0, i); 1181- send_msg(MSG_REDO, numbuf, 4); 1182+ decrement_active_files(i); 1183+ push_redo_num(i); 1184 } 1185 } 1186 } 1187--- old/rsync.c 1188+++ new/rsync.c 1189@@ -39,7 +39,6 @@ extern int omit_dir_times; 1190 extern int am_root; 1191 extern int am_server; 1192 extern int am_sender; 1193-extern int am_generator; 1194 extern int am_starting_up; 1195 extern int allow_8bit_chars; 1196 extern int preserve_uid; 1197@@ -305,5 +304,5 @@ const char *who_am_i(void) 1198 { 1199 if (am_starting_up) 1200 return am_server ? "server" : "client"; 1201- return am_sender ? "sender" : am_generator ? "generator" : "receiver"; 1202+ return am_sender ? "sender" : am_generator() ? "generator" : "receiver"; 1203 } 1204--- old/rsync.h 1205+++ new/rsync.h 1206@@ -169,10 +169,8 @@ enum msgcode { 1207 MSG_DATA=0, /* raw data on the multiplexed stream */ 1208 MSG_ERROR=FERROR, MSG_INFO=FINFO, /* remote logging */ 1209 MSG_LOG=FLOG, MSG_SOCKERR=FSOCKERR, /* sibling logging */ 1210- MSG_REDO=9, /* reprocess indicated flist index */ 1211 MSG_SUCCESS=100,/* successfully updated indicated flist index */ 1212 MSG_DELETED=101,/* successfully deleted a file on receiving side */ 1213- MSG_DONE=86 /* current phase is done */ 1214 }; 1215 1216 #include "errcode.h" 1217@@ -329,6 +327,7 @@ enum msgcode { 1218 #endif 1219 1220 #include <assert.h> 1221+#include <pthread.h> 1222 1223 #include "lib/pool_alloc.h" 1224 1225--- old/util.c 1226+++ new/util.c 1227@@ -415,49 +415,6 @@ int robust_rename(char *from, char *to, 1228 return -1; 1229 } 1230 1231-static pid_t all_pids[10]; 1232-static int num_pids; 1233- 1234-/** Fork and record the pid of the child. **/ 1235-pid_t do_fork(void) 1236-{ 1237- pid_t newpid = fork(); 1238- 1239- if (newpid != 0 && newpid != -1) { 1240- all_pids[num_pids++] = newpid; 1241- } 1242- return newpid; 1243-} 1244- 1245-/** 1246- * Kill all children. 1247- * 1248- * @todo It would be kind of nice to make sure that they are actually 1249- * all our children before we kill them, because their pids may have 1250- * been recycled by some other process. Perhaps when we wait for a 1251- * child, we should remove it from this array. Alternatively we could 1252- * perhaps use process groups, but I think that would not work on 1253- * ancient Unix versions that don't support them. 1254- **/ 1255-void kill_all(int sig) 1256-{ 1257- int i; 1258- 1259- for (i = 0; i < num_pids; i++) { 1260- /* Let's just be a little careful where we 1261- * point that gun, hey? See kill(2) for the 1262- * magic caused by negative values. */ 1263- pid_t p = all_pids[i]; 1264- 1265- if (p == getpid()) 1266- continue; 1267- if (p <= 0) 1268- continue; 1269- 1270- kill(p, sig); 1271- } 1272-} 1273- 1274 /** Turn a user name into a uid */ 1275 int name_to_uid(char *name, uid_t *uid) 1276 { 1277--- old/proto.h 1278+++ new/proto.h 1279@@ -101,11 +101,12 @@ void hard_link_cluster(struct file_struc 1280 void io_set_sock_fds(int f_in, int f_out); 1281 void set_io_timeout(int secs); 1282 void set_msg_fd_in(int fd); 1283-void set_msg_fd_out(int fd); 1284 void increment_active_files(int ndx, int itemizing, enum logcode code); 1285 void decrement_active_files(int ndx); 1286 int send_msg(enum msgcode code, char *buf, int len); 1287-int get_redo_num(int itemizing, enum logcode code); 1288+void push_redo_num(int ndx); 1289+int get_redo_num(void); 1290+void push_hlink_num(int ndx); 1291 int get_hlink_num(void); 1292 void io_set_filesfrom_fds(int f_in, int f_out); 1293 int read_filesfrom_line(int fd, char *fname); 1294@@ -123,7 +124,7 @@ uchar read_byte(int f); 1295 int read_vstring(int f, char *buf, int bufsize); 1296 void read_sum_head(int f, struct sum_struct *sum); 1297 void write_sum_head(int f, struct sum_struct *sum); 1298-void io_flush(int flush_it_all); 1299+void io_flush(UNUSED(int flush_it_all)); 1300 void write_shortint(int f, int x); 1301 void write_int(int f,int32 x); 1302 void write_longint(int f, int64 x); 1303@@ -139,7 +140,6 @@ int io_multiplex_write(enum msgcode code 1304 void close_multiplexing_in(void); 1305 void close_multiplexing_out(void); 1306 void start_write_batch(int fd); 1307-void stop_write_batch(void); 1308 char *lp_bind_address(void); 1309 char *lp_motd_file(void); 1310 char *lp_pid_file(void); 1311@@ -198,6 +198,8 @@ void maybe_log_item(struct file_struct * 1312 char *buf); 1313 void log_delete(char *fname, int mode); 1314 void log_exit(int code, const char *file, int line); 1315+int am_generator(); 1316+int am_receiver(); 1317 pid_t wait_process(pid_t pid, int *status_ptr, int flags); 1318 int child_main(int argc, char *argv[]); 1319 void start_server(int f_in, int f_out, int argc, char *argv[]); 1320@@ -286,8 +288,6 @@ int copy_file(const char *source, const 1321 int robust_unlink(const char *fname); 1322 int robust_rename(char *from, char *to, char *partialptr, 1323 int mode); 1324-pid_t do_fork(void); 1325-void kill_all(int sig); 1326 int name_to_uid(char *name, uid_t *uid); 1327 int name_to_gid(char *name, gid_t *gid); 1328 int lock_range(int fd, int offset, int len); 1329