txg.c revision 224579
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24
25#include <sys/zfs_context.h>
26#include <sys/txg_impl.h>
27#include <sys/dmu_impl.h>
28#include <sys/dmu_tx.h>
29#include <sys/dsl_pool.h>
30#include <sys/dsl_scan.h>
31#include <sys/callb.h>
32
33/*
34 * Pool-wide transaction groups.
35 */
36
37static void txg_sync_thread(void *arg);
38static void txg_quiesce_thread(void *arg);
39
40int zfs_txg_timeout = 5;	/* max seconds worth of delta per txg */
41
42SYSCTL_DECL(_vfs_zfs);
43SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
44TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
45SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0,
46    "Maximum seconds worth of delta per txg");
47
48/*
49 * Prepare the txg subsystem.
50 */
51void
52txg_init(dsl_pool_t *dp, uint64_t txg)
53{
54	tx_state_t *tx = &dp->dp_tx;
55	int c;
56	bzero(tx, sizeof (tx_state_t));
57
58	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
59
60	for (c = 0; c < max_ncpus; c++) {
61		int i;
62
63		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
64		for (i = 0; i < TXG_SIZE; i++) {
65			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
66			    NULL);
67			list_create(&tx->tx_cpu[c].tc_callbacks[i],
68			    sizeof (dmu_tx_callback_t),
69			    offsetof(dmu_tx_callback_t, dcb_node));
70		}
71	}
72
73	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
74
75	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
76	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
77	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
78	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
79	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
80
81	tx->tx_open_txg = txg;
82}
83
84/*
85 * Close down the txg subsystem.
86 */
87void
88txg_fini(dsl_pool_t *dp)
89{
90	tx_state_t *tx = &dp->dp_tx;
91	int c;
92
93	ASSERT(tx->tx_threads == 0);
94
95	mutex_destroy(&tx->tx_sync_lock);
96
97	cv_destroy(&tx->tx_sync_more_cv);
98	cv_destroy(&tx->tx_sync_done_cv);
99	cv_destroy(&tx->tx_quiesce_more_cv);
100	cv_destroy(&tx->tx_quiesce_done_cv);
101	cv_destroy(&tx->tx_exit_cv);
102
103	for (c = 0; c < max_ncpus; c++) {
104		int i;
105
106		mutex_destroy(&tx->tx_cpu[c].tc_lock);
107		for (i = 0; i < TXG_SIZE; i++) {
108			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
109			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
110		}
111	}
112
113	if (tx->tx_commit_cb_taskq != NULL)
114		taskq_destroy(tx->tx_commit_cb_taskq);
115
116	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
117
118	bzero(tx, sizeof (tx_state_t));
119}
120
121/*
122 * Start syncing transaction groups.
123 */
124void
125txg_sync_start(dsl_pool_t *dp)
126{
127	tx_state_t *tx = &dp->dp_tx;
128
129	mutex_enter(&tx->tx_sync_lock);
130
131	dprintf("pool %p\n", dp);
132
133	ASSERT(tx->tx_threads == 0);
134
135	tx->tx_threads = 2;
136
137	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
138	    dp, 0, &p0, TS_RUN, minclsyspri);
139
140	/*
141	 * The sync thread can need a larger-than-default stack size on
142	 * 32-bit x86.  This is due in part to nested pools and
143	 * scrub_visitbp() recursion.
144	 */
145	tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
146	    dp, 0, &p0, TS_RUN, minclsyspri);
147
148	mutex_exit(&tx->tx_sync_lock);
149}
150
151static void
152txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
153{
154	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
155	mutex_enter(&tx->tx_sync_lock);
156}
157
158static void
159txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
160{
161	ASSERT(*tpp != NULL);
162	*tpp = NULL;
163	tx->tx_threads--;
164	cv_broadcast(&tx->tx_exit_cv);
165	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
166	thread_exit();
167}
168
169static void
170txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
171{
172	CALLB_CPR_SAFE_BEGIN(cpr);
173
174	if (time)
175		(void) cv_timedwait(cv, &tx->tx_sync_lock, time);
176	else
177		cv_wait(cv, &tx->tx_sync_lock);
178
179	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
180}
181
182/*
183 * Stop syncing transaction groups.
184 */
185void
186txg_sync_stop(dsl_pool_t *dp)
187{
188	tx_state_t *tx = &dp->dp_tx;
189
190	dprintf("pool %p\n", dp);
191	/*
192	 * Finish off any work in progress.
193	 */
194	ASSERT(tx->tx_threads == 2);
195
196	/*
197	 * We need to ensure that we've vacated the deferred space_maps.
198	 */
199	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
200
201	/*
202	 * Wake all sync threads and wait for them to die.
203	 */
204	mutex_enter(&tx->tx_sync_lock);
205
206	ASSERT(tx->tx_threads == 2);
207
208	tx->tx_exiting = 1;
209
210	cv_broadcast(&tx->tx_quiesce_more_cv);
211	cv_broadcast(&tx->tx_quiesce_done_cv);
212	cv_broadcast(&tx->tx_sync_more_cv);
213
214	while (tx->tx_threads != 0)
215		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
216
217	tx->tx_exiting = 0;
218
219	mutex_exit(&tx->tx_sync_lock);
220}
221
222uint64_t
223txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
224{
225	tx_state_t *tx = &dp->dp_tx;
226	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
227	uint64_t txg;
228
229	mutex_enter(&tc->tc_lock);
230
231	txg = tx->tx_open_txg;
232	tc->tc_count[txg & TXG_MASK]++;
233
234	th->th_cpu = tc;
235	th->th_txg = txg;
236
237	return (txg);
238}
239
240void
241txg_rele_to_quiesce(txg_handle_t *th)
242{
243	tx_cpu_t *tc = th->th_cpu;
244
245	mutex_exit(&tc->tc_lock);
246}
247
248void
249txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
250{
251	tx_cpu_t *tc = th->th_cpu;
252	int g = th->th_txg & TXG_MASK;
253
254	mutex_enter(&tc->tc_lock);
255	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
256	mutex_exit(&tc->tc_lock);
257}
258
259void
260txg_rele_to_sync(txg_handle_t *th)
261{
262	tx_cpu_t *tc = th->th_cpu;
263	int g = th->th_txg & TXG_MASK;
264
265	mutex_enter(&tc->tc_lock);
266	ASSERT(tc->tc_count[g] != 0);
267	if (--tc->tc_count[g] == 0)
268		cv_broadcast(&tc->tc_cv[g]);
269	mutex_exit(&tc->tc_lock);
270
271	th->th_cpu = NULL;	/* defensive */
272}
273
274static void
275txg_quiesce(dsl_pool_t *dp, uint64_t txg)
276{
277	tx_state_t *tx = &dp->dp_tx;
278	int g = txg & TXG_MASK;
279	int c;
280
281	/*
282	 * Grab all tx_cpu locks so nobody else can get into this txg.
283	 */
284	for (c = 0; c < max_ncpus; c++)
285		mutex_enter(&tx->tx_cpu[c].tc_lock);
286
287	ASSERT(txg == tx->tx_open_txg);
288	tx->tx_open_txg++;
289
290	/*
291	 * Now that we've incremented tx_open_txg, we can let threads
292	 * enter the next transaction group.
293	 */
294	for (c = 0; c < max_ncpus; c++)
295		mutex_exit(&tx->tx_cpu[c].tc_lock);
296
297	/*
298	 * Quiesce the transaction group by waiting for everyone to txg_exit().
299	 */
300	for (c = 0; c < max_ncpus; c++) {
301		tx_cpu_t *tc = &tx->tx_cpu[c];
302		mutex_enter(&tc->tc_lock);
303		while (tc->tc_count[g] != 0)
304			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
305		mutex_exit(&tc->tc_lock);
306	}
307}
308
309static void
310txg_do_callbacks(void *arg)
311{
312	list_t *cb_list = arg;
313
314	dmu_tx_do_callbacks(cb_list, 0);
315
316	list_destroy(cb_list);
317
318	kmem_free(cb_list, sizeof (list_t));
319}
320
321/*
322 * Dispatch the commit callbacks registered on this txg to worker threads.
323 */
324static void
325txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
326{
327	int c;
328	tx_state_t *tx = &dp->dp_tx;
329	list_t *cb_list;
330
331	for (c = 0; c < max_ncpus; c++) {
332		tx_cpu_t *tc = &tx->tx_cpu[c];
333		/* No need to lock tx_cpu_t at this point */
334
335		int g = txg & TXG_MASK;
336
337		if (list_is_empty(&tc->tc_callbacks[g]))
338			continue;
339
340		if (tx->tx_commit_cb_taskq == NULL) {
341			/*
342			 * Commit callback taskq hasn't been created yet.
343			 */
344			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
345			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
346			    TASKQ_PREPOPULATE);
347		}
348
349		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
350		list_create(cb_list, sizeof (dmu_tx_callback_t),
351		    offsetof(dmu_tx_callback_t, dcb_node));
352
353		list_move_tail(&tc->tc_callbacks[g], cb_list);
354
355		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
356		    txg_do_callbacks, cb_list, TQ_SLEEP);
357	}
358}
359
360static void
361txg_sync_thread(void *arg)
362{
363	dsl_pool_t *dp = arg;
364	spa_t *spa = dp->dp_spa;
365	tx_state_t *tx = &dp->dp_tx;
366	callb_cpr_t cpr;
367	uint64_t start, delta;
368
369	txg_thread_enter(tx, &cpr);
370
371	start = delta = 0;
372	for (;;) {
373		uint64_t timer, timeout = zfs_txg_timeout * hz;
374		uint64_t txg;
375
376		/*
377		 * We sync when we're scanning, there's someone waiting
378		 * on us, or the quiesce thread has handed off a txg to
379		 * us, or we have reached our timeout.
380		 */
381		timer = (delta >= timeout ? 0 : timeout - delta);
382		while (!dsl_scan_active(dp->dp_scan) &&
383		    !tx->tx_exiting && timer > 0 &&
384		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
385		    tx->tx_quiesced_txg == 0) {
386			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
387			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
388			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
389			delta = ddi_get_lbolt() - start;
390			timer = (delta > timeout ? 0 : timeout - delta);
391		}
392
393		/*
394		 * Wait until the quiesce thread hands off a txg to us,
395		 * prompting it to do so if necessary.
396		 */
397		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
398			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
399				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
400			cv_broadcast(&tx->tx_quiesce_more_cv);
401			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
402		}
403
404		if (tx->tx_exiting)
405			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
406
407		/*
408		 * Consume the quiesced txg which has been handed off to
409		 * us.  This may cause the quiescing thread to now be
410		 * able to quiesce another txg, so we must signal it.
411		 */
412		txg = tx->tx_quiesced_txg;
413		tx->tx_quiesced_txg = 0;
414		tx->tx_syncing_txg = txg;
415		cv_broadcast(&tx->tx_quiesce_more_cv);
416
417		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
418		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
419		mutex_exit(&tx->tx_sync_lock);
420
421		start = ddi_get_lbolt();
422		spa_sync(spa, txg);
423		delta = ddi_get_lbolt() - start;
424
425		mutex_enter(&tx->tx_sync_lock);
426		tx->tx_synced_txg = txg;
427		tx->tx_syncing_txg = 0;
428		cv_broadcast(&tx->tx_sync_done_cv);
429
430		/*
431		 * Dispatch commit callbacks to worker threads.
432		 */
433		txg_dispatch_callbacks(dp, txg);
434	}
435}
436
437static void
438txg_quiesce_thread(void *arg)
439{
440	dsl_pool_t *dp = arg;
441	tx_state_t *tx = &dp->dp_tx;
442	callb_cpr_t cpr;
443
444	txg_thread_enter(tx, &cpr);
445
446	for (;;) {
447		uint64_t txg;
448
449		/*
450		 * We quiesce when there's someone waiting on us.
451		 * However, we can only have one txg in "quiescing" or
452		 * "quiesced, waiting to sync" state.  So we wait until
453		 * the "quiesced, waiting to sync" txg has been consumed
454		 * by the sync thread.
455		 */
456		while (!tx->tx_exiting &&
457		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
458		    tx->tx_quiesced_txg != 0))
459			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
460
461		if (tx->tx_exiting)
462			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
463
464		txg = tx->tx_open_txg;
465		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
466		    txg, tx->tx_quiesce_txg_waiting,
467		    tx->tx_sync_txg_waiting);
468		mutex_exit(&tx->tx_sync_lock);
469		txg_quiesce(dp, txg);
470		mutex_enter(&tx->tx_sync_lock);
471
472		/*
473		 * Hand this txg off to the sync thread.
474		 */
475		dprintf("quiesce done, handing off txg %llu\n", txg);
476		tx->tx_quiesced_txg = txg;
477		cv_broadcast(&tx->tx_sync_more_cv);
478		cv_broadcast(&tx->tx_quiesce_done_cv);
479	}
480}
481
482/*
483 * Delay this thread by 'ticks' if we are still in the open transaction
484 * group and there is already a waiting txg quiesing or quiesced.  Abort
485 * the delay if this txg stalls or enters the quiesing state.
486 */
487void
488txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
489{
490	tx_state_t *tx = &dp->dp_tx;
491	clock_t timeout = ddi_get_lbolt() + ticks;
492
493	/* don't delay if this txg could transition to quiesing immediately */
494	if (tx->tx_open_txg > txg ||
495	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
496		return;
497
498	mutex_enter(&tx->tx_sync_lock);
499	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
500		mutex_exit(&tx->tx_sync_lock);
501		return;
502	}
503
504	while (ddi_get_lbolt() < timeout &&
505	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
506		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
507		    timeout - ddi_get_lbolt());
508
509	mutex_exit(&tx->tx_sync_lock);
510}
511
512void
513txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
514{
515	tx_state_t *tx = &dp->dp_tx;
516
517	mutex_enter(&tx->tx_sync_lock);
518	ASSERT(tx->tx_threads == 2);
519	if (txg == 0)
520		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
521	if (tx->tx_sync_txg_waiting < txg)
522		tx->tx_sync_txg_waiting = txg;
523	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
524	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
525	while (tx->tx_synced_txg < txg) {
526		dprintf("broadcasting sync more "
527		    "tx_synced=%llu waiting=%llu dp=%p\n",
528		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
529		cv_broadcast(&tx->tx_sync_more_cv);
530		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
531	}
532	mutex_exit(&tx->tx_sync_lock);
533}
534
535void
536txg_wait_open(dsl_pool_t *dp, uint64_t txg)
537{
538	tx_state_t *tx = &dp->dp_tx;
539
540	mutex_enter(&tx->tx_sync_lock);
541	ASSERT(tx->tx_threads == 2);
542	if (txg == 0)
543		txg = tx->tx_open_txg + 1;
544	if (tx->tx_quiesce_txg_waiting < txg)
545		tx->tx_quiesce_txg_waiting = txg;
546	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
547	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
548	while (tx->tx_open_txg < txg) {
549		cv_broadcast(&tx->tx_quiesce_more_cv);
550		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
551	}
552	mutex_exit(&tx->tx_sync_lock);
553}
554
555boolean_t
556txg_stalled(dsl_pool_t *dp)
557{
558	tx_state_t *tx = &dp->dp_tx;
559	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
560}
561
562boolean_t
563txg_sync_waiting(dsl_pool_t *dp)
564{
565	tx_state_t *tx = &dp->dp_tx;
566
567	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
568	    tx->tx_quiesced_txg != 0);
569}
570
571/*
572 * Per-txg object lists.
573 */
574void
575txg_list_create(txg_list_t *tl, size_t offset)
576{
577	int t;
578
579	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
580
581	tl->tl_offset = offset;
582
583	for (t = 0; t < TXG_SIZE; t++)
584		tl->tl_head[t] = NULL;
585}
586
587void
588txg_list_destroy(txg_list_t *tl)
589{
590	int t;
591
592	for (t = 0; t < TXG_SIZE; t++)
593		ASSERT(txg_list_empty(tl, t));
594
595	mutex_destroy(&tl->tl_lock);
596}
597
598int
599txg_list_empty(txg_list_t *tl, uint64_t txg)
600{
601	return (tl->tl_head[txg & TXG_MASK] == NULL);
602}
603
604/*
605 * Add an entry to the list.
606 * Returns 0 if it's a new entry, 1 if it's already there.
607 */
608int
609txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
610{
611	int t = txg & TXG_MASK;
612	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
613	int already_on_list;
614
615	mutex_enter(&tl->tl_lock);
616	already_on_list = tn->tn_member[t];
617	if (!already_on_list) {
618		tn->tn_member[t] = 1;
619		tn->tn_next[t] = tl->tl_head[t];
620		tl->tl_head[t] = tn;
621	}
622	mutex_exit(&tl->tl_lock);
623
624	return (already_on_list);
625}
626
627/*
628 * Add an entry to the end of the list (walks list to find end).
629 * Returns 0 if it's a new entry, 1 if it's already there.
630 */
631int
632txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
633{
634	int t = txg & TXG_MASK;
635	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
636	int already_on_list;
637
638	mutex_enter(&tl->tl_lock);
639	already_on_list = tn->tn_member[t];
640	if (!already_on_list) {
641		txg_node_t **tp;
642
643		for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
644			continue;
645
646		tn->tn_member[t] = 1;
647		tn->tn_next[t] = NULL;
648		*tp = tn;
649	}
650	mutex_exit(&tl->tl_lock);
651
652	return (already_on_list);
653}
654
655/*
656 * Remove the head of the list and return it.
657 */
658void *
659txg_list_remove(txg_list_t *tl, uint64_t txg)
660{
661	int t = txg & TXG_MASK;
662	txg_node_t *tn;
663	void *p = NULL;
664
665	mutex_enter(&tl->tl_lock);
666	if ((tn = tl->tl_head[t]) != NULL) {
667		p = (char *)tn - tl->tl_offset;
668		tl->tl_head[t] = tn->tn_next[t];
669		tn->tn_next[t] = NULL;
670		tn->tn_member[t] = 0;
671	}
672	mutex_exit(&tl->tl_lock);
673
674	return (p);
675}
676
677/*
678 * Remove a specific item from the list and return it.
679 */
680void *
681txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
682{
683	int t = txg & TXG_MASK;
684	txg_node_t *tn, **tp;
685
686	mutex_enter(&tl->tl_lock);
687
688	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
689		if ((char *)tn - tl->tl_offset == p) {
690			*tp = tn->tn_next[t];
691			tn->tn_next[t] = NULL;
692			tn->tn_member[t] = 0;
693			mutex_exit(&tl->tl_lock);
694			return (p);
695		}
696	}
697
698	mutex_exit(&tl->tl_lock);
699
700	return (NULL);
701}
702
703int
704txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
705{
706	int t = txg & TXG_MASK;
707	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
708
709	return (tn->tn_member[t]);
710}
711
712/*
713 * Walk a txg list -- only safe if you know it's not changing.
714 */
715void *
716txg_list_head(txg_list_t *tl, uint64_t txg)
717{
718	int t = txg & TXG_MASK;
719	txg_node_t *tn = tl->tl_head[t];
720
721	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
722}
723
724void *
725txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
726{
727	int t = txg & TXG_MASK;
728	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
729
730	tn = tn->tn_next[t];
731
732	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
733}
734