txg.c revision 248571
1168404Spjd/* 2168404Spjd * CDDL HEADER START 3168404Spjd * 4168404Spjd * The contents of this file are subject to the terms of the 5168404Spjd * Common Development and Distribution License (the "License"). 6168404Spjd * You may not use this file except in compliance with the License. 7168404Spjd * 8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9168404Spjd * or http://www.opensolaris.org/os/licensing. 10168404Spjd * See the License for the specific language governing permissions 11168404Spjd * and limitations under the License. 12168404Spjd * 13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each 14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15168404Spjd * If applicable, add the following below this CDDL HEADER, with the 16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying 17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner] 18168404Spjd * 19168404Spjd * CDDL HEADER END 20168404Spjd */ 21168404Spjd/* 22219089Spjd * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23226724Smm * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org> 24245511Sdelphij * Copyright (c) 2013 by Delphix. All rights reserved. 25168404Spjd */ 26168404Spjd 27168404Spjd#include <sys/zfs_context.h> 28168404Spjd#include <sys/txg_impl.h> 29168404Spjd#include <sys/dmu_impl.h> 30219089Spjd#include <sys/dmu_tx.h> 31168404Spjd#include <sys/dsl_pool.h> 32219089Spjd#include <sys/dsl_scan.h> 33168404Spjd#include <sys/callb.h> 34168404Spjd 35168404Spjd/* 36245511Sdelphij * ZFS Transaction Groups 37245511Sdelphij * ---------------------- 38245511Sdelphij * 39245511Sdelphij * ZFS transaction groups are, as the name implies, groups of transactions 40245511Sdelphij * that act on persistent state. ZFS asserts consistency at the granularity of 41245511Sdelphij * these transaction groups. Each successive transaction group (txg) is 42245511Sdelphij * assigned a 64-bit consecutive identifier. There are three active 43245511Sdelphij * transaction group states: open, quiescing, or syncing. At any given time, 44245511Sdelphij * there may be an active txg associated with each state; each active txg may 45245511Sdelphij * either be processing, or blocked waiting to enter the next state. There may 46245511Sdelphij * be up to three active txgs, and there is always a txg in the open state 47245511Sdelphij * (though it may be blocked waiting to enter the quiescing state). In broad 48245511Sdelphij * strokes, transactions ��� operations that change in-memory structures ��� are 49245511Sdelphij * accepted into the txg in the open state, and are completed while the txg is 50245511Sdelphij * in the open or quiescing states. The accumulated changes are written to 51245511Sdelphij * disk in the syncing state. 52245511Sdelphij * 53245511Sdelphij * Open 54245511Sdelphij * 55245511Sdelphij * When a new txg becomes active, it first enters the open state. New 56245511Sdelphij * transactions ��� updates to in-memory structures ��� are assigned to the 57245511Sdelphij * currently open txg. There is always a txg in the open state so that ZFS can 58245511Sdelphij * accept new changes (though the txg may refuse new changes if it has hit 59245511Sdelphij * some limit). ZFS advances the open txg to the next state for a variety of 60245511Sdelphij * reasons such as it hitting a time or size threshold, or the execution of an 61245511Sdelphij * administrative action that must be completed in the syncing state. 62245511Sdelphij * 63245511Sdelphij * Quiescing 64245511Sdelphij * 65245511Sdelphij * After a txg exits the open state, it enters the quiescing state. The 66245511Sdelphij * quiescing state is intended to provide a buffer between accepting new 67245511Sdelphij * transactions in the open state and writing them out to stable storage in 68245511Sdelphij * the syncing state. While quiescing, transactions can continue their 69245511Sdelphij * operation without delaying either of the other states. Typically, a txg is 70245511Sdelphij * in the quiescing state very briefly since the operations are bounded by 71245511Sdelphij * software latencies rather than, say, slower I/O latencies. After all 72245511Sdelphij * transactions complete, the txg is ready to enter the next state. 73245511Sdelphij * 74245511Sdelphij * Syncing 75245511Sdelphij * 76245511Sdelphij * In the syncing state, the in-memory state built up during the open and (to 77245511Sdelphij * a lesser degree) the quiescing states is written to stable storage. The 78245511Sdelphij * process of writing out modified data can, in turn modify more data. For 79245511Sdelphij * example when we write new blocks, we need to allocate space for them; those 80245511Sdelphij * allocations modify metadata (space maps)... which themselves must be 81245511Sdelphij * written to stable storage. During the sync state, ZFS iterates, writing out 82245511Sdelphij * data until it converges and all in-memory changes have been written out. 83245511Sdelphij * The first such pass is the largest as it encompasses all the modified user 84245511Sdelphij * data (as opposed to filesystem metadata). Subsequent passes typically have 85245511Sdelphij * far less data to write as they consist exclusively of filesystem metadata. 86245511Sdelphij * 87245511Sdelphij * To ensure convergence, after a certain number of passes ZFS begins 88245511Sdelphij * overwriting locations on stable storage that had been allocated earlier in 89245511Sdelphij * the syncing state (and subsequently freed). ZFS usually allocates new 90245511Sdelphij * blocks to optimize for large, continuous, writes. For the syncing state to 91245511Sdelphij * converge however it must complete a pass where no new blocks are allocated 92245511Sdelphij * since each allocation requires a modification of persistent metadata. 93245511Sdelphij * Further, to hasten convergence, after a prescribed number of passes, ZFS 94245511Sdelphij * also defers frees, and stops compressing. 95245511Sdelphij * 96245511Sdelphij * In addition to writing out user data, we must also execute synctasks during 97245511Sdelphij * the syncing context. A synctask is the mechanism by which some 98245511Sdelphij * administrative activities work such as creating and destroying snapshots or 99245511Sdelphij * datasets. Note that when a synctask is initiated it enters the open txg, 100245511Sdelphij * and ZFS then pushes that txg as quickly as possible to completion of the 101245511Sdelphij * syncing state in order to reduce the latency of the administrative 102245511Sdelphij * activity. To complete the syncing state, ZFS writes out a new uberblock, 103245511Sdelphij * the root of the tree of blocks that comprise all state stored on the ZFS 104245511Sdelphij * pool. Finally, if there is a quiesced txg waiting, we signal that it can 105245511Sdelphij * now transition to the syncing state. 106168404Spjd */ 107168404Spjd 108168404Spjdstatic void txg_sync_thread(void *arg); 109168404Spjdstatic void txg_quiesce_thread(void *arg); 110168404Spjd 111219089Spjdint zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 112168404Spjd 113185029SpjdSYSCTL_DECL(_vfs_zfs); 114219089SpjdSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 115185029SpjdTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 116228363SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0, 117185029Spjd "Maximum seconds worth of delta per txg"); 118185029Spjd 119168404Spjd/* 120168404Spjd * Prepare the txg subsystem. 121168404Spjd */ 122168404Spjdvoid 123168404Spjdtxg_init(dsl_pool_t *dp, uint64_t txg) 124168404Spjd{ 125168404Spjd tx_state_t *tx = &dp->dp_tx; 126185029Spjd int c; 127168404Spjd bzero(tx, sizeof (tx_state_t)); 128168404Spjd 129168404Spjd tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 130185029Spjd 131168404Spjd for (c = 0; c < max_ncpus; c++) { 132185029Spjd int i; 133185029Spjd 134168404Spjd mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 135185029Spjd for (i = 0; i < TXG_SIZE; i++) { 136185029Spjd cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 137185029Spjd NULL); 138219089Spjd list_create(&tx->tx_cpu[c].tc_callbacks[i], 139219089Spjd sizeof (dmu_tx_callback_t), 140219089Spjd offsetof(dmu_tx_callback_t, dcb_node)); 141185029Spjd } 142168404Spjd } 143168404Spjd 144168404Spjd mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 145208372Smm 146168404Spjd cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 147168404Spjd cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 148168404Spjd cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 149168404Spjd cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 150168404Spjd cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 151168404Spjd 152168404Spjd tx->tx_open_txg = txg; 153168404Spjd} 154168404Spjd 155168404Spjd/* 156168404Spjd * Close down the txg subsystem. 157168404Spjd */ 158168404Spjdvoid 159168404Spjdtxg_fini(dsl_pool_t *dp) 160168404Spjd{ 161168404Spjd tx_state_t *tx = &dp->dp_tx; 162185029Spjd int c; 163168404Spjd 164168404Spjd ASSERT(tx->tx_threads == 0); 165168404Spjd 166168404Spjd mutex_destroy(&tx->tx_sync_lock); 167168404Spjd 168208372Smm cv_destroy(&tx->tx_sync_more_cv); 169208372Smm cv_destroy(&tx->tx_sync_done_cv); 170208372Smm cv_destroy(&tx->tx_quiesce_more_cv); 171208372Smm cv_destroy(&tx->tx_quiesce_done_cv); 172208372Smm cv_destroy(&tx->tx_exit_cv); 173208372Smm 174168404Spjd for (c = 0; c < max_ncpus; c++) { 175185029Spjd int i; 176185029Spjd 177185029Spjd mutex_destroy(&tx->tx_cpu[c].tc_lock); 178219089Spjd for (i = 0; i < TXG_SIZE; i++) { 179168404Spjd cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 180219089Spjd list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 181219089Spjd } 182168404Spjd } 183168404Spjd 184219089Spjd if (tx->tx_commit_cb_taskq != NULL) 185219089Spjd taskq_destroy(tx->tx_commit_cb_taskq); 186219089Spjd 187168404Spjd kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 188168404Spjd 189168404Spjd bzero(tx, sizeof (tx_state_t)); 190168404Spjd} 191168404Spjd 192168404Spjd/* 193168404Spjd * Start syncing transaction groups. 194168404Spjd */ 195168404Spjdvoid 196168404Spjdtxg_sync_start(dsl_pool_t *dp) 197168404Spjd{ 198168404Spjd tx_state_t *tx = &dp->dp_tx; 199168404Spjd 200168404Spjd mutex_enter(&tx->tx_sync_lock); 201168404Spjd 202168404Spjd dprintf("pool %p\n", dp); 203168404Spjd 204168404Spjd ASSERT(tx->tx_threads == 0); 205168404Spjd 206185029Spjd tx->tx_threads = 2; 207168404Spjd 208168404Spjd tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 209168404Spjd dp, 0, &p0, TS_RUN, minclsyspri); 210168404Spjd 211185029Spjd /* 212185029Spjd * The sync thread can need a larger-than-default stack size on 213185029Spjd * 32-bit x86. This is due in part to nested pools and 214185029Spjd * scrub_visitbp() recursion. 215185029Spjd */ 216210192Snwhitehorn tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 217168404Spjd dp, 0, &p0, TS_RUN, minclsyspri); 218168404Spjd 219168404Spjd mutex_exit(&tx->tx_sync_lock); 220168404Spjd} 221168404Spjd 222168404Spjdstatic void 223168404Spjdtxg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 224168404Spjd{ 225168404Spjd CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 226168404Spjd mutex_enter(&tx->tx_sync_lock); 227168404Spjd} 228168404Spjd 229168404Spjdstatic void 230168404Spjdtxg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 231168404Spjd{ 232168404Spjd ASSERT(*tpp != NULL); 233168404Spjd *tpp = NULL; 234168404Spjd tx->tx_threads--; 235168404Spjd cv_broadcast(&tx->tx_exit_cv); 236168404Spjd CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 237168404Spjd thread_exit(); 238168404Spjd} 239168404Spjd 240168404Spjdstatic void 241185029Spjdtxg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 242168404Spjd{ 243168404Spjd CALLB_CPR_SAFE_BEGIN(cpr); 244168404Spjd 245185029Spjd if (time) 246185029Spjd (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 247168404Spjd else 248168404Spjd cv_wait(cv, &tx->tx_sync_lock); 249168404Spjd 250168404Spjd CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 251168404Spjd} 252168404Spjd 253168404Spjd/* 254168404Spjd * Stop syncing transaction groups. 255168404Spjd */ 256168404Spjdvoid 257168404Spjdtxg_sync_stop(dsl_pool_t *dp) 258168404Spjd{ 259168404Spjd tx_state_t *tx = &dp->dp_tx; 260168404Spjd 261168404Spjd dprintf("pool %p\n", dp); 262168404Spjd /* 263168404Spjd * Finish off any work in progress. 264168404Spjd */ 265185029Spjd ASSERT(tx->tx_threads == 2); 266168404Spjd 267168404Spjd /* 268219089Spjd * We need to ensure that we've vacated the deferred space_maps. 269219089Spjd */ 270219089Spjd txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 271219089Spjd 272219089Spjd /* 273185029Spjd * Wake all sync threads and wait for them to die. 274168404Spjd */ 275168404Spjd mutex_enter(&tx->tx_sync_lock); 276168404Spjd 277185029Spjd ASSERT(tx->tx_threads == 2); 278168404Spjd 279168404Spjd tx->tx_exiting = 1; 280168404Spjd 281168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 282168404Spjd cv_broadcast(&tx->tx_quiesce_done_cv); 283168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 284168404Spjd 285168404Spjd while (tx->tx_threads != 0) 286168404Spjd cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 287168404Spjd 288168404Spjd tx->tx_exiting = 0; 289168404Spjd 290168404Spjd mutex_exit(&tx->tx_sync_lock); 291168404Spjd} 292168404Spjd 293168404Spjduint64_t 294168404Spjdtxg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 295168404Spjd{ 296168404Spjd tx_state_t *tx = &dp->dp_tx; 297168404Spjd tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 298168404Spjd uint64_t txg; 299168404Spjd 300168404Spjd mutex_enter(&tc->tc_lock); 301168404Spjd 302168404Spjd txg = tx->tx_open_txg; 303168404Spjd tc->tc_count[txg & TXG_MASK]++; 304168404Spjd 305168404Spjd th->th_cpu = tc; 306168404Spjd th->th_txg = txg; 307168404Spjd 308168404Spjd return (txg); 309168404Spjd} 310168404Spjd 311168404Spjdvoid 312168404Spjdtxg_rele_to_quiesce(txg_handle_t *th) 313168404Spjd{ 314168404Spjd tx_cpu_t *tc = th->th_cpu; 315168404Spjd 316168404Spjd mutex_exit(&tc->tc_lock); 317168404Spjd} 318168404Spjd 319168404Spjdvoid 320219089Spjdtxg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 321219089Spjd{ 322219089Spjd tx_cpu_t *tc = th->th_cpu; 323219089Spjd int g = th->th_txg & TXG_MASK; 324219089Spjd 325219089Spjd mutex_enter(&tc->tc_lock); 326219089Spjd list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 327219089Spjd mutex_exit(&tc->tc_lock); 328219089Spjd} 329219089Spjd 330219089Spjdvoid 331168404Spjdtxg_rele_to_sync(txg_handle_t *th) 332168404Spjd{ 333168404Spjd tx_cpu_t *tc = th->th_cpu; 334168404Spjd int g = th->th_txg & TXG_MASK; 335168404Spjd 336168404Spjd mutex_enter(&tc->tc_lock); 337168404Spjd ASSERT(tc->tc_count[g] != 0); 338168404Spjd if (--tc->tc_count[g] == 0) 339168404Spjd cv_broadcast(&tc->tc_cv[g]); 340168404Spjd mutex_exit(&tc->tc_lock); 341168404Spjd 342168404Spjd th->th_cpu = NULL; /* defensive */ 343168404Spjd} 344168404Spjd 345168404Spjdstatic void 346168404Spjdtxg_quiesce(dsl_pool_t *dp, uint64_t txg) 347168404Spjd{ 348168404Spjd tx_state_t *tx = &dp->dp_tx; 349168404Spjd int g = txg & TXG_MASK; 350168404Spjd int c; 351168404Spjd 352168404Spjd /* 353168404Spjd * Grab all tx_cpu locks so nobody else can get into this txg. 354168404Spjd */ 355168404Spjd for (c = 0; c < max_ncpus; c++) 356168404Spjd mutex_enter(&tx->tx_cpu[c].tc_lock); 357168404Spjd 358168404Spjd ASSERT(txg == tx->tx_open_txg); 359168404Spjd tx->tx_open_txg++; 360168404Spjd 361168404Spjd /* 362168404Spjd * Now that we've incremented tx_open_txg, we can let threads 363168404Spjd * enter the next transaction group. 364168404Spjd */ 365168404Spjd for (c = 0; c < max_ncpus; c++) 366168404Spjd mutex_exit(&tx->tx_cpu[c].tc_lock); 367168404Spjd 368168404Spjd /* 369168404Spjd * Quiesce the transaction group by waiting for everyone to txg_exit(). 370168404Spjd */ 371168404Spjd for (c = 0; c < max_ncpus; c++) { 372168404Spjd tx_cpu_t *tc = &tx->tx_cpu[c]; 373168404Spjd mutex_enter(&tc->tc_lock); 374168404Spjd while (tc->tc_count[g] != 0) 375168404Spjd cv_wait(&tc->tc_cv[g], &tc->tc_lock); 376168404Spjd mutex_exit(&tc->tc_lock); 377168404Spjd } 378168404Spjd} 379168404Spjd 380168404Spjdstatic void 381219089Spjdtxg_do_callbacks(void *arg) 382219089Spjd{ 383219089Spjd list_t *cb_list = arg; 384219089Spjd 385219089Spjd dmu_tx_do_callbacks(cb_list, 0); 386219089Spjd 387219089Spjd list_destroy(cb_list); 388219089Spjd 389219089Spjd kmem_free(cb_list, sizeof (list_t)); 390219089Spjd} 391219089Spjd 392219089Spjd/* 393219089Spjd * Dispatch the commit callbacks registered on this txg to worker threads. 394219089Spjd */ 395219089Spjdstatic void 396219089Spjdtxg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 397219089Spjd{ 398219089Spjd int c; 399219089Spjd tx_state_t *tx = &dp->dp_tx; 400219089Spjd list_t *cb_list; 401219089Spjd 402219089Spjd for (c = 0; c < max_ncpus; c++) { 403219089Spjd tx_cpu_t *tc = &tx->tx_cpu[c]; 404219089Spjd /* No need to lock tx_cpu_t at this point */ 405219089Spjd 406219089Spjd int g = txg & TXG_MASK; 407219089Spjd 408219089Spjd if (list_is_empty(&tc->tc_callbacks[g])) 409219089Spjd continue; 410219089Spjd 411219089Spjd if (tx->tx_commit_cb_taskq == NULL) { 412219089Spjd /* 413219089Spjd * Commit callback taskq hasn't been created yet. 414219089Spjd */ 415219089Spjd tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 416219089Spjd max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 417219089Spjd TASKQ_PREPOPULATE); 418219089Spjd } 419219089Spjd 420219089Spjd cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 421219089Spjd list_create(cb_list, sizeof (dmu_tx_callback_t), 422219089Spjd offsetof(dmu_tx_callback_t, dcb_node)); 423219089Spjd 424219089Spjd list_move_tail(&tc->tc_callbacks[g], cb_list); 425219089Spjd 426219089Spjd (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 427219089Spjd txg_do_callbacks, cb_list, TQ_SLEEP); 428219089Spjd } 429219089Spjd} 430219089Spjd 431219089Spjdstatic void 432168404Spjdtxg_sync_thread(void *arg) 433168404Spjd{ 434168404Spjd dsl_pool_t *dp = arg; 435219089Spjd spa_t *spa = dp->dp_spa; 436168404Spjd tx_state_t *tx = &dp->dp_tx; 437168404Spjd callb_cpr_t cpr; 438185029Spjd uint64_t start, delta; 439168404Spjd 440168404Spjd txg_thread_enter(tx, &cpr); 441168404Spjd 442185029Spjd start = delta = 0; 443168404Spjd for (;;) { 444185029Spjd uint64_t timer, timeout = zfs_txg_timeout * hz; 445168404Spjd uint64_t txg; 446168404Spjd 447168404Spjd /* 448219089Spjd * We sync when we're scanning, there's someone waiting 449208047Smm * on us, or the quiesce thread has handed off a txg to 450208047Smm * us, or we have reached our timeout. 451168404Spjd */ 452185029Spjd timer = (delta >= timeout ? 0 : timeout - delta); 453219089Spjd while (!dsl_scan_active(dp->dp_scan) && 454208047Smm !tx->tx_exiting && timer > 0 && 455168404Spjd tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 456168404Spjd tx->tx_quiesced_txg == 0) { 457168404Spjd dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 458168404Spjd tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 459185029Spjd txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 460219089Spjd delta = ddi_get_lbolt() - start; 461185029Spjd timer = (delta > timeout ? 0 : timeout - delta); 462168404Spjd } 463168404Spjd 464168404Spjd /* 465168404Spjd * Wait until the quiesce thread hands off a txg to us, 466168404Spjd * prompting it to do so if necessary. 467168404Spjd */ 468168404Spjd while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 469168404Spjd if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 470168404Spjd tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 471168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 472168404Spjd txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 473168404Spjd } 474168404Spjd 475168404Spjd if (tx->tx_exiting) 476168404Spjd txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 477168404Spjd 478168404Spjd /* 479168404Spjd * Consume the quiesced txg which has been handed off to 480168404Spjd * us. This may cause the quiescing thread to now be 481168404Spjd * able to quiesce another txg, so we must signal it. 482168404Spjd */ 483168404Spjd txg = tx->tx_quiesced_txg; 484168404Spjd tx->tx_quiesced_txg = 0; 485168404Spjd tx->tx_syncing_txg = txg; 486168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 487168404Spjd 488168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 489185029Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 490168404Spjd mutex_exit(&tx->tx_sync_lock); 491185029Spjd 492219089Spjd start = ddi_get_lbolt(); 493219089Spjd spa_sync(spa, txg); 494219089Spjd delta = ddi_get_lbolt() - start; 495185029Spjd 496168404Spjd mutex_enter(&tx->tx_sync_lock); 497168404Spjd tx->tx_synced_txg = txg; 498168404Spjd tx->tx_syncing_txg = 0; 499168404Spjd cv_broadcast(&tx->tx_sync_done_cv); 500219089Spjd 501219089Spjd /* 502219089Spjd * Dispatch commit callbacks to worker threads. 503219089Spjd */ 504219089Spjd txg_dispatch_callbacks(dp, txg); 505168404Spjd } 506168404Spjd} 507168404Spjd 508168404Spjdstatic void 509168404Spjdtxg_quiesce_thread(void *arg) 510168404Spjd{ 511168404Spjd dsl_pool_t *dp = arg; 512168404Spjd tx_state_t *tx = &dp->dp_tx; 513168404Spjd callb_cpr_t cpr; 514168404Spjd 515168404Spjd txg_thread_enter(tx, &cpr); 516168404Spjd 517168404Spjd for (;;) { 518168404Spjd uint64_t txg; 519168404Spjd 520168404Spjd /* 521168404Spjd * We quiesce when there's someone waiting on us. 522168404Spjd * However, we can only have one txg in "quiescing" or 523168404Spjd * "quiesced, waiting to sync" state. So we wait until 524168404Spjd * the "quiesced, waiting to sync" txg has been consumed 525168404Spjd * by the sync thread. 526168404Spjd */ 527168404Spjd while (!tx->tx_exiting && 528168404Spjd (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 529168404Spjd tx->tx_quiesced_txg != 0)) 530168404Spjd txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 531168404Spjd 532168404Spjd if (tx->tx_exiting) 533168404Spjd txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 534168404Spjd 535168404Spjd txg = tx->tx_open_txg; 536168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 537168404Spjd txg, tx->tx_quiesce_txg_waiting, 538168404Spjd tx->tx_sync_txg_waiting); 539168404Spjd mutex_exit(&tx->tx_sync_lock); 540168404Spjd txg_quiesce(dp, txg); 541168404Spjd mutex_enter(&tx->tx_sync_lock); 542168404Spjd 543168404Spjd /* 544168404Spjd * Hand this txg off to the sync thread. 545168404Spjd */ 546168404Spjd dprintf("quiesce done, handing off txg %llu\n", txg); 547168404Spjd tx->tx_quiesced_txg = txg; 548168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 549168404Spjd cv_broadcast(&tx->tx_quiesce_done_cv); 550168404Spjd } 551168404Spjd} 552168404Spjd 553185029Spjd/* 554185029Spjd * Delay this thread by 'ticks' if we are still in the open transaction 555185029Spjd * group and there is already a waiting txg quiesing or quiesced. Abort 556185029Spjd * the delay if this txg stalls or enters the quiesing state. 557185029Spjd */ 558168404Spjdvoid 559185029Spjdtxg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 560185029Spjd{ 561185029Spjd tx_state_t *tx = &dp->dp_tx; 562224579Smm clock_t timeout = ddi_get_lbolt() + ticks; 563185029Spjd 564185029Spjd /* don't delay if this txg could transition to quiesing immediately */ 565185029Spjd if (tx->tx_open_txg > txg || 566185029Spjd tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 567185029Spjd return; 568185029Spjd 569185029Spjd mutex_enter(&tx->tx_sync_lock); 570185029Spjd if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 571185029Spjd mutex_exit(&tx->tx_sync_lock); 572185029Spjd return; 573185029Spjd } 574185029Spjd 575219089Spjd while (ddi_get_lbolt() < timeout && 576185029Spjd tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 577185029Spjd (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 578219089Spjd timeout - ddi_get_lbolt()); 579185029Spjd 580185029Spjd mutex_exit(&tx->tx_sync_lock); 581185029Spjd} 582185029Spjd 583185029Spjdvoid 584168404Spjdtxg_wait_synced(dsl_pool_t *dp, uint64_t txg) 585168404Spjd{ 586168404Spjd tx_state_t *tx = &dp->dp_tx; 587168404Spjd 588248571Smm ASSERT(!dsl_pool_config_held(dp)); 589248571Smm 590168404Spjd mutex_enter(&tx->tx_sync_lock); 591185029Spjd ASSERT(tx->tx_threads == 2); 592168404Spjd if (txg == 0) 593219089Spjd txg = tx->tx_open_txg + TXG_DEFER_SIZE; 594168404Spjd if (tx->tx_sync_txg_waiting < txg) 595168404Spjd tx->tx_sync_txg_waiting = txg; 596168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 597168404Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 598168404Spjd while (tx->tx_synced_txg < txg) { 599168404Spjd dprintf("broadcasting sync more " 600168404Spjd "tx_synced=%llu waiting=%llu dp=%p\n", 601168404Spjd tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 602168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 603168404Spjd cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 604168404Spjd } 605168404Spjd mutex_exit(&tx->tx_sync_lock); 606168404Spjd} 607168404Spjd 608168404Spjdvoid 609168404Spjdtxg_wait_open(dsl_pool_t *dp, uint64_t txg) 610168404Spjd{ 611168404Spjd tx_state_t *tx = &dp->dp_tx; 612168404Spjd 613248571Smm ASSERT(!dsl_pool_config_held(dp)); 614248571Smm 615168404Spjd mutex_enter(&tx->tx_sync_lock); 616185029Spjd ASSERT(tx->tx_threads == 2); 617168404Spjd if (txg == 0) 618168404Spjd txg = tx->tx_open_txg + 1; 619168404Spjd if (tx->tx_quiesce_txg_waiting < txg) 620168404Spjd tx->tx_quiesce_txg_waiting = txg; 621168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 622168404Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 623168404Spjd while (tx->tx_open_txg < txg) { 624168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 625168404Spjd cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 626168404Spjd } 627168404Spjd mutex_exit(&tx->tx_sync_lock); 628168404Spjd} 629168404Spjd 630185029Spjdboolean_t 631185029Spjdtxg_stalled(dsl_pool_t *dp) 632168404Spjd{ 633168404Spjd tx_state_t *tx = &dp->dp_tx; 634185029Spjd return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 635168404Spjd} 636168404Spjd 637185029Spjdboolean_t 638185029Spjdtxg_sync_waiting(dsl_pool_t *dp) 639168404Spjd{ 640168404Spjd tx_state_t *tx = &dp->dp_tx; 641185029Spjd 642185029Spjd return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 643185029Spjd tx->tx_quiesced_txg != 0); 644168404Spjd} 645168404Spjd 646168404Spjd/* 647168404Spjd * Per-txg object lists. 648168404Spjd */ 649168404Spjdvoid 650168404Spjdtxg_list_create(txg_list_t *tl, size_t offset) 651168404Spjd{ 652168404Spjd int t; 653168404Spjd 654168404Spjd mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 655168404Spjd 656168404Spjd tl->tl_offset = offset; 657168404Spjd 658168404Spjd for (t = 0; t < TXG_SIZE; t++) 659168404Spjd tl->tl_head[t] = NULL; 660168404Spjd} 661168404Spjd 662168404Spjdvoid 663168404Spjdtxg_list_destroy(txg_list_t *tl) 664168404Spjd{ 665168404Spjd int t; 666168404Spjd 667168404Spjd for (t = 0; t < TXG_SIZE; t++) 668168404Spjd ASSERT(txg_list_empty(tl, t)); 669168404Spjd 670168404Spjd mutex_destroy(&tl->tl_lock); 671168404Spjd} 672168404Spjd 673239620Smmboolean_t 674168404Spjdtxg_list_empty(txg_list_t *tl, uint64_t txg) 675168404Spjd{ 676168404Spjd return (tl->tl_head[txg & TXG_MASK] == NULL); 677168404Spjd} 678168404Spjd 679168404Spjd/* 680248571Smm * Add an entry to the list (unless it's already on the list). 681248571Smm * Returns B_TRUE if it was actually added. 682168404Spjd */ 683248571Smmboolean_t 684168404Spjdtxg_list_add(txg_list_t *tl, void *p, uint64_t txg) 685168404Spjd{ 686168404Spjd int t = txg & TXG_MASK; 687168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 688248571Smm boolean_t add; 689168404Spjd 690168404Spjd mutex_enter(&tl->tl_lock); 691248571Smm add = (tn->tn_member[t] == 0); 692248571Smm if (add) { 693168404Spjd tn->tn_member[t] = 1; 694168404Spjd tn->tn_next[t] = tl->tl_head[t]; 695168404Spjd tl->tl_head[t] = tn; 696168404Spjd } 697168404Spjd mutex_exit(&tl->tl_lock); 698168404Spjd 699248571Smm return (add); 700168404Spjd} 701168404Spjd 702168404Spjd/* 703248571Smm * Add an entry to the end of the list, unless it's already on the list. 704248571Smm * (walks list to find end) 705248571Smm * Returns B_TRUE if it was actually added. 706219089Spjd */ 707248571Smmboolean_t 708219089Spjdtxg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 709219089Spjd{ 710219089Spjd int t = txg & TXG_MASK; 711219089Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 712248571Smm boolean_t add; 713219089Spjd 714219089Spjd mutex_enter(&tl->tl_lock); 715248571Smm add = (tn->tn_member[t] == 0); 716248571Smm if (add) { 717219089Spjd txg_node_t **tp; 718219089Spjd 719219089Spjd for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 720219089Spjd continue; 721219089Spjd 722219089Spjd tn->tn_member[t] = 1; 723219089Spjd tn->tn_next[t] = NULL; 724219089Spjd *tp = tn; 725219089Spjd } 726219089Spjd mutex_exit(&tl->tl_lock); 727219089Spjd 728248571Smm return (add); 729219089Spjd} 730219089Spjd 731219089Spjd/* 732168404Spjd * Remove the head of the list and return it. 733168404Spjd */ 734168404Spjdvoid * 735168404Spjdtxg_list_remove(txg_list_t *tl, uint64_t txg) 736168404Spjd{ 737168404Spjd int t = txg & TXG_MASK; 738168404Spjd txg_node_t *tn; 739168404Spjd void *p = NULL; 740168404Spjd 741168404Spjd mutex_enter(&tl->tl_lock); 742168404Spjd if ((tn = tl->tl_head[t]) != NULL) { 743168404Spjd p = (char *)tn - tl->tl_offset; 744168404Spjd tl->tl_head[t] = tn->tn_next[t]; 745168404Spjd tn->tn_next[t] = NULL; 746168404Spjd tn->tn_member[t] = 0; 747168404Spjd } 748168404Spjd mutex_exit(&tl->tl_lock); 749168404Spjd 750168404Spjd return (p); 751168404Spjd} 752168404Spjd 753168404Spjd/* 754168404Spjd * Remove a specific item from the list and return it. 755168404Spjd */ 756168404Spjdvoid * 757168404Spjdtxg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 758168404Spjd{ 759168404Spjd int t = txg & TXG_MASK; 760168404Spjd txg_node_t *tn, **tp; 761168404Spjd 762168404Spjd mutex_enter(&tl->tl_lock); 763168404Spjd 764168404Spjd for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 765168404Spjd if ((char *)tn - tl->tl_offset == p) { 766168404Spjd *tp = tn->tn_next[t]; 767168404Spjd tn->tn_next[t] = NULL; 768168404Spjd tn->tn_member[t] = 0; 769168404Spjd mutex_exit(&tl->tl_lock); 770168404Spjd return (p); 771168404Spjd } 772168404Spjd } 773168404Spjd 774168404Spjd mutex_exit(&tl->tl_lock); 775168404Spjd 776168404Spjd return (NULL); 777168404Spjd} 778168404Spjd 779248571Smmboolean_t 780168404Spjdtxg_list_member(txg_list_t *tl, void *p, uint64_t txg) 781168404Spjd{ 782168404Spjd int t = txg & TXG_MASK; 783168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 784168404Spjd 785248571Smm return (tn->tn_member[t] != 0); 786168404Spjd} 787168404Spjd 788168404Spjd/* 789168404Spjd * Walk a txg list -- only safe if you know it's not changing. 790168404Spjd */ 791168404Spjdvoid * 792168404Spjdtxg_list_head(txg_list_t *tl, uint64_t txg) 793168404Spjd{ 794168404Spjd int t = txg & TXG_MASK; 795168404Spjd txg_node_t *tn = tl->tl_head[t]; 796168404Spjd 797168404Spjd return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 798168404Spjd} 799168404Spjd 800168404Spjdvoid * 801168404Spjdtxg_list_next(txg_list_t *tl, void *p, uint64_t txg) 802168404Spjd{ 803168404Spjd int t = txg & TXG_MASK; 804168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 805168404Spjd 806168404Spjd tn = tn->tn_next[t]; 807168404Spjd 808168404Spjd return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 809168404Spjd} 810