txg.c revision 207480
1193323Sed/*
2198090Srdivacky * CDDL HEADER START
3193323Sed *
4193323Sed * The contents of this file are subject to the terms of the
5193323Sed * Common Development and Distribution License (the "License").
6193323Sed * You may not use this file except in compliance with the License.
7198090Srdivacky *
8193323Sed * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9193323Sed * or http://www.opensolaris.org/os/licensing.
10193323Sed * See the License for the specific language governing permissions
11193323Sed * and limitations under the License.
12193323Sed *
13193323Sed * When distributing Covered Code, include this CDDL HEADER in each
14193323Sed * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15193323Sed * If applicable, add the following below this CDDL HEADER, with the
16193323Sed * fields enclosed by brackets "[]" replaced with your own identifying
17193323Sed * information: Portions Copyright [yyyy] [name of copyright owner]
18193323Sed *
19193323Sed * CDDL HEADER END
20193323Sed */
21193323Sed/*
22193323Sed * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23193323Sed * Use is subject to license terms.
24193323Sed */
25193323Sed
26193323Sed#include <sys/zfs_context.h>
27193323Sed#include <sys/txg_impl.h>
28193323Sed#include <sys/dmu_impl.h>
29193323Sed#include <sys/dsl_pool.h>
30193323Sed#include <sys/callb.h>
31193323Sed
32193323Sed/*
33195340Sed * Pool-wide transaction groups.
34195340Sed */
35195340Sed
36195340Sedstatic void txg_sync_thread(void *arg);
37195340Sedstatic void txg_quiesce_thread(void *arg);
38193323Sed
39193323Sedint zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
40193323Sedextern int zfs_txg_synctime;
41193323Sed
42193323SedSYSCTL_DECL(_vfs_zfs);
43193323SedSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0,
44193323Sed    "ZFS transaction groups (TXG)");
45193323SedTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
46212904SdimSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0,
47212904Sdim    "Maximum seconds worth of delta per txg");
48212904SdimTUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime);
49212904SdimSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime,
50193323Sed    0, "Target seconds to sync a txg");
51193323Sed
52193323Sed/*
53193323Sed * Prepare the txg subsystem.
54193323Sed */
55193323Sedvoid
56193323Sedtxg_init(dsl_pool_t *dp, uint64_t txg)
57193323Sed{
58193323Sed	tx_state_t *tx = &dp->dp_tx;
59193323Sed	int c;
60193323Sed	bzero(tx, sizeof (tx_state_t));
61193323Sed
62193323Sed	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
63193323Sed
64193323Sed	for (c = 0; c < max_ncpus; c++) {
65193323Sed		int i;
66193323Sed
67193323Sed		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
68193323Sed		for (i = 0; i < TXG_SIZE; i++) {
69193323Sed			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
70193323Sed			    NULL);
71193323Sed		}
72193323Sed	}
73193323Sed
74193323Sed	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
75193323Sed	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
76193323Sed	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
77193323Sed	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
78193323Sed	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
79193323Sed	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
80193323Sed	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
81193323Sed
82193323Sed	tx->tx_open_txg = txg;
83193323Sed}
84193323Sed
85193323Sed/*
86193323Sed * Close down the txg subsystem.
87193323Sed */
88193323Sedvoid
89193323Sedtxg_fini(dsl_pool_t *dp)
90193323Sed{
91193323Sed	tx_state_t *tx = &dp->dp_tx;
92207618Srdivacky	int c;
93207618Srdivacky
94207618Srdivacky	ASSERT(tx->tx_threads == 0);
95210299Sed
96210299Sed	cv_destroy(&tx->tx_exit_cv);
97210299Sed	cv_destroy(&tx->tx_quiesce_done_cv);
98193323Sed	cv_destroy(&tx->tx_quiesce_more_cv);
99193323Sed	cv_destroy(&tx->tx_sync_done_cv);
100195340Sed	cv_destroy(&tx->tx_sync_more_cv);
101195340Sed	rw_destroy(&tx->tx_suspend);
102212904Sdim	mutex_destroy(&tx->tx_sync_lock);
103198090Srdivacky
104193323Sed	for (c = 0; c < max_ncpus; c++) {
105193323Sed		int i;
106193323Sed
107210299Sed		mutex_destroy(&tx->tx_cpu[c].tc_lock);
108210299Sed		for (i = 0; i < TXG_SIZE; i++)
109193323Sed			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
110193574Sed	}
111193323Sed
112195340Sed	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
113212904Sdim
114198090Srdivacky	bzero(tx, sizeof (tx_state_t));
115193323Sed}
116193323Sed
117193323Sed/*
118193323Sed * Start syncing transaction groups.
119210299Sed */
120210299Sedvoid
121193323Sedtxg_sync_start(dsl_pool_t *dp)
122193323Sed{
123198090Srdivacky	tx_state_t *tx = &dp->dp_tx;
124195340Sed
125193323Sed	mutex_enter(&tx->tx_sync_lock);
126193323Sed
127193323Sed	dprintf("pool %p\n", dp);
128193323Sed
129193323Sed	ASSERT(tx->tx_threads == 0);
130193323Sed
131193323Sed	tx->tx_threads = 2;
132212904Sdim
133212904Sdim	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
134212904Sdim	    dp, 0, &p0, TS_RUN, minclsyspri);
135193323Sed
136193323Sed	/*
137193323Sed	 * The sync thread can need a larger-than-default stack size on
138193323Sed	 * 32-bit x86.  This is due in part to nested pools and
139193323Sed	 * scrub_visitbp() recursion.
140198090Srdivacky	 */
141193323Sed	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
142193323Sed	    dp, 0, &p0, TS_RUN, minclsyspri);
143193323Sed
144193323Sed	mutex_exit(&tx->tx_sync_lock);
145193323Sed}
146193323Sed
147193323Sedstatic void
148193323Sedtxg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
149193323Sed{
150193323Sed	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
151193323Sed	mutex_enter(&tx->tx_sync_lock);
152193323Sed}
153193323Sed
154193323Sedstatic void
155193323Sedtxg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
156193323Sed{
157193323Sed	ASSERT(*tpp != NULL);
158193323Sed	*tpp = NULL;
159193323Sed	tx->tx_threads--;
160193323Sed	cv_broadcast(&tx->tx_exit_cv);
161193323Sed	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
162193323Sed	thread_exit();
163193323Sed}
164193323Sed
165193323Sedstatic void
166193323Sedtxg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
167193323Sed{
168193323Sed	CALLB_CPR_SAFE_BEGIN(cpr);
169193323Sed
170193323Sed	if (time)
171193323Sed		(void) cv_timedwait(cv, &tx->tx_sync_lock, time);
172193323Sed	else
173193323Sed		cv_wait(cv, &tx->tx_sync_lock);
174193323Sed
175193323Sed	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
176193323Sed}
177193323Sed
178193323Sed/*
179193323Sed * Stop syncing transaction groups.
180193323Sed */
181193323Sedvoid
182193323Sedtxg_sync_stop(dsl_pool_t *dp)
183193323Sed{
184193323Sed	tx_state_t *tx = &dp->dp_tx;
185193323Sed
186193323Sed	dprintf("pool %p\n", dp);
187193323Sed	/*
188193323Sed	 * Finish off any work in progress.
189193323Sed	 */
190193323Sed	ASSERT(tx->tx_threads == 2);
191193323Sed	txg_wait_synced(dp, 0);
192193323Sed
193193323Sed	/*
194193323Sed	 * Wake all sync threads and wait for them to die.
195193323Sed	 */
196193323Sed	mutex_enter(&tx->tx_sync_lock);
197193323Sed
198193323Sed	ASSERT(tx->tx_threads == 2);
199193323Sed
200193323Sed	tx->tx_exiting = 1;
201193323Sed
202193323Sed	cv_broadcast(&tx->tx_quiesce_more_cv);
203193323Sed	cv_broadcast(&tx->tx_quiesce_done_cv);
204193323Sed	cv_broadcast(&tx->tx_sync_more_cv);
205193323Sed
206193323Sed	while (tx->tx_threads != 0)
207193323Sed		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
208193323Sed
209193323Sed	tx->tx_exiting = 0;
210193323Sed
211193323Sed	mutex_exit(&tx->tx_sync_lock);
212193323Sed}
213193323Sed
214193323Seduint64_t
215193323Sedtxg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
216193323Sed{
217193323Sed	tx_state_t *tx = &dp->dp_tx;
218193323Sed	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
219193323Sed	uint64_t txg;
220193323Sed
221193323Sed	mutex_enter(&tc->tc_lock);
222193323Sed
223193323Sed	txg = tx->tx_open_txg;
224193323Sed	tc->tc_count[txg & TXG_MASK]++;
225193323Sed
226193323Sed	th->th_cpu = tc;
227193323Sed	th->th_txg = txg;
228193323Sed
229193323Sed	return (txg);
230193323Sed}
231193323Sed
232193323Sedvoid
233193323Sedtxg_rele_to_quiesce(txg_handle_t *th)
234193323Sed{
235193323Sed	tx_cpu_t *tc = th->th_cpu;
236193323Sed
237193323Sed	mutex_exit(&tc->tc_lock);
238193323Sed}
239193323Sed
240193323Sedvoid
241207618Srdivackytxg_rele_to_sync(txg_handle_t *th)
242207618Srdivacky{
243207618Srdivacky	tx_cpu_t *tc = th->th_cpu;
244210299Sed	int g = th->th_txg & TXG_MASK;
245210299Sed
246210299Sed	mutex_enter(&tc->tc_lock);
247193323Sed	ASSERT(tc->tc_count[g] != 0);
248193323Sed	if (--tc->tc_count[g] == 0)
249193323Sed		cv_broadcast(&tc->tc_cv[g]);
250193323Sed	mutex_exit(&tc->tc_lock);
251
252	th->th_cpu = NULL;	/* defensive */
253}
254
255static void
256txg_quiesce(dsl_pool_t *dp, uint64_t txg)
257{
258	tx_state_t *tx = &dp->dp_tx;
259	int g = txg & TXG_MASK;
260	int c;
261
262	/*
263	 * Grab all tx_cpu locks so nobody else can get into this txg.
264	 */
265	for (c = 0; c < max_ncpus; c++)
266		mutex_enter(&tx->tx_cpu[c].tc_lock);
267
268	ASSERT(txg == tx->tx_open_txg);
269	tx->tx_open_txg++;
270
271	/*
272	 * Now that we've incremented tx_open_txg, we can let threads
273	 * enter the next transaction group.
274	 */
275	for (c = 0; c < max_ncpus; c++)
276		mutex_exit(&tx->tx_cpu[c].tc_lock);
277
278	/*
279	 * Quiesce the transaction group by waiting for everyone to txg_exit().
280	 */
281	for (c = 0; c < max_ncpus; c++) {
282		tx_cpu_t *tc = &tx->tx_cpu[c];
283		mutex_enter(&tc->tc_lock);
284		while (tc->tc_count[g] != 0)
285			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
286		mutex_exit(&tc->tc_lock);
287	}
288}
289
290static void
291txg_sync_thread(void *arg)
292{
293	dsl_pool_t *dp = arg;
294	tx_state_t *tx = &dp->dp_tx;
295	callb_cpr_t cpr;
296	uint64_t start, delta;
297
298	txg_thread_enter(tx, &cpr);
299
300	start = delta = 0;
301	for (;;) {
302		uint64_t timer, timeout = zfs_txg_timeout * hz;
303		uint64_t txg;
304
305		/*
306		 * We sync when there's someone waiting on us, or the
307		 * quiesce thread has handed off a txg to us, or we have
308		 * reached our timeout.
309		 */
310		timer = (delta >= timeout ? 0 : timeout - delta);
311		while (!tx->tx_exiting && timer > 0 &&
312		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
313		    tx->tx_quiesced_txg == 0) {
314			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
315			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
316			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
317			delta = LBOLT - start;
318			timer = (delta > timeout ? 0 : timeout - delta);
319		}
320
321		/*
322		 * Wait until the quiesce thread hands off a txg to us,
323		 * prompting it to do so if necessary.
324		 */
325		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
326			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
327				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
328			cv_broadcast(&tx->tx_quiesce_more_cv);
329			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
330		}
331
332		if (tx->tx_exiting)
333			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
334
335		rw_enter(&tx->tx_suspend, RW_WRITER);
336
337		/*
338		 * Consume the quiesced txg which has been handed off to
339		 * us.  This may cause the quiescing thread to now be
340		 * able to quiesce another txg, so we must signal it.
341		 */
342		txg = tx->tx_quiesced_txg;
343		tx->tx_quiesced_txg = 0;
344		tx->tx_syncing_txg = txg;
345		cv_broadcast(&tx->tx_quiesce_more_cv);
346		rw_exit(&tx->tx_suspend);
347
348		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
349		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
350		mutex_exit(&tx->tx_sync_lock);
351
352		start = LBOLT;
353		spa_sync(dp->dp_spa, txg);
354		delta = LBOLT - start;
355
356		mutex_enter(&tx->tx_sync_lock);
357		rw_enter(&tx->tx_suspend, RW_WRITER);
358		tx->tx_synced_txg = txg;
359		tx->tx_syncing_txg = 0;
360		rw_exit(&tx->tx_suspend);
361		cv_broadcast(&tx->tx_sync_done_cv);
362	}
363}
364
365static void
366txg_quiesce_thread(void *arg)
367{
368	dsl_pool_t *dp = arg;
369	tx_state_t *tx = &dp->dp_tx;
370	callb_cpr_t cpr;
371
372	txg_thread_enter(tx, &cpr);
373
374	for (;;) {
375		uint64_t txg;
376
377		/*
378		 * We quiesce when there's someone waiting on us.
379		 * However, we can only have one txg in "quiescing" or
380		 * "quiesced, waiting to sync" state.  So we wait until
381		 * the "quiesced, waiting to sync" txg has been consumed
382		 * by the sync thread.
383		 */
384		while (!tx->tx_exiting &&
385		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
386		    tx->tx_quiesced_txg != 0))
387			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
388
389		if (tx->tx_exiting)
390			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
391
392		txg = tx->tx_open_txg;
393		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
394		    txg, tx->tx_quiesce_txg_waiting,
395		    tx->tx_sync_txg_waiting);
396		mutex_exit(&tx->tx_sync_lock);
397		txg_quiesce(dp, txg);
398		mutex_enter(&tx->tx_sync_lock);
399
400		/*
401		 * Hand this txg off to the sync thread.
402		 */
403		dprintf("quiesce done, handing off txg %llu\n", txg);
404		tx->tx_quiesced_txg = txg;
405		cv_broadcast(&tx->tx_sync_more_cv);
406		cv_broadcast(&tx->tx_quiesce_done_cv);
407	}
408}
409
410/*
411 * Delay this thread by 'ticks' if we are still in the open transaction
412 * group and there is already a waiting txg quiesing or quiesced.  Abort
413 * the delay if this txg stalls or enters the quiesing state.
414 */
415void
416txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
417{
418	tx_state_t *tx = &dp->dp_tx;
419	int timeout = LBOLT + ticks;
420
421	/* don't delay if this txg could transition to quiesing immediately */
422	if (tx->tx_open_txg > txg ||
423	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
424		return;
425
426	mutex_enter(&tx->tx_sync_lock);
427	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
428		mutex_exit(&tx->tx_sync_lock);
429		return;
430	}
431
432	while (LBOLT < timeout &&
433	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
434		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
435		    timeout - LBOLT);
436
437	mutex_exit(&tx->tx_sync_lock);
438}
439
440void
441txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
442{
443	tx_state_t *tx = &dp->dp_tx;
444
445	mutex_enter(&tx->tx_sync_lock);
446	ASSERT(tx->tx_threads == 2);
447	if (txg == 0)
448		txg = tx->tx_open_txg;
449	if (tx->tx_sync_txg_waiting < txg)
450		tx->tx_sync_txg_waiting = txg;
451	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
452	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
453	while (tx->tx_synced_txg < txg) {
454		dprintf("broadcasting sync more "
455		    "tx_synced=%llu waiting=%llu dp=%p\n",
456		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
457		cv_broadcast(&tx->tx_sync_more_cv);
458		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
459	}
460	mutex_exit(&tx->tx_sync_lock);
461}
462
463void
464txg_wait_open(dsl_pool_t *dp, uint64_t txg)
465{
466	tx_state_t *tx = &dp->dp_tx;
467
468	mutex_enter(&tx->tx_sync_lock);
469	ASSERT(tx->tx_threads == 2);
470	if (txg == 0)
471		txg = tx->tx_open_txg + 1;
472	if (tx->tx_quiesce_txg_waiting < txg)
473		tx->tx_quiesce_txg_waiting = txg;
474	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
475	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
476	while (tx->tx_open_txg < txg) {
477		cv_broadcast(&tx->tx_quiesce_more_cv);
478		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
479	}
480	mutex_exit(&tx->tx_sync_lock);
481}
482
483boolean_t
484txg_stalled(dsl_pool_t *dp)
485{
486	tx_state_t *tx = &dp->dp_tx;
487	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
488}
489
490boolean_t
491txg_sync_waiting(dsl_pool_t *dp)
492{
493	tx_state_t *tx = &dp->dp_tx;
494
495	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
496	    tx->tx_quiesced_txg != 0);
497}
498
499void
500txg_suspend(dsl_pool_t *dp)
501{
502	tx_state_t *tx = &dp->dp_tx;
503	/* XXX some code paths suspend when they are already suspended! */
504	rw_enter(&tx->tx_suspend, RW_READER);
505}
506
507void
508txg_resume(dsl_pool_t *dp)
509{
510	tx_state_t *tx = &dp->dp_tx;
511	rw_exit(&tx->tx_suspend);
512}
513
514/*
515 * Per-txg object lists.
516 */
517void
518txg_list_create(txg_list_t *tl, size_t offset)
519{
520	int t;
521
522	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
523
524	tl->tl_offset = offset;
525
526	for (t = 0; t < TXG_SIZE; t++)
527		tl->tl_head[t] = NULL;
528}
529
530void
531txg_list_destroy(txg_list_t *tl)
532{
533	int t;
534
535	for (t = 0; t < TXG_SIZE; t++)
536		ASSERT(txg_list_empty(tl, t));
537
538	mutex_destroy(&tl->tl_lock);
539}
540
541int
542txg_list_empty(txg_list_t *tl, uint64_t txg)
543{
544	return (tl->tl_head[txg & TXG_MASK] == NULL);
545}
546
547/*
548 * Add an entry to the list.
549 * Returns 0 if it's a new entry, 1 if it's already there.
550 */
551int
552txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
553{
554	int t = txg & TXG_MASK;
555	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
556	int already_on_list;
557
558	mutex_enter(&tl->tl_lock);
559	already_on_list = tn->tn_member[t];
560	if (!already_on_list) {
561		tn->tn_member[t] = 1;
562		tn->tn_next[t] = tl->tl_head[t];
563		tl->tl_head[t] = tn;
564	}
565	mutex_exit(&tl->tl_lock);
566
567	return (already_on_list);
568}
569
570/*
571 * Remove the head of the list and return it.
572 */
573void *
574txg_list_remove(txg_list_t *tl, uint64_t txg)
575{
576	int t = txg & TXG_MASK;
577	txg_node_t *tn;
578	void *p = NULL;
579
580	mutex_enter(&tl->tl_lock);
581	if ((tn = tl->tl_head[t]) != NULL) {
582		p = (char *)tn - tl->tl_offset;
583		tl->tl_head[t] = tn->tn_next[t];
584		tn->tn_next[t] = NULL;
585		tn->tn_member[t] = 0;
586	}
587	mutex_exit(&tl->tl_lock);
588
589	return (p);
590}
591
592/*
593 * Remove a specific item from the list and return it.
594 */
595void *
596txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
597{
598	int t = txg & TXG_MASK;
599	txg_node_t *tn, **tp;
600
601	mutex_enter(&tl->tl_lock);
602
603	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
604		if ((char *)tn - tl->tl_offset == p) {
605			*tp = tn->tn_next[t];
606			tn->tn_next[t] = NULL;
607			tn->tn_member[t] = 0;
608			mutex_exit(&tl->tl_lock);
609			return (p);
610		}
611	}
612
613	mutex_exit(&tl->tl_lock);
614
615	return (NULL);
616}
617
618int
619txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
620{
621	int t = txg & TXG_MASK;
622	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
623
624	return (tn->tn_member[t]);
625}
626
627/*
628 * Walk a txg list -- only safe if you know it's not changing.
629 */
630void *
631txg_list_head(txg_list_t *tl, uint64_t txg)
632{
633	int t = txg & TXG_MASK;
634	txg_node_t *tn = tl->tl_head[t];
635
636	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
637}
638
639void *
640txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
641{
642	int t = txg & TXG_MASK;
643	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
644
645	tn = tn->tn_next[t];
646
647	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
648}
649