Lines Matching refs:wq

278 finalize_phase_one(workqueue_t *wq)
294 for (startslot = -1, i = 0; i < wq->wq_nwipslots; i++) {
295 if (wq->wq_wip[i].wip_batchid == wq->wq_lastdonebatch + 1) {
303 for (i = startslot; i < startslot + wq->wq_nwipslots; i++) {
304 int slotnum = i % wq->wq_nwipslots;
305 wip_t *wipslot = &wq->wq_wip[slotnum];
314 fifo_add(wq->wq_donequeue, wipslot->wip_td);
315 wq->wq_wip[slotnum].wip_td = NULL;
319 wq->wq_lastdonebatch = wq->wq_next_batchid++;
322 fifo_len(wq->wq_donequeue));
326 init_phase_two(workqueue_t *wq)
338 wq->wq_ninqueue = num = fifo_len(wq->wq_donequeue);
340 wq->wq_ninqueue += num / 2;
348 assert(fifo_len(wq->wq_queue) == 0);
349 fifo_free(wq->wq_queue, NULL);
350 wq->wq_queue = wq->wq_donequeue;
354 wip_save_work(workqueue_t *wq, wip_t *slot, int slotnum)
356 pthread_mutex_lock(&wq->wq_donequeue_lock);
358 while (wq->wq_lastdonebatch + 1 < slot->wip_batchid)
359 pthread_cond_wait(&slot->wip_cv, &wq->wq_donequeue_lock);
360 assert(wq->wq_lastdonebatch + 1 == slot->wip_batchid);
362 fifo_add(wq->wq_donequeue, slot->wip_td);
363 wq->wq_lastdonebatch++;
364 pthread_cond_signal(&wq->wq_wip[(slotnum + 1) %
365 wq->wq_nwipslots].wip_cv);
369 slot->wip_batchid = wq->wq_next_batchid++;
371 pthread_mutex_unlock(&wq->wq_donequeue_lock);
392 worker_runphase1(workqueue_t *wq)
399 pthread_mutex_lock(&wq->wq_queue_lock);
401 while (fifo_empty(wq->wq_queue)) {
402 if (wq->wq_nomorefiles == 1) {
403 pthread_cond_broadcast(&wq->wq_work_avail);
404 pthread_mutex_unlock(&wq->wq_queue_lock);
410 pthread_cond_wait(&wq->wq_work_avail,
411 &wq->wq_queue_lock);
415 pow = fifo_remove(wq->wq_queue);
416 pownum = wq->wq_nextpownum++;
417 pthread_cond_broadcast(&wq->wq_work_removed);
422 wipslotnum = pownum % wq->wq_nwipslots;
423 wipslot = &wq->wq_wip[wipslotnum];
427 pthread_mutex_unlock(&wq->wq_queue_lock);
431 if (wipslot->wip_nmerged == wq->wq_maxbatchsz)
432 wip_save_work(wq, wipslot, wipslotnum);
439 worker_runphase2(workqueue_t *wq)
445 pthread_mutex_lock(&wq->wq_queue_lock);
447 if (wq->wq_ninqueue == 1) {
448 pthread_cond_broadcast(&wq->wq_work_avail);
449 pthread_mutex_unlock(&wq->wq_queue_lock);
453 if (barrier_wait(&wq->wq_bar1)) {
454 pthread_mutex_lock(&wq->wq_queue_lock);
455 wq->wq_alldone = 1;
456 pthread_cond_signal(&wq->wq_alldone_cv);
457 pthread_mutex_unlock(&wq->wq_queue_lock);
463 if (fifo_len(wq->wq_queue) < 2) {
464 pthread_cond_wait(&wq->wq_work_avail,
465 &wq->wq_queue_lock);
466 pthread_mutex_unlock(&wq->wq_queue_lock);
471 pow1 = fifo_remove(wq->wq_queue);
472 pow2 = fifo_remove(wq->wq_queue);
473 wq->wq_ninqueue -= 2;
475 batchid = wq->wq_next_batchid++;
477 pthread_mutex_unlock(&wq->wq_queue_lock);
488 pthread_mutex_lock(&wq->wq_queue_lock);
489 while (wq->wq_lastdonebatch + 1 != batchid) {
490 pthread_cond_wait(&wq->wq_done_cv,
491 &wq->wq_queue_lock);
494 wq->wq_lastdonebatch = batchid;
496 fifo_add(wq->wq_queue, pow2);
498 pthread_self(), (void *)pow2, fifo_len(wq->wq_queue),
499 wq->wq_ninqueue);
500 pthread_cond_broadcast(&wq->wq_done_cv);
501 pthread_cond_signal(&wq->wq_work_avail);
502 pthread_mutex_unlock(&wq->wq_queue_lock);
510 worker_thread(workqueue_t *wq)
512 worker_runphase1(wq);
516 if (barrier_wait(&wq->wq_bar1)) {
520 finalize_phase_one(wq);
522 init_phase_two(wq);
525 wq->wq_ninqueue, fifo_len(wq->wq_queue));
530 (void) barrier_wait(&wq->wq_bar2);
534 worker_runphase2(wq);
544 workqueue_t *wq = arg;
548 pthread_mutex_lock(&wq->wq_queue_lock);
549 while (fifo_len(wq->wq_queue) > wq->wq_ithrottle) {
551 fifo_len(wq->wq_queue), wq->wq_ithrottle);
552 pthread_cond_wait(&wq->wq_work_removed, &wq->wq_queue_lock);
555 fifo_add(wq->wq_queue, td);
557 pthread_cond_broadcast(&wq->wq_work_avail);
558 pthread_mutex_unlock(&wq->wq_queue_lock);
623 wq_init(workqueue_t *wq, int nfiles)
633 wq->wq_maxbatchsz = atoi(getenv("CTFMERGE_PHASE1_BATCH_SIZE"));
635 wq->wq_maxbatchsz = MERGE_PHASE1_BATCH_SIZE;
637 nslots = MIN(nslots, (nfiles + wq->wq_maxbatchsz - 1) /
638 wq->wq_maxbatchsz);
640 wq->wq_wip = xcalloc(sizeof (wip_t) * nslots);
641 wq->wq_nwipslots = nslots;
642 wq->wq_nthreads = MIN(sysconf(_SC_NPROCESSORS_ONLN) * 3 / 2, nslots);
643 wq->wq_thread = xmalloc(sizeof (pthread_t) * wq->wq_nthreads);
649 wq->wq_ithrottle = throttle * wq->wq_nthreads;
651 debug(1, "Using %d slots, %d threads\n", wq->wq_nwipslots,
652 wq->wq_nthreads);
654 wq->wq_next_batchid = 0;
657 pthread_mutex_init(&wq->wq_wip[i].wip_lock, NULL);
658 wq->wq_wip[i].wip_batchid = wq->wq_next_batchid++;
661 pthread_mutex_init(&wq->wq_queue_lock, NULL);
662 wq->wq_queue = fifo_new();
663 pthread_cond_init(&wq->wq_work_avail, NULL);
664 pthread_cond_init(&wq->wq_work_removed, NULL);
665 wq->wq_ninqueue = nfiles;
666 wq->wq_nextpownum = 0;
668 pthread_mutex_init(&wq->wq_donequeue_lock, NULL);
669 wq->wq_donequeue = fifo_new();
670 wq->wq_lastdonebatch = -1;
672 pthread_cond_init(&wq->wq_done_cv, NULL);
674 pthread_cond_init(&wq->wq_alldone_cv, NULL);
675 wq->wq_alldone = 0;
677 barrier_init(&wq->wq_bar1, wq->wq_nthreads);
678 barrier_init(&wq->wq_bar2, wq->wq_nthreads);
680 wq->wq_nomorefiles = 0;
684 start_threads(workqueue_t *wq)
695 for (i = 0; i < wq->wq_nthreads; i++) {
696 pthread_create(&wq->wq_thread[i], NULL,
697 (void *(*)(void *))worker_thread, wq);
707 join_threads(workqueue_t *wq)
711 for (i = 0; i < wq->wq_nthreads; i++) {
712 pthread_join(wq->wq_thread[i], NULL);
732 static workqueue_t wq;
886 wq_init(&wq, nielems);
888 start_threads(&wq);
897 &wq, require_ctf) == 0) {
908 pthread_mutex_lock(&wq.wq_queue_lock);
909 wq.wq_nomorefiles = 1;
910 pthread_cond_broadcast(&wq.wq_work_avail);
911 pthread_mutex_unlock(&wq.wq_queue_lock);
913 pthread_mutex_lock(&wq.wq_queue_lock);
914 while (wq.wq_alldone == 0)
915 pthread_cond_wait(&wq.wq_alldone_cv, &wq.wq_queue_lock);
916 pthread_mutex_unlock(&wq.wq_queue_lock);
918 join_threads(&wq);
931 assert(fifo_len(wq.wq_queue) == 1);
932 mstrtd = fifo_remove(wq.wq_queue);