txg.c revision 210192
1168404Spjd/* 2168404Spjd * CDDL HEADER START 3168404Spjd * 4168404Spjd * The contents of this file are subject to the terms of the 5168404Spjd * Common Development and Distribution License (the "License"). 6168404Spjd * You may not use this file except in compliance with the License. 7168404Spjd * 8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9168404Spjd * or http://www.opensolaris.org/os/licensing. 10168404Spjd * See the License for the specific language governing permissions 11168404Spjd * and limitations under the License. 12168404Spjd * 13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each 14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15168404Spjd * If applicable, add the following below this CDDL HEADER, with the 16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying 17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner] 18168404Spjd * 19168404Spjd * CDDL HEADER END 20168404Spjd */ 21168404Spjd/* 22185029Spjd * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23168404Spjd * Use is subject to license terms. 24168404Spjd */ 25168404Spjd 26168404Spjd#include <sys/zfs_context.h> 27168404Spjd#include <sys/txg_impl.h> 28168404Spjd#include <sys/dmu_impl.h> 29168404Spjd#include <sys/dsl_pool.h> 30168404Spjd#include <sys/callb.h> 31168404Spjd 32168404Spjd/* 33168404Spjd * Pool-wide transaction groups. 34168404Spjd */ 35168404Spjd 36168404Spjdstatic void txg_sync_thread(void *arg); 37168404Spjdstatic void txg_quiesce_thread(void *arg); 38168404Spjd 39185029Spjdint zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ 40185029Spjdextern int zfs_txg_synctime; 41207481Smmextern uint64_t zfs_write_limit_override; 42168404Spjd 43185029SpjdSYSCTL_DECL(_vfs_zfs); 44207480SmmSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, 45207480Smm "ZFS transaction groups (TXG)"); 46185029SpjdTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 47185029SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0, 48185029Spjd "Maximum seconds worth of delta per txg"); 49185029SpjdTUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime); 50185029SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime, 51185029Spjd 0, "Target seconds to sync a txg"); 52207481SmmTUNABLE_QUAD("vfs.zfs.txg.write_limit_override", &zfs_write_limit_override); 53207481SmmSYSCTL_QUAD(_vfs_zfs_txg, OID_AUTO, write_limit_override, CTLFLAG_RW, 54207481Smm &zfs_write_limit_override, 0, 55207481Smm "Override maximum size of a txg to this size in bytes, " 56207481Smm "value of 0 means don't override"); 57185029Spjd 58168404Spjd/* 59168404Spjd * Prepare the txg subsystem. 60168404Spjd */ 61168404Spjdvoid 62168404Spjdtxg_init(dsl_pool_t *dp, uint64_t txg) 63168404Spjd{ 64168404Spjd tx_state_t *tx = &dp->dp_tx; 65185029Spjd int c; 66168404Spjd bzero(tx, sizeof (tx_state_t)); 67168404Spjd 68168404Spjd tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 69185029Spjd 70168404Spjd for (c = 0; c < max_ncpus; c++) { 71185029Spjd int i; 72185029Spjd 73168404Spjd mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 74185029Spjd for (i = 0; i < TXG_SIZE; i++) { 75185029Spjd cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 76185029Spjd NULL); 77185029Spjd } 78168404Spjd } 79168404Spjd 80168404Spjd rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 81168404Spjd mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 82208372Smm 83168404Spjd cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 84168404Spjd cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 85168404Spjd cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 86168404Spjd cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 87168404Spjd cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 88168404Spjd 89168404Spjd tx->tx_open_txg = txg; 90168404Spjd} 91168404Spjd 92168404Spjd/* 93168404Spjd * Close down the txg subsystem. 94168404Spjd */ 95168404Spjdvoid 96168404Spjdtxg_fini(dsl_pool_t *dp) 97168404Spjd{ 98168404Spjd tx_state_t *tx = &dp->dp_tx; 99185029Spjd int c; 100168404Spjd 101168404Spjd ASSERT(tx->tx_threads == 0); 102168404Spjd 103168404Spjd rw_destroy(&tx->tx_suspend); 104168404Spjd mutex_destroy(&tx->tx_sync_lock); 105168404Spjd 106208372Smm cv_destroy(&tx->tx_sync_more_cv); 107208372Smm cv_destroy(&tx->tx_sync_done_cv); 108208372Smm cv_destroy(&tx->tx_quiesce_more_cv); 109208372Smm cv_destroy(&tx->tx_quiesce_done_cv); 110208372Smm cv_destroy(&tx->tx_exit_cv); 111208372Smm 112168404Spjd for (c = 0; c < max_ncpus; c++) { 113185029Spjd int i; 114185029Spjd 115185029Spjd mutex_destroy(&tx->tx_cpu[c].tc_lock); 116168404Spjd for (i = 0; i < TXG_SIZE; i++) 117168404Spjd cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 118168404Spjd } 119168404Spjd 120168404Spjd kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 121168404Spjd 122168404Spjd bzero(tx, sizeof (tx_state_t)); 123168404Spjd} 124168404Spjd 125168404Spjd/* 126168404Spjd * Start syncing transaction groups. 127168404Spjd */ 128168404Spjdvoid 129168404Spjdtxg_sync_start(dsl_pool_t *dp) 130168404Spjd{ 131168404Spjd tx_state_t *tx = &dp->dp_tx; 132168404Spjd 133168404Spjd mutex_enter(&tx->tx_sync_lock); 134168404Spjd 135168404Spjd dprintf("pool %p\n", dp); 136168404Spjd 137168404Spjd ASSERT(tx->tx_threads == 0); 138168404Spjd 139185029Spjd tx->tx_threads = 2; 140168404Spjd 141168404Spjd tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 142168404Spjd dp, 0, &p0, TS_RUN, minclsyspri); 143168404Spjd 144185029Spjd /* 145185029Spjd * The sync thread can need a larger-than-default stack size on 146185029Spjd * 32-bit x86. This is due in part to nested pools and 147185029Spjd * scrub_visitbp() recursion. 148185029Spjd */ 149210192Snwhitehorn tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 150168404Spjd dp, 0, &p0, TS_RUN, minclsyspri); 151168404Spjd 152168404Spjd mutex_exit(&tx->tx_sync_lock); 153168404Spjd} 154168404Spjd 155168404Spjdstatic void 156168404Spjdtxg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 157168404Spjd{ 158168404Spjd CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 159168404Spjd mutex_enter(&tx->tx_sync_lock); 160168404Spjd} 161168404Spjd 162168404Spjdstatic void 163168404Spjdtxg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 164168404Spjd{ 165168404Spjd ASSERT(*tpp != NULL); 166168404Spjd *tpp = NULL; 167168404Spjd tx->tx_threads--; 168168404Spjd cv_broadcast(&tx->tx_exit_cv); 169168404Spjd CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 170168404Spjd thread_exit(); 171168404Spjd} 172168404Spjd 173168404Spjdstatic void 174185029Spjdtxg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 175168404Spjd{ 176168404Spjd CALLB_CPR_SAFE_BEGIN(cpr); 177168404Spjd 178185029Spjd if (time) 179185029Spjd (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 180168404Spjd else 181168404Spjd cv_wait(cv, &tx->tx_sync_lock); 182168404Spjd 183168404Spjd CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 184168404Spjd} 185168404Spjd 186168404Spjd/* 187168404Spjd * Stop syncing transaction groups. 188168404Spjd */ 189168404Spjdvoid 190168404Spjdtxg_sync_stop(dsl_pool_t *dp) 191168404Spjd{ 192168404Spjd tx_state_t *tx = &dp->dp_tx; 193168404Spjd 194168404Spjd dprintf("pool %p\n", dp); 195168404Spjd /* 196168404Spjd * Finish off any work in progress. 197168404Spjd */ 198185029Spjd ASSERT(tx->tx_threads == 2); 199168404Spjd txg_wait_synced(dp, 0); 200168404Spjd 201168404Spjd /* 202185029Spjd * Wake all sync threads and wait for them to die. 203168404Spjd */ 204168404Spjd mutex_enter(&tx->tx_sync_lock); 205168404Spjd 206185029Spjd ASSERT(tx->tx_threads == 2); 207168404Spjd 208168404Spjd tx->tx_exiting = 1; 209168404Spjd 210168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 211168404Spjd cv_broadcast(&tx->tx_quiesce_done_cv); 212168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 213168404Spjd 214168404Spjd while (tx->tx_threads != 0) 215168404Spjd cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 216168404Spjd 217168404Spjd tx->tx_exiting = 0; 218168404Spjd 219168404Spjd mutex_exit(&tx->tx_sync_lock); 220168404Spjd} 221168404Spjd 222168404Spjduint64_t 223168404Spjdtxg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 224168404Spjd{ 225168404Spjd tx_state_t *tx = &dp->dp_tx; 226168404Spjd tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 227168404Spjd uint64_t txg; 228168404Spjd 229168404Spjd mutex_enter(&tc->tc_lock); 230168404Spjd 231168404Spjd txg = tx->tx_open_txg; 232168404Spjd tc->tc_count[txg & TXG_MASK]++; 233168404Spjd 234168404Spjd th->th_cpu = tc; 235168404Spjd th->th_txg = txg; 236168404Spjd 237168404Spjd return (txg); 238168404Spjd} 239168404Spjd 240168404Spjdvoid 241168404Spjdtxg_rele_to_quiesce(txg_handle_t *th) 242168404Spjd{ 243168404Spjd tx_cpu_t *tc = th->th_cpu; 244168404Spjd 245168404Spjd mutex_exit(&tc->tc_lock); 246168404Spjd} 247168404Spjd 248168404Spjdvoid 249168404Spjdtxg_rele_to_sync(txg_handle_t *th) 250168404Spjd{ 251168404Spjd tx_cpu_t *tc = th->th_cpu; 252168404Spjd int g = th->th_txg & TXG_MASK; 253168404Spjd 254168404Spjd mutex_enter(&tc->tc_lock); 255168404Spjd ASSERT(tc->tc_count[g] != 0); 256168404Spjd if (--tc->tc_count[g] == 0) 257168404Spjd cv_broadcast(&tc->tc_cv[g]); 258168404Spjd mutex_exit(&tc->tc_lock); 259168404Spjd 260168404Spjd th->th_cpu = NULL; /* defensive */ 261168404Spjd} 262168404Spjd 263168404Spjdstatic void 264168404Spjdtxg_quiesce(dsl_pool_t *dp, uint64_t txg) 265168404Spjd{ 266168404Spjd tx_state_t *tx = &dp->dp_tx; 267168404Spjd int g = txg & TXG_MASK; 268168404Spjd int c; 269168404Spjd 270168404Spjd /* 271168404Spjd * Grab all tx_cpu locks so nobody else can get into this txg. 272168404Spjd */ 273168404Spjd for (c = 0; c < max_ncpus; c++) 274168404Spjd mutex_enter(&tx->tx_cpu[c].tc_lock); 275168404Spjd 276168404Spjd ASSERT(txg == tx->tx_open_txg); 277168404Spjd tx->tx_open_txg++; 278168404Spjd 279168404Spjd /* 280168404Spjd * Now that we've incremented tx_open_txg, we can let threads 281168404Spjd * enter the next transaction group. 282168404Spjd */ 283168404Spjd for (c = 0; c < max_ncpus; c++) 284168404Spjd mutex_exit(&tx->tx_cpu[c].tc_lock); 285168404Spjd 286168404Spjd /* 287168404Spjd * Quiesce the transaction group by waiting for everyone to txg_exit(). 288168404Spjd */ 289168404Spjd for (c = 0; c < max_ncpus; c++) { 290168404Spjd tx_cpu_t *tc = &tx->tx_cpu[c]; 291168404Spjd mutex_enter(&tc->tc_lock); 292168404Spjd while (tc->tc_count[g] != 0) 293168404Spjd cv_wait(&tc->tc_cv[g], &tc->tc_lock); 294168404Spjd mutex_exit(&tc->tc_lock); 295168404Spjd } 296168404Spjd} 297168404Spjd 298168404Spjdstatic void 299168404Spjdtxg_sync_thread(void *arg) 300168404Spjd{ 301168404Spjd dsl_pool_t *dp = arg; 302168404Spjd tx_state_t *tx = &dp->dp_tx; 303168404Spjd callb_cpr_t cpr; 304185029Spjd uint64_t start, delta; 305168404Spjd 306168404Spjd txg_thread_enter(tx, &cpr); 307168404Spjd 308185029Spjd start = delta = 0; 309168404Spjd for (;;) { 310185029Spjd uint64_t timer, timeout = zfs_txg_timeout * hz; 311168404Spjd uint64_t txg; 312168404Spjd 313168404Spjd /* 314208047Smm * We sync when we're scrubbing, there's someone waiting 315208047Smm * on us, or the quiesce thread has handed off a txg to 316208047Smm * us, or we have reached our timeout. 317168404Spjd */ 318185029Spjd timer = (delta >= timeout ? 0 : timeout - delta); 319208047Smm while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || 320208047Smm spa_shutting_down(dp->dp_spa)) && 321208047Smm !tx->tx_exiting && timer > 0 && 322168404Spjd tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 323168404Spjd tx->tx_quiesced_txg == 0) { 324168404Spjd dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 325168404Spjd tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 326185029Spjd txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 327185029Spjd delta = LBOLT - start; 328185029Spjd timer = (delta > timeout ? 0 : timeout - delta); 329168404Spjd } 330168404Spjd 331168404Spjd /* 332168404Spjd * Wait until the quiesce thread hands off a txg to us, 333168404Spjd * prompting it to do so if necessary. 334168404Spjd */ 335168404Spjd while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 336168404Spjd if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 337168404Spjd tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 338168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 339168404Spjd txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 340168404Spjd } 341168404Spjd 342168404Spjd if (tx->tx_exiting) 343168404Spjd txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 344168404Spjd 345168404Spjd rw_enter(&tx->tx_suspend, RW_WRITER); 346168404Spjd 347168404Spjd /* 348168404Spjd * Consume the quiesced txg which has been handed off to 349168404Spjd * us. This may cause the quiescing thread to now be 350168404Spjd * able to quiesce another txg, so we must signal it. 351168404Spjd */ 352168404Spjd txg = tx->tx_quiesced_txg; 353168404Spjd tx->tx_quiesced_txg = 0; 354168404Spjd tx->tx_syncing_txg = txg; 355168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 356168404Spjd rw_exit(&tx->tx_suspend); 357168404Spjd 358168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 359185029Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 360168404Spjd mutex_exit(&tx->tx_sync_lock); 361185029Spjd 362185029Spjd start = LBOLT; 363168404Spjd spa_sync(dp->dp_spa, txg); 364185029Spjd delta = LBOLT - start; 365185029Spjd 366168404Spjd mutex_enter(&tx->tx_sync_lock); 367168404Spjd rw_enter(&tx->tx_suspend, RW_WRITER); 368168404Spjd tx->tx_synced_txg = txg; 369168404Spjd tx->tx_syncing_txg = 0; 370168404Spjd rw_exit(&tx->tx_suspend); 371168404Spjd cv_broadcast(&tx->tx_sync_done_cv); 372168404Spjd } 373168404Spjd} 374168404Spjd 375168404Spjdstatic void 376168404Spjdtxg_quiesce_thread(void *arg) 377168404Spjd{ 378168404Spjd dsl_pool_t *dp = arg; 379168404Spjd tx_state_t *tx = &dp->dp_tx; 380168404Spjd callb_cpr_t cpr; 381168404Spjd 382168404Spjd txg_thread_enter(tx, &cpr); 383168404Spjd 384168404Spjd for (;;) { 385168404Spjd uint64_t txg; 386168404Spjd 387168404Spjd /* 388168404Spjd * We quiesce when there's someone waiting on us. 389168404Spjd * However, we can only have one txg in "quiescing" or 390168404Spjd * "quiesced, waiting to sync" state. So we wait until 391168404Spjd * the "quiesced, waiting to sync" txg has been consumed 392168404Spjd * by the sync thread. 393168404Spjd */ 394168404Spjd while (!tx->tx_exiting && 395168404Spjd (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 396168404Spjd tx->tx_quiesced_txg != 0)) 397168404Spjd txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 398168404Spjd 399168404Spjd if (tx->tx_exiting) 400168404Spjd txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 401168404Spjd 402168404Spjd txg = tx->tx_open_txg; 403168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 404168404Spjd txg, tx->tx_quiesce_txg_waiting, 405168404Spjd tx->tx_sync_txg_waiting); 406168404Spjd mutex_exit(&tx->tx_sync_lock); 407168404Spjd txg_quiesce(dp, txg); 408168404Spjd mutex_enter(&tx->tx_sync_lock); 409168404Spjd 410168404Spjd /* 411168404Spjd * Hand this txg off to the sync thread. 412168404Spjd */ 413168404Spjd dprintf("quiesce done, handing off txg %llu\n", txg); 414168404Spjd tx->tx_quiesced_txg = txg; 415168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 416168404Spjd cv_broadcast(&tx->tx_quiesce_done_cv); 417168404Spjd } 418168404Spjd} 419168404Spjd 420185029Spjd/* 421185029Spjd * Delay this thread by 'ticks' if we are still in the open transaction 422185029Spjd * group and there is already a waiting txg quiesing or quiesced. Abort 423185029Spjd * the delay if this txg stalls or enters the quiesing state. 424185029Spjd */ 425168404Spjdvoid 426185029Spjdtxg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 427185029Spjd{ 428185029Spjd tx_state_t *tx = &dp->dp_tx; 429185029Spjd int timeout = LBOLT + ticks; 430185029Spjd 431185029Spjd /* don't delay if this txg could transition to quiesing immediately */ 432185029Spjd if (tx->tx_open_txg > txg || 433185029Spjd tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 434185029Spjd return; 435185029Spjd 436185029Spjd mutex_enter(&tx->tx_sync_lock); 437185029Spjd if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 438185029Spjd mutex_exit(&tx->tx_sync_lock); 439185029Spjd return; 440185029Spjd } 441185029Spjd 442185029Spjd while (LBOLT < timeout && 443185029Spjd tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 444185029Spjd (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 445185029Spjd timeout - LBOLT); 446185029Spjd 447185029Spjd mutex_exit(&tx->tx_sync_lock); 448185029Spjd} 449185029Spjd 450185029Spjdvoid 451168404Spjdtxg_wait_synced(dsl_pool_t *dp, uint64_t txg) 452168404Spjd{ 453168404Spjd tx_state_t *tx = &dp->dp_tx; 454168404Spjd 455168404Spjd mutex_enter(&tx->tx_sync_lock); 456185029Spjd ASSERT(tx->tx_threads == 2); 457168404Spjd if (txg == 0) 458168404Spjd txg = tx->tx_open_txg; 459168404Spjd if (tx->tx_sync_txg_waiting < txg) 460168404Spjd tx->tx_sync_txg_waiting = txg; 461168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 462168404Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 463168404Spjd while (tx->tx_synced_txg < txg) { 464168404Spjd dprintf("broadcasting sync more " 465168404Spjd "tx_synced=%llu waiting=%llu dp=%p\n", 466168404Spjd tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 467168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 468168404Spjd cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 469168404Spjd } 470168404Spjd mutex_exit(&tx->tx_sync_lock); 471168404Spjd} 472168404Spjd 473168404Spjdvoid 474168404Spjdtxg_wait_open(dsl_pool_t *dp, uint64_t txg) 475168404Spjd{ 476168404Spjd tx_state_t *tx = &dp->dp_tx; 477168404Spjd 478168404Spjd mutex_enter(&tx->tx_sync_lock); 479185029Spjd ASSERT(tx->tx_threads == 2); 480168404Spjd if (txg == 0) 481168404Spjd txg = tx->tx_open_txg + 1; 482168404Spjd if (tx->tx_quiesce_txg_waiting < txg) 483168404Spjd tx->tx_quiesce_txg_waiting = txg; 484168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 485168404Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 486168404Spjd while (tx->tx_open_txg < txg) { 487168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 488168404Spjd cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 489168404Spjd } 490168404Spjd mutex_exit(&tx->tx_sync_lock); 491168404Spjd} 492168404Spjd 493185029Spjdboolean_t 494185029Spjdtxg_stalled(dsl_pool_t *dp) 495168404Spjd{ 496168404Spjd tx_state_t *tx = &dp->dp_tx; 497185029Spjd return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 498168404Spjd} 499168404Spjd 500185029Spjdboolean_t 501185029Spjdtxg_sync_waiting(dsl_pool_t *dp) 502168404Spjd{ 503168404Spjd tx_state_t *tx = &dp->dp_tx; 504185029Spjd 505185029Spjd return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 506185029Spjd tx->tx_quiesced_txg != 0); 507168404Spjd} 508168404Spjd 509168404Spjdvoid 510168404Spjdtxg_suspend(dsl_pool_t *dp) 511168404Spjd{ 512168404Spjd tx_state_t *tx = &dp->dp_tx; 513168404Spjd /* XXX some code paths suspend when they are already suspended! */ 514168404Spjd rw_enter(&tx->tx_suspend, RW_READER); 515168404Spjd} 516168404Spjd 517168404Spjdvoid 518168404Spjdtxg_resume(dsl_pool_t *dp) 519168404Spjd{ 520168404Spjd tx_state_t *tx = &dp->dp_tx; 521168404Spjd rw_exit(&tx->tx_suspend); 522168404Spjd} 523168404Spjd 524168404Spjd/* 525168404Spjd * Per-txg object lists. 526168404Spjd */ 527168404Spjdvoid 528168404Spjdtxg_list_create(txg_list_t *tl, size_t offset) 529168404Spjd{ 530168404Spjd int t; 531168404Spjd 532168404Spjd mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 533168404Spjd 534168404Spjd tl->tl_offset = offset; 535168404Spjd 536168404Spjd for (t = 0; t < TXG_SIZE; t++) 537168404Spjd tl->tl_head[t] = NULL; 538168404Spjd} 539168404Spjd 540168404Spjdvoid 541168404Spjdtxg_list_destroy(txg_list_t *tl) 542168404Spjd{ 543168404Spjd int t; 544168404Spjd 545168404Spjd for (t = 0; t < TXG_SIZE; t++) 546168404Spjd ASSERT(txg_list_empty(tl, t)); 547168404Spjd 548168404Spjd mutex_destroy(&tl->tl_lock); 549168404Spjd} 550168404Spjd 551168404Spjdint 552168404Spjdtxg_list_empty(txg_list_t *tl, uint64_t txg) 553168404Spjd{ 554168404Spjd return (tl->tl_head[txg & TXG_MASK] == NULL); 555168404Spjd} 556168404Spjd 557168404Spjd/* 558168404Spjd * Add an entry to the list. 559168404Spjd * Returns 0 if it's a new entry, 1 if it's already there. 560168404Spjd */ 561168404Spjdint 562168404Spjdtxg_list_add(txg_list_t *tl, void *p, uint64_t txg) 563168404Spjd{ 564168404Spjd int t = txg & TXG_MASK; 565168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 566168404Spjd int already_on_list; 567168404Spjd 568168404Spjd mutex_enter(&tl->tl_lock); 569168404Spjd already_on_list = tn->tn_member[t]; 570168404Spjd if (!already_on_list) { 571168404Spjd tn->tn_member[t] = 1; 572168404Spjd tn->tn_next[t] = tl->tl_head[t]; 573168404Spjd tl->tl_head[t] = tn; 574168404Spjd } 575168404Spjd mutex_exit(&tl->tl_lock); 576168404Spjd 577168404Spjd return (already_on_list); 578168404Spjd} 579168404Spjd 580168404Spjd/* 581168404Spjd * Remove the head of the list and return it. 582168404Spjd */ 583168404Spjdvoid * 584168404Spjdtxg_list_remove(txg_list_t *tl, uint64_t txg) 585168404Spjd{ 586168404Spjd int t = txg & TXG_MASK; 587168404Spjd txg_node_t *tn; 588168404Spjd void *p = NULL; 589168404Spjd 590168404Spjd mutex_enter(&tl->tl_lock); 591168404Spjd if ((tn = tl->tl_head[t]) != NULL) { 592168404Spjd p = (char *)tn - tl->tl_offset; 593168404Spjd tl->tl_head[t] = tn->tn_next[t]; 594168404Spjd tn->tn_next[t] = NULL; 595168404Spjd tn->tn_member[t] = 0; 596168404Spjd } 597168404Spjd mutex_exit(&tl->tl_lock); 598168404Spjd 599168404Spjd return (p); 600168404Spjd} 601168404Spjd 602168404Spjd/* 603168404Spjd * Remove a specific item from the list and return it. 604168404Spjd */ 605168404Spjdvoid * 606168404Spjdtxg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 607168404Spjd{ 608168404Spjd int t = txg & TXG_MASK; 609168404Spjd txg_node_t *tn, **tp; 610168404Spjd 611168404Spjd mutex_enter(&tl->tl_lock); 612168404Spjd 613168404Spjd for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 614168404Spjd if ((char *)tn - tl->tl_offset == p) { 615168404Spjd *tp = tn->tn_next[t]; 616168404Spjd tn->tn_next[t] = NULL; 617168404Spjd tn->tn_member[t] = 0; 618168404Spjd mutex_exit(&tl->tl_lock); 619168404Spjd return (p); 620168404Spjd } 621168404Spjd } 622168404Spjd 623168404Spjd mutex_exit(&tl->tl_lock); 624168404Spjd 625168404Spjd return (NULL); 626168404Spjd} 627168404Spjd 628168404Spjdint 629168404Spjdtxg_list_member(txg_list_t *tl, void *p, uint64_t txg) 630168404Spjd{ 631168404Spjd int t = txg & TXG_MASK; 632168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 633168404Spjd 634168404Spjd return (tn->tn_member[t]); 635168404Spjd} 636168404Spjd 637168404Spjd/* 638168404Spjd * Walk a txg list -- only safe if you know it's not changing. 639168404Spjd */ 640168404Spjdvoid * 641168404Spjdtxg_list_head(txg_list_t *tl, uint64_t txg) 642168404Spjd{ 643168404Spjd int t = txg & TXG_MASK; 644168404Spjd txg_node_t *tn = tl->tl_head[t]; 645168404Spjd 646168404Spjd return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 647168404Spjd} 648168404Spjd 649168404Spjdvoid * 650168404Spjdtxg_list_next(txg_list_t *tl, void *p, uint64_t txg) 651168404Spjd{ 652168404Spjd int t = txg & TXG_MASK; 653168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 654168404Spjd 655168404Spjd tn = tn->tn_next[t]; 656168404Spjd 657168404Spjd return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 658168404Spjd} 659