txg.c revision 248571
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org>
24 * Copyright (c) 2013 by Delphix. All rights reserved.
25 */
26
27#include <sys/zfs_context.h>
28#include <sys/txg_impl.h>
29#include <sys/dmu_impl.h>
30#include <sys/dmu_tx.h>
31#include <sys/dsl_pool.h>
32#include <sys/dsl_scan.h>
33#include <sys/callb.h>
34
35/*
36 * ZFS Transaction Groups
37 * ----------------------
38 *
39 * ZFS transaction groups are, as the name implies, groups of transactions
40 * that act on persistent state. ZFS asserts consistency at the granularity of
41 * these transaction groups. Each successive transaction group (txg) is
42 * assigned a 64-bit consecutive identifier. There are three active
43 * transaction group states: open, quiescing, or syncing. At any given time,
44 * there may be an active txg associated with each state; each active txg may
45 * either be processing, or blocked waiting to enter the next state. There may
46 * be up to three active txgs, and there is always a txg in the open state
47 * (though it may be blocked waiting to enter the quiescing state). In broad
48 * strokes, transactions ��� operations that change in-memory structures ��� are
49 * accepted into the txg in the open state, and are completed while the txg is
50 * in the open or quiescing states. The accumulated changes are written to
51 * disk in the syncing state.
52 *
53 * Open
54 *
55 * When a new txg becomes active, it first enters the open state. New
56 * transactions ��� updates to in-memory structures ��� are assigned to the
57 * currently open txg. There is always a txg in the open state so that ZFS can
58 * accept new changes (though the txg may refuse new changes if it has hit
59 * some limit). ZFS advances the open txg to the next state for a variety of
60 * reasons such as it hitting a time or size threshold, or the execution of an
61 * administrative action that must be completed in the syncing state.
62 *
63 * Quiescing
64 *
65 * After a txg exits the open state, it enters the quiescing state. The
66 * quiescing state is intended to provide a buffer between accepting new
67 * transactions in the open state and writing them out to stable storage in
68 * the syncing state. While quiescing, transactions can continue their
69 * operation without delaying either of the other states. Typically, a txg is
70 * in the quiescing state very briefly since the operations are bounded by
71 * software latencies rather than, say, slower I/O latencies. After all
72 * transactions complete, the txg is ready to enter the next state.
73 *
74 * Syncing
75 *
76 * In the syncing state, the in-memory state built up during the open and (to
77 * a lesser degree) the quiescing states is written to stable storage. The
78 * process of writing out modified data can, in turn modify more data. For
79 * example when we write new blocks, we need to allocate space for them; those
80 * allocations modify metadata (space maps)... which themselves must be
81 * written to stable storage. During the sync state, ZFS iterates, writing out
82 * data until it converges and all in-memory changes have been written out.
83 * The first such pass is the largest as it encompasses all the modified user
84 * data (as opposed to filesystem metadata). Subsequent passes typically have
85 * far less data to write as they consist exclusively of filesystem metadata.
86 *
87 * To ensure convergence, after a certain number of passes ZFS begins
88 * overwriting locations on stable storage that had been allocated earlier in
89 * the syncing state (and subsequently freed). ZFS usually allocates new
90 * blocks to optimize for large, continuous, writes. For the syncing state to
91 * converge however it must complete a pass where no new blocks are allocated
92 * since each allocation requires a modification of persistent metadata.
93 * Further, to hasten convergence, after a prescribed number of passes, ZFS
94 * also defers frees, and stops compressing.
95 *
96 * In addition to writing out user data, we must also execute synctasks during
97 * the syncing context. A synctask is the mechanism by which some
98 * administrative activities work such as creating and destroying snapshots or
99 * datasets. Note that when a synctask is initiated it enters the open txg,
100 * and ZFS then pushes that txg as quickly as possible to completion of the
101 * syncing state in order to reduce the latency of the administrative
102 * activity. To complete the syncing state, ZFS writes out a new uberblock,
103 * the root of the tree of blocks that comprise all state stored on the ZFS
104 * pool. Finally, if there is a quiesced txg waiting, we signal that it can
105 * now transition to the syncing state.
106 */
107
108static void txg_sync_thread(void *arg);
109static void txg_quiesce_thread(void *arg);
110
111int zfs_txg_timeout = 5;	/* max seconds worth of delta per txg */
112
113SYSCTL_DECL(_vfs_zfs);
114SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
115TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
116SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RW, &zfs_txg_timeout, 0,
117    "Maximum seconds worth of delta per txg");
118
119/*
120 * Prepare the txg subsystem.
121 */
122void
123txg_init(dsl_pool_t *dp, uint64_t txg)
124{
125	tx_state_t *tx = &dp->dp_tx;
126	int c;
127	bzero(tx, sizeof (tx_state_t));
128
129	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
130
131	for (c = 0; c < max_ncpus; c++) {
132		int i;
133
134		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
135		for (i = 0; i < TXG_SIZE; i++) {
136			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
137			    NULL);
138			list_create(&tx->tx_cpu[c].tc_callbacks[i],
139			    sizeof (dmu_tx_callback_t),
140			    offsetof(dmu_tx_callback_t, dcb_node));
141		}
142	}
143
144	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
145
146	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
147	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
148	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
149	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
150	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
151
152	tx->tx_open_txg = txg;
153}
154
155/*
156 * Close down the txg subsystem.
157 */
158void
159txg_fini(dsl_pool_t *dp)
160{
161	tx_state_t *tx = &dp->dp_tx;
162	int c;
163
164	ASSERT(tx->tx_threads == 0);
165
166	mutex_destroy(&tx->tx_sync_lock);
167
168	cv_destroy(&tx->tx_sync_more_cv);
169	cv_destroy(&tx->tx_sync_done_cv);
170	cv_destroy(&tx->tx_quiesce_more_cv);
171	cv_destroy(&tx->tx_quiesce_done_cv);
172	cv_destroy(&tx->tx_exit_cv);
173
174	for (c = 0; c < max_ncpus; c++) {
175		int i;
176
177		mutex_destroy(&tx->tx_cpu[c].tc_lock);
178		for (i = 0; i < TXG_SIZE; i++) {
179			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
180			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
181		}
182	}
183
184	if (tx->tx_commit_cb_taskq != NULL)
185		taskq_destroy(tx->tx_commit_cb_taskq);
186
187	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
188
189	bzero(tx, sizeof (tx_state_t));
190}
191
192/*
193 * Start syncing transaction groups.
194 */
195void
196txg_sync_start(dsl_pool_t *dp)
197{
198	tx_state_t *tx = &dp->dp_tx;
199
200	mutex_enter(&tx->tx_sync_lock);
201
202	dprintf("pool %p\n", dp);
203
204	ASSERT(tx->tx_threads == 0);
205
206	tx->tx_threads = 2;
207
208	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
209	    dp, 0, &p0, TS_RUN, minclsyspri);
210
211	/*
212	 * The sync thread can need a larger-than-default stack size on
213	 * 32-bit x86.  This is due in part to nested pools and
214	 * scrub_visitbp() recursion.
215	 */
216	tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
217	    dp, 0, &p0, TS_RUN, minclsyspri);
218
219	mutex_exit(&tx->tx_sync_lock);
220}
221
222static void
223txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
224{
225	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
226	mutex_enter(&tx->tx_sync_lock);
227}
228
229static void
230txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
231{
232	ASSERT(*tpp != NULL);
233	*tpp = NULL;
234	tx->tx_threads--;
235	cv_broadcast(&tx->tx_exit_cv);
236	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
237	thread_exit();
238}
239
240static void
241txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
242{
243	CALLB_CPR_SAFE_BEGIN(cpr);
244
245	if (time)
246		(void) cv_timedwait(cv, &tx->tx_sync_lock, time);
247	else
248		cv_wait(cv, &tx->tx_sync_lock);
249
250	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
251}
252
253/*
254 * Stop syncing transaction groups.
255 */
256void
257txg_sync_stop(dsl_pool_t *dp)
258{
259	tx_state_t *tx = &dp->dp_tx;
260
261	dprintf("pool %p\n", dp);
262	/*
263	 * Finish off any work in progress.
264	 */
265	ASSERT(tx->tx_threads == 2);
266
267	/*
268	 * We need to ensure that we've vacated the deferred space_maps.
269	 */
270	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
271
272	/*
273	 * Wake all sync threads and wait for them to die.
274	 */
275	mutex_enter(&tx->tx_sync_lock);
276
277	ASSERT(tx->tx_threads == 2);
278
279	tx->tx_exiting = 1;
280
281	cv_broadcast(&tx->tx_quiesce_more_cv);
282	cv_broadcast(&tx->tx_quiesce_done_cv);
283	cv_broadcast(&tx->tx_sync_more_cv);
284
285	while (tx->tx_threads != 0)
286		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
287
288	tx->tx_exiting = 0;
289
290	mutex_exit(&tx->tx_sync_lock);
291}
292
293uint64_t
294txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
295{
296	tx_state_t *tx = &dp->dp_tx;
297	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
298	uint64_t txg;
299
300	mutex_enter(&tc->tc_lock);
301
302	txg = tx->tx_open_txg;
303	tc->tc_count[txg & TXG_MASK]++;
304
305	th->th_cpu = tc;
306	th->th_txg = txg;
307
308	return (txg);
309}
310
311void
312txg_rele_to_quiesce(txg_handle_t *th)
313{
314	tx_cpu_t *tc = th->th_cpu;
315
316	mutex_exit(&tc->tc_lock);
317}
318
319void
320txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
321{
322	tx_cpu_t *tc = th->th_cpu;
323	int g = th->th_txg & TXG_MASK;
324
325	mutex_enter(&tc->tc_lock);
326	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
327	mutex_exit(&tc->tc_lock);
328}
329
330void
331txg_rele_to_sync(txg_handle_t *th)
332{
333	tx_cpu_t *tc = th->th_cpu;
334	int g = th->th_txg & TXG_MASK;
335
336	mutex_enter(&tc->tc_lock);
337	ASSERT(tc->tc_count[g] != 0);
338	if (--tc->tc_count[g] == 0)
339		cv_broadcast(&tc->tc_cv[g]);
340	mutex_exit(&tc->tc_lock);
341
342	th->th_cpu = NULL;	/* defensive */
343}
344
345static void
346txg_quiesce(dsl_pool_t *dp, uint64_t txg)
347{
348	tx_state_t *tx = &dp->dp_tx;
349	int g = txg & TXG_MASK;
350	int c;
351
352	/*
353	 * Grab all tx_cpu locks so nobody else can get into this txg.
354	 */
355	for (c = 0; c < max_ncpus; c++)
356		mutex_enter(&tx->tx_cpu[c].tc_lock);
357
358	ASSERT(txg == tx->tx_open_txg);
359	tx->tx_open_txg++;
360
361	/*
362	 * Now that we've incremented tx_open_txg, we can let threads
363	 * enter the next transaction group.
364	 */
365	for (c = 0; c < max_ncpus; c++)
366		mutex_exit(&tx->tx_cpu[c].tc_lock);
367
368	/*
369	 * Quiesce the transaction group by waiting for everyone to txg_exit().
370	 */
371	for (c = 0; c < max_ncpus; c++) {
372		tx_cpu_t *tc = &tx->tx_cpu[c];
373		mutex_enter(&tc->tc_lock);
374		while (tc->tc_count[g] != 0)
375			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
376		mutex_exit(&tc->tc_lock);
377	}
378}
379
380static void
381txg_do_callbacks(void *arg)
382{
383	list_t *cb_list = arg;
384
385	dmu_tx_do_callbacks(cb_list, 0);
386
387	list_destroy(cb_list);
388
389	kmem_free(cb_list, sizeof (list_t));
390}
391
392/*
393 * Dispatch the commit callbacks registered on this txg to worker threads.
394 */
395static void
396txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
397{
398	int c;
399	tx_state_t *tx = &dp->dp_tx;
400	list_t *cb_list;
401
402	for (c = 0; c < max_ncpus; c++) {
403		tx_cpu_t *tc = &tx->tx_cpu[c];
404		/* No need to lock tx_cpu_t at this point */
405
406		int g = txg & TXG_MASK;
407
408		if (list_is_empty(&tc->tc_callbacks[g]))
409			continue;
410
411		if (tx->tx_commit_cb_taskq == NULL) {
412			/*
413			 * Commit callback taskq hasn't been created yet.
414			 */
415			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
416			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
417			    TASKQ_PREPOPULATE);
418		}
419
420		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
421		list_create(cb_list, sizeof (dmu_tx_callback_t),
422		    offsetof(dmu_tx_callback_t, dcb_node));
423
424		list_move_tail(&tc->tc_callbacks[g], cb_list);
425
426		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
427		    txg_do_callbacks, cb_list, TQ_SLEEP);
428	}
429}
430
431static void
432txg_sync_thread(void *arg)
433{
434	dsl_pool_t *dp = arg;
435	spa_t *spa = dp->dp_spa;
436	tx_state_t *tx = &dp->dp_tx;
437	callb_cpr_t cpr;
438	uint64_t start, delta;
439
440	txg_thread_enter(tx, &cpr);
441
442	start = delta = 0;
443	for (;;) {
444		uint64_t timer, timeout = zfs_txg_timeout * hz;
445		uint64_t txg;
446
447		/*
448		 * We sync when we're scanning, there's someone waiting
449		 * on us, or the quiesce thread has handed off a txg to
450		 * us, or we have reached our timeout.
451		 */
452		timer = (delta >= timeout ? 0 : timeout - delta);
453		while (!dsl_scan_active(dp->dp_scan) &&
454		    !tx->tx_exiting && timer > 0 &&
455		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
456		    tx->tx_quiesced_txg == 0) {
457			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
458			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
459			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
460			delta = ddi_get_lbolt() - start;
461			timer = (delta > timeout ? 0 : timeout - delta);
462		}
463
464		/*
465		 * Wait until the quiesce thread hands off a txg to us,
466		 * prompting it to do so if necessary.
467		 */
468		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
469			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
470				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
471			cv_broadcast(&tx->tx_quiesce_more_cv);
472			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
473		}
474
475		if (tx->tx_exiting)
476			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
477
478		/*
479		 * Consume the quiesced txg which has been handed off to
480		 * us.  This may cause the quiescing thread to now be
481		 * able to quiesce another txg, so we must signal it.
482		 */
483		txg = tx->tx_quiesced_txg;
484		tx->tx_quiesced_txg = 0;
485		tx->tx_syncing_txg = txg;
486		cv_broadcast(&tx->tx_quiesce_more_cv);
487
488		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
489		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
490		mutex_exit(&tx->tx_sync_lock);
491
492		start = ddi_get_lbolt();
493		spa_sync(spa, txg);
494		delta = ddi_get_lbolt() - start;
495
496		mutex_enter(&tx->tx_sync_lock);
497		tx->tx_synced_txg = txg;
498		tx->tx_syncing_txg = 0;
499		cv_broadcast(&tx->tx_sync_done_cv);
500
501		/*
502		 * Dispatch commit callbacks to worker threads.
503		 */
504		txg_dispatch_callbacks(dp, txg);
505	}
506}
507
508static void
509txg_quiesce_thread(void *arg)
510{
511	dsl_pool_t *dp = arg;
512	tx_state_t *tx = &dp->dp_tx;
513	callb_cpr_t cpr;
514
515	txg_thread_enter(tx, &cpr);
516
517	for (;;) {
518		uint64_t txg;
519
520		/*
521		 * We quiesce when there's someone waiting on us.
522		 * However, we can only have one txg in "quiescing" or
523		 * "quiesced, waiting to sync" state.  So we wait until
524		 * the "quiesced, waiting to sync" txg has been consumed
525		 * by the sync thread.
526		 */
527		while (!tx->tx_exiting &&
528		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
529		    tx->tx_quiesced_txg != 0))
530			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
531
532		if (tx->tx_exiting)
533			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
534
535		txg = tx->tx_open_txg;
536		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
537		    txg, tx->tx_quiesce_txg_waiting,
538		    tx->tx_sync_txg_waiting);
539		mutex_exit(&tx->tx_sync_lock);
540		txg_quiesce(dp, txg);
541		mutex_enter(&tx->tx_sync_lock);
542
543		/*
544		 * Hand this txg off to the sync thread.
545		 */
546		dprintf("quiesce done, handing off txg %llu\n", txg);
547		tx->tx_quiesced_txg = txg;
548		cv_broadcast(&tx->tx_sync_more_cv);
549		cv_broadcast(&tx->tx_quiesce_done_cv);
550	}
551}
552
553/*
554 * Delay this thread by 'ticks' if we are still in the open transaction
555 * group and there is already a waiting txg quiesing or quiesced.  Abort
556 * the delay if this txg stalls or enters the quiesing state.
557 */
558void
559txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
560{
561	tx_state_t *tx = &dp->dp_tx;
562	clock_t timeout = ddi_get_lbolt() + ticks;
563
564	/* don't delay if this txg could transition to quiesing immediately */
565	if (tx->tx_open_txg > txg ||
566	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
567		return;
568
569	mutex_enter(&tx->tx_sync_lock);
570	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
571		mutex_exit(&tx->tx_sync_lock);
572		return;
573	}
574
575	while (ddi_get_lbolt() < timeout &&
576	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
577		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
578		    timeout - ddi_get_lbolt());
579
580	mutex_exit(&tx->tx_sync_lock);
581}
582
583void
584txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
585{
586	tx_state_t *tx = &dp->dp_tx;
587
588	ASSERT(!dsl_pool_config_held(dp));
589
590	mutex_enter(&tx->tx_sync_lock);
591	ASSERT(tx->tx_threads == 2);
592	if (txg == 0)
593		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
594	if (tx->tx_sync_txg_waiting < txg)
595		tx->tx_sync_txg_waiting = txg;
596	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
597	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
598	while (tx->tx_synced_txg < txg) {
599		dprintf("broadcasting sync more "
600		    "tx_synced=%llu waiting=%llu dp=%p\n",
601		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
602		cv_broadcast(&tx->tx_sync_more_cv);
603		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
604	}
605	mutex_exit(&tx->tx_sync_lock);
606}
607
608void
609txg_wait_open(dsl_pool_t *dp, uint64_t txg)
610{
611	tx_state_t *tx = &dp->dp_tx;
612
613	ASSERT(!dsl_pool_config_held(dp));
614
615	mutex_enter(&tx->tx_sync_lock);
616	ASSERT(tx->tx_threads == 2);
617	if (txg == 0)
618		txg = tx->tx_open_txg + 1;
619	if (tx->tx_quiesce_txg_waiting < txg)
620		tx->tx_quiesce_txg_waiting = txg;
621	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
622	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
623	while (tx->tx_open_txg < txg) {
624		cv_broadcast(&tx->tx_quiesce_more_cv);
625		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
626	}
627	mutex_exit(&tx->tx_sync_lock);
628}
629
630boolean_t
631txg_stalled(dsl_pool_t *dp)
632{
633	tx_state_t *tx = &dp->dp_tx;
634	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
635}
636
637boolean_t
638txg_sync_waiting(dsl_pool_t *dp)
639{
640	tx_state_t *tx = &dp->dp_tx;
641
642	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
643	    tx->tx_quiesced_txg != 0);
644}
645
646/*
647 * Per-txg object lists.
648 */
649void
650txg_list_create(txg_list_t *tl, size_t offset)
651{
652	int t;
653
654	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
655
656	tl->tl_offset = offset;
657
658	for (t = 0; t < TXG_SIZE; t++)
659		tl->tl_head[t] = NULL;
660}
661
662void
663txg_list_destroy(txg_list_t *tl)
664{
665	int t;
666
667	for (t = 0; t < TXG_SIZE; t++)
668		ASSERT(txg_list_empty(tl, t));
669
670	mutex_destroy(&tl->tl_lock);
671}
672
673boolean_t
674txg_list_empty(txg_list_t *tl, uint64_t txg)
675{
676	return (tl->tl_head[txg & TXG_MASK] == NULL);
677}
678
679/*
680 * Add an entry to the list (unless it's already on the list).
681 * Returns B_TRUE if it was actually added.
682 */
683boolean_t
684txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
685{
686	int t = txg & TXG_MASK;
687	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
688	boolean_t add;
689
690	mutex_enter(&tl->tl_lock);
691	add = (tn->tn_member[t] == 0);
692	if (add) {
693		tn->tn_member[t] = 1;
694		tn->tn_next[t] = tl->tl_head[t];
695		tl->tl_head[t] = tn;
696	}
697	mutex_exit(&tl->tl_lock);
698
699	return (add);
700}
701
702/*
703 * Add an entry to the end of the list, unless it's already on the list.
704 * (walks list to find end)
705 * Returns B_TRUE if it was actually added.
706 */
707boolean_t
708txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
709{
710	int t = txg & TXG_MASK;
711	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
712	boolean_t add;
713
714	mutex_enter(&tl->tl_lock);
715	add = (tn->tn_member[t] == 0);
716	if (add) {
717		txg_node_t **tp;
718
719		for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
720			continue;
721
722		tn->tn_member[t] = 1;
723		tn->tn_next[t] = NULL;
724		*tp = tn;
725	}
726	mutex_exit(&tl->tl_lock);
727
728	return (add);
729}
730
731/*
732 * Remove the head of the list and return it.
733 */
734void *
735txg_list_remove(txg_list_t *tl, uint64_t txg)
736{
737	int t = txg & TXG_MASK;
738	txg_node_t *tn;
739	void *p = NULL;
740
741	mutex_enter(&tl->tl_lock);
742	if ((tn = tl->tl_head[t]) != NULL) {
743		p = (char *)tn - tl->tl_offset;
744		tl->tl_head[t] = tn->tn_next[t];
745		tn->tn_next[t] = NULL;
746		tn->tn_member[t] = 0;
747	}
748	mutex_exit(&tl->tl_lock);
749
750	return (p);
751}
752
753/*
754 * Remove a specific item from the list and return it.
755 */
756void *
757txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
758{
759	int t = txg & TXG_MASK;
760	txg_node_t *tn, **tp;
761
762	mutex_enter(&tl->tl_lock);
763
764	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
765		if ((char *)tn - tl->tl_offset == p) {
766			*tp = tn->tn_next[t];
767			tn->tn_next[t] = NULL;
768			tn->tn_member[t] = 0;
769			mutex_exit(&tl->tl_lock);
770			return (p);
771		}
772	}
773
774	mutex_exit(&tl->tl_lock);
775
776	return (NULL);
777}
778
779boolean_t
780txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
781{
782	int t = txg & TXG_MASK;
783	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
784
785	return (tn->tn_member[t] != 0);
786}
787
788/*
789 * Walk a txg list -- only safe if you know it's not changing.
790 */
791void *
792txg_list_head(txg_list_t *tl, uint64_t txg)
793{
794	int t = txg & TXG_MASK;
795	txg_node_t *tn = tl->tl_head[t];
796
797	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
798}
799
800void *
801txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
802{
803	int t = txg & TXG_MASK;
804	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
805
806	tn = tn->tn_next[t];
807
808	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
809}
810