txg.c revision 248571
1168404Spjd/*
2168404Spjd * CDDL HEADER START
3168404Spjd *
4168404Spjd * The contents of this file are subject to the terms of the
5168404Spjd * Common Development and Distribution License (the "License").
6168404Spjd * You may not use this file except in compliance with the License.
7168404Spjd *
8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9168404Spjd * or http://www.opensolaris.org/os/licensing.
10168404Spjd * See the License for the specific language governing permissions
11168404Spjd * and limitations under the License.
12168404Spjd *
13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each
14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15168404Spjd * If applicable, add the following below this CDDL HEADER, with the
16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying
17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner]
18168404Spjd *
19168404Spjd * CDDL HEADER END
20168404Spjd */
21168404Spjd/*
22219089Spjd * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23226724Smm * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org>
24245511Sdelphij * Copyright (c) 2013 by Delphix. All rights reserved.
25168404Spjd */
26168404Spjd
27168404Spjd#include <sys/zfs_context.h>
28168404Spjd#include <sys/txg_impl.h>
29168404Spjd#include <sys/dmu_impl.h>
30219089Spjd#include <sys/dmu_tx.h>
31168404Spjd#include <sys/dsl_pool.h>
32219089Spjd#include <sys/dsl_scan.h>
33168404Spjd#include <sys/callb.h>
34168404Spjd
35168404Spjd/*
36245511Sdelphij * ZFS Transaction Groups
37245511Sdelphij * ----------------------
38245511Sdelphij *
39245511Sdelphij * ZFS transaction groups are, as the name implies, groups of transactions
40245511Sdelphij * that act on persistent state. ZFS asserts consistency at the granularity of
41245511Sdelphij * these transaction groups. Each successive transaction group (txg) is
42245511Sdelphij * assigned a 64-bit consecutive identifier. There are three active
43245511Sdelphij * transaction group states: open, quiescing, or syncing. At any given time,
44245511Sdelphij * there may be an active txg associated with each state; each active txg may
45245511Sdelphij * either be processing, or blocked waiting to enter the next state. There may
46245511Sdelphij * be up to three active txgs, and there is always a txg in the open state
47245511Sdelphij * (though it may be blocked waiting to enter the quiescing state). In broad
48245511Sdelphij * strokes, transactions ��� operations that change in-memory structures ��� are
49245511Sdelphij * accepted into the txg in the open state, and are completed while the txg is
50245511Sdelphij * in the open or quiescing states. The accumulated changes are written to
51245511Sdelphij * disk in the syncing state.
52245511Sdelphij *
53245511Sdelphij * Open
54245511Sdelphij *
55245511Sdelphij * When a new txg becomes active, it first enters the open state. New
56245511Sdelphij * transactions ��� updates to in-memory structures ��� are assigned to the
57245511Sdelphij * currently open txg. There is always a txg in the open state so that ZFS can
58245511Sdelphij * accept new changes (though the txg may refuse new changes if it has hit
59245511Sdelphij * some limit). ZFS advances the open txg to the next state for a variety of
60245511Sdelphij * reasons such as it hitting a time or size threshold, or the execution of an
61245511Sdelphij * administrative action that must be completed in the syncing state.
62245511Sdelphij *
63245511Sdelphij * Quiescing
64245511Sdelphij *
65245511Sdelphij * After a txg exits the open state, it enters the quiescing state. The
66245511Sdelphij * quiescing state is intended to provide a buffer between accepting new
67245511Sdelphij * transactions in the open state and writing them out to stable storage in
68245511Sdelphij * the syncing state. While quiescing, transactions can continue their
69245511Sdelphij * operation without delaying either of the other states. Typically, a txg is
70245511Sdelphij * in the quiescing state very briefly since the operations are bounded by
71245511Sdelphij * software latencies rather than, say, slower I/O latencies. After all
72245511Sdelphij * transactions complete, the txg is ready to enter the next state.
73245511Sdelphij *
74245511Sdelphij * Syncing
75245511Sdelphij *
76245511Sdelphij * In the syncing state, the in-memory state built up during the open and (to
77245511Sdelphij * a lesser degree) the quiescing states is written to stable storage. The
78245511Sdelphij * process of writing out modified data can, in turn modify more data. For
79245511Sdelphij * example when we write new blocks, we need to allocate space for them; those
80245511Sdelphij * allocations modify metadata (space maps)... which themselves must be
81245511Sdelphij * written to stable storage. During the sync state, ZFS iterates, writing out
82245511Sdelphij * data until it converges and all in-memory changes have been written out.
83245511Sdelphij * The first such pass is the largest as it encompasses all the modified user
84245511Sdelphij * data (as opposed to filesystem metadata). Subsequent passes typically have
85245511Sdelphij * far less data to write as they consist exclusively of filesystem metadata.
86245511Sdelphij *
87245511Sdelphij * To ensure convergence, after a certain number of passes ZFS begins
88245511Sdelphij * overwriting locations on stable storage that had been allocated earlier in
89245511Sdelphij * the syncing state (and subsequently freed). ZFS usually allocates new
90245511Sdelphij * blocks to optimize for large, continuous, writes. For the syncing state to
91245511Sdelphij * converge however it must complete a pass where no new blocks are allocated
92245511Sdelphij * since each allocation requires a modification of persistent metadata.
93245511Sdelphij * Further, to hasten convergence, after a prescribed number of passes, ZFS
94245511Sdelphij * also defers frees, and stops compressing.
95245511Sdelphij *
96245511Sdelphij * In addition to writing out user data, we must also execute synctasks during
97245511Sdelphij * the syncing context. A synctask is the mechanism by which some
98245511Sdelphij * administrative activities work such as creating and destroying snapshots or
99245511Sdelphij * datasets. Note that when a synctask is initiated it enters the open txg,
100245511Sdelphij * and ZFS then pushes that txg as quickly as possible to completion of the
101245511Sdelphij * syncing state in order to reduce the latency of the administrative
102245511Sdelphij * activity. To complete the syncing state, ZFS writes out a new uberblock,
103245511Sdelphij * the root of the tree of blocks that comprise all state stored on the ZFS
104245511Sdelphij * pool. Finally, if there is a quiesced txg waiting, we signal that it can
105245511Sdelphij * now transition to the syncing state.
106168404Spjd */
107168404Spjd
108168404Spjdstatic void txg_sync_thread(void *arg);
109168404Spjdstatic void txg_quiesce_thread(void *arg);
110168404Spjd
111219089Spjdint zfs_txg_timeout = 5;	/* max seconds worth of delta per txg */
112168404Spjd
113185029SpjdSYSCTL_DECL(_vfs_zfs);
114219089SpjdSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
115185029SpjdTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
116228363SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0,
117185029Spjd    "Maximum seconds worth of delta per txg");
118185029Spjd
119168404Spjd/*
120168404Spjd * Prepare the txg subsystem.
121168404Spjd */
122168404Spjdvoid
123168404Spjdtxg_init(dsl_pool_t *dp, uint64_t txg)
124168404Spjd{
125168404Spjd	tx_state_t *tx = &dp->dp_tx;
126185029Spjd	int c;
127168404Spjd	bzero(tx, sizeof (tx_state_t));
128168404Spjd
129168404Spjd	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
130185029Spjd
131168404Spjd	for (c = 0; c < max_ncpus; c++) {
132185029Spjd		int i;
133185029Spjd
134168404Spjd		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
135185029Spjd		for (i = 0; i < TXG_SIZE; i++) {
136185029Spjd			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
137185029Spjd			    NULL);
138219089Spjd			list_create(&tx->tx_cpu[c].tc_callbacks[i],
139219089Spjd			    sizeof (dmu_tx_callback_t),
140219089Spjd			    offsetof(dmu_tx_callback_t, dcb_node));
141185029Spjd		}
142168404Spjd	}
143168404Spjd
144168404Spjd	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
145208372Smm
146168404Spjd	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
147168404Spjd	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
148168404Spjd	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
149168404Spjd	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
150168404Spjd	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
151168404Spjd
152168404Spjd	tx->tx_open_txg = txg;
153168404Spjd}
154168404Spjd
155168404Spjd/*
156168404Spjd * Close down the txg subsystem.
157168404Spjd */
158168404Spjdvoid
159168404Spjdtxg_fini(dsl_pool_t *dp)
160168404Spjd{
161168404Spjd	tx_state_t *tx = &dp->dp_tx;
162185029Spjd	int c;
163168404Spjd
164168404Spjd	ASSERT(tx->tx_threads == 0);
165168404Spjd
166168404Spjd	mutex_destroy(&tx->tx_sync_lock);
167168404Spjd
168208372Smm	cv_destroy(&tx->tx_sync_more_cv);
169208372Smm	cv_destroy(&tx->tx_sync_done_cv);
170208372Smm	cv_destroy(&tx->tx_quiesce_more_cv);
171208372Smm	cv_destroy(&tx->tx_quiesce_done_cv);
172208372Smm	cv_destroy(&tx->tx_exit_cv);
173208372Smm
174168404Spjd	for (c = 0; c < max_ncpus; c++) {
175185029Spjd		int i;
176185029Spjd
177185029Spjd		mutex_destroy(&tx->tx_cpu[c].tc_lock);
178219089Spjd		for (i = 0; i < TXG_SIZE; i++) {
179168404Spjd			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
180219089Spjd			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
181219089Spjd		}
182168404Spjd	}
183168404Spjd
184219089Spjd	if (tx->tx_commit_cb_taskq != NULL)
185219089Spjd		taskq_destroy(tx->tx_commit_cb_taskq);
186219089Spjd
187168404Spjd	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
188168404Spjd
189168404Spjd	bzero(tx, sizeof (tx_state_t));
190168404Spjd}
191168404Spjd
192168404Spjd/*
193168404Spjd * Start syncing transaction groups.
194168404Spjd */
195168404Spjdvoid
196168404Spjdtxg_sync_start(dsl_pool_t *dp)
197168404Spjd{
198168404Spjd	tx_state_t *tx = &dp->dp_tx;
199168404Spjd
200168404Spjd	mutex_enter(&tx->tx_sync_lock);
201168404Spjd
202168404Spjd	dprintf("pool %p\n", dp);
203168404Spjd
204168404Spjd	ASSERT(tx->tx_threads == 0);
205168404Spjd
206185029Spjd	tx->tx_threads = 2;
207168404Spjd
208168404Spjd	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
209168404Spjd	    dp, 0, &p0, TS_RUN, minclsyspri);
210168404Spjd
211185029Spjd	/*
212185029Spjd	 * The sync thread can need a larger-than-default stack size on
213185029Spjd	 * 32-bit x86.  This is due in part to nested pools and
214185029Spjd	 * scrub_visitbp() recursion.
215185029Spjd	 */
216210192Snwhitehorn	tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
217168404Spjd	    dp, 0, &p0, TS_RUN, minclsyspri);
218168404Spjd
219168404Spjd	mutex_exit(&tx->tx_sync_lock);
220168404Spjd}
221168404Spjd
222168404Spjdstatic void
223168404Spjdtxg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
224168404Spjd{
225168404Spjd	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
226168404Spjd	mutex_enter(&tx->tx_sync_lock);
227168404Spjd}
228168404Spjd
229168404Spjdstatic void
230168404Spjdtxg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
231168404Spjd{
232168404Spjd	ASSERT(*tpp != NULL);
233168404Spjd	*tpp = NULL;
234168404Spjd	tx->tx_threads--;
235168404Spjd	cv_broadcast(&tx->tx_exit_cv);
236168404Spjd	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
237168404Spjd	thread_exit();
238168404Spjd}
239168404Spjd
240168404Spjdstatic void
241185029Spjdtxg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
242168404Spjd{
243168404Spjd	CALLB_CPR_SAFE_BEGIN(cpr);
244168404Spjd
245185029Spjd	if (time)
246185029Spjd		(void) cv_timedwait(cv, &tx->tx_sync_lock, time);
247168404Spjd	else
248168404Spjd		cv_wait(cv, &tx->tx_sync_lock);
249168404Spjd
250168404Spjd	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
251168404Spjd}
252168404Spjd
253168404Spjd/*
254168404Spjd * Stop syncing transaction groups.
255168404Spjd */
256168404Spjdvoid
257168404Spjdtxg_sync_stop(dsl_pool_t *dp)
258168404Spjd{
259168404Spjd	tx_state_t *tx = &dp->dp_tx;
260168404Spjd
261168404Spjd	dprintf("pool %p\n", dp);
262168404Spjd	/*
263168404Spjd	 * Finish off any work in progress.
264168404Spjd	 */
265185029Spjd	ASSERT(tx->tx_threads == 2);
266168404Spjd
267168404Spjd	/*
268219089Spjd	 * We need to ensure that we've vacated the deferred space_maps.
269219089Spjd	 */
270219089Spjd	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
271219089Spjd
272219089Spjd	/*
273185029Spjd	 * Wake all sync threads and wait for them to die.
274168404Spjd	 */
275168404Spjd	mutex_enter(&tx->tx_sync_lock);
276168404Spjd
277185029Spjd	ASSERT(tx->tx_threads == 2);
278168404Spjd
279168404Spjd	tx->tx_exiting = 1;
280168404Spjd
281168404Spjd	cv_broadcast(&tx->tx_quiesce_more_cv);
282168404Spjd	cv_broadcast(&tx->tx_quiesce_done_cv);
283168404Spjd	cv_broadcast(&tx->tx_sync_more_cv);
284168404Spjd
285168404Spjd	while (tx->tx_threads != 0)
286168404Spjd		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
287168404Spjd
288168404Spjd	tx->tx_exiting = 0;
289168404Spjd
290168404Spjd	mutex_exit(&tx->tx_sync_lock);
291168404Spjd}
292168404Spjd
293168404Spjduint64_t
294168404Spjdtxg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
295168404Spjd{
296168404Spjd	tx_state_t *tx = &dp->dp_tx;
297168404Spjd	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
298168404Spjd	uint64_t txg;
299168404Spjd
300168404Spjd	mutex_enter(&tc->tc_lock);
301168404Spjd
302168404Spjd	txg = tx->tx_open_txg;
303168404Spjd	tc->tc_count[txg & TXG_MASK]++;
304168404Spjd
305168404Spjd	th->th_cpu = tc;
306168404Spjd	th->th_txg = txg;
307168404Spjd
308168404Spjd	return (txg);
309168404Spjd}
310168404Spjd
311168404Spjdvoid
312168404Spjdtxg_rele_to_quiesce(txg_handle_t *th)
313168404Spjd{
314168404Spjd	tx_cpu_t *tc = th->th_cpu;
315168404Spjd
316168404Spjd	mutex_exit(&tc->tc_lock);
317168404Spjd}
318168404Spjd
319168404Spjdvoid
320219089Spjdtxg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
321219089Spjd{
322219089Spjd	tx_cpu_t *tc = th->th_cpu;
323219089Spjd	int g = th->th_txg & TXG_MASK;
324219089Spjd
325219089Spjd	mutex_enter(&tc->tc_lock);
326219089Spjd	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
327219089Spjd	mutex_exit(&tc->tc_lock);
328219089Spjd}
329219089Spjd
330219089Spjdvoid
331168404Spjdtxg_rele_to_sync(txg_handle_t *th)
332168404Spjd{
333168404Spjd	tx_cpu_t *tc = th->th_cpu;
334168404Spjd	int g = th->th_txg & TXG_MASK;
335168404Spjd
336168404Spjd	mutex_enter(&tc->tc_lock);
337168404Spjd	ASSERT(tc->tc_count[g] != 0);
338168404Spjd	if (--tc->tc_count[g] == 0)
339168404Spjd		cv_broadcast(&tc->tc_cv[g]);
340168404Spjd	mutex_exit(&tc->tc_lock);
341168404Spjd
342168404Spjd	th->th_cpu = NULL;	/* defensive */
343168404Spjd}
344168404Spjd
345168404Spjdstatic void
346168404Spjdtxg_quiesce(dsl_pool_t *dp, uint64_t txg)
347168404Spjd{
348168404Spjd	tx_state_t *tx = &dp->dp_tx;
349168404Spjd	int g = txg & TXG_MASK;
350168404Spjd	int c;
351168404Spjd
352168404Spjd	/*
353168404Spjd	 * Grab all tx_cpu locks so nobody else can get into this txg.
354168404Spjd	 */
355168404Spjd	for (c = 0; c < max_ncpus; c++)
356168404Spjd		mutex_enter(&tx->tx_cpu[c].tc_lock);
357168404Spjd
358168404Spjd	ASSERT(txg == tx->tx_open_txg);
359168404Spjd	tx->tx_open_txg++;
360168404Spjd
361168404Spjd	/*
362168404Spjd	 * Now that we've incremented tx_open_txg, we can let threads
363168404Spjd	 * enter the next transaction group.
364168404Spjd	 */
365168404Spjd	for (c = 0; c < max_ncpus; c++)
366168404Spjd		mutex_exit(&tx->tx_cpu[c].tc_lock);
367168404Spjd
368168404Spjd	/*
369168404Spjd	 * Quiesce the transaction group by waiting for everyone to txg_exit().
370168404Spjd	 */
371168404Spjd	for (c = 0; c < max_ncpus; c++) {
372168404Spjd		tx_cpu_t *tc = &tx->tx_cpu[c];
373168404Spjd		mutex_enter(&tc->tc_lock);
374168404Spjd		while (tc->tc_count[g] != 0)
375168404Spjd			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
376168404Spjd		mutex_exit(&tc->tc_lock);
377168404Spjd	}
378168404Spjd}
379168404Spjd
380168404Spjdstatic void
381219089Spjdtxg_do_callbacks(void *arg)
382219089Spjd{
383219089Spjd	list_t *cb_list = arg;
384219089Spjd
385219089Spjd	dmu_tx_do_callbacks(cb_list, 0);
386219089Spjd
387219089Spjd	list_destroy(cb_list);
388219089Spjd
389219089Spjd	kmem_free(cb_list, sizeof (list_t));
390219089Spjd}
391219089Spjd
392219089Spjd/*
393219089Spjd * Dispatch the commit callbacks registered on this txg to worker threads.
394219089Spjd */
395219089Spjdstatic void
396219089Spjdtxg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
397219089Spjd{
398219089Spjd	int c;
399219089Spjd	tx_state_t *tx = &dp->dp_tx;
400219089Spjd	list_t *cb_list;
401219089Spjd
402219089Spjd	for (c = 0; c < max_ncpus; c++) {
403219089Spjd		tx_cpu_t *tc = &tx->tx_cpu[c];
404219089Spjd		/* No need to lock tx_cpu_t at this point */
405219089Spjd
406219089Spjd		int g = txg & TXG_MASK;
407219089Spjd
408219089Spjd		if (list_is_empty(&tc->tc_callbacks[g]))
409219089Spjd			continue;
410219089Spjd
411219089Spjd		if (tx->tx_commit_cb_taskq == NULL) {
412219089Spjd			/*
413219089Spjd			 * Commit callback taskq hasn't been created yet.
414219089Spjd			 */
415219089Spjd			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
416219089Spjd			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
417219089Spjd			    TASKQ_PREPOPULATE);
418219089Spjd		}
419219089Spjd
420219089Spjd		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
421219089Spjd		list_create(cb_list, sizeof (dmu_tx_callback_t),
422219089Spjd		    offsetof(dmu_tx_callback_t, dcb_node));
423219089Spjd
424219089Spjd		list_move_tail(&tc->tc_callbacks[g], cb_list);
425219089Spjd
426219089Spjd		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
427219089Spjd		    txg_do_callbacks, cb_list, TQ_SLEEP);
428219089Spjd	}
429219089Spjd}
430219089Spjd
431219089Spjdstatic void
432168404Spjdtxg_sync_thread(void *arg)
433168404Spjd{
434168404Spjd	dsl_pool_t *dp = arg;
435219089Spjd	spa_t *spa = dp->dp_spa;
436168404Spjd	tx_state_t *tx = &dp->dp_tx;
437168404Spjd	callb_cpr_t cpr;
438185029Spjd	uint64_t start, delta;
439168404Spjd
440168404Spjd	txg_thread_enter(tx, &cpr);
441168404Spjd
442185029Spjd	start = delta = 0;
443168404Spjd	for (;;) {
444185029Spjd		uint64_t timer, timeout = zfs_txg_timeout * hz;
445168404Spjd		uint64_t txg;
446168404Spjd
447168404Spjd		/*
448219089Spjd		 * We sync when we're scanning, there's someone waiting
449208047Smm		 * on us, or the quiesce thread has handed off a txg to
450208047Smm		 * us, or we have reached our timeout.
451168404Spjd		 */
452185029Spjd		timer = (delta >= timeout ? 0 : timeout - delta);
453219089Spjd		while (!dsl_scan_active(dp->dp_scan) &&
454208047Smm		    !tx->tx_exiting && timer > 0 &&
455168404Spjd		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
456168404Spjd		    tx->tx_quiesced_txg == 0) {
457168404Spjd			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
458168404Spjd			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
459185029Spjd			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
460219089Spjd			delta = ddi_get_lbolt() - start;
461185029Spjd			timer = (delta > timeout ? 0 : timeout - delta);
462168404Spjd		}
463168404Spjd
464168404Spjd		/*
465168404Spjd		 * Wait until the quiesce thread hands off a txg to us,
466168404Spjd		 * prompting it to do so if necessary.
467168404Spjd		 */
468168404Spjd		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
469168404Spjd			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
470168404Spjd				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
471168404Spjd			cv_broadcast(&tx->tx_quiesce_more_cv);
472168404Spjd			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
473168404Spjd		}
474168404Spjd
475168404Spjd		if (tx->tx_exiting)
476168404Spjd			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
477168404Spjd
478168404Spjd		/*
479168404Spjd		 * Consume the quiesced txg which has been handed off to
480168404Spjd		 * us.  This may cause the quiescing thread to now be
481168404Spjd		 * able to quiesce another txg, so we must signal it.
482168404Spjd		 */
483168404Spjd		txg = tx->tx_quiesced_txg;
484168404Spjd		tx->tx_quiesced_txg = 0;
485168404Spjd		tx->tx_syncing_txg = txg;
486168404Spjd		cv_broadcast(&tx->tx_quiesce_more_cv);
487168404Spjd
488168404Spjd		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
489185029Spjd		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
490168404Spjd		mutex_exit(&tx->tx_sync_lock);
491185029Spjd
492219089Spjd		start = ddi_get_lbolt();
493219089Spjd		spa_sync(spa, txg);
494219089Spjd		delta = ddi_get_lbolt() - start;
495185029Spjd
496168404Spjd		mutex_enter(&tx->tx_sync_lock);
497168404Spjd		tx->tx_synced_txg = txg;
498168404Spjd		tx->tx_syncing_txg = 0;
499168404Spjd		cv_broadcast(&tx->tx_sync_done_cv);
500219089Spjd
501219089Spjd		/*
502219089Spjd		 * Dispatch commit callbacks to worker threads.
503219089Spjd		 */
504219089Spjd		txg_dispatch_callbacks(dp, txg);
505168404Spjd	}
506168404Spjd}
507168404Spjd
508168404Spjdstatic void
509168404Spjdtxg_quiesce_thread(void *arg)
510168404Spjd{
511168404Spjd	dsl_pool_t *dp = arg;
512168404Spjd	tx_state_t *tx = &dp->dp_tx;
513168404Spjd	callb_cpr_t cpr;
514168404Spjd
515168404Spjd	txg_thread_enter(tx, &cpr);
516168404Spjd
517168404Spjd	for (;;) {
518168404Spjd		uint64_t txg;
519168404Spjd
520168404Spjd		/*
521168404Spjd		 * We quiesce when there's someone waiting on us.
522168404Spjd		 * However, we can only have one txg in "quiescing" or
523168404Spjd		 * "quiesced, waiting to sync" state.  So we wait until
524168404Spjd		 * the "quiesced, waiting to sync" txg has been consumed
525168404Spjd		 * by the sync thread.
526168404Spjd		 */
527168404Spjd		while (!tx->tx_exiting &&
528168404Spjd		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
529168404Spjd		    tx->tx_quiesced_txg != 0))
530168404Spjd			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
531168404Spjd
532168404Spjd		if (tx->tx_exiting)
533168404Spjd			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
534168404Spjd
535168404Spjd		txg = tx->tx_open_txg;
536168404Spjd		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
537168404Spjd		    txg, tx->tx_quiesce_txg_waiting,
538168404Spjd		    tx->tx_sync_txg_waiting);
539168404Spjd		mutex_exit(&tx->tx_sync_lock);
540168404Spjd		txg_quiesce(dp, txg);
541168404Spjd		mutex_enter(&tx->tx_sync_lock);
542168404Spjd
543168404Spjd		/*
544168404Spjd		 * Hand this txg off to the sync thread.
545168404Spjd		 */
546168404Spjd		dprintf("quiesce done, handing off txg %llu\n", txg);
547168404Spjd		tx->tx_quiesced_txg = txg;
548168404Spjd		cv_broadcast(&tx->tx_sync_more_cv);
549168404Spjd		cv_broadcast(&tx->tx_quiesce_done_cv);
550168404Spjd	}
551168404Spjd}
552168404Spjd
553185029Spjd/*
554185029Spjd * Delay this thread by 'ticks' if we are still in the open transaction
555185029Spjd * group and there is already a waiting txg quiesing or quiesced.  Abort
556185029Spjd * the delay if this txg stalls or enters the quiesing state.
557185029Spjd */
558168404Spjdvoid
559185029Spjdtxg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
560185029Spjd{
561185029Spjd	tx_state_t *tx = &dp->dp_tx;
562224579Smm	clock_t timeout = ddi_get_lbolt() + ticks;
563185029Spjd
564185029Spjd	/* don't delay if this txg could transition to quiesing immediately */
565185029Spjd	if (tx->tx_open_txg > txg ||
566185029Spjd	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
567185029Spjd		return;
568185029Spjd
569185029Spjd	mutex_enter(&tx->tx_sync_lock);
570185029Spjd	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
571185029Spjd		mutex_exit(&tx->tx_sync_lock);
572185029Spjd		return;
573185029Spjd	}
574185029Spjd
575219089Spjd	while (ddi_get_lbolt() < timeout &&
576185029Spjd	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
577185029Spjd		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
578219089Spjd		    timeout - ddi_get_lbolt());
579185029Spjd
580185029Spjd	mutex_exit(&tx->tx_sync_lock);
581185029Spjd}
582185029Spjd
583185029Spjdvoid
584168404Spjdtxg_wait_synced(dsl_pool_t *dp, uint64_t txg)
585168404Spjd{
586168404Spjd	tx_state_t *tx = &dp->dp_tx;
587168404Spjd
588248571Smm	ASSERT(!dsl_pool_config_held(dp));
589248571Smm
590168404Spjd	mutex_enter(&tx->tx_sync_lock);
591185029Spjd	ASSERT(tx->tx_threads == 2);
592168404Spjd	if (txg == 0)
593219089Spjd		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
594168404Spjd	if (tx->tx_sync_txg_waiting < txg)
595168404Spjd		tx->tx_sync_txg_waiting = txg;
596168404Spjd	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
597168404Spjd	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
598168404Spjd	while (tx->tx_synced_txg < txg) {
599168404Spjd		dprintf("broadcasting sync more "
600168404Spjd		    "tx_synced=%llu waiting=%llu dp=%p\n",
601168404Spjd		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
602168404Spjd		cv_broadcast(&tx->tx_sync_more_cv);
603168404Spjd		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
604168404Spjd	}
605168404Spjd	mutex_exit(&tx->tx_sync_lock);
606168404Spjd}
607168404Spjd
608168404Spjdvoid
609168404Spjdtxg_wait_open(dsl_pool_t *dp, uint64_t txg)
610168404Spjd{
611168404Spjd	tx_state_t *tx = &dp->dp_tx;
612168404Spjd
613248571Smm	ASSERT(!dsl_pool_config_held(dp));
614248571Smm
615168404Spjd	mutex_enter(&tx->tx_sync_lock);
616185029Spjd	ASSERT(tx->tx_threads == 2);
617168404Spjd	if (txg == 0)
618168404Spjd		txg = tx->tx_open_txg + 1;
619168404Spjd	if (tx->tx_quiesce_txg_waiting < txg)
620168404Spjd		tx->tx_quiesce_txg_waiting = txg;
621168404Spjd	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
622168404Spjd	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
623168404Spjd	while (tx->tx_open_txg < txg) {
624168404Spjd		cv_broadcast(&tx->tx_quiesce_more_cv);
625168404Spjd		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
626168404Spjd	}
627168404Spjd	mutex_exit(&tx->tx_sync_lock);
628168404Spjd}
629168404Spjd
630185029Spjdboolean_t
631185029Spjdtxg_stalled(dsl_pool_t *dp)
632168404Spjd{
633168404Spjd	tx_state_t *tx = &dp->dp_tx;
634185029Spjd	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
635168404Spjd}
636168404Spjd
637185029Spjdboolean_t
638185029Spjdtxg_sync_waiting(dsl_pool_t *dp)
639168404Spjd{
640168404Spjd	tx_state_t *tx = &dp->dp_tx;
641185029Spjd
642185029Spjd	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
643185029Spjd	    tx->tx_quiesced_txg != 0);
644168404Spjd}
645168404Spjd
646168404Spjd/*
647168404Spjd * Per-txg object lists.
648168404Spjd */
649168404Spjdvoid
650168404Spjdtxg_list_create(txg_list_t *tl, size_t offset)
651168404Spjd{
652168404Spjd	int t;
653168404Spjd
654168404Spjd	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
655168404Spjd
656168404Spjd	tl->tl_offset = offset;
657168404Spjd
658168404Spjd	for (t = 0; t < TXG_SIZE; t++)
659168404Spjd		tl->tl_head[t] = NULL;
660168404Spjd}
661168404Spjd
662168404Spjdvoid
663168404Spjdtxg_list_destroy(txg_list_t *tl)
664168404Spjd{
665168404Spjd	int t;
666168404Spjd
667168404Spjd	for (t = 0; t < TXG_SIZE; t++)
668168404Spjd		ASSERT(txg_list_empty(tl, t));
669168404Spjd
670168404Spjd	mutex_destroy(&tl->tl_lock);
671168404Spjd}
672168404Spjd
673239620Smmboolean_t
674168404Spjdtxg_list_empty(txg_list_t *tl, uint64_t txg)
675168404Spjd{
676168404Spjd	return (tl->tl_head[txg & TXG_MASK] == NULL);
677168404Spjd}
678168404Spjd
679168404Spjd/*
680248571Smm * Add an entry to the list (unless it's already on the list).
681248571Smm * Returns B_TRUE if it was actually added.
682168404Spjd */
683248571Smmboolean_t
684168404Spjdtxg_list_add(txg_list_t *tl, void *p, uint64_t txg)
685168404Spjd{
686168404Spjd	int t = txg & TXG_MASK;
687168404Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
688248571Smm	boolean_t add;
689168404Spjd
690168404Spjd	mutex_enter(&tl->tl_lock);
691248571Smm	add = (tn->tn_member[t] == 0);
692248571Smm	if (add) {
693168404Spjd		tn->tn_member[t] = 1;
694168404Spjd		tn->tn_next[t] = tl->tl_head[t];
695168404Spjd		tl->tl_head[t] = tn;
696168404Spjd	}
697168404Spjd	mutex_exit(&tl->tl_lock);
698168404Spjd
699248571Smm	return (add);
700168404Spjd}
701168404Spjd
702168404Spjd/*
703248571Smm * Add an entry to the end of the list, unless it's already on the list.
704248571Smm * (walks list to find end)
705248571Smm * Returns B_TRUE if it was actually added.
706219089Spjd */
707248571Smmboolean_t
708219089Spjdtxg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
709219089Spjd{
710219089Spjd	int t = txg & TXG_MASK;
711219089Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
712248571Smm	boolean_t add;
713219089Spjd
714219089Spjd	mutex_enter(&tl->tl_lock);
715248571Smm	add = (tn->tn_member[t] == 0);
716248571Smm	if (add) {
717219089Spjd		txg_node_t **tp;
718219089Spjd
719219089Spjd		for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
720219089Spjd			continue;
721219089Spjd
722219089Spjd		tn->tn_member[t] = 1;
723219089Spjd		tn->tn_next[t] = NULL;
724219089Spjd		*tp = tn;
725219089Spjd	}
726219089Spjd	mutex_exit(&tl->tl_lock);
727219089Spjd
728248571Smm	return (add);
729219089Spjd}
730219089Spjd
731219089Spjd/*
732168404Spjd * Remove the head of the list and return it.
733168404Spjd */
734168404Spjdvoid *
735168404Spjdtxg_list_remove(txg_list_t *tl, uint64_t txg)
736168404Spjd{
737168404Spjd	int t = txg & TXG_MASK;
738168404Spjd	txg_node_t *tn;
739168404Spjd	void *p = NULL;
740168404Spjd
741168404Spjd	mutex_enter(&tl->tl_lock);
742168404Spjd	if ((tn = tl->tl_head[t]) != NULL) {
743168404Spjd		p = (char *)tn - tl->tl_offset;
744168404Spjd		tl->tl_head[t] = tn->tn_next[t];
745168404Spjd		tn->tn_next[t] = NULL;
746168404Spjd		tn->tn_member[t] = 0;
747168404Spjd	}
748168404Spjd	mutex_exit(&tl->tl_lock);
749168404Spjd
750168404Spjd	return (p);
751168404Spjd}
752168404Spjd
753168404Spjd/*
754168404Spjd * Remove a specific item from the list and return it.
755168404Spjd */
756168404Spjdvoid *
757168404Spjdtxg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
758168404Spjd{
759168404Spjd	int t = txg & TXG_MASK;
760168404Spjd	txg_node_t *tn, **tp;
761168404Spjd
762168404Spjd	mutex_enter(&tl->tl_lock);
763168404Spjd
764168404Spjd	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
765168404Spjd		if ((char *)tn - tl->tl_offset == p) {
766168404Spjd			*tp = tn->tn_next[t];
767168404Spjd			tn->tn_next[t] = NULL;
768168404Spjd			tn->tn_member[t] = 0;
769168404Spjd			mutex_exit(&tl->tl_lock);
770168404Spjd			return (p);
771168404Spjd		}
772168404Spjd	}
773168404Spjd
774168404Spjd	mutex_exit(&tl->tl_lock);
775168404Spjd
776168404Spjd	return (NULL);
777168404Spjd}
778168404Spjd
779248571Smmboolean_t
780168404Spjdtxg_list_member(txg_list_t *tl, void *p, uint64_t txg)
781168404Spjd{
782168404Spjd	int t = txg & TXG_MASK;
783168404Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
784168404Spjd
785248571Smm	return (tn->tn_member[t] != 0);
786168404Spjd}
787168404Spjd
788168404Spjd/*
789168404Spjd * Walk a txg list -- only safe if you know it's not changing.
790168404Spjd */
791168404Spjdvoid *
792168404Spjdtxg_list_head(txg_list_t *tl, uint64_t txg)
793168404Spjd{
794168404Spjd	int t = txg & TXG_MASK;
795168404Spjd	txg_node_t *tn = tl->tl_head[t];
796168404Spjd
797168404Spjd	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
798168404Spjd}
799168404Spjd
800168404Spjdvoid *
801168404Spjdtxg_list_next(txg_list_t *tl, void *p, uint64_t txg)
802168404Spjd{
803168404Spjd	int t = txg & TXG_MASK;
804168404Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
805168404Spjd
806168404Spjd	tn = tn->tn_next[t];
807168404Spjd
808168404Spjd	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
809168404Spjd}
810