txg.c revision 248571
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org> 24 * Copyright (c) 2013 by Delphix. All rights reserved. 25 */ 26 27#include <sys/zfs_context.h> 28#include <sys/txg_impl.h> 29#include <sys/dmu_impl.h> 30#include <sys/dmu_tx.h> 31#include <sys/dsl_pool.h> 32#include <sys/dsl_scan.h> 33#include <sys/callb.h> 34 35/* 36 * ZFS Transaction Groups 37 * ---------------------- 38 * 39 * ZFS transaction groups are, as the name implies, groups of transactions 40 * that act on persistent state. ZFS asserts consistency at the granularity of 41 * these transaction groups. Each successive transaction group (txg) is 42 * assigned a 64-bit consecutive identifier. There are three active 43 * transaction group states: open, quiescing, or syncing. At any given time, 44 * there may be an active txg associated with each state; each active txg may 45 * either be processing, or blocked waiting to enter the next state. There may 46 * be up to three active txgs, and there is always a txg in the open state 47 * (though it may be blocked waiting to enter the quiescing state). In broad 48 * strokes, transactions ��� operations that change in-memory structures ��� are 49 * accepted into the txg in the open state, and are completed while the txg is 50 * in the open or quiescing states. The accumulated changes are written to 51 * disk in the syncing state. 52 * 53 * Open 54 * 55 * When a new txg becomes active, it first enters the open state. New 56 * transactions ��� updates to in-memory structures ��� are assigned to the 57 * currently open txg. There is always a txg in the open state so that ZFS can 58 * accept new changes (though the txg may refuse new changes if it has hit 59 * some limit). ZFS advances the open txg to the next state for a variety of 60 * reasons such as it hitting a time or size threshold, or the execution of an 61 * administrative action that must be completed in the syncing state. 62 * 63 * Quiescing 64 * 65 * After a txg exits the open state, it enters the quiescing state. The 66 * quiescing state is intended to provide a buffer between accepting new 67 * transactions in the open state and writing them out to stable storage in 68 * the syncing state. While quiescing, transactions can continue their 69 * operation without delaying either of the other states. Typically, a txg is 70 * in the quiescing state very briefly since the operations are bounded by 71 * software latencies rather than, say, slower I/O latencies. After all 72 * transactions complete, the txg is ready to enter the next state. 73 * 74 * Syncing 75 * 76 * In the syncing state, the in-memory state built up during the open and (to 77 * a lesser degree) the quiescing states is written to stable storage. The 78 * process of writing out modified data can, in turn modify more data. For 79 * example when we write new blocks, we need to allocate space for them; those 80 * allocations modify metadata (space maps)... which themselves must be 81 * written to stable storage. During the sync state, ZFS iterates, writing out 82 * data until it converges and all in-memory changes have been written out. 83 * The first such pass is the largest as it encompasses all the modified user 84 * data (as opposed to filesystem metadata). Subsequent passes typically have 85 * far less data to write as they consist exclusively of filesystem metadata. 86 * 87 * To ensure convergence, after a certain number of passes ZFS begins 88 * overwriting locations on stable storage that had been allocated earlier in 89 * the syncing state (and subsequently freed). ZFS usually allocates new 90 * blocks to optimize for large, continuous, writes. For the syncing state to 91 * converge however it must complete a pass where no new blocks are allocated 92 * since each allocation requires a modification of persistent metadata. 93 * Further, to hasten convergence, after a prescribed number of passes, ZFS 94 * also defers frees, and stops compressing. 95 * 96 * In addition to writing out user data, we must also execute synctasks during 97 * the syncing context. A synctask is the mechanism by which some 98 * administrative activities work such as creating and destroying snapshots or 99 * datasets. Note that when a synctask is initiated it enters the open txg, 100 * and ZFS then pushes that txg as quickly as possible to completion of the 101 * syncing state in order to reduce the latency of the administrative 102 * activity. To complete the syncing state, ZFS writes out a new uberblock, 103 * the root of the tree of blocks that comprise all state stored on the ZFS 104 * pool. Finally, if there is a quiesced txg waiting, we signal that it can 105 * now transition to the syncing state. 106 */ 107 108static void txg_sync_thread(void *arg); 109static void txg_quiesce_thread(void *arg); 110 111int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 112 113SYSCTL_DECL(_vfs_zfs); 114SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 115TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 116SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0, 117 "Maximum seconds worth of delta per txg"); 118 119/* 120 * Prepare the txg subsystem. 121 */ 122void 123txg_init(dsl_pool_t *dp, uint64_t txg) 124{ 125 tx_state_t *tx = &dp->dp_tx; 126 int c; 127 bzero(tx, sizeof (tx_state_t)); 128 129 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 130 131 for (c = 0; c < max_ncpus; c++) { 132 int i; 133 134 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 135 for (i = 0; i < TXG_SIZE; i++) { 136 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 137 NULL); 138 list_create(&tx->tx_cpu[c].tc_callbacks[i], 139 sizeof (dmu_tx_callback_t), 140 offsetof(dmu_tx_callback_t, dcb_node)); 141 } 142 } 143 144 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 145 146 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 147 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 148 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 149 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 150 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 151 152 tx->tx_open_txg = txg; 153} 154 155/* 156 * Close down the txg subsystem. 157 */ 158void 159txg_fini(dsl_pool_t *dp) 160{ 161 tx_state_t *tx = &dp->dp_tx; 162 int c; 163 164 ASSERT(tx->tx_threads == 0); 165 166 mutex_destroy(&tx->tx_sync_lock); 167 168 cv_destroy(&tx->tx_sync_more_cv); 169 cv_destroy(&tx->tx_sync_done_cv); 170 cv_destroy(&tx->tx_quiesce_more_cv); 171 cv_destroy(&tx->tx_quiesce_done_cv); 172 cv_destroy(&tx->tx_exit_cv); 173 174 for (c = 0; c < max_ncpus; c++) { 175 int i; 176 177 mutex_destroy(&tx->tx_cpu[c].tc_lock); 178 for (i = 0; i < TXG_SIZE; i++) { 179 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 180 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 181 } 182 } 183 184 if (tx->tx_commit_cb_taskq != NULL) 185 taskq_destroy(tx->tx_commit_cb_taskq); 186 187 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 188 189 bzero(tx, sizeof (tx_state_t)); 190} 191 192/* 193 * Start syncing transaction groups. 194 */ 195void 196txg_sync_start(dsl_pool_t *dp) 197{ 198 tx_state_t *tx = &dp->dp_tx; 199 200 mutex_enter(&tx->tx_sync_lock); 201 202 dprintf("pool %p\n", dp); 203 204 ASSERT(tx->tx_threads == 0); 205 206 tx->tx_threads = 2; 207 208 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 209 dp, 0, &p0, TS_RUN, minclsyspri); 210 211 /* 212 * The sync thread can need a larger-than-default stack size on 213 * 32-bit x86. This is due in part to nested pools and 214 * scrub_visitbp() recursion. 215 */ 216 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 217 dp, 0, &p0, TS_RUN, minclsyspri); 218 219 mutex_exit(&tx->tx_sync_lock); 220} 221 222static void 223txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 224{ 225 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 226 mutex_enter(&tx->tx_sync_lock); 227} 228 229static void 230txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 231{ 232 ASSERT(*tpp != NULL); 233 *tpp = NULL; 234 tx->tx_threads--; 235 cv_broadcast(&tx->tx_exit_cv); 236 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 237 thread_exit(); 238} 239 240static void 241txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 242{ 243 CALLB_CPR_SAFE_BEGIN(cpr); 244 245 if (time) 246 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 247 else 248 cv_wait(cv, &tx->tx_sync_lock); 249 250 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 251} 252 253/* 254 * Stop syncing transaction groups. 255 */ 256void 257txg_sync_stop(dsl_pool_t *dp) 258{ 259 tx_state_t *tx = &dp->dp_tx; 260 261 dprintf("pool %p\n", dp); 262 /* 263 * Finish off any work in progress. 264 */ 265 ASSERT(tx->tx_threads == 2); 266 267 /* 268 * We need to ensure that we've vacated the deferred space_maps. 269 */ 270 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 271 272 /* 273 * Wake all sync threads and wait for them to die. 274 */ 275 mutex_enter(&tx->tx_sync_lock); 276 277 ASSERT(tx->tx_threads == 2); 278 279 tx->tx_exiting = 1; 280 281 cv_broadcast(&tx->tx_quiesce_more_cv); 282 cv_broadcast(&tx->tx_quiesce_done_cv); 283 cv_broadcast(&tx->tx_sync_more_cv); 284 285 while (tx->tx_threads != 0) 286 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 287 288 tx->tx_exiting = 0; 289 290 mutex_exit(&tx->tx_sync_lock); 291} 292 293uint64_t 294txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 295{ 296 tx_state_t *tx = &dp->dp_tx; 297 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 298 uint64_t txg; 299 300 mutex_enter(&tc->tc_lock); 301 302 txg = tx->tx_open_txg; 303 tc->tc_count[txg & TXG_MASK]++; 304 305 th->th_cpu = tc; 306 th->th_txg = txg; 307 308 return (txg); 309} 310 311void 312txg_rele_to_quiesce(txg_handle_t *th) 313{ 314 tx_cpu_t *tc = th->th_cpu; 315 316 mutex_exit(&tc->tc_lock); 317} 318 319void 320txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 321{ 322 tx_cpu_t *tc = th->th_cpu; 323 int g = th->th_txg & TXG_MASK; 324 325 mutex_enter(&tc->tc_lock); 326 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 327 mutex_exit(&tc->tc_lock); 328} 329 330void 331txg_rele_to_sync(txg_handle_t *th) 332{ 333 tx_cpu_t *tc = th->th_cpu; 334 int g = th->th_txg & TXG_MASK; 335 336 mutex_enter(&tc->tc_lock); 337 ASSERT(tc->tc_count[g] != 0); 338 if (--tc->tc_count[g] == 0) 339 cv_broadcast(&tc->tc_cv[g]); 340 mutex_exit(&tc->tc_lock); 341 342 th->th_cpu = NULL; /* defensive */ 343} 344 345static void 346txg_quiesce(dsl_pool_t *dp, uint64_t txg) 347{ 348 tx_state_t *tx = &dp->dp_tx; 349 int g = txg & TXG_MASK; 350 int c; 351 352 /* 353 * Grab all tx_cpu locks so nobody else can get into this txg. 354 */ 355 for (c = 0; c < max_ncpus; c++) 356 mutex_enter(&tx->tx_cpu[c].tc_lock); 357 358 ASSERT(txg == tx->tx_open_txg); 359 tx->tx_open_txg++; 360 361 /* 362 * Now that we've incremented tx_open_txg, we can let threads 363 * enter the next transaction group. 364 */ 365 for (c = 0; c < max_ncpus; c++) 366 mutex_exit(&tx->tx_cpu[c].tc_lock); 367 368 /* 369 * Quiesce the transaction group by waiting for everyone to txg_exit(). 370 */ 371 for (c = 0; c < max_ncpus; c++) { 372 tx_cpu_t *tc = &tx->tx_cpu[c]; 373 mutex_enter(&tc->tc_lock); 374 while (tc->tc_count[g] != 0) 375 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 376 mutex_exit(&tc->tc_lock); 377 } 378} 379 380static void 381txg_do_callbacks(void *arg) 382{ 383 list_t *cb_list = arg; 384 385 dmu_tx_do_callbacks(cb_list, 0); 386 387 list_destroy(cb_list); 388 389 kmem_free(cb_list, sizeof (list_t)); 390} 391 392/* 393 * Dispatch the commit callbacks registered on this txg to worker threads. 394 */ 395static void 396txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 397{ 398 int c; 399 tx_state_t *tx = &dp->dp_tx; 400 list_t *cb_list; 401 402 for (c = 0; c < max_ncpus; c++) { 403 tx_cpu_t *tc = &tx->tx_cpu[c]; 404 /* No need to lock tx_cpu_t at this point */ 405 406 int g = txg & TXG_MASK; 407 408 if (list_is_empty(&tc->tc_callbacks[g])) 409 continue; 410 411 if (tx->tx_commit_cb_taskq == NULL) { 412 /* 413 * Commit callback taskq hasn't been created yet. 414 */ 415 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 416 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 417 TASKQ_PREPOPULATE); 418 } 419 420 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 421 list_create(cb_list, sizeof (dmu_tx_callback_t), 422 offsetof(dmu_tx_callback_t, dcb_node)); 423 424 list_move_tail(&tc->tc_callbacks[g], cb_list); 425 426 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 427 txg_do_callbacks, cb_list, TQ_SLEEP); 428 } 429} 430 431static void 432txg_sync_thread(void *arg) 433{ 434 dsl_pool_t *dp = arg; 435 spa_t *spa = dp->dp_spa; 436 tx_state_t *tx = &dp->dp_tx; 437 callb_cpr_t cpr; 438 uint64_t start, delta; 439 440 txg_thread_enter(tx, &cpr); 441 442 start = delta = 0; 443 for (;;) { 444 uint64_t timer, timeout = zfs_txg_timeout * hz; 445 uint64_t txg; 446 447 /* 448 * We sync when we're scanning, there's someone waiting 449 * on us, or the quiesce thread has handed off a txg to 450 * us, or we have reached our timeout. 451 */ 452 timer = (delta >= timeout ? 0 : timeout - delta); 453 while (!dsl_scan_active(dp->dp_scan) && 454 !tx->tx_exiting && timer > 0 && 455 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 456 tx->tx_quiesced_txg == 0) { 457 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 458 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 459 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 460 delta = ddi_get_lbolt() - start; 461 timer = (delta > timeout ? 0 : timeout - delta); 462 } 463 464 /* 465 * Wait until the quiesce thread hands off a txg to us, 466 * prompting it to do so if necessary. 467 */ 468 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 469 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 470 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 471 cv_broadcast(&tx->tx_quiesce_more_cv); 472 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 473 } 474 475 if (tx->tx_exiting) 476 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 477 478 /* 479 * Consume the quiesced txg which has been handed off to 480 * us. This may cause the quiescing thread to now be 481 * able to quiesce another txg, so we must signal it. 482 */ 483 txg = tx->tx_quiesced_txg; 484 tx->tx_quiesced_txg = 0; 485 tx->tx_syncing_txg = txg; 486 cv_broadcast(&tx->tx_quiesce_more_cv); 487 488 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 489 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 490 mutex_exit(&tx->tx_sync_lock); 491 492 start = ddi_get_lbolt(); 493 spa_sync(spa, txg); 494 delta = ddi_get_lbolt() - start; 495 496 mutex_enter(&tx->tx_sync_lock); 497 tx->tx_synced_txg = txg; 498 tx->tx_syncing_txg = 0; 499 cv_broadcast(&tx->tx_sync_done_cv); 500 501 /* 502 * Dispatch commit callbacks to worker threads. 503 */ 504 txg_dispatch_callbacks(dp, txg); 505 } 506} 507 508static void 509txg_quiesce_thread(void *arg) 510{ 511 dsl_pool_t *dp = arg; 512 tx_state_t *tx = &dp->dp_tx; 513 callb_cpr_t cpr; 514 515 txg_thread_enter(tx, &cpr); 516 517 for (;;) { 518 uint64_t txg; 519 520 /* 521 * We quiesce when there's someone waiting on us. 522 * However, we can only have one txg in "quiescing" or 523 * "quiesced, waiting to sync" state. So we wait until 524 * the "quiesced, waiting to sync" txg has been consumed 525 * by the sync thread. 526 */ 527 while (!tx->tx_exiting && 528 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 529 tx->tx_quiesced_txg != 0)) 530 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 531 532 if (tx->tx_exiting) 533 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 534 535 txg = tx->tx_open_txg; 536 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 537 txg, tx->tx_quiesce_txg_waiting, 538 tx->tx_sync_txg_waiting); 539 mutex_exit(&tx->tx_sync_lock); 540 txg_quiesce(dp, txg); 541 mutex_enter(&tx->tx_sync_lock); 542 543 /* 544 * Hand this txg off to the sync thread. 545 */ 546 dprintf("quiesce done, handing off txg %llu\n", txg); 547 tx->tx_quiesced_txg = txg; 548 cv_broadcast(&tx->tx_sync_more_cv); 549 cv_broadcast(&tx->tx_quiesce_done_cv); 550 } 551} 552 553/* 554 * Delay this thread by 'ticks' if we are still in the open transaction 555 * group and there is already a waiting txg quiesing or quiesced. Abort 556 * the delay if this txg stalls or enters the quiesing state. 557 */ 558void 559txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 560{ 561 tx_state_t *tx = &dp->dp_tx; 562 clock_t timeout = ddi_get_lbolt() + ticks; 563 564 /* don't delay if this txg could transition to quiesing immediately */ 565 if (tx->tx_open_txg > txg || 566 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 567 return; 568 569 mutex_enter(&tx->tx_sync_lock); 570 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 571 mutex_exit(&tx->tx_sync_lock); 572 return; 573 } 574 575 while (ddi_get_lbolt() < timeout && 576 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 577 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 578 timeout - ddi_get_lbolt()); 579 580 mutex_exit(&tx->tx_sync_lock); 581} 582 583void 584txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 585{ 586 tx_state_t *tx = &dp->dp_tx; 587 588 ASSERT(!dsl_pool_config_held(dp)); 589 590 mutex_enter(&tx->tx_sync_lock); 591 ASSERT(tx->tx_threads == 2); 592 if (txg == 0) 593 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 594 if (tx->tx_sync_txg_waiting < txg) 595 tx->tx_sync_txg_waiting = txg; 596 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 597 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 598 while (tx->tx_synced_txg < txg) { 599 dprintf("broadcasting sync more " 600 "tx_synced=%llu waiting=%llu dp=%p\n", 601 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 602 cv_broadcast(&tx->tx_sync_more_cv); 603 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 604 } 605 mutex_exit(&tx->tx_sync_lock); 606} 607 608void 609txg_wait_open(dsl_pool_t *dp, uint64_t txg) 610{ 611 tx_state_t *tx = &dp->dp_tx; 612 613 ASSERT(!dsl_pool_config_held(dp)); 614 615 mutex_enter(&tx->tx_sync_lock); 616 ASSERT(tx->tx_threads == 2); 617 if (txg == 0) 618 txg = tx->tx_open_txg + 1; 619 if (tx->tx_quiesce_txg_waiting < txg) 620 tx->tx_quiesce_txg_waiting = txg; 621 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 622 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 623 while (tx->tx_open_txg < txg) { 624 cv_broadcast(&tx->tx_quiesce_more_cv); 625 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 626 } 627 mutex_exit(&tx->tx_sync_lock); 628} 629 630boolean_t 631txg_stalled(dsl_pool_t *dp) 632{ 633 tx_state_t *tx = &dp->dp_tx; 634 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 635} 636 637boolean_t 638txg_sync_waiting(dsl_pool_t *dp) 639{ 640 tx_state_t *tx = &dp->dp_tx; 641 642 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 643 tx->tx_quiesced_txg != 0); 644} 645 646/* 647 * Per-txg object lists. 648 */ 649void 650txg_list_create(txg_list_t *tl, size_t offset) 651{ 652 int t; 653 654 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 655 656 tl->tl_offset = offset; 657 658 for (t = 0; t < TXG_SIZE; t++) 659 tl->tl_head[t] = NULL; 660} 661 662void 663txg_list_destroy(txg_list_t *tl) 664{ 665 int t; 666 667 for (t = 0; t < TXG_SIZE; t++) 668 ASSERT(txg_list_empty(tl, t)); 669 670 mutex_destroy(&tl->tl_lock); 671} 672 673boolean_t 674txg_list_empty(txg_list_t *tl, uint64_t txg) 675{ 676 return (tl->tl_head[txg & TXG_MASK] == NULL); 677} 678 679/* 680 * Add an entry to the list (unless it's already on the list). 681 * Returns B_TRUE if it was actually added. 682 */ 683boolean_t 684txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 685{ 686 int t = txg & TXG_MASK; 687 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 688 boolean_t add; 689 690 mutex_enter(&tl->tl_lock); 691 add = (tn->tn_member[t] == 0); 692 if (add) { 693 tn->tn_member[t] = 1; 694 tn->tn_next[t] = tl->tl_head[t]; 695 tl->tl_head[t] = tn; 696 } 697 mutex_exit(&tl->tl_lock); 698 699 return (add); 700} 701 702/* 703 * Add an entry to the end of the list, unless it's already on the list. 704 * (walks list to find end) 705 * Returns B_TRUE if it was actually added. 706 */ 707boolean_t 708txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 709{ 710 int t = txg & TXG_MASK; 711 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 712 boolean_t add; 713 714 mutex_enter(&tl->tl_lock); 715 add = (tn->tn_member[t] == 0); 716 if (add) { 717 txg_node_t **tp; 718 719 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 720 continue; 721 722 tn->tn_member[t] = 1; 723 tn->tn_next[t] = NULL; 724 *tp = tn; 725 } 726 mutex_exit(&tl->tl_lock); 727 728 return (add); 729} 730 731/* 732 * Remove the head of the list and return it. 733 */ 734void * 735txg_list_remove(txg_list_t *tl, uint64_t txg) 736{ 737 int t = txg & TXG_MASK; 738 txg_node_t *tn; 739 void *p = NULL; 740 741 mutex_enter(&tl->tl_lock); 742 if ((tn = tl->tl_head[t]) != NULL) { 743 p = (char *)tn - tl->tl_offset; 744 tl->tl_head[t] = tn->tn_next[t]; 745 tn->tn_next[t] = NULL; 746 tn->tn_member[t] = 0; 747 } 748 mutex_exit(&tl->tl_lock); 749 750 return (p); 751} 752 753/* 754 * Remove a specific item from the list and return it. 755 */ 756void * 757txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 758{ 759 int t = txg & TXG_MASK; 760 txg_node_t *tn, **tp; 761 762 mutex_enter(&tl->tl_lock); 763 764 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 765 if ((char *)tn - tl->tl_offset == p) { 766 *tp = tn->tn_next[t]; 767 tn->tn_next[t] = NULL; 768 tn->tn_member[t] = 0; 769 mutex_exit(&tl->tl_lock); 770 return (p); 771 } 772 } 773 774 mutex_exit(&tl->tl_lock); 775 776 return (NULL); 777} 778 779boolean_t 780txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 781{ 782 int t = txg & TXG_MASK; 783 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 784 785 return (tn->tn_member[t] != 0); 786} 787 788/* 789 * Walk a txg list -- only safe if you know it's not changing. 790 */ 791void * 792txg_list_head(txg_list_t *tl, uint64_t txg) 793{ 794 int t = txg & TXG_MASK; 795 txg_node_t *tn = tl->tl_head[t]; 796 797 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 798} 799 800void * 801txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 802{ 803 int t = txg & TXG_MASK; 804 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 805 806 tn = tn->tn_next[t]; 807 808 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 809} 810