txg.c revision 224579
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25#include <sys/zfs_context.h> 26#include <sys/txg_impl.h> 27#include <sys/dmu_impl.h> 28#include <sys/dmu_tx.h> 29#include <sys/dsl_pool.h> 30#include <sys/dsl_scan.h> 31#include <sys/callb.h> 32 33/* 34 * Pool-wide transaction groups. 35 */ 36 37static void txg_sync_thread(void *arg); 38static void txg_quiesce_thread(void *arg); 39 40int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 41 42SYSCTL_DECL(_vfs_zfs); 43SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 44TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 45SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0, 46 "Maximum seconds worth of delta per txg"); 47 48/* 49 * Prepare the txg subsystem. 50 */ 51void 52txg_init(dsl_pool_t *dp, uint64_t txg) 53{ 54 tx_state_t *tx = &dp->dp_tx; 55 int c; 56 bzero(tx, sizeof (tx_state_t)); 57 58 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 59 60 for (c = 0; c < max_ncpus; c++) { 61 int i; 62 63 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 64 for (i = 0; i < TXG_SIZE; i++) { 65 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 66 NULL); 67 list_create(&tx->tx_cpu[c].tc_callbacks[i], 68 sizeof (dmu_tx_callback_t), 69 offsetof(dmu_tx_callback_t, dcb_node)); 70 } 71 } 72 73 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 74 75 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 76 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 77 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 78 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 79 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 80 81 tx->tx_open_txg = txg; 82} 83 84/* 85 * Close down the txg subsystem. 86 */ 87void 88txg_fini(dsl_pool_t *dp) 89{ 90 tx_state_t *tx = &dp->dp_tx; 91 int c; 92 93 ASSERT(tx->tx_threads == 0); 94 95 mutex_destroy(&tx->tx_sync_lock); 96 97 cv_destroy(&tx->tx_sync_more_cv); 98 cv_destroy(&tx->tx_sync_done_cv); 99 cv_destroy(&tx->tx_quiesce_more_cv); 100 cv_destroy(&tx->tx_quiesce_done_cv); 101 cv_destroy(&tx->tx_exit_cv); 102 103 for (c = 0; c < max_ncpus; c++) { 104 int i; 105 106 mutex_destroy(&tx->tx_cpu[c].tc_lock); 107 for (i = 0; i < TXG_SIZE; i++) { 108 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 109 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 110 } 111 } 112 113 if (tx->tx_commit_cb_taskq != NULL) 114 taskq_destroy(tx->tx_commit_cb_taskq); 115 116 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 117 118 bzero(tx, sizeof (tx_state_t)); 119} 120 121/* 122 * Start syncing transaction groups. 123 */ 124void 125txg_sync_start(dsl_pool_t *dp) 126{ 127 tx_state_t *tx = &dp->dp_tx; 128 129 mutex_enter(&tx->tx_sync_lock); 130 131 dprintf("pool %p\n", dp); 132 133 ASSERT(tx->tx_threads == 0); 134 135 tx->tx_threads = 2; 136 137 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 138 dp, 0, &p0, TS_RUN, minclsyspri); 139 140 /* 141 * The sync thread can need a larger-than-default stack size on 142 * 32-bit x86. This is due in part to nested pools and 143 * scrub_visitbp() recursion. 144 */ 145 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 146 dp, 0, &p0, TS_RUN, minclsyspri); 147 148 mutex_exit(&tx->tx_sync_lock); 149} 150 151static void 152txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 153{ 154 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 155 mutex_enter(&tx->tx_sync_lock); 156} 157 158static void 159txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 160{ 161 ASSERT(*tpp != NULL); 162 *tpp = NULL; 163 tx->tx_threads--; 164 cv_broadcast(&tx->tx_exit_cv); 165 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 166 thread_exit(); 167} 168 169static void 170txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 171{ 172 CALLB_CPR_SAFE_BEGIN(cpr); 173 174 if (time) 175 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 176 else 177 cv_wait(cv, &tx->tx_sync_lock); 178 179 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 180} 181 182/* 183 * Stop syncing transaction groups. 184 */ 185void 186txg_sync_stop(dsl_pool_t *dp) 187{ 188 tx_state_t *tx = &dp->dp_tx; 189 190 dprintf("pool %p\n", dp); 191 /* 192 * Finish off any work in progress. 193 */ 194 ASSERT(tx->tx_threads == 2); 195 196 /* 197 * We need to ensure that we've vacated the deferred space_maps. 198 */ 199 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 200 201 /* 202 * Wake all sync threads and wait for them to die. 203 */ 204 mutex_enter(&tx->tx_sync_lock); 205 206 ASSERT(tx->tx_threads == 2); 207 208 tx->tx_exiting = 1; 209 210 cv_broadcast(&tx->tx_quiesce_more_cv); 211 cv_broadcast(&tx->tx_quiesce_done_cv); 212 cv_broadcast(&tx->tx_sync_more_cv); 213 214 while (tx->tx_threads != 0) 215 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 216 217 tx->tx_exiting = 0; 218 219 mutex_exit(&tx->tx_sync_lock); 220} 221 222uint64_t 223txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 224{ 225 tx_state_t *tx = &dp->dp_tx; 226 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 227 uint64_t txg; 228 229 mutex_enter(&tc->tc_lock); 230 231 txg = tx->tx_open_txg; 232 tc->tc_count[txg & TXG_MASK]++; 233 234 th->th_cpu = tc; 235 th->th_txg = txg; 236 237 return (txg); 238} 239 240void 241txg_rele_to_quiesce(txg_handle_t *th) 242{ 243 tx_cpu_t *tc = th->th_cpu; 244 245 mutex_exit(&tc->tc_lock); 246} 247 248void 249txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 250{ 251 tx_cpu_t *tc = th->th_cpu; 252 int g = th->th_txg & TXG_MASK; 253 254 mutex_enter(&tc->tc_lock); 255 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 256 mutex_exit(&tc->tc_lock); 257} 258 259void 260txg_rele_to_sync(txg_handle_t *th) 261{ 262 tx_cpu_t *tc = th->th_cpu; 263 int g = th->th_txg & TXG_MASK; 264 265 mutex_enter(&tc->tc_lock); 266 ASSERT(tc->tc_count[g] != 0); 267 if (--tc->tc_count[g] == 0) 268 cv_broadcast(&tc->tc_cv[g]); 269 mutex_exit(&tc->tc_lock); 270 271 th->th_cpu = NULL; /* defensive */ 272} 273 274static void 275txg_quiesce(dsl_pool_t *dp, uint64_t txg) 276{ 277 tx_state_t *tx = &dp->dp_tx; 278 int g = txg & TXG_MASK; 279 int c; 280 281 /* 282 * Grab all tx_cpu locks so nobody else can get into this txg. 283 */ 284 for (c = 0; c < max_ncpus; c++) 285 mutex_enter(&tx->tx_cpu[c].tc_lock); 286 287 ASSERT(txg == tx->tx_open_txg); 288 tx->tx_open_txg++; 289 290 /* 291 * Now that we've incremented tx_open_txg, we can let threads 292 * enter the next transaction group. 293 */ 294 for (c = 0; c < max_ncpus; c++) 295 mutex_exit(&tx->tx_cpu[c].tc_lock); 296 297 /* 298 * Quiesce the transaction group by waiting for everyone to txg_exit(). 299 */ 300 for (c = 0; c < max_ncpus; c++) { 301 tx_cpu_t *tc = &tx->tx_cpu[c]; 302 mutex_enter(&tc->tc_lock); 303 while (tc->tc_count[g] != 0) 304 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 305 mutex_exit(&tc->tc_lock); 306 } 307} 308 309static void 310txg_do_callbacks(void *arg) 311{ 312 list_t *cb_list = arg; 313 314 dmu_tx_do_callbacks(cb_list, 0); 315 316 list_destroy(cb_list); 317 318 kmem_free(cb_list, sizeof (list_t)); 319} 320 321/* 322 * Dispatch the commit callbacks registered on this txg to worker threads. 323 */ 324static void 325txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 326{ 327 int c; 328 tx_state_t *tx = &dp->dp_tx; 329 list_t *cb_list; 330 331 for (c = 0; c < max_ncpus; c++) { 332 tx_cpu_t *tc = &tx->tx_cpu[c]; 333 /* No need to lock tx_cpu_t at this point */ 334 335 int g = txg & TXG_MASK; 336 337 if (list_is_empty(&tc->tc_callbacks[g])) 338 continue; 339 340 if (tx->tx_commit_cb_taskq == NULL) { 341 /* 342 * Commit callback taskq hasn't been created yet. 343 */ 344 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 345 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 346 TASKQ_PREPOPULATE); 347 } 348 349 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 350 list_create(cb_list, sizeof (dmu_tx_callback_t), 351 offsetof(dmu_tx_callback_t, dcb_node)); 352 353 list_move_tail(&tc->tc_callbacks[g], cb_list); 354 355 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 356 txg_do_callbacks, cb_list, TQ_SLEEP); 357 } 358} 359 360static void 361txg_sync_thread(void *arg) 362{ 363 dsl_pool_t *dp = arg; 364 spa_t *spa = dp->dp_spa; 365 tx_state_t *tx = &dp->dp_tx; 366 callb_cpr_t cpr; 367 uint64_t start, delta; 368 369 txg_thread_enter(tx, &cpr); 370 371 start = delta = 0; 372 for (;;) { 373 uint64_t timer, timeout = zfs_txg_timeout * hz; 374 uint64_t txg; 375 376 /* 377 * We sync when we're scanning, there's someone waiting 378 * on us, or the quiesce thread has handed off a txg to 379 * us, or we have reached our timeout. 380 */ 381 timer = (delta >= timeout ? 0 : timeout - delta); 382 while (!dsl_scan_active(dp->dp_scan) && 383 !tx->tx_exiting && timer > 0 && 384 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 385 tx->tx_quiesced_txg == 0) { 386 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 387 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 388 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 389 delta = ddi_get_lbolt() - start; 390 timer = (delta > timeout ? 0 : timeout - delta); 391 } 392 393 /* 394 * Wait until the quiesce thread hands off a txg to us, 395 * prompting it to do so if necessary. 396 */ 397 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 398 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 399 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 400 cv_broadcast(&tx->tx_quiesce_more_cv); 401 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 402 } 403 404 if (tx->tx_exiting) 405 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 406 407 /* 408 * Consume the quiesced txg which has been handed off to 409 * us. This may cause the quiescing thread to now be 410 * able to quiesce another txg, so we must signal it. 411 */ 412 txg = tx->tx_quiesced_txg; 413 tx->tx_quiesced_txg = 0; 414 tx->tx_syncing_txg = txg; 415 cv_broadcast(&tx->tx_quiesce_more_cv); 416 417 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 418 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 419 mutex_exit(&tx->tx_sync_lock); 420 421 start = ddi_get_lbolt(); 422 spa_sync(spa, txg); 423 delta = ddi_get_lbolt() - start; 424 425 mutex_enter(&tx->tx_sync_lock); 426 tx->tx_synced_txg = txg; 427 tx->tx_syncing_txg = 0; 428 cv_broadcast(&tx->tx_sync_done_cv); 429 430 /* 431 * Dispatch commit callbacks to worker threads. 432 */ 433 txg_dispatch_callbacks(dp, txg); 434 } 435} 436 437static void 438txg_quiesce_thread(void *arg) 439{ 440 dsl_pool_t *dp = arg; 441 tx_state_t *tx = &dp->dp_tx; 442 callb_cpr_t cpr; 443 444 txg_thread_enter(tx, &cpr); 445 446 for (;;) { 447 uint64_t txg; 448 449 /* 450 * We quiesce when there's someone waiting on us. 451 * However, we can only have one txg in "quiescing" or 452 * "quiesced, waiting to sync" state. So we wait until 453 * the "quiesced, waiting to sync" txg has been consumed 454 * by the sync thread. 455 */ 456 while (!tx->tx_exiting && 457 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 458 tx->tx_quiesced_txg != 0)) 459 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 460 461 if (tx->tx_exiting) 462 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 463 464 txg = tx->tx_open_txg; 465 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 466 txg, tx->tx_quiesce_txg_waiting, 467 tx->tx_sync_txg_waiting); 468 mutex_exit(&tx->tx_sync_lock); 469 txg_quiesce(dp, txg); 470 mutex_enter(&tx->tx_sync_lock); 471 472 /* 473 * Hand this txg off to the sync thread. 474 */ 475 dprintf("quiesce done, handing off txg %llu\n", txg); 476 tx->tx_quiesced_txg = txg; 477 cv_broadcast(&tx->tx_sync_more_cv); 478 cv_broadcast(&tx->tx_quiesce_done_cv); 479 } 480} 481 482/* 483 * Delay this thread by 'ticks' if we are still in the open transaction 484 * group and there is already a waiting txg quiesing or quiesced. Abort 485 * the delay if this txg stalls or enters the quiesing state. 486 */ 487void 488txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 489{ 490 tx_state_t *tx = &dp->dp_tx; 491 clock_t timeout = ddi_get_lbolt() + ticks; 492 493 /* don't delay if this txg could transition to quiesing immediately */ 494 if (tx->tx_open_txg > txg || 495 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 496 return; 497 498 mutex_enter(&tx->tx_sync_lock); 499 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 500 mutex_exit(&tx->tx_sync_lock); 501 return; 502 } 503 504 while (ddi_get_lbolt() < timeout && 505 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 506 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 507 timeout - ddi_get_lbolt()); 508 509 mutex_exit(&tx->tx_sync_lock); 510} 511 512void 513txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 514{ 515 tx_state_t *tx = &dp->dp_tx; 516 517 mutex_enter(&tx->tx_sync_lock); 518 ASSERT(tx->tx_threads == 2); 519 if (txg == 0) 520 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 521 if (tx->tx_sync_txg_waiting < txg) 522 tx->tx_sync_txg_waiting = txg; 523 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 524 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 525 while (tx->tx_synced_txg < txg) { 526 dprintf("broadcasting sync more " 527 "tx_synced=%llu waiting=%llu dp=%p\n", 528 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 529 cv_broadcast(&tx->tx_sync_more_cv); 530 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 531 } 532 mutex_exit(&tx->tx_sync_lock); 533} 534 535void 536txg_wait_open(dsl_pool_t *dp, uint64_t txg) 537{ 538 tx_state_t *tx = &dp->dp_tx; 539 540 mutex_enter(&tx->tx_sync_lock); 541 ASSERT(tx->tx_threads == 2); 542 if (txg == 0) 543 txg = tx->tx_open_txg + 1; 544 if (tx->tx_quiesce_txg_waiting < txg) 545 tx->tx_quiesce_txg_waiting = txg; 546 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 547 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 548 while (tx->tx_open_txg < txg) { 549 cv_broadcast(&tx->tx_quiesce_more_cv); 550 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 551 } 552 mutex_exit(&tx->tx_sync_lock); 553} 554 555boolean_t 556txg_stalled(dsl_pool_t *dp) 557{ 558 tx_state_t *tx = &dp->dp_tx; 559 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 560} 561 562boolean_t 563txg_sync_waiting(dsl_pool_t *dp) 564{ 565 tx_state_t *tx = &dp->dp_tx; 566 567 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 568 tx->tx_quiesced_txg != 0); 569} 570 571/* 572 * Per-txg object lists. 573 */ 574void 575txg_list_create(txg_list_t *tl, size_t offset) 576{ 577 int t; 578 579 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 580 581 tl->tl_offset = offset; 582 583 for (t = 0; t < TXG_SIZE; t++) 584 tl->tl_head[t] = NULL; 585} 586 587void 588txg_list_destroy(txg_list_t *tl) 589{ 590 int t; 591 592 for (t = 0; t < TXG_SIZE; t++) 593 ASSERT(txg_list_empty(tl, t)); 594 595 mutex_destroy(&tl->tl_lock); 596} 597 598int 599txg_list_empty(txg_list_t *tl, uint64_t txg) 600{ 601 return (tl->tl_head[txg & TXG_MASK] == NULL); 602} 603 604/* 605 * Add an entry to the list. 606 * Returns 0 if it's a new entry, 1 if it's already there. 607 */ 608int 609txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 610{ 611 int t = txg & TXG_MASK; 612 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 613 int already_on_list; 614 615 mutex_enter(&tl->tl_lock); 616 already_on_list = tn->tn_member[t]; 617 if (!already_on_list) { 618 tn->tn_member[t] = 1; 619 tn->tn_next[t] = tl->tl_head[t]; 620 tl->tl_head[t] = tn; 621 } 622 mutex_exit(&tl->tl_lock); 623 624 return (already_on_list); 625} 626 627/* 628 * Add an entry to the end of the list (walks list to find end). 629 * Returns 0 if it's a new entry, 1 if it's already there. 630 */ 631int 632txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 633{ 634 int t = txg & TXG_MASK; 635 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 636 int already_on_list; 637 638 mutex_enter(&tl->tl_lock); 639 already_on_list = tn->tn_member[t]; 640 if (!already_on_list) { 641 txg_node_t **tp; 642 643 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 644 continue; 645 646 tn->tn_member[t] = 1; 647 tn->tn_next[t] = NULL; 648 *tp = tn; 649 } 650 mutex_exit(&tl->tl_lock); 651 652 return (already_on_list); 653} 654 655/* 656 * Remove the head of the list and return it. 657 */ 658void * 659txg_list_remove(txg_list_t *tl, uint64_t txg) 660{ 661 int t = txg & TXG_MASK; 662 txg_node_t *tn; 663 void *p = NULL; 664 665 mutex_enter(&tl->tl_lock); 666 if ((tn = tl->tl_head[t]) != NULL) { 667 p = (char *)tn - tl->tl_offset; 668 tl->tl_head[t] = tn->tn_next[t]; 669 tn->tn_next[t] = NULL; 670 tn->tn_member[t] = 0; 671 } 672 mutex_exit(&tl->tl_lock); 673 674 return (p); 675} 676 677/* 678 * Remove a specific item from the list and return it. 679 */ 680void * 681txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 682{ 683 int t = txg & TXG_MASK; 684 txg_node_t *tn, **tp; 685 686 mutex_enter(&tl->tl_lock); 687 688 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 689 if ((char *)tn - tl->tl_offset == p) { 690 *tp = tn->tn_next[t]; 691 tn->tn_next[t] = NULL; 692 tn->tn_member[t] = 0; 693 mutex_exit(&tl->tl_lock); 694 return (p); 695 } 696 } 697 698 mutex_exit(&tl->tl_lock); 699 700 return (NULL); 701} 702 703int 704txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 705{ 706 int t = txg & TXG_MASK; 707 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 708 709 return (tn->tn_member[t]); 710} 711 712/* 713 * Walk a txg list -- only safe if you know it's not changing. 714 */ 715void * 716txg_list_head(txg_list_t *tl, uint64_t txg) 717{ 718 int t = txg & TXG_MASK; 719 txg_node_t *tn = tl->tl_head[t]; 720 721 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 722} 723 724void * 725txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 726{ 727 int t = txg & TXG_MASK; 728 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 729 730 tn = tn->tn_next[t]; 731 732 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 733} 734