txg.c revision 210192
1168404Spjd/*
2168404Spjd * CDDL HEADER START
3168404Spjd *
4168404Spjd * The contents of this file are subject to the terms of the
5168404Spjd * Common Development and Distribution License (the "License").
6168404Spjd * You may not use this file except in compliance with the License.
7168404Spjd *
8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9168404Spjd * or http://www.opensolaris.org/os/licensing.
10168404Spjd * See the License for the specific language governing permissions
11168404Spjd * and limitations under the License.
12168404Spjd *
13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each
14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15168404Spjd * If applicable, add the following below this CDDL HEADER, with the
16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying
17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner]
18168404Spjd *
19168404Spjd * CDDL HEADER END
20168404Spjd */
21168404Spjd/*
22185029Spjd * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23168404Spjd * Use is subject to license terms.
24168404Spjd */
25168404Spjd
26168404Spjd#include <sys/zfs_context.h>
27168404Spjd#include <sys/txg_impl.h>
28168404Spjd#include <sys/dmu_impl.h>
29168404Spjd#include <sys/dsl_pool.h>
30168404Spjd#include <sys/callb.h>
31168404Spjd
32168404Spjd/*
33168404Spjd * Pool-wide transaction groups.
34168404Spjd */
35168404Spjd
36168404Spjdstatic void txg_sync_thread(void *arg);
37168404Spjdstatic void txg_quiesce_thread(void *arg);
38168404Spjd
39185029Spjdint zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
40185029Spjdextern int zfs_txg_synctime;
41207481Smmextern uint64_t zfs_write_limit_override;
42168404Spjd
43185029SpjdSYSCTL_DECL(_vfs_zfs);
44207480SmmSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0,
45207480Smm    "ZFS transaction groups (TXG)");
46185029SpjdTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
47185029SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0,
48185029Spjd    "Maximum seconds worth of delta per txg");
49185029SpjdTUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime);
50185029SpjdSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime,
51185029Spjd    0, "Target seconds to sync a txg");
52207481SmmTUNABLE_QUAD("vfs.zfs.txg.write_limit_override", &zfs_write_limit_override);
53207481SmmSYSCTL_QUAD(_vfs_zfs_txg, OID_AUTO, write_limit_override, CTLFLAG_RW,
54207481Smm    &zfs_write_limit_override, 0,
55207481Smm    "Override maximum size of a txg to this size in bytes, "
56207481Smm    "value of 0 means don't override");
57185029Spjd
58168404Spjd/*
59168404Spjd * Prepare the txg subsystem.
60168404Spjd */
61168404Spjdvoid
62168404Spjdtxg_init(dsl_pool_t *dp, uint64_t txg)
63168404Spjd{
64168404Spjd	tx_state_t *tx = &dp->dp_tx;
65185029Spjd	int c;
66168404Spjd	bzero(tx, sizeof (tx_state_t));
67168404Spjd
68168404Spjd	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
69185029Spjd
70168404Spjd	for (c = 0; c < max_ncpus; c++) {
71185029Spjd		int i;
72185029Spjd
73168404Spjd		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
74185029Spjd		for (i = 0; i < TXG_SIZE; i++) {
75185029Spjd			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
76185029Spjd			    NULL);
77185029Spjd		}
78168404Spjd	}
79168404Spjd
80168404Spjd	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
81168404Spjd	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
82208372Smm
83168404Spjd	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
84168404Spjd	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
85168404Spjd	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
86168404Spjd	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
87168404Spjd	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
88168404Spjd
89168404Spjd	tx->tx_open_txg = txg;
90168404Spjd}
91168404Spjd
92168404Spjd/*
93168404Spjd * Close down the txg subsystem.
94168404Spjd */
95168404Spjdvoid
96168404Spjdtxg_fini(dsl_pool_t *dp)
97168404Spjd{
98168404Spjd	tx_state_t *tx = &dp->dp_tx;
99185029Spjd	int c;
100168404Spjd
101168404Spjd	ASSERT(tx->tx_threads == 0);
102168404Spjd
103168404Spjd	rw_destroy(&tx->tx_suspend);
104168404Spjd	mutex_destroy(&tx->tx_sync_lock);
105168404Spjd
106208372Smm	cv_destroy(&tx->tx_sync_more_cv);
107208372Smm	cv_destroy(&tx->tx_sync_done_cv);
108208372Smm	cv_destroy(&tx->tx_quiesce_more_cv);
109208372Smm	cv_destroy(&tx->tx_quiesce_done_cv);
110208372Smm	cv_destroy(&tx->tx_exit_cv);
111208372Smm
112168404Spjd	for (c = 0; c < max_ncpus; c++) {
113185029Spjd		int i;
114185029Spjd
115185029Spjd		mutex_destroy(&tx->tx_cpu[c].tc_lock);
116168404Spjd		for (i = 0; i < TXG_SIZE; i++)
117168404Spjd			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
118168404Spjd	}
119168404Spjd
120168404Spjd	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
121168404Spjd
122168404Spjd	bzero(tx, sizeof (tx_state_t));
123168404Spjd}
124168404Spjd
125168404Spjd/*
126168404Spjd * Start syncing transaction groups.
127168404Spjd */
128168404Spjdvoid
129168404Spjdtxg_sync_start(dsl_pool_t *dp)
130168404Spjd{
131168404Spjd	tx_state_t *tx = &dp->dp_tx;
132168404Spjd
133168404Spjd	mutex_enter(&tx->tx_sync_lock);
134168404Spjd
135168404Spjd	dprintf("pool %p\n", dp);
136168404Spjd
137168404Spjd	ASSERT(tx->tx_threads == 0);
138168404Spjd
139185029Spjd	tx->tx_threads = 2;
140168404Spjd
141168404Spjd	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
142168404Spjd	    dp, 0, &p0, TS_RUN, minclsyspri);
143168404Spjd
144185029Spjd	/*
145185029Spjd	 * The sync thread can need a larger-than-default stack size on
146185029Spjd	 * 32-bit x86.  This is due in part to nested pools and
147185029Spjd	 * scrub_visitbp() recursion.
148185029Spjd	 */
149210192Snwhitehorn	tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
150168404Spjd	    dp, 0, &p0, TS_RUN, minclsyspri);
151168404Spjd
152168404Spjd	mutex_exit(&tx->tx_sync_lock);
153168404Spjd}
154168404Spjd
155168404Spjdstatic void
156168404Spjdtxg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
157168404Spjd{
158168404Spjd	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
159168404Spjd	mutex_enter(&tx->tx_sync_lock);
160168404Spjd}
161168404Spjd
162168404Spjdstatic void
163168404Spjdtxg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
164168404Spjd{
165168404Spjd	ASSERT(*tpp != NULL);
166168404Spjd	*tpp = NULL;
167168404Spjd	tx->tx_threads--;
168168404Spjd	cv_broadcast(&tx->tx_exit_cv);
169168404Spjd	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
170168404Spjd	thread_exit();
171168404Spjd}
172168404Spjd
173168404Spjdstatic void
174185029Spjdtxg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
175168404Spjd{
176168404Spjd	CALLB_CPR_SAFE_BEGIN(cpr);
177168404Spjd
178185029Spjd	if (time)
179185029Spjd		(void) cv_timedwait(cv, &tx->tx_sync_lock, time);
180168404Spjd	else
181168404Spjd		cv_wait(cv, &tx->tx_sync_lock);
182168404Spjd
183168404Spjd	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
184168404Spjd}
185168404Spjd
186168404Spjd/*
187168404Spjd * Stop syncing transaction groups.
188168404Spjd */
189168404Spjdvoid
190168404Spjdtxg_sync_stop(dsl_pool_t *dp)
191168404Spjd{
192168404Spjd	tx_state_t *tx = &dp->dp_tx;
193168404Spjd
194168404Spjd	dprintf("pool %p\n", dp);
195168404Spjd	/*
196168404Spjd	 * Finish off any work in progress.
197168404Spjd	 */
198185029Spjd	ASSERT(tx->tx_threads == 2);
199168404Spjd	txg_wait_synced(dp, 0);
200168404Spjd
201168404Spjd	/*
202185029Spjd	 * Wake all sync threads and wait for them to die.
203168404Spjd	 */
204168404Spjd	mutex_enter(&tx->tx_sync_lock);
205168404Spjd
206185029Spjd	ASSERT(tx->tx_threads == 2);
207168404Spjd
208168404Spjd	tx->tx_exiting = 1;
209168404Spjd
210168404Spjd	cv_broadcast(&tx->tx_quiesce_more_cv);
211168404Spjd	cv_broadcast(&tx->tx_quiesce_done_cv);
212168404Spjd	cv_broadcast(&tx->tx_sync_more_cv);
213168404Spjd
214168404Spjd	while (tx->tx_threads != 0)
215168404Spjd		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
216168404Spjd
217168404Spjd	tx->tx_exiting = 0;
218168404Spjd
219168404Spjd	mutex_exit(&tx->tx_sync_lock);
220168404Spjd}
221168404Spjd
222168404Spjduint64_t
223168404Spjdtxg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
224168404Spjd{
225168404Spjd	tx_state_t *tx = &dp->dp_tx;
226168404Spjd	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
227168404Spjd	uint64_t txg;
228168404Spjd
229168404Spjd	mutex_enter(&tc->tc_lock);
230168404Spjd
231168404Spjd	txg = tx->tx_open_txg;
232168404Spjd	tc->tc_count[txg & TXG_MASK]++;
233168404Spjd
234168404Spjd	th->th_cpu = tc;
235168404Spjd	th->th_txg = txg;
236168404Spjd
237168404Spjd	return (txg);
238168404Spjd}
239168404Spjd
240168404Spjdvoid
241168404Spjdtxg_rele_to_quiesce(txg_handle_t *th)
242168404Spjd{
243168404Spjd	tx_cpu_t *tc = th->th_cpu;
244168404Spjd
245168404Spjd	mutex_exit(&tc->tc_lock);
246168404Spjd}
247168404Spjd
248168404Spjdvoid
249168404Spjdtxg_rele_to_sync(txg_handle_t *th)
250168404Spjd{
251168404Spjd	tx_cpu_t *tc = th->th_cpu;
252168404Spjd	int g = th->th_txg & TXG_MASK;
253168404Spjd
254168404Spjd	mutex_enter(&tc->tc_lock);
255168404Spjd	ASSERT(tc->tc_count[g] != 0);
256168404Spjd	if (--tc->tc_count[g] == 0)
257168404Spjd		cv_broadcast(&tc->tc_cv[g]);
258168404Spjd	mutex_exit(&tc->tc_lock);
259168404Spjd
260168404Spjd	th->th_cpu = NULL;	/* defensive */
261168404Spjd}
262168404Spjd
263168404Spjdstatic void
264168404Spjdtxg_quiesce(dsl_pool_t *dp, uint64_t txg)
265168404Spjd{
266168404Spjd	tx_state_t *tx = &dp->dp_tx;
267168404Spjd	int g = txg & TXG_MASK;
268168404Spjd	int c;
269168404Spjd
270168404Spjd	/*
271168404Spjd	 * Grab all tx_cpu locks so nobody else can get into this txg.
272168404Spjd	 */
273168404Spjd	for (c = 0; c < max_ncpus; c++)
274168404Spjd		mutex_enter(&tx->tx_cpu[c].tc_lock);
275168404Spjd
276168404Spjd	ASSERT(txg == tx->tx_open_txg);
277168404Spjd	tx->tx_open_txg++;
278168404Spjd
279168404Spjd	/*
280168404Spjd	 * Now that we've incremented tx_open_txg, we can let threads
281168404Spjd	 * enter the next transaction group.
282168404Spjd	 */
283168404Spjd	for (c = 0; c < max_ncpus; c++)
284168404Spjd		mutex_exit(&tx->tx_cpu[c].tc_lock);
285168404Spjd
286168404Spjd	/*
287168404Spjd	 * Quiesce the transaction group by waiting for everyone to txg_exit().
288168404Spjd	 */
289168404Spjd	for (c = 0; c < max_ncpus; c++) {
290168404Spjd		tx_cpu_t *tc = &tx->tx_cpu[c];
291168404Spjd		mutex_enter(&tc->tc_lock);
292168404Spjd		while (tc->tc_count[g] != 0)
293168404Spjd			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
294168404Spjd		mutex_exit(&tc->tc_lock);
295168404Spjd	}
296168404Spjd}
297168404Spjd
298168404Spjdstatic void
299168404Spjdtxg_sync_thread(void *arg)
300168404Spjd{
301168404Spjd	dsl_pool_t *dp = arg;
302168404Spjd	tx_state_t *tx = &dp->dp_tx;
303168404Spjd	callb_cpr_t cpr;
304185029Spjd	uint64_t start, delta;
305168404Spjd
306168404Spjd	txg_thread_enter(tx, &cpr);
307168404Spjd
308185029Spjd	start = delta = 0;
309168404Spjd	for (;;) {
310185029Spjd		uint64_t timer, timeout = zfs_txg_timeout * hz;
311168404Spjd		uint64_t txg;
312168404Spjd
313168404Spjd		/*
314208047Smm		 * We sync when we're scrubbing, there's someone waiting
315208047Smm		 * on us, or the quiesce thread has handed off a txg to
316208047Smm		 * us, or we have reached our timeout.
317168404Spjd		 */
318185029Spjd		timer = (delta >= timeout ? 0 : timeout - delta);
319208047Smm		while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
320208047Smm		    spa_shutting_down(dp->dp_spa)) &&
321208047Smm		    !tx->tx_exiting && timer > 0 &&
322168404Spjd		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
323168404Spjd		    tx->tx_quiesced_txg == 0) {
324168404Spjd			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
325168404Spjd			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
326185029Spjd			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
327185029Spjd			delta = LBOLT - start;
328185029Spjd			timer = (delta > timeout ? 0 : timeout - delta);
329168404Spjd		}
330168404Spjd
331168404Spjd		/*
332168404Spjd		 * Wait until the quiesce thread hands off a txg to us,
333168404Spjd		 * prompting it to do so if necessary.
334168404Spjd		 */
335168404Spjd		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
336168404Spjd			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
337168404Spjd				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
338168404Spjd			cv_broadcast(&tx->tx_quiesce_more_cv);
339168404Spjd			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
340168404Spjd		}
341168404Spjd
342168404Spjd		if (tx->tx_exiting)
343168404Spjd			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
344168404Spjd
345168404Spjd		rw_enter(&tx->tx_suspend, RW_WRITER);
346168404Spjd
347168404Spjd		/*
348168404Spjd		 * Consume the quiesced txg which has been handed off to
349168404Spjd		 * us.  This may cause the quiescing thread to now be
350168404Spjd		 * able to quiesce another txg, so we must signal it.
351168404Spjd		 */
352168404Spjd		txg = tx->tx_quiesced_txg;
353168404Spjd		tx->tx_quiesced_txg = 0;
354168404Spjd		tx->tx_syncing_txg = txg;
355168404Spjd		cv_broadcast(&tx->tx_quiesce_more_cv);
356168404Spjd		rw_exit(&tx->tx_suspend);
357168404Spjd
358168404Spjd		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
359185029Spjd		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
360168404Spjd		mutex_exit(&tx->tx_sync_lock);
361185029Spjd
362185029Spjd		start = LBOLT;
363168404Spjd		spa_sync(dp->dp_spa, txg);
364185029Spjd		delta = LBOLT - start;
365185029Spjd
366168404Spjd		mutex_enter(&tx->tx_sync_lock);
367168404Spjd		rw_enter(&tx->tx_suspend, RW_WRITER);
368168404Spjd		tx->tx_synced_txg = txg;
369168404Spjd		tx->tx_syncing_txg = 0;
370168404Spjd		rw_exit(&tx->tx_suspend);
371168404Spjd		cv_broadcast(&tx->tx_sync_done_cv);
372168404Spjd	}
373168404Spjd}
374168404Spjd
375168404Spjdstatic void
376168404Spjdtxg_quiesce_thread(void *arg)
377168404Spjd{
378168404Spjd	dsl_pool_t *dp = arg;
379168404Spjd	tx_state_t *tx = &dp->dp_tx;
380168404Spjd	callb_cpr_t cpr;
381168404Spjd
382168404Spjd	txg_thread_enter(tx, &cpr);
383168404Spjd
384168404Spjd	for (;;) {
385168404Spjd		uint64_t txg;
386168404Spjd
387168404Spjd		/*
388168404Spjd		 * We quiesce when there's someone waiting on us.
389168404Spjd		 * However, we can only have one txg in "quiescing" or
390168404Spjd		 * "quiesced, waiting to sync" state.  So we wait until
391168404Spjd		 * the "quiesced, waiting to sync" txg has been consumed
392168404Spjd		 * by the sync thread.
393168404Spjd		 */
394168404Spjd		while (!tx->tx_exiting &&
395168404Spjd		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
396168404Spjd		    tx->tx_quiesced_txg != 0))
397168404Spjd			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
398168404Spjd
399168404Spjd		if (tx->tx_exiting)
400168404Spjd			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
401168404Spjd
402168404Spjd		txg = tx->tx_open_txg;
403168404Spjd		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
404168404Spjd		    txg, tx->tx_quiesce_txg_waiting,
405168404Spjd		    tx->tx_sync_txg_waiting);
406168404Spjd		mutex_exit(&tx->tx_sync_lock);
407168404Spjd		txg_quiesce(dp, txg);
408168404Spjd		mutex_enter(&tx->tx_sync_lock);
409168404Spjd
410168404Spjd		/*
411168404Spjd		 * Hand this txg off to the sync thread.
412168404Spjd		 */
413168404Spjd		dprintf("quiesce done, handing off txg %llu\n", txg);
414168404Spjd		tx->tx_quiesced_txg = txg;
415168404Spjd		cv_broadcast(&tx->tx_sync_more_cv);
416168404Spjd		cv_broadcast(&tx->tx_quiesce_done_cv);
417168404Spjd	}
418168404Spjd}
419168404Spjd
420185029Spjd/*
421185029Spjd * Delay this thread by 'ticks' if we are still in the open transaction
422185029Spjd * group and there is already a waiting txg quiesing or quiesced.  Abort
423185029Spjd * the delay if this txg stalls or enters the quiesing state.
424185029Spjd */
425168404Spjdvoid
426185029Spjdtxg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
427185029Spjd{
428185029Spjd	tx_state_t *tx = &dp->dp_tx;
429185029Spjd	int timeout = LBOLT + ticks;
430185029Spjd
431185029Spjd	/* don't delay if this txg could transition to quiesing immediately */
432185029Spjd	if (tx->tx_open_txg > txg ||
433185029Spjd	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
434185029Spjd		return;
435185029Spjd
436185029Spjd	mutex_enter(&tx->tx_sync_lock);
437185029Spjd	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
438185029Spjd		mutex_exit(&tx->tx_sync_lock);
439185029Spjd		return;
440185029Spjd	}
441185029Spjd
442185029Spjd	while (LBOLT < timeout &&
443185029Spjd	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
444185029Spjd		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
445185029Spjd		    timeout - LBOLT);
446185029Spjd
447185029Spjd	mutex_exit(&tx->tx_sync_lock);
448185029Spjd}
449185029Spjd
450185029Spjdvoid
451168404Spjdtxg_wait_synced(dsl_pool_t *dp, uint64_t txg)
452168404Spjd{
453168404Spjd	tx_state_t *tx = &dp->dp_tx;
454168404Spjd
455168404Spjd	mutex_enter(&tx->tx_sync_lock);
456185029Spjd	ASSERT(tx->tx_threads == 2);
457168404Spjd	if (txg == 0)
458168404Spjd		txg = tx->tx_open_txg;
459168404Spjd	if (tx->tx_sync_txg_waiting < txg)
460168404Spjd		tx->tx_sync_txg_waiting = txg;
461168404Spjd	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
462168404Spjd	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
463168404Spjd	while (tx->tx_synced_txg < txg) {
464168404Spjd		dprintf("broadcasting sync more "
465168404Spjd		    "tx_synced=%llu waiting=%llu dp=%p\n",
466168404Spjd		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
467168404Spjd		cv_broadcast(&tx->tx_sync_more_cv);
468168404Spjd		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
469168404Spjd	}
470168404Spjd	mutex_exit(&tx->tx_sync_lock);
471168404Spjd}
472168404Spjd
473168404Spjdvoid
474168404Spjdtxg_wait_open(dsl_pool_t *dp, uint64_t txg)
475168404Spjd{
476168404Spjd	tx_state_t *tx = &dp->dp_tx;
477168404Spjd
478168404Spjd	mutex_enter(&tx->tx_sync_lock);
479185029Spjd	ASSERT(tx->tx_threads == 2);
480168404Spjd	if (txg == 0)
481168404Spjd		txg = tx->tx_open_txg + 1;
482168404Spjd	if (tx->tx_quiesce_txg_waiting < txg)
483168404Spjd		tx->tx_quiesce_txg_waiting = txg;
484168404Spjd	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
485168404Spjd	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
486168404Spjd	while (tx->tx_open_txg < txg) {
487168404Spjd		cv_broadcast(&tx->tx_quiesce_more_cv);
488168404Spjd		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
489168404Spjd	}
490168404Spjd	mutex_exit(&tx->tx_sync_lock);
491168404Spjd}
492168404Spjd
493185029Spjdboolean_t
494185029Spjdtxg_stalled(dsl_pool_t *dp)
495168404Spjd{
496168404Spjd	tx_state_t *tx = &dp->dp_tx;
497185029Spjd	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
498168404Spjd}
499168404Spjd
500185029Spjdboolean_t
501185029Spjdtxg_sync_waiting(dsl_pool_t *dp)
502168404Spjd{
503168404Spjd	tx_state_t *tx = &dp->dp_tx;
504185029Spjd
505185029Spjd	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
506185029Spjd	    tx->tx_quiesced_txg != 0);
507168404Spjd}
508168404Spjd
509168404Spjdvoid
510168404Spjdtxg_suspend(dsl_pool_t *dp)
511168404Spjd{
512168404Spjd	tx_state_t *tx = &dp->dp_tx;
513168404Spjd	/* XXX some code paths suspend when they are already suspended! */
514168404Spjd	rw_enter(&tx->tx_suspend, RW_READER);
515168404Spjd}
516168404Spjd
517168404Spjdvoid
518168404Spjdtxg_resume(dsl_pool_t *dp)
519168404Spjd{
520168404Spjd	tx_state_t *tx = &dp->dp_tx;
521168404Spjd	rw_exit(&tx->tx_suspend);
522168404Spjd}
523168404Spjd
524168404Spjd/*
525168404Spjd * Per-txg object lists.
526168404Spjd */
527168404Spjdvoid
528168404Spjdtxg_list_create(txg_list_t *tl, size_t offset)
529168404Spjd{
530168404Spjd	int t;
531168404Spjd
532168404Spjd	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
533168404Spjd
534168404Spjd	tl->tl_offset = offset;
535168404Spjd
536168404Spjd	for (t = 0; t < TXG_SIZE; t++)
537168404Spjd		tl->tl_head[t] = NULL;
538168404Spjd}
539168404Spjd
540168404Spjdvoid
541168404Spjdtxg_list_destroy(txg_list_t *tl)
542168404Spjd{
543168404Spjd	int t;
544168404Spjd
545168404Spjd	for (t = 0; t < TXG_SIZE; t++)
546168404Spjd		ASSERT(txg_list_empty(tl, t));
547168404Spjd
548168404Spjd	mutex_destroy(&tl->tl_lock);
549168404Spjd}
550168404Spjd
551168404Spjdint
552168404Spjdtxg_list_empty(txg_list_t *tl, uint64_t txg)
553168404Spjd{
554168404Spjd	return (tl->tl_head[txg & TXG_MASK] == NULL);
555168404Spjd}
556168404Spjd
557168404Spjd/*
558168404Spjd * Add an entry to the list.
559168404Spjd * Returns 0 if it's a new entry, 1 if it's already there.
560168404Spjd */
561168404Spjdint
562168404Spjdtxg_list_add(txg_list_t *tl, void *p, uint64_t txg)
563168404Spjd{
564168404Spjd	int t = txg & TXG_MASK;
565168404Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
566168404Spjd	int already_on_list;
567168404Spjd
568168404Spjd	mutex_enter(&tl->tl_lock);
569168404Spjd	already_on_list = tn->tn_member[t];
570168404Spjd	if (!already_on_list) {
571168404Spjd		tn->tn_member[t] = 1;
572168404Spjd		tn->tn_next[t] = tl->tl_head[t];
573168404Spjd		tl->tl_head[t] = tn;
574168404Spjd	}
575168404Spjd	mutex_exit(&tl->tl_lock);
576168404Spjd
577168404Spjd	return (already_on_list);
578168404Spjd}
579168404Spjd
580168404Spjd/*
581168404Spjd * Remove the head of the list and return it.
582168404Spjd */
583168404Spjdvoid *
584168404Spjdtxg_list_remove(txg_list_t *tl, uint64_t txg)
585168404Spjd{
586168404Spjd	int t = txg & TXG_MASK;
587168404Spjd	txg_node_t *tn;
588168404Spjd	void *p = NULL;
589168404Spjd
590168404Spjd	mutex_enter(&tl->tl_lock);
591168404Spjd	if ((tn = tl->tl_head[t]) != NULL) {
592168404Spjd		p = (char *)tn - tl->tl_offset;
593168404Spjd		tl->tl_head[t] = tn->tn_next[t];
594168404Spjd		tn->tn_next[t] = NULL;
595168404Spjd		tn->tn_member[t] = 0;
596168404Spjd	}
597168404Spjd	mutex_exit(&tl->tl_lock);
598168404Spjd
599168404Spjd	return (p);
600168404Spjd}
601168404Spjd
602168404Spjd/*
603168404Spjd * Remove a specific item from the list and return it.
604168404Spjd */
605168404Spjdvoid *
606168404Spjdtxg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
607168404Spjd{
608168404Spjd	int t = txg & TXG_MASK;
609168404Spjd	txg_node_t *tn, **tp;
610168404Spjd
611168404Spjd	mutex_enter(&tl->tl_lock);
612168404Spjd
613168404Spjd	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
614168404Spjd		if ((char *)tn - tl->tl_offset == p) {
615168404Spjd			*tp = tn->tn_next[t];
616168404Spjd			tn->tn_next[t] = NULL;
617168404Spjd			tn->tn_member[t] = 0;
618168404Spjd			mutex_exit(&tl->tl_lock);
619168404Spjd			return (p);
620168404Spjd		}
621168404Spjd	}
622168404Spjd
623168404Spjd	mutex_exit(&tl->tl_lock);
624168404Spjd
625168404Spjd	return (NULL);
626168404Spjd}
627168404Spjd
628168404Spjdint
629168404Spjdtxg_list_member(txg_list_t *tl, void *p, uint64_t txg)
630168404Spjd{
631168404Spjd	int t = txg & TXG_MASK;
632168404Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
633168404Spjd
634168404Spjd	return (tn->tn_member[t]);
635168404Spjd}
636168404Spjd
637168404Spjd/*
638168404Spjd * Walk a txg list -- only safe if you know it's not changing.
639168404Spjd */
640168404Spjdvoid *
641168404Spjdtxg_list_head(txg_list_t *tl, uint64_t txg)
642168404Spjd{
643168404Spjd	int t = txg & TXG_MASK;
644168404Spjd	txg_node_t *tn = tl->tl_head[t];
645168404Spjd
646168404Spjd	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
647168404Spjd}
648168404Spjd
649168404Spjdvoid *
650168404Spjdtxg_list_next(txg_list_t *tl, void *p, uint64_t txg)
651168404Spjd{
652168404Spjd	int t = txg & TXG_MASK;
653168404Spjd	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
654168404Spjd
655168404Spjd	tn = tn->tn_next[t];
656168404Spjd
657168404Spjd	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
658168404Spjd}
659