24 */ 25 26#include <sys/zfs_context.h> 27#include <sys/txg_impl.h> 28#include <sys/dmu_impl.h> 29#include <sys/dmu_tx.h> 30#include <sys/dsl_pool.h> 31#include <sys/dsl_scan.h> 32#include <sys/callb.h> 33 34/* 35 * Pool-wide transaction groups. 36 */ 37 38static void txg_sync_thread(void *arg); 39static void txg_quiesce_thread(void *arg); 40 41int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 42 43SYSCTL_DECL(_vfs_zfs); 44SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 45TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 46SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0, 47 "Maximum seconds worth of delta per txg"); 48 49/* 50 * Prepare the txg subsystem. 51 */ 52void 53txg_init(dsl_pool_t *dp, uint64_t txg) 54{ 55 tx_state_t *tx = &dp->dp_tx; 56 int c; 57 bzero(tx, sizeof (tx_state_t)); 58 59 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 60 61 for (c = 0; c < max_ncpus; c++) { 62 int i; 63 64 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 65 for (i = 0; i < TXG_SIZE; i++) { 66 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 67 NULL); 68 list_create(&tx->tx_cpu[c].tc_callbacks[i], 69 sizeof (dmu_tx_callback_t), 70 offsetof(dmu_tx_callback_t, dcb_node)); 71 } 72 } 73 74 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 75 76 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 77 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 78 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 79 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 80 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 81 82 tx->tx_open_txg = txg; 83} 84 85/* 86 * Close down the txg subsystem. 87 */ 88void 89txg_fini(dsl_pool_t *dp) 90{ 91 tx_state_t *tx = &dp->dp_tx; 92 int c; 93 94 ASSERT(tx->tx_threads == 0); 95 96 mutex_destroy(&tx->tx_sync_lock); 97 98 cv_destroy(&tx->tx_sync_more_cv); 99 cv_destroy(&tx->tx_sync_done_cv); 100 cv_destroy(&tx->tx_quiesce_more_cv); 101 cv_destroy(&tx->tx_quiesce_done_cv); 102 cv_destroy(&tx->tx_exit_cv); 103 104 for (c = 0; c < max_ncpus; c++) { 105 int i; 106 107 mutex_destroy(&tx->tx_cpu[c].tc_lock); 108 for (i = 0; i < TXG_SIZE; i++) { 109 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 110 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 111 } 112 } 113 114 if (tx->tx_commit_cb_taskq != NULL) 115 taskq_destroy(tx->tx_commit_cb_taskq); 116 117 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 118 119 bzero(tx, sizeof (tx_state_t)); 120} 121 122/* 123 * Start syncing transaction groups. 124 */ 125void 126txg_sync_start(dsl_pool_t *dp) 127{ 128 tx_state_t *tx = &dp->dp_tx; 129 130 mutex_enter(&tx->tx_sync_lock); 131 132 dprintf("pool %p\n", dp); 133 134 ASSERT(tx->tx_threads == 0); 135 136 tx->tx_threads = 2; 137 138 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 139 dp, 0, &p0, TS_RUN, minclsyspri); 140 141 /* 142 * The sync thread can need a larger-than-default stack size on 143 * 32-bit x86. This is due in part to nested pools and 144 * scrub_visitbp() recursion. 145 */ 146 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 147 dp, 0, &p0, TS_RUN, minclsyspri); 148 149 mutex_exit(&tx->tx_sync_lock); 150} 151 152static void 153txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 154{ 155 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 156 mutex_enter(&tx->tx_sync_lock); 157} 158 159static void 160txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 161{ 162 ASSERT(*tpp != NULL); 163 *tpp = NULL; 164 tx->tx_threads--; 165 cv_broadcast(&tx->tx_exit_cv); 166 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 167 thread_exit(); 168} 169 170static void 171txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 172{ 173 CALLB_CPR_SAFE_BEGIN(cpr); 174 175 if (time) 176 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 177 else 178 cv_wait(cv, &tx->tx_sync_lock); 179 180 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 181} 182 183/* 184 * Stop syncing transaction groups. 185 */ 186void 187txg_sync_stop(dsl_pool_t *dp) 188{ 189 tx_state_t *tx = &dp->dp_tx; 190 191 dprintf("pool %p\n", dp); 192 /* 193 * Finish off any work in progress. 194 */ 195 ASSERT(tx->tx_threads == 2); 196 197 /* 198 * We need to ensure that we've vacated the deferred space_maps. 199 */ 200 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 201 202 /* 203 * Wake all sync threads and wait for them to die. 204 */ 205 mutex_enter(&tx->tx_sync_lock); 206 207 ASSERT(tx->tx_threads == 2); 208 209 tx->tx_exiting = 1; 210 211 cv_broadcast(&tx->tx_quiesce_more_cv); 212 cv_broadcast(&tx->tx_quiesce_done_cv); 213 cv_broadcast(&tx->tx_sync_more_cv); 214 215 while (tx->tx_threads != 0) 216 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 217 218 tx->tx_exiting = 0; 219 220 mutex_exit(&tx->tx_sync_lock); 221} 222 223uint64_t 224txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 225{ 226 tx_state_t *tx = &dp->dp_tx; 227 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 228 uint64_t txg; 229 230 mutex_enter(&tc->tc_lock); 231 232 txg = tx->tx_open_txg; 233 tc->tc_count[txg & TXG_MASK]++; 234 235 th->th_cpu = tc; 236 th->th_txg = txg; 237 238 return (txg); 239} 240 241void 242txg_rele_to_quiesce(txg_handle_t *th) 243{ 244 tx_cpu_t *tc = th->th_cpu; 245 246 mutex_exit(&tc->tc_lock); 247} 248 249void 250txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 251{ 252 tx_cpu_t *tc = th->th_cpu; 253 int g = th->th_txg & TXG_MASK; 254 255 mutex_enter(&tc->tc_lock); 256 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 257 mutex_exit(&tc->tc_lock); 258} 259 260void 261txg_rele_to_sync(txg_handle_t *th) 262{ 263 tx_cpu_t *tc = th->th_cpu; 264 int g = th->th_txg & TXG_MASK; 265 266 mutex_enter(&tc->tc_lock); 267 ASSERT(tc->tc_count[g] != 0); 268 if (--tc->tc_count[g] == 0) 269 cv_broadcast(&tc->tc_cv[g]); 270 mutex_exit(&tc->tc_lock); 271 272 th->th_cpu = NULL; /* defensive */ 273} 274 275static void 276txg_quiesce(dsl_pool_t *dp, uint64_t txg) 277{ 278 tx_state_t *tx = &dp->dp_tx; 279 int g = txg & TXG_MASK; 280 int c; 281 282 /* 283 * Grab all tx_cpu locks so nobody else can get into this txg. 284 */ 285 for (c = 0; c < max_ncpus; c++) 286 mutex_enter(&tx->tx_cpu[c].tc_lock); 287 288 ASSERT(txg == tx->tx_open_txg); 289 tx->tx_open_txg++; 290 291 /* 292 * Now that we've incremented tx_open_txg, we can let threads 293 * enter the next transaction group. 294 */ 295 for (c = 0; c < max_ncpus; c++) 296 mutex_exit(&tx->tx_cpu[c].tc_lock); 297 298 /* 299 * Quiesce the transaction group by waiting for everyone to txg_exit(). 300 */ 301 for (c = 0; c < max_ncpus; c++) { 302 tx_cpu_t *tc = &tx->tx_cpu[c]; 303 mutex_enter(&tc->tc_lock); 304 while (tc->tc_count[g] != 0) 305 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 306 mutex_exit(&tc->tc_lock); 307 } 308} 309 310static void 311txg_do_callbacks(void *arg) 312{ 313 list_t *cb_list = arg; 314 315 dmu_tx_do_callbacks(cb_list, 0); 316 317 list_destroy(cb_list); 318 319 kmem_free(cb_list, sizeof (list_t)); 320} 321 322/* 323 * Dispatch the commit callbacks registered on this txg to worker threads. 324 */ 325static void 326txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 327{ 328 int c; 329 tx_state_t *tx = &dp->dp_tx; 330 list_t *cb_list; 331 332 for (c = 0; c < max_ncpus; c++) { 333 tx_cpu_t *tc = &tx->tx_cpu[c]; 334 /* No need to lock tx_cpu_t at this point */ 335 336 int g = txg & TXG_MASK; 337 338 if (list_is_empty(&tc->tc_callbacks[g])) 339 continue; 340 341 if (tx->tx_commit_cb_taskq == NULL) { 342 /* 343 * Commit callback taskq hasn't been created yet. 344 */ 345 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 346 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 347 TASKQ_PREPOPULATE); 348 } 349 350 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 351 list_create(cb_list, sizeof (dmu_tx_callback_t), 352 offsetof(dmu_tx_callback_t, dcb_node)); 353 354 list_move_tail(&tc->tc_callbacks[g], cb_list); 355 356 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 357 txg_do_callbacks, cb_list, TQ_SLEEP); 358 } 359} 360 361static void 362txg_sync_thread(void *arg) 363{ 364 dsl_pool_t *dp = arg; 365 spa_t *spa = dp->dp_spa; 366 tx_state_t *tx = &dp->dp_tx; 367 callb_cpr_t cpr; 368 uint64_t start, delta; 369 370 txg_thread_enter(tx, &cpr); 371 372 start = delta = 0; 373 for (;;) { 374 uint64_t timer, timeout = zfs_txg_timeout * hz; 375 uint64_t txg; 376 377 /* 378 * We sync when we're scanning, there's someone waiting 379 * on us, or the quiesce thread has handed off a txg to 380 * us, or we have reached our timeout. 381 */ 382 timer = (delta >= timeout ? 0 : timeout - delta); 383 while (!dsl_scan_active(dp->dp_scan) && 384 !tx->tx_exiting && timer > 0 && 385 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 386 tx->tx_quiesced_txg == 0) { 387 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 388 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 389 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 390 delta = ddi_get_lbolt() - start; 391 timer = (delta > timeout ? 0 : timeout - delta); 392 } 393 394 /* 395 * Wait until the quiesce thread hands off a txg to us, 396 * prompting it to do so if necessary. 397 */ 398 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 399 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 400 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 401 cv_broadcast(&tx->tx_quiesce_more_cv); 402 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 403 } 404 405 if (tx->tx_exiting) 406 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 407 408 /* 409 * Consume the quiesced txg which has been handed off to 410 * us. This may cause the quiescing thread to now be 411 * able to quiesce another txg, so we must signal it. 412 */ 413 txg = tx->tx_quiesced_txg; 414 tx->tx_quiesced_txg = 0; 415 tx->tx_syncing_txg = txg; 416 cv_broadcast(&tx->tx_quiesce_more_cv); 417 418 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 419 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 420 mutex_exit(&tx->tx_sync_lock); 421 422 start = ddi_get_lbolt(); 423 spa_sync(spa, txg); 424 delta = ddi_get_lbolt() - start; 425 426 mutex_enter(&tx->tx_sync_lock); 427 tx->tx_synced_txg = txg; 428 tx->tx_syncing_txg = 0; 429 cv_broadcast(&tx->tx_sync_done_cv); 430 431 /* 432 * Dispatch commit callbacks to worker threads. 433 */ 434 txg_dispatch_callbacks(dp, txg); 435 } 436} 437 438static void 439txg_quiesce_thread(void *arg) 440{ 441 dsl_pool_t *dp = arg; 442 tx_state_t *tx = &dp->dp_tx; 443 callb_cpr_t cpr; 444 445 txg_thread_enter(tx, &cpr); 446 447 for (;;) { 448 uint64_t txg; 449 450 /* 451 * We quiesce when there's someone waiting on us. 452 * However, we can only have one txg in "quiescing" or 453 * "quiesced, waiting to sync" state. So we wait until 454 * the "quiesced, waiting to sync" txg has been consumed 455 * by the sync thread. 456 */ 457 while (!tx->tx_exiting && 458 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 459 tx->tx_quiesced_txg != 0)) 460 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 461 462 if (tx->tx_exiting) 463 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 464 465 txg = tx->tx_open_txg; 466 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 467 txg, tx->tx_quiesce_txg_waiting, 468 tx->tx_sync_txg_waiting); 469 mutex_exit(&tx->tx_sync_lock); 470 txg_quiesce(dp, txg); 471 mutex_enter(&tx->tx_sync_lock); 472 473 /* 474 * Hand this txg off to the sync thread. 475 */ 476 dprintf("quiesce done, handing off txg %llu\n", txg); 477 tx->tx_quiesced_txg = txg; 478 cv_broadcast(&tx->tx_sync_more_cv); 479 cv_broadcast(&tx->tx_quiesce_done_cv); 480 } 481} 482 483/* 484 * Delay this thread by 'ticks' if we are still in the open transaction 485 * group and there is already a waiting txg quiesing or quiesced. Abort 486 * the delay if this txg stalls or enters the quiesing state. 487 */ 488void 489txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 490{ 491 tx_state_t *tx = &dp->dp_tx; 492 clock_t timeout = ddi_get_lbolt() + ticks; 493 494 /* don't delay if this txg could transition to quiesing immediately */ 495 if (tx->tx_open_txg > txg || 496 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 497 return; 498 499 mutex_enter(&tx->tx_sync_lock); 500 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 501 mutex_exit(&tx->tx_sync_lock); 502 return; 503 } 504 505 while (ddi_get_lbolt() < timeout && 506 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 507 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 508 timeout - ddi_get_lbolt()); 509 510 mutex_exit(&tx->tx_sync_lock); 511} 512 513void 514txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 515{ 516 tx_state_t *tx = &dp->dp_tx; 517 518 mutex_enter(&tx->tx_sync_lock); 519 ASSERT(tx->tx_threads == 2); 520 if (txg == 0) 521 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 522 if (tx->tx_sync_txg_waiting < txg) 523 tx->tx_sync_txg_waiting = txg; 524 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 525 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 526 while (tx->tx_synced_txg < txg) { 527 dprintf("broadcasting sync more " 528 "tx_synced=%llu waiting=%llu dp=%p\n", 529 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 530 cv_broadcast(&tx->tx_sync_more_cv); 531 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 532 } 533 mutex_exit(&tx->tx_sync_lock); 534} 535 536void 537txg_wait_open(dsl_pool_t *dp, uint64_t txg) 538{ 539 tx_state_t *tx = &dp->dp_tx; 540 541 mutex_enter(&tx->tx_sync_lock); 542 ASSERT(tx->tx_threads == 2); 543 if (txg == 0) 544 txg = tx->tx_open_txg + 1; 545 if (tx->tx_quiesce_txg_waiting < txg) 546 tx->tx_quiesce_txg_waiting = txg; 547 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 548 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 549 while (tx->tx_open_txg < txg) { 550 cv_broadcast(&tx->tx_quiesce_more_cv); 551 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 552 } 553 mutex_exit(&tx->tx_sync_lock); 554} 555 556boolean_t 557txg_stalled(dsl_pool_t *dp) 558{ 559 tx_state_t *tx = &dp->dp_tx; 560 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 561} 562 563boolean_t 564txg_sync_waiting(dsl_pool_t *dp) 565{ 566 tx_state_t *tx = &dp->dp_tx; 567 568 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 569 tx->tx_quiesced_txg != 0); 570} 571 572/* 573 * Per-txg object lists. 574 */ 575void 576txg_list_create(txg_list_t *tl, size_t offset) 577{ 578 int t; 579 580 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 581 582 tl->tl_offset = offset; 583 584 for (t = 0; t < TXG_SIZE; t++) 585 tl->tl_head[t] = NULL; 586} 587 588void 589txg_list_destroy(txg_list_t *tl) 590{ 591 int t; 592 593 for (t = 0; t < TXG_SIZE; t++) 594 ASSERT(txg_list_empty(tl, t)); 595 596 mutex_destroy(&tl->tl_lock); 597} 598
| 25 */ 26 27#include <sys/zfs_context.h> 28#include <sys/txg_impl.h> 29#include <sys/dmu_impl.h> 30#include <sys/dmu_tx.h> 31#include <sys/dsl_pool.h> 32#include <sys/dsl_scan.h> 33#include <sys/callb.h> 34 35/* 36 * Pool-wide transaction groups. 37 */ 38 39static void txg_sync_thread(void *arg); 40static void txg_quiesce_thread(void *arg); 41 42int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 43 44SYSCTL_DECL(_vfs_zfs); 45SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 46TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 47SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0, 48 "Maximum seconds worth of delta per txg"); 49 50/* 51 * Prepare the txg subsystem. 52 */ 53void 54txg_init(dsl_pool_t *dp, uint64_t txg) 55{ 56 tx_state_t *tx = &dp->dp_tx; 57 int c; 58 bzero(tx, sizeof (tx_state_t)); 59 60 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 61 62 for (c = 0; c < max_ncpus; c++) { 63 int i; 64 65 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 66 for (i = 0; i < TXG_SIZE; i++) { 67 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 68 NULL); 69 list_create(&tx->tx_cpu[c].tc_callbacks[i], 70 sizeof (dmu_tx_callback_t), 71 offsetof(dmu_tx_callback_t, dcb_node)); 72 } 73 } 74 75 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 76 77 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 78 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 79 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 80 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 81 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 82 83 tx->tx_open_txg = txg; 84} 85 86/* 87 * Close down the txg subsystem. 88 */ 89void 90txg_fini(dsl_pool_t *dp) 91{ 92 tx_state_t *tx = &dp->dp_tx; 93 int c; 94 95 ASSERT(tx->tx_threads == 0); 96 97 mutex_destroy(&tx->tx_sync_lock); 98 99 cv_destroy(&tx->tx_sync_more_cv); 100 cv_destroy(&tx->tx_sync_done_cv); 101 cv_destroy(&tx->tx_quiesce_more_cv); 102 cv_destroy(&tx->tx_quiesce_done_cv); 103 cv_destroy(&tx->tx_exit_cv); 104 105 for (c = 0; c < max_ncpus; c++) { 106 int i; 107 108 mutex_destroy(&tx->tx_cpu[c].tc_lock); 109 for (i = 0; i < TXG_SIZE; i++) { 110 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 111 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 112 } 113 } 114 115 if (tx->tx_commit_cb_taskq != NULL) 116 taskq_destroy(tx->tx_commit_cb_taskq); 117 118 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 119 120 bzero(tx, sizeof (tx_state_t)); 121} 122 123/* 124 * Start syncing transaction groups. 125 */ 126void 127txg_sync_start(dsl_pool_t *dp) 128{ 129 tx_state_t *tx = &dp->dp_tx; 130 131 mutex_enter(&tx->tx_sync_lock); 132 133 dprintf("pool %p\n", dp); 134 135 ASSERT(tx->tx_threads == 0); 136 137 tx->tx_threads = 2; 138 139 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 140 dp, 0, &p0, TS_RUN, minclsyspri); 141 142 /* 143 * The sync thread can need a larger-than-default stack size on 144 * 32-bit x86. This is due in part to nested pools and 145 * scrub_visitbp() recursion. 146 */ 147 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 148 dp, 0, &p0, TS_RUN, minclsyspri); 149 150 mutex_exit(&tx->tx_sync_lock); 151} 152 153static void 154txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 155{ 156 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 157 mutex_enter(&tx->tx_sync_lock); 158} 159 160static void 161txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 162{ 163 ASSERT(*tpp != NULL); 164 *tpp = NULL; 165 tx->tx_threads--; 166 cv_broadcast(&tx->tx_exit_cv); 167 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 168 thread_exit(); 169} 170 171static void 172txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 173{ 174 CALLB_CPR_SAFE_BEGIN(cpr); 175 176 if (time) 177 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 178 else 179 cv_wait(cv, &tx->tx_sync_lock); 180 181 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 182} 183 184/* 185 * Stop syncing transaction groups. 186 */ 187void 188txg_sync_stop(dsl_pool_t *dp) 189{ 190 tx_state_t *tx = &dp->dp_tx; 191 192 dprintf("pool %p\n", dp); 193 /* 194 * Finish off any work in progress. 195 */ 196 ASSERT(tx->tx_threads == 2); 197 198 /* 199 * We need to ensure that we've vacated the deferred space_maps. 200 */ 201 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 202 203 /* 204 * Wake all sync threads and wait for them to die. 205 */ 206 mutex_enter(&tx->tx_sync_lock); 207 208 ASSERT(tx->tx_threads == 2); 209 210 tx->tx_exiting = 1; 211 212 cv_broadcast(&tx->tx_quiesce_more_cv); 213 cv_broadcast(&tx->tx_quiesce_done_cv); 214 cv_broadcast(&tx->tx_sync_more_cv); 215 216 while (tx->tx_threads != 0) 217 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 218 219 tx->tx_exiting = 0; 220 221 mutex_exit(&tx->tx_sync_lock); 222} 223 224uint64_t 225txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 226{ 227 tx_state_t *tx = &dp->dp_tx; 228 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 229 uint64_t txg; 230 231 mutex_enter(&tc->tc_lock); 232 233 txg = tx->tx_open_txg; 234 tc->tc_count[txg & TXG_MASK]++; 235 236 th->th_cpu = tc; 237 th->th_txg = txg; 238 239 return (txg); 240} 241 242void 243txg_rele_to_quiesce(txg_handle_t *th) 244{ 245 tx_cpu_t *tc = th->th_cpu; 246 247 mutex_exit(&tc->tc_lock); 248} 249 250void 251txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 252{ 253 tx_cpu_t *tc = th->th_cpu; 254 int g = th->th_txg & TXG_MASK; 255 256 mutex_enter(&tc->tc_lock); 257 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 258 mutex_exit(&tc->tc_lock); 259} 260 261void 262txg_rele_to_sync(txg_handle_t *th) 263{ 264 tx_cpu_t *tc = th->th_cpu; 265 int g = th->th_txg & TXG_MASK; 266 267 mutex_enter(&tc->tc_lock); 268 ASSERT(tc->tc_count[g] != 0); 269 if (--tc->tc_count[g] == 0) 270 cv_broadcast(&tc->tc_cv[g]); 271 mutex_exit(&tc->tc_lock); 272 273 th->th_cpu = NULL; /* defensive */ 274} 275 276static void 277txg_quiesce(dsl_pool_t *dp, uint64_t txg) 278{ 279 tx_state_t *tx = &dp->dp_tx; 280 int g = txg & TXG_MASK; 281 int c; 282 283 /* 284 * Grab all tx_cpu locks so nobody else can get into this txg. 285 */ 286 for (c = 0; c < max_ncpus; c++) 287 mutex_enter(&tx->tx_cpu[c].tc_lock); 288 289 ASSERT(txg == tx->tx_open_txg); 290 tx->tx_open_txg++; 291 292 /* 293 * Now that we've incremented tx_open_txg, we can let threads 294 * enter the next transaction group. 295 */ 296 for (c = 0; c < max_ncpus; c++) 297 mutex_exit(&tx->tx_cpu[c].tc_lock); 298 299 /* 300 * Quiesce the transaction group by waiting for everyone to txg_exit(). 301 */ 302 for (c = 0; c < max_ncpus; c++) { 303 tx_cpu_t *tc = &tx->tx_cpu[c]; 304 mutex_enter(&tc->tc_lock); 305 while (tc->tc_count[g] != 0) 306 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 307 mutex_exit(&tc->tc_lock); 308 } 309} 310 311static void 312txg_do_callbacks(void *arg) 313{ 314 list_t *cb_list = arg; 315 316 dmu_tx_do_callbacks(cb_list, 0); 317 318 list_destroy(cb_list); 319 320 kmem_free(cb_list, sizeof (list_t)); 321} 322 323/* 324 * Dispatch the commit callbacks registered on this txg to worker threads. 325 */ 326static void 327txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 328{ 329 int c; 330 tx_state_t *tx = &dp->dp_tx; 331 list_t *cb_list; 332 333 for (c = 0; c < max_ncpus; c++) { 334 tx_cpu_t *tc = &tx->tx_cpu[c]; 335 /* No need to lock tx_cpu_t at this point */ 336 337 int g = txg & TXG_MASK; 338 339 if (list_is_empty(&tc->tc_callbacks[g])) 340 continue; 341 342 if (tx->tx_commit_cb_taskq == NULL) { 343 /* 344 * Commit callback taskq hasn't been created yet. 345 */ 346 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 347 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 348 TASKQ_PREPOPULATE); 349 } 350 351 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 352 list_create(cb_list, sizeof (dmu_tx_callback_t), 353 offsetof(dmu_tx_callback_t, dcb_node)); 354 355 list_move_tail(&tc->tc_callbacks[g], cb_list); 356 357 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 358 txg_do_callbacks, cb_list, TQ_SLEEP); 359 } 360} 361 362static void 363txg_sync_thread(void *arg) 364{ 365 dsl_pool_t *dp = arg; 366 spa_t *spa = dp->dp_spa; 367 tx_state_t *tx = &dp->dp_tx; 368 callb_cpr_t cpr; 369 uint64_t start, delta; 370 371 txg_thread_enter(tx, &cpr); 372 373 start = delta = 0; 374 for (;;) { 375 uint64_t timer, timeout = zfs_txg_timeout * hz; 376 uint64_t txg; 377 378 /* 379 * We sync when we're scanning, there's someone waiting 380 * on us, or the quiesce thread has handed off a txg to 381 * us, or we have reached our timeout. 382 */ 383 timer = (delta >= timeout ? 0 : timeout - delta); 384 while (!dsl_scan_active(dp->dp_scan) && 385 !tx->tx_exiting && timer > 0 && 386 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 387 tx->tx_quiesced_txg == 0) { 388 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 389 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 390 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 391 delta = ddi_get_lbolt() - start; 392 timer = (delta > timeout ? 0 : timeout - delta); 393 } 394 395 /* 396 * Wait until the quiesce thread hands off a txg to us, 397 * prompting it to do so if necessary. 398 */ 399 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 400 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 401 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 402 cv_broadcast(&tx->tx_quiesce_more_cv); 403 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 404 } 405 406 if (tx->tx_exiting) 407 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 408 409 /* 410 * Consume the quiesced txg which has been handed off to 411 * us. This may cause the quiescing thread to now be 412 * able to quiesce another txg, so we must signal it. 413 */ 414 txg = tx->tx_quiesced_txg; 415 tx->tx_quiesced_txg = 0; 416 tx->tx_syncing_txg = txg; 417 cv_broadcast(&tx->tx_quiesce_more_cv); 418 419 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 420 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 421 mutex_exit(&tx->tx_sync_lock); 422 423 start = ddi_get_lbolt(); 424 spa_sync(spa, txg); 425 delta = ddi_get_lbolt() - start; 426 427 mutex_enter(&tx->tx_sync_lock); 428 tx->tx_synced_txg = txg; 429 tx->tx_syncing_txg = 0; 430 cv_broadcast(&tx->tx_sync_done_cv); 431 432 /* 433 * Dispatch commit callbacks to worker threads. 434 */ 435 txg_dispatch_callbacks(dp, txg); 436 } 437} 438 439static void 440txg_quiesce_thread(void *arg) 441{ 442 dsl_pool_t *dp = arg; 443 tx_state_t *tx = &dp->dp_tx; 444 callb_cpr_t cpr; 445 446 txg_thread_enter(tx, &cpr); 447 448 for (;;) { 449 uint64_t txg; 450 451 /* 452 * We quiesce when there's someone waiting on us. 453 * However, we can only have one txg in "quiescing" or 454 * "quiesced, waiting to sync" state. So we wait until 455 * the "quiesced, waiting to sync" txg has been consumed 456 * by the sync thread. 457 */ 458 while (!tx->tx_exiting && 459 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 460 tx->tx_quiesced_txg != 0)) 461 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 462 463 if (tx->tx_exiting) 464 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 465 466 txg = tx->tx_open_txg; 467 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 468 txg, tx->tx_quiesce_txg_waiting, 469 tx->tx_sync_txg_waiting); 470 mutex_exit(&tx->tx_sync_lock); 471 txg_quiesce(dp, txg); 472 mutex_enter(&tx->tx_sync_lock); 473 474 /* 475 * Hand this txg off to the sync thread. 476 */ 477 dprintf("quiesce done, handing off txg %llu\n", txg); 478 tx->tx_quiesced_txg = txg; 479 cv_broadcast(&tx->tx_sync_more_cv); 480 cv_broadcast(&tx->tx_quiesce_done_cv); 481 } 482} 483 484/* 485 * Delay this thread by 'ticks' if we are still in the open transaction 486 * group and there is already a waiting txg quiesing or quiesced. Abort 487 * the delay if this txg stalls or enters the quiesing state. 488 */ 489void 490txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 491{ 492 tx_state_t *tx = &dp->dp_tx; 493 clock_t timeout = ddi_get_lbolt() + ticks; 494 495 /* don't delay if this txg could transition to quiesing immediately */ 496 if (tx->tx_open_txg > txg || 497 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 498 return; 499 500 mutex_enter(&tx->tx_sync_lock); 501 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 502 mutex_exit(&tx->tx_sync_lock); 503 return; 504 } 505 506 while (ddi_get_lbolt() < timeout && 507 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 508 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 509 timeout - ddi_get_lbolt()); 510 511 mutex_exit(&tx->tx_sync_lock); 512} 513 514void 515txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 516{ 517 tx_state_t *tx = &dp->dp_tx; 518 519 mutex_enter(&tx->tx_sync_lock); 520 ASSERT(tx->tx_threads == 2); 521 if (txg == 0) 522 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 523 if (tx->tx_sync_txg_waiting < txg) 524 tx->tx_sync_txg_waiting = txg; 525 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 526 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 527 while (tx->tx_synced_txg < txg) { 528 dprintf("broadcasting sync more " 529 "tx_synced=%llu waiting=%llu dp=%p\n", 530 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 531 cv_broadcast(&tx->tx_sync_more_cv); 532 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 533 } 534 mutex_exit(&tx->tx_sync_lock); 535} 536 537void 538txg_wait_open(dsl_pool_t *dp, uint64_t txg) 539{ 540 tx_state_t *tx = &dp->dp_tx; 541 542 mutex_enter(&tx->tx_sync_lock); 543 ASSERT(tx->tx_threads == 2); 544 if (txg == 0) 545 txg = tx->tx_open_txg + 1; 546 if (tx->tx_quiesce_txg_waiting < txg) 547 tx->tx_quiesce_txg_waiting = txg; 548 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 549 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 550 while (tx->tx_open_txg < txg) { 551 cv_broadcast(&tx->tx_quiesce_more_cv); 552 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 553 } 554 mutex_exit(&tx->tx_sync_lock); 555} 556 557boolean_t 558txg_stalled(dsl_pool_t *dp) 559{ 560 tx_state_t *tx = &dp->dp_tx; 561 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 562} 563 564boolean_t 565txg_sync_waiting(dsl_pool_t *dp) 566{ 567 tx_state_t *tx = &dp->dp_tx; 568 569 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 570 tx->tx_quiesced_txg != 0); 571} 572 573/* 574 * Per-txg object lists. 575 */ 576void 577txg_list_create(txg_list_t *tl, size_t offset) 578{ 579 int t; 580 581 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 582 583 tl->tl_offset = offset; 584 585 for (t = 0; t < TXG_SIZE; t++) 586 tl->tl_head[t] = NULL; 587} 588 589void 590txg_list_destroy(txg_list_t *tl) 591{ 592 int t; 593 594 for (t = 0; t < TXG_SIZE; t++) 595 ASSERT(txg_list_empty(tl, t)); 596 597 mutex_destroy(&tl->tl_lock); 598} 599
|