1168404Spjd/* 2168404Spjd * CDDL HEADER START 3168404Spjd * 4168404Spjd * The contents of this file are subject to the terms of the 5168404Spjd * Common Development and Distribution License (the "License"). 6168404Spjd * You may not use this file except in compliance with the License. 7168404Spjd * 8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9168404Spjd * or http://www.opensolaris.org/os/licensing. 10168404Spjd * See the License for the specific language governing permissions 11168404Spjd * and limitations under the License. 12168404Spjd * 13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each 14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15168404Spjd * If applicable, add the following below this CDDL HEADER, with the 16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying 17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner] 18168404Spjd * 19168404Spjd * CDDL HEADER END 20168404Spjd */ 21168404Spjd/* 22219089Spjd * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23226724Smm * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org> 24269418Sdelphij * Copyright (c) 2012, 2014 by Delphix. All rights reserved. 25168404Spjd */ 26168404Spjd 27168404Spjd#include <sys/zfs_context.h> 28168404Spjd#include <sys/txg_impl.h> 29168404Spjd#include <sys/dmu_impl.h> 30219089Spjd#include <sys/dmu_tx.h> 31168404Spjd#include <sys/dsl_pool.h> 32219089Spjd#include <sys/dsl_scan.h> 33168404Spjd#include <sys/callb.h> 34168404Spjd 35168404Spjd/* 36245511Sdelphij * ZFS Transaction Groups 37245511Sdelphij * ---------------------- 38245511Sdelphij * 39245511Sdelphij * ZFS transaction groups are, as the name implies, groups of transactions 40245511Sdelphij * that act on persistent state. ZFS asserts consistency at the granularity of 41245511Sdelphij * these transaction groups. Each successive transaction group (txg) is 42245511Sdelphij * assigned a 64-bit consecutive identifier. There are three active 43245511Sdelphij * transaction group states: open, quiescing, or syncing. At any given time, 44245511Sdelphij * there may be an active txg associated with each state; each active txg may 45245511Sdelphij * either be processing, or blocked waiting to enter the next state. There may 46245511Sdelphij * be up to three active txgs, and there is always a txg in the open state 47245511Sdelphij * (though it may be blocked waiting to enter the quiescing state). In broad 48260763Savg * strokes, transactions -- operations that change in-memory structures -- are 49245511Sdelphij * accepted into the txg in the open state, and are completed while the txg is 50245511Sdelphij * in the open or quiescing states. The accumulated changes are written to 51245511Sdelphij * disk in the syncing state. 52245511Sdelphij * 53245511Sdelphij * Open 54245511Sdelphij * 55245511Sdelphij * When a new txg becomes active, it first enters the open state. New 56260763Savg * transactions -- updates to in-memory structures -- are assigned to the 57245511Sdelphij * currently open txg. There is always a txg in the open state so that ZFS can 58245511Sdelphij * accept new changes (though the txg may refuse new changes if it has hit 59245511Sdelphij * some limit). ZFS advances the open txg to the next state for a variety of 60245511Sdelphij * reasons such as it hitting a time or size threshold, or the execution of an 61245511Sdelphij * administrative action that must be completed in the syncing state. 62245511Sdelphij * 63245511Sdelphij * Quiescing 64245511Sdelphij * 65245511Sdelphij * After a txg exits the open state, it enters the quiescing state. The 66245511Sdelphij * quiescing state is intended to provide a buffer between accepting new 67245511Sdelphij * transactions in the open state and writing them out to stable storage in 68245511Sdelphij * the syncing state. While quiescing, transactions can continue their 69245511Sdelphij * operation without delaying either of the other states. Typically, a txg is 70245511Sdelphij * in the quiescing state very briefly since the operations are bounded by 71245511Sdelphij * software latencies rather than, say, slower I/O latencies. After all 72245511Sdelphij * transactions complete, the txg is ready to enter the next state. 73245511Sdelphij * 74245511Sdelphij * Syncing 75245511Sdelphij * 76245511Sdelphij * In the syncing state, the in-memory state built up during the open and (to 77245511Sdelphij * a lesser degree) the quiescing states is written to stable storage. The 78245511Sdelphij * process of writing out modified data can, in turn modify more data. For 79245511Sdelphij * example when we write new blocks, we need to allocate space for them; those 80245511Sdelphij * allocations modify metadata (space maps)... which themselves must be 81245511Sdelphij * written to stable storage. During the sync state, ZFS iterates, writing out 82245511Sdelphij * data until it converges and all in-memory changes have been written out. 83245511Sdelphij * The first such pass is the largest as it encompasses all the modified user 84245511Sdelphij * data (as opposed to filesystem metadata). Subsequent passes typically have 85245511Sdelphij * far less data to write as they consist exclusively of filesystem metadata. 86245511Sdelphij * 87245511Sdelphij * To ensure convergence, after a certain number of passes ZFS begins 88245511Sdelphij * overwriting locations on stable storage that had been allocated earlier in 89245511Sdelphij * the syncing state (and subsequently freed). ZFS usually allocates new 90245511Sdelphij * blocks to optimize for large, continuous, writes. For the syncing state to 91245511Sdelphij * converge however it must complete a pass where no new blocks are allocated 92245511Sdelphij * since each allocation requires a modification of persistent metadata. 93245511Sdelphij * Further, to hasten convergence, after a prescribed number of passes, ZFS 94245511Sdelphij * also defers frees, and stops compressing. 95245511Sdelphij * 96245511Sdelphij * In addition to writing out user data, we must also execute synctasks during 97245511Sdelphij * the syncing context. A synctask is the mechanism by which some 98245511Sdelphij * administrative activities work such as creating and destroying snapshots or 99245511Sdelphij * datasets. Note that when a synctask is initiated it enters the open txg, 100245511Sdelphij * and ZFS then pushes that txg as quickly as possible to completion of the 101245511Sdelphij * syncing state in order to reduce the latency of the administrative 102245511Sdelphij * activity. To complete the syncing state, ZFS writes out a new uberblock, 103245511Sdelphij * the root of the tree of blocks that comprise all state stored on the ZFS 104245511Sdelphij * pool. Finally, if there is a quiesced txg waiting, we signal that it can 105245511Sdelphij * now transition to the syncing state. 106168404Spjd */ 107168404Spjd 108168404Spjdstatic void txg_sync_thread(void *arg); 109168404Spjdstatic void txg_quiesce_thread(void *arg); 110168404Spjd 111219089Spjdint zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 112168404Spjd 113185029SpjdSYSCTL_DECL(_vfs_zfs); 114219089SpjdSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 115185029SpjdTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 116228363SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0, 117185029Spjd "Maximum seconds worth of delta per txg"); 118185029Spjd 119168404Spjd/* 120168404Spjd * Prepare the txg subsystem. 121168404Spjd */ 122168404Spjdvoid 123168404Spjdtxg_init(dsl_pool_t *dp, uint64_t txg) 124168404Spjd{ 125168404Spjd tx_state_t *tx = &dp->dp_tx; 126185029Spjd int c; 127168404Spjd bzero(tx, sizeof (tx_state_t)); 128168404Spjd 129168404Spjd tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 130185029Spjd 131168404Spjd for (c = 0; c < max_ncpus; c++) { 132185029Spjd int i; 133185029Spjd 134168404Spjd mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 135249858Smm mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT, 136249858Smm NULL); 137185029Spjd for (i = 0; i < TXG_SIZE; i++) { 138185029Spjd cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 139185029Spjd NULL); 140219089Spjd list_create(&tx->tx_cpu[c].tc_callbacks[i], 141219089Spjd sizeof (dmu_tx_callback_t), 142219089Spjd offsetof(dmu_tx_callback_t, dcb_node)); 143185029Spjd } 144168404Spjd } 145168404Spjd 146168404Spjd mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 147208372Smm 148168404Spjd cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 149168404Spjd cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 150168404Spjd cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 151168404Spjd cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 152168404Spjd cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 153168404Spjd 154168404Spjd tx->tx_open_txg = txg; 155168404Spjd} 156168404Spjd 157168404Spjd/* 158168404Spjd * Close down the txg subsystem. 159168404Spjd */ 160168404Spjdvoid 161168404Spjdtxg_fini(dsl_pool_t *dp) 162168404Spjd{ 163168404Spjd tx_state_t *tx = &dp->dp_tx; 164185029Spjd int c; 165168404Spjd 166168404Spjd ASSERT(tx->tx_threads == 0); 167168404Spjd 168168404Spjd mutex_destroy(&tx->tx_sync_lock); 169168404Spjd 170208372Smm cv_destroy(&tx->tx_sync_more_cv); 171208372Smm cv_destroy(&tx->tx_sync_done_cv); 172208372Smm cv_destroy(&tx->tx_quiesce_more_cv); 173208372Smm cv_destroy(&tx->tx_quiesce_done_cv); 174208372Smm cv_destroy(&tx->tx_exit_cv); 175208372Smm 176168404Spjd for (c = 0; c < max_ncpus; c++) { 177185029Spjd int i; 178185029Spjd 179249858Smm mutex_destroy(&tx->tx_cpu[c].tc_open_lock); 180185029Spjd mutex_destroy(&tx->tx_cpu[c].tc_lock); 181219089Spjd for (i = 0; i < TXG_SIZE; i++) { 182168404Spjd cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 183219089Spjd list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 184219089Spjd } 185168404Spjd } 186168404Spjd 187219089Spjd if (tx->tx_commit_cb_taskq != NULL) 188219089Spjd taskq_destroy(tx->tx_commit_cb_taskq); 189219089Spjd 190168404Spjd kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 191168404Spjd 192168404Spjd bzero(tx, sizeof (tx_state_t)); 193168404Spjd} 194168404Spjd 195168404Spjd/* 196168404Spjd * Start syncing transaction groups. 197168404Spjd */ 198168404Spjdvoid 199168404Spjdtxg_sync_start(dsl_pool_t *dp) 200168404Spjd{ 201168404Spjd tx_state_t *tx = &dp->dp_tx; 202168404Spjd 203168404Spjd mutex_enter(&tx->tx_sync_lock); 204168404Spjd 205168404Spjd dprintf("pool %p\n", dp); 206168404Spjd 207168404Spjd ASSERT(tx->tx_threads == 0); 208168404Spjd 209185029Spjd tx->tx_threads = 2; 210168404Spjd 211168404Spjd tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 212168404Spjd dp, 0, &p0, TS_RUN, minclsyspri); 213168404Spjd 214185029Spjd /* 215185029Spjd * The sync thread can need a larger-than-default stack size on 216185029Spjd * 32-bit x86. This is due in part to nested pools and 217185029Spjd * scrub_visitbp() recursion. 218185029Spjd */ 219210192Snwhitehorn tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 220168404Spjd dp, 0, &p0, TS_RUN, minclsyspri); 221168404Spjd 222168404Spjd mutex_exit(&tx->tx_sync_lock); 223168404Spjd} 224168404Spjd 225168404Spjdstatic void 226168404Spjdtxg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 227168404Spjd{ 228168404Spjd CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 229168404Spjd mutex_enter(&tx->tx_sync_lock); 230168404Spjd} 231168404Spjd 232168404Spjdstatic void 233168404Spjdtxg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 234168404Spjd{ 235168404Spjd ASSERT(*tpp != NULL); 236168404Spjd *tpp = NULL; 237168404Spjd tx->tx_threads--; 238168404Spjd cv_broadcast(&tx->tx_exit_cv); 239168404Spjd CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 240168404Spjd thread_exit(); 241168404Spjd} 242168404Spjd 243168404Spjdstatic void 244255437Sdelphijtxg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time) 245168404Spjd{ 246168404Spjd CALLB_CPR_SAFE_BEGIN(cpr); 247168404Spjd 248185029Spjd if (time) 249185029Spjd (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 250168404Spjd else 251168404Spjd cv_wait(cv, &tx->tx_sync_lock); 252168404Spjd 253168404Spjd CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 254168404Spjd} 255168404Spjd 256168404Spjd/* 257168404Spjd * Stop syncing transaction groups. 258168404Spjd */ 259168404Spjdvoid 260168404Spjdtxg_sync_stop(dsl_pool_t *dp) 261168404Spjd{ 262168404Spjd tx_state_t *tx = &dp->dp_tx; 263168404Spjd 264168404Spjd dprintf("pool %p\n", dp); 265168404Spjd /* 266168404Spjd * Finish off any work in progress. 267168404Spjd */ 268185029Spjd ASSERT(tx->tx_threads == 2); 269168404Spjd 270168404Spjd /* 271219089Spjd * We need to ensure that we've vacated the deferred space_maps. 272219089Spjd */ 273219089Spjd txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 274219089Spjd 275219089Spjd /* 276185029Spjd * Wake all sync threads and wait for them to die. 277168404Spjd */ 278168404Spjd mutex_enter(&tx->tx_sync_lock); 279168404Spjd 280185029Spjd ASSERT(tx->tx_threads == 2); 281168404Spjd 282168404Spjd tx->tx_exiting = 1; 283168404Spjd 284168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 285168404Spjd cv_broadcast(&tx->tx_quiesce_done_cv); 286168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 287168404Spjd 288168404Spjd while (tx->tx_threads != 0) 289168404Spjd cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 290168404Spjd 291168404Spjd tx->tx_exiting = 0; 292168404Spjd 293168404Spjd mutex_exit(&tx->tx_sync_lock); 294168404Spjd} 295168404Spjd 296168404Spjduint64_t 297168404Spjdtxg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 298168404Spjd{ 299168404Spjd tx_state_t *tx = &dp->dp_tx; 300168404Spjd tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 301168404Spjd uint64_t txg; 302168404Spjd 303249858Smm mutex_enter(&tc->tc_open_lock); 304249858Smm txg = tx->tx_open_txg; 305249858Smm 306168404Spjd mutex_enter(&tc->tc_lock); 307168404Spjd tc->tc_count[txg & TXG_MASK]++; 308249858Smm mutex_exit(&tc->tc_lock); 309168404Spjd 310168404Spjd th->th_cpu = tc; 311168404Spjd th->th_txg = txg; 312168404Spjd 313168404Spjd return (txg); 314168404Spjd} 315168404Spjd 316168404Spjdvoid 317168404Spjdtxg_rele_to_quiesce(txg_handle_t *th) 318168404Spjd{ 319168404Spjd tx_cpu_t *tc = th->th_cpu; 320168404Spjd 321249858Smm ASSERT(!MUTEX_HELD(&tc->tc_lock)); 322249858Smm mutex_exit(&tc->tc_open_lock); 323168404Spjd} 324168404Spjd 325168404Spjdvoid 326219089Spjdtxg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 327219089Spjd{ 328219089Spjd tx_cpu_t *tc = th->th_cpu; 329219089Spjd int g = th->th_txg & TXG_MASK; 330219089Spjd 331219089Spjd mutex_enter(&tc->tc_lock); 332219089Spjd list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 333219089Spjd mutex_exit(&tc->tc_lock); 334219089Spjd} 335219089Spjd 336219089Spjdvoid 337168404Spjdtxg_rele_to_sync(txg_handle_t *th) 338168404Spjd{ 339168404Spjd tx_cpu_t *tc = th->th_cpu; 340168404Spjd int g = th->th_txg & TXG_MASK; 341168404Spjd 342168404Spjd mutex_enter(&tc->tc_lock); 343168404Spjd ASSERT(tc->tc_count[g] != 0); 344168404Spjd if (--tc->tc_count[g] == 0) 345168404Spjd cv_broadcast(&tc->tc_cv[g]); 346168404Spjd mutex_exit(&tc->tc_lock); 347168404Spjd 348168404Spjd th->th_cpu = NULL; /* defensive */ 349168404Spjd} 350168404Spjd 351251629Sdelphij/* 352251629Sdelphij * Blocks until all transactions in the group are committed. 353251629Sdelphij * 354251629Sdelphij * On return, the transaction group has reached a stable state in which it can 355251629Sdelphij * then be passed off to the syncing context. 356251629Sdelphij */ 357285717Sjpaetzelstatic __noinline void 358168404Spjdtxg_quiesce(dsl_pool_t *dp, uint64_t txg) 359168404Spjd{ 360168404Spjd tx_state_t *tx = &dp->dp_tx; 361168404Spjd int g = txg & TXG_MASK; 362168404Spjd int c; 363168404Spjd 364168404Spjd /* 365249858Smm * Grab all tc_open_locks so nobody else can get into this txg. 366168404Spjd */ 367168404Spjd for (c = 0; c < max_ncpus; c++) 368249858Smm mutex_enter(&tx->tx_cpu[c].tc_open_lock); 369168404Spjd 370168404Spjd ASSERT(txg == tx->tx_open_txg); 371168404Spjd tx->tx_open_txg++; 372260763Savg tx->tx_open_time = gethrtime(); 373168404Spjd 374255437Sdelphij DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg); 375255437Sdelphij DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg); 376255437Sdelphij 377168404Spjd /* 378168404Spjd * Now that we've incremented tx_open_txg, we can let threads 379168404Spjd * enter the next transaction group. 380168404Spjd */ 381168404Spjd for (c = 0; c < max_ncpus; c++) 382249858Smm mutex_exit(&tx->tx_cpu[c].tc_open_lock); 383168404Spjd 384168404Spjd /* 385168404Spjd * Quiesce the transaction group by waiting for everyone to txg_exit(). 386168404Spjd */ 387168404Spjd for (c = 0; c < max_ncpus; c++) { 388168404Spjd tx_cpu_t *tc = &tx->tx_cpu[c]; 389168404Spjd mutex_enter(&tc->tc_lock); 390168404Spjd while (tc->tc_count[g] != 0) 391168404Spjd cv_wait(&tc->tc_cv[g], &tc->tc_lock); 392168404Spjd mutex_exit(&tc->tc_lock); 393168404Spjd } 394168404Spjd} 395168404Spjd 396168404Spjdstatic void 397219089Spjdtxg_do_callbacks(void *arg) 398219089Spjd{ 399219089Spjd list_t *cb_list = arg; 400219089Spjd 401219089Spjd dmu_tx_do_callbacks(cb_list, 0); 402219089Spjd 403219089Spjd list_destroy(cb_list); 404219089Spjd 405219089Spjd kmem_free(cb_list, sizeof (list_t)); 406219089Spjd} 407219089Spjd 408219089Spjd/* 409219089Spjd * Dispatch the commit callbacks registered on this txg to worker threads. 410251629Sdelphij * 411251629Sdelphij * If no callbacks are registered for a given TXG, nothing happens. 412251629Sdelphij * This function creates a taskq for the associated pool, if needed. 413219089Spjd */ 414219089Spjdstatic void 415219089Spjdtxg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 416219089Spjd{ 417219089Spjd int c; 418219089Spjd tx_state_t *tx = &dp->dp_tx; 419219089Spjd list_t *cb_list; 420219089Spjd 421219089Spjd for (c = 0; c < max_ncpus; c++) { 422219089Spjd tx_cpu_t *tc = &tx->tx_cpu[c]; 423251629Sdelphij /* 424251629Sdelphij * No need to lock tx_cpu_t at this point, since this can 425251629Sdelphij * only be called once a txg has been synced. 426251629Sdelphij */ 427219089Spjd 428219089Spjd int g = txg & TXG_MASK; 429219089Spjd 430219089Spjd if (list_is_empty(&tc->tc_callbacks[g])) 431219089Spjd continue; 432219089Spjd 433219089Spjd if (tx->tx_commit_cb_taskq == NULL) { 434219089Spjd /* 435219089Spjd * Commit callback taskq hasn't been created yet. 436219089Spjd */ 437219089Spjd tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 438219089Spjd max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 439219089Spjd TASKQ_PREPOPULATE); 440219089Spjd } 441219089Spjd 442219089Spjd cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 443219089Spjd list_create(cb_list, sizeof (dmu_tx_callback_t), 444219089Spjd offsetof(dmu_tx_callback_t, dcb_node)); 445219089Spjd 446251635Sdelphij list_move_tail(cb_list, &tc->tc_callbacks[g]); 447219089Spjd 448219089Spjd (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 449219089Spjd txg_do_callbacks, cb_list, TQ_SLEEP); 450219089Spjd } 451219089Spjd} 452219089Spjd 453219089Spjdstatic void 454168404Spjdtxg_sync_thread(void *arg) 455168404Spjd{ 456168404Spjd dsl_pool_t *dp = arg; 457219089Spjd spa_t *spa = dp->dp_spa; 458168404Spjd tx_state_t *tx = &dp->dp_tx; 459168404Spjd callb_cpr_t cpr; 460185029Spjd uint64_t start, delta; 461168404Spjd 462168404Spjd txg_thread_enter(tx, &cpr); 463168404Spjd 464185029Spjd start = delta = 0; 465168404Spjd for (;;) { 466260763Savg uint64_t timeout = zfs_txg_timeout * hz; 467260763Savg uint64_t timer; 468168404Spjd uint64_t txg; 469168404Spjd 470168404Spjd /* 471219089Spjd * We sync when we're scanning, there's someone waiting 472208047Smm * on us, or the quiesce thread has handed off a txg to 473208047Smm * us, or we have reached our timeout. 474168404Spjd */ 475185029Spjd timer = (delta >= timeout ? 0 : timeout - delta); 476219089Spjd while (!dsl_scan_active(dp->dp_scan) && 477208047Smm !tx->tx_exiting && timer > 0 && 478168404Spjd tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 479260763Savg tx->tx_quiesced_txg == 0 && 480260763Savg dp->dp_dirty_total < zfs_dirty_data_sync) { 481168404Spjd dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 482168404Spjd tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 483185029Spjd txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 484219089Spjd delta = ddi_get_lbolt() - start; 485185029Spjd timer = (delta > timeout ? 0 : timeout - delta); 486168404Spjd } 487168404Spjd 488168404Spjd /* 489168404Spjd * Wait until the quiesce thread hands off a txg to us, 490168404Spjd * prompting it to do so if necessary. 491168404Spjd */ 492168404Spjd while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 493168404Spjd if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 494168404Spjd tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 495168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 496168404Spjd txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 497168404Spjd } 498168404Spjd 499168404Spjd if (tx->tx_exiting) 500168404Spjd txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 501168404Spjd 502168404Spjd /* 503168404Spjd * Consume the quiesced txg which has been handed off to 504168404Spjd * us. This may cause the quiescing thread to now be 505168404Spjd * able to quiesce another txg, so we must signal it. 506168404Spjd */ 507168404Spjd txg = tx->tx_quiesced_txg; 508168404Spjd tx->tx_quiesced_txg = 0; 509168404Spjd tx->tx_syncing_txg = txg; 510255437Sdelphij DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg); 511168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 512168404Spjd 513168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 514185029Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 515168404Spjd mutex_exit(&tx->tx_sync_lock); 516185029Spjd 517219089Spjd start = ddi_get_lbolt(); 518219089Spjd spa_sync(spa, txg); 519219089Spjd delta = ddi_get_lbolt() - start; 520185029Spjd 521168404Spjd mutex_enter(&tx->tx_sync_lock); 522168404Spjd tx->tx_synced_txg = txg; 523168404Spjd tx->tx_syncing_txg = 0; 524255437Sdelphij DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg); 525168404Spjd cv_broadcast(&tx->tx_sync_done_cv); 526219089Spjd 527219089Spjd /* 528219089Spjd * Dispatch commit callbacks to worker threads. 529219089Spjd */ 530219089Spjd txg_dispatch_callbacks(dp, txg); 531168404Spjd } 532168404Spjd} 533168404Spjd 534168404Spjdstatic void 535168404Spjdtxg_quiesce_thread(void *arg) 536168404Spjd{ 537168404Spjd dsl_pool_t *dp = arg; 538168404Spjd tx_state_t *tx = &dp->dp_tx; 539168404Spjd callb_cpr_t cpr; 540168404Spjd 541168404Spjd txg_thread_enter(tx, &cpr); 542168404Spjd 543168404Spjd for (;;) { 544168404Spjd uint64_t txg; 545168404Spjd 546168404Spjd /* 547168404Spjd * We quiesce when there's someone waiting on us. 548168404Spjd * However, we can only have one txg in "quiescing" or 549168404Spjd * "quiesced, waiting to sync" state. So we wait until 550168404Spjd * the "quiesced, waiting to sync" txg has been consumed 551168404Spjd * by the sync thread. 552168404Spjd */ 553168404Spjd while (!tx->tx_exiting && 554168404Spjd (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 555168404Spjd tx->tx_quiesced_txg != 0)) 556168404Spjd txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 557168404Spjd 558168404Spjd if (tx->tx_exiting) 559168404Spjd txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 560168404Spjd 561168404Spjd txg = tx->tx_open_txg; 562168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 563168404Spjd txg, tx->tx_quiesce_txg_waiting, 564168404Spjd tx->tx_sync_txg_waiting); 565168404Spjd mutex_exit(&tx->tx_sync_lock); 566168404Spjd txg_quiesce(dp, txg); 567168404Spjd mutex_enter(&tx->tx_sync_lock); 568168404Spjd 569168404Spjd /* 570168404Spjd * Hand this txg off to the sync thread. 571168404Spjd */ 572168404Spjd dprintf("quiesce done, handing off txg %llu\n", txg); 573168404Spjd tx->tx_quiesced_txg = txg; 574255437Sdelphij DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg); 575168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 576168404Spjd cv_broadcast(&tx->tx_quiesce_done_cv); 577168404Spjd } 578168404Spjd} 579168404Spjd 580185029Spjd/* 581255437Sdelphij * Delay this thread by delay nanoseconds if we are still in the open 582255437Sdelphij * transaction group and there is already a waiting txg quiesing or quiesced. 583255437Sdelphij * Abort the delay if this txg stalls or enters the quiesing state. 584185029Spjd */ 585168404Spjdvoid 586255437Sdelphijtxg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution) 587185029Spjd{ 588185029Spjd tx_state_t *tx = &dp->dp_tx; 589255437Sdelphij hrtime_t start = gethrtime(); 590185029Spjd 591251631Sdelphij /* don't delay if this txg could transition to quiescing immediately */ 592185029Spjd if (tx->tx_open_txg > txg || 593185029Spjd tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 594185029Spjd return; 595185029Spjd 596185029Spjd mutex_enter(&tx->tx_sync_lock); 597185029Spjd if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 598185029Spjd mutex_exit(&tx->tx_sync_lock); 599185029Spjd return; 600185029Spjd } 601185029Spjd 602255437Sdelphij while (gethrtime() - start < delay && 603255437Sdelphij tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) { 604255437Sdelphij (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv, 605255437Sdelphij &tx->tx_sync_lock, delay, resolution, 0); 606255437Sdelphij } 607185029Spjd 608185029Spjd mutex_exit(&tx->tx_sync_lock); 609185029Spjd} 610185029Spjd 611185029Spjdvoid 612168404Spjdtxg_wait_synced(dsl_pool_t *dp, uint64_t txg) 613168404Spjd{ 614168404Spjd tx_state_t *tx = &dp->dp_tx; 615168404Spjd 616248571Smm ASSERT(!dsl_pool_config_held(dp)); 617248571Smm 618168404Spjd mutex_enter(&tx->tx_sync_lock); 619185029Spjd ASSERT(tx->tx_threads == 2); 620168404Spjd if (txg == 0) 621219089Spjd txg = tx->tx_open_txg + TXG_DEFER_SIZE; 622168404Spjd if (tx->tx_sync_txg_waiting < txg) 623168404Spjd tx->tx_sync_txg_waiting = txg; 624168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 625168404Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 626168404Spjd while (tx->tx_synced_txg < txg) { 627168404Spjd dprintf("broadcasting sync more " 628168404Spjd "tx_synced=%llu waiting=%llu dp=%p\n", 629168404Spjd tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 630168404Spjd cv_broadcast(&tx->tx_sync_more_cv); 631168404Spjd cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 632168404Spjd } 633168404Spjd mutex_exit(&tx->tx_sync_lock); 634168404Spjd} 635168404Spjd 636168404Spjdvoid 637168404Spjdtxg_wait_open(dsl_pool_t *dp, uint64_t txg) 638168404Spjd{ 639168404Spjd tx_state_t *tx = &dp->dp_tx; 640168404Spjd 641248571Smm ASSERT(!dsl_pool_config_held(dp)); 642248571Smm 643168404Spjd mutex_enter(&tx->tx_sync_lock); 644185029Spjd ASSERT(tx->tx_threads == 2); 645168404Spjd if (txg == 0) 646168404Spjd txg = tx->tx_open_txg + 1; 647168404Spjd if (tx->tx_quiesce_txg_waiting < txg) 648168404Spjd tx->tx_quiesce_txg_waiting = txg; 649168404Spjd dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 650168404Spjd txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 651168404Spjd while (tx->tx_open_txg < txg) { 652168404Spjd cv_broadcast(&tx->tx_quiesce_more_cv); 653168404Spjd cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 654168404Spjd } 655168404Spjd mutex_exit(&tx->tx_sync_lock); 656168404Spjd} 657168404Spjd 658260763Savg/* 659260763Savg * If there isn't a txg syncing or in the pipeline, push another txg through 660260763Savg * the pipeline by queiscing the open txg. 661260763Savg */ 662260763Savgvoid 663260763Savgtxg_kick(dsl_pool_t *dp) 664260763Savg{ 665260763Savg tx_state_t *tx = &dp->dp_tx; 666260763Savg 667260763Savg ASSERT(!dsl_pool_config_held(dp)); 668260763Savg 669260763Savg mutex_enter(&tx->tx_sync_lock); 670260763Savg if (tx->tx_syncing_txg == 0 && 671260763Savg tx->tx_quiesce_txg_waiting <= tx->tx_open_txg && 672260763Savg tx->tx_sync_txg_waiting <= tx->tx_synced_txg && 673260763Savg tx->tx_quiesced_txg <= tx->tx_synced_txg) { 674260763Savg tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1; 675260763Savg cv_broadcast(&tx->tx_quiesce_more_cv); 676260763Savg } 677260763Savg mutex_exit(&tx->tx_sync_lock); 678260763Savg} 679260763Savg 680185029Spjdboolean_t 681185029Spjdtxg_stalled(dsl_pool_t *dp) 682168404Spjd{ 683168404Spjd tx_state_t *tx = &dp->dp_tx; 684185029Spjd return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 685168404Spjd} 686168404Spjd 687185029Spjdboolean_t 688185029Spjdtxg_sync_waiting(dsl_pool_t *dp) 689168404Spjd{ 690168404Spjd tx_state_t *tx = &dp->dp_tx; 691185029Spjd 692185029Spjd return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 693185029Spjd tx->tx_quiesced_txg != 0); 694168404Spjd} 695168404Spjd 696168404Spjd/* 697168404Spjd * Per-txg object lists. 698168404Spjd */ 699168404Spjdvoid 700168404Spjdtxg_list_create(txg_list_t *tl, size_t offset) 701168404Spjd{ 702168404Spjd int t; 703168404Spjd 704168404Spjd mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 705168404Spjd 706168404Spjd tl->tl_offset = offset; 707168404Spjd 708168404Spjd for (t = 0; t < TXG_SIZE; t++) 709168404Spjd tl->tl_head[t] = NULL; 710168404Spjd} 711168404Spjd 712168404Spjdvoid 713168404Spjdtxg_list_destroy(txg_list_t *tl) 714168404Spjd{ 715168404Spjd int t; 716168404Spjd 717168404Spjd for (t = 0; t < TXG_SIZE; t++) 718168404Spjd ASSERT(txg_list_empty(tl, t)); 719168404Spjd 720168404Spjd mutex_destroy(&tl->tl_lock); 721168404Spjd} 722168404Spjd 723239620Smmboolean_t 724168404Spjdtxg_list_empty(txg_list_t *tl, uint64_t txg) 725168404Spjd{ 726168404Spjd return (tl->tl_head[txg & TXG_MASK] == NULL); 727168404Spjd} 728168404Spjd 729168404Spjd/* 730269418Sdelphij * Returns true if all txg lists are empty. 731269418Sdelphij * 732269418Sdelphij * Warning: this is inherently racy (an item could be added immediately after this 733269418Sdelphij * function returns). We don't bother with the lock because it wouldn't change the 734269418Sdelphij * semantics. 735269418Sdelphij */ 736269418Sdelphijboolean_t 737269418Sdelphijtxg_all_lists_empty(txg_list_t *tl) 738269418Sdelphij{ 739269418Sdelphij for (int i = 0; i < TXG_SIZE; i++) { 740269418Sdelphij if (!txg_list_empty(tl, i)) { 741269418Sdelphij return (B_FALSE); 742269418Sdelphij } 743269418Sdelphij } 744269418Sdelphij return (B_TRUE); 745269418Sdelphij} 746269418Sdelphij 747269418Sdelphij/* 748248571Smm * Add an entry to the list (unless it's already on the list). 749248571Smm * Returns B_TRUE if it was actually added. 750168404Spjd */ 751248571Smmboolean_t 752168404Spjdtxg_list_add(txg_list_t *tl, void *p, uint64_t txg) 753168404Spjd{ 754168404Spjd int t = txg & TXG_MASK; 755168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 756248571Smm boolean_t add; 757168404Spjd 758168404Spjd mutex_enter(&tl->tl_lock); 759248571Smm add = (tn->tn_member[t] == 0); 760248571Smm if (add) { 761168404Spjd tn->tn_member[t] = 1; 762168404Spjd tn->tn_next[t] = tl->tl_head[t]; 763168404Spjd tl->tl_head[t] = tn; 764168404Spjd } 765168404Spjd mutex_exit(&tl->tl_lock); 766168404Spjd 767248571Smm return (add); 768168404Spjd} 769168404Spjd 770168404Spjd/* 771248571Smm * Add an entry to the end of the list, unless it's already on the list. 772248571Smm * (walks list to find end) 773248571Smm * Returns B_TRUE if it was actually added. 774219089Spjd */ 775248571Smmboolean_t 776219089Spjdtxg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 777219089Spjd{ 778219089Spjd int t = txg & TXG_MASK; 779219089Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 780248571Smm boolean_t add; 781219089Spjd 782219089Spjd mutex_enter(&tl->tl_lock); 783248571Smm add = (tn->tn_member[t] == 0); 784248571Smm if (add) { 785219089Spjd txg_node_t **tp; 786219089Spjd 787219089Spjd for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 788219089Spjd continue; 789219089Spjd 790219089Spjd tn->tn_member[t] = 1; 791219089Spjd tn->tn_next[t] = NULL; 792219089Spjd *tp = tn; 793219089Spjd } 794219089Spjd mutex_exit(&tl->tl_lock); 795219089Spjd 796248571Smm return (add); 797219089Spjd} 798219089Spjd 799219089Spjd/* 800168404Spjd * Remove the head of the list and return it. 801168404Spjd */ 802168404Spjdvoid * 803168404Spjdtxg_list_remove(txg_list_t *tl, uint64_t txg) 804168404Spjd{ 805168404Spjd int t = txg & TXG_MASK; 806168404Spjd txg_node_t *tn; 807168404Spjd void *p = NULL; 808168404Spjd 809168404Spjd mutex_enter(&tl->tl_lock); 810168404Spjd if ((tn = tl->tl_head[t]) != NULL) { 811168404Spjd p = (char *)tn - tl->tl_offset; 812168404Spjd tl->tl_head[t] = tn->tn_next[t]; 813168404Spjd tn->tn_next[t] = NULL; 814168404Spjd tn->tn_member[t] = 0; 815168404Spjd } 816168404Spjd mutex_exit(&tl->tl_lock); 817168404Spjd 818168404Spjd return (p); 819168404Spjd} 820168404Spjd 821168404Spjd/* 822168404Spjd * Remove a specific item from the list and return it. 823168404Spjd */ 824168404Spjdvoid * 825168404Spjdtxg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 826168404Spjd{ 827168404Spjd int t = txg & TXG_MASK; 828168404Spjd txg_node_t *tn, **tp; 829168404Spjd 830168404Spjd mutex_enter(&tl->tl_lock); 831168404Spjd 832168404Spjd for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 833168404Spjd if ((char *)tn - tl->tl_offset == p) { 834168404Spjd *tp = tn->tn_next[t]; 835168404Spjd tn->tn_next[t] = NULL; 836168404Spjd tn->tn_member[t] = 0; 837168404Spjd mutex_exit(&tl->tl_lock); 838168404Spjd return (p); 839168404Spjd } 840168404Spjd } 841168404Spjd 842168404Spjd mutex_exit(&tl->tl_lock); 843168404Spjd 844168404Spjd return (NULL); 845168404Spjd} 846168404Spjd 847248571Smmboolean_t 848168404Spjdtxg_list_member(txg_list_t *tl, void *p, uint64_t txg) 849168404Spjd{ 850168404Spjd int t = txg & TXG_MASK; 851168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 852168404Spjd 853248571Smm return (tn->tn_member[t] != 0); 854168404Spjd} 855168404Spjd 856168404Spjd/* 857168404Spjd * Walk a txg list -- only safe if you know it's not changing. 858168404Spjd */ 859168404Spjdvoid * 860168404Spjdtxg_list_head(txg_list_t *tl, uint64_t txg) 861168404Spjd{ 862168404Spjd int t = txg & TXG_MASK; 863168404Spjd txg_node_t *tn = tl->tl_head[t]; 864168404Spjd 865168404Spjd return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 866168404Spjd} 867168404Spjd 868168404Spjdvoid * 869168404Spjdtxg_list_next(txg_list_t *tl, void *p, uint64_t txg) 870168404Spjd{ 871168404Spjd int t = txg & TXG_MASK; 872168404Spjd txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 873168404Spjd 874168404Spjd tn = tn->tn_next[t]; 875168404Spjd 876168404Spjd return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 877168404Spjd} 878