txg.c revision 226724
149876Syokota/* 249876Syokota * CDDL HEADER START 349876Syokota * 449876Syokota * The contents of this file are subject to the terms of the 549876Syokota * Common Development and Distribution License (the "License"). 649876Syokota * You may not use this file except in compliance with the License. 749876Syokota * 849876Syokota * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 949876Syokota * or http://www.opensolaris.org/os/licensing. 1049876Syokota * See the License for the specific language governing permissions 1149876Syokota * and limitations under the License. 1249876Syokota * 1349876Syokota * When distributing Covered Code, include this CDDL HEADER in each 1449876Syokota * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 1549876Syokota * If applicable, add the following below this CDDL HEADER, with the 1649876Syokota * fields enclosed by brackets "[]" replaced with your own identifying 1749876Syokota * information: Portions Copyright [yyyy] [name of copyright owner] 1849876Syokota * 1949876Syokota * CDDL HEADER END 2049876Syokota */ 2149876Syokota/* 2249876Syokota * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 2349876Syokota * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org> 2449876Syokota */ 2549876Syokota 2649876Syokota#include <sys/zfs_context.h> 2749876Syokota#include <sys/txg_impl.h> 2849876Syokota#include <sys/dmu_impl.h> 2949876Syokota#include <sys/dmu_tx.h> 3049876Syokota#include <sys/dsl_pool.h> 3149876Syokota#include <sys/dsl_scan.h> 3249876Syokota#include <sys/callb.h> 3349876Syokota 3449876Syokota/* 3549876Syokota * Pool-wide transaction groups. 3649876Syokota */ 3749876Syokota 3849876Syokotastatic void txg_sync_thread(void *arg); 3949876Syokotastatic void txg_quiesce_thread(void *arg); 4049876Syokota 4149876Syokotaint zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 4249876Syokota 4349876SyokotaSYSCTL_DECL(_vfs_zfs); 4449876SyokotaSYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG"); 4549876SyokotaTUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 4649876SyokotaSYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0, 4749876Syokota "Maximum seconds worth of delta per txg"); 4849876Syokota 4949876Syokota/* 5049876Syokota * Prepare the txg subsystem. 5149876Syokota */ 5249876Syokotavoid 5349876Syokotatxg_init(dsl_pool_t *dp, uint64_t txg) 5449876Syokota{ 5549876Syokota tx_state_t *tx = &dp->dp_tx; 5649876Syokota int c; 5749876Syokota bzero(tx, sizeof (tx_state_t)); 5849876Syokota 5949876Syokota tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 6049876Syokota 6149876Syokota for (c = 0; c < max_ncpus; c++) { 6249876Syokota int i; 6349876Syokota 6449876Syokota mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 6549876Syokota for (i = 0; i < TXG_SIZE; i++) { 6649876Syokota cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 6749876Syokota NULL); 6849876Syokota list_create(&tx->tx_cpu[c].tc_callbacks[i], 6949876Syokota sizeof (dmu_tx_callback_t), 7049876Syokota offsetof(dmu_tx_callback_t, dcb_node)); 7149876Syokota } 7249876Syokota } 7349876Syokota 7449876Syokota mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 7549876Syokota 7649876Syokota cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 7749876Syokota cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 7849876Syokota cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 7949876Syokota cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 8049876Syokota cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 8149876Syokota 8249876Syokota tx->tx_open_txg = txg; 8349876Syokota} 8449876Syokota 8549876Syokota/* 8649876Syokota * Close down the txg subsystem. 8749876Syokota */ 8849876Syokotavoid 8949876Syokotatxg_fini(dsl_pool_t *dp) 9049876Syokota{ 9149876Syokota tx_state_t *tx = &dp->dp_tx; 9249876Syokota int c; 9349876Syokota 9449876Syokota ASSERT(tx->tx_threads == 0); 9549876Syokota 9649876Syokota mutex_destroy(&tx->tx_sync_lock); 9749876Syokota 9849876Syokota cv_destroy(&tx->tx_sync_more_cv); 9949876Syokota cv_destroy(&tx->tx_sync_done_cv); 10049876Syokota cv_destroy(&tx->tx_quiesce_more_cv); 10149876Syokota cv_destroy(&tx->tx_quiesce_done_cv); 10249876Syokota cv_destroy(&tx->tx_exit_cv); 10349876Syokota 10449876Syokota for (c = 0; c < max_ncpus; c++) { 10549876Syokota int i; 10649876Syokota 10749876Syokota mutex_destroy(&tx->tx_cpu[c].tc_lock); 10849876Syokota for (i = 0; i < TXG_SIZE; i++) { 10949876Syokota cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 11049876Syokota list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 11149876Syokota } 11249876Syokota } 11349876Syokota 11449876Syokota if (tx->tx_commit_cb_taskq != NULL) 11549876Syokota taskq_destroy(tx->tx_commit_cb_taskq); 11649876Syokota 11749876Syokota kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 11849876Syokota 11949876Syokota bzero(tx, sizeof (tx_state_t)); 12049876Syokota} 12149876Syokota 12249876Syokota/* 12349876Syokota * Start syncing transaction groups. 12449876Syokota */ 12549876Syokotavoid 12649876Syokotatxg_sync_start(dsl_pool_t *dp) 12749876Syokota{ 12849876Syokota tx_state_t *tx = &dp->dp_tx; 12949876Syokota 13049876Syokota mutex_enter(&tx->tx_sync_lock); 13149876Syokota 13249876Syokota dprintf("pool %p\n", dp); 13349876Syokota 13449876Syokota ASSERT(tx->tx_threads == 0); 13549876Syokota 13649876Syokota tx->tx_threads = 2; 13749876Syokota 13849876Syokota tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 13949876Syokota dp, 0, &p0, TS_RUN, minclsyspri); 14049876Syokota 14149876Syokota /* 14249876Syokota * The sync thread can need a larger-than-default stack size on 14349876Syokota * 32-bit x86. This is due in part to nested pools and 14449876Syokota * scrub_visitbp() recursion. 145 */ 146 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 147 dp, 0, &p0, TS_RUN, minclsyspri); 148 149 mutex_exit(&tx->tx_sync_lock); 150} 151 152static void 153txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 154{ 155 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 156 mutex_enter(&tx->tx_sync_lock); 157} 158 159static void 160txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 161{ 162 ASSERT(*tpp != NULL); 163 *tpp = NULL; 164 tx->tx_threads--; 165 cv_broadcast(&tx->tx_exit_cv); 166 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 167 thread_exit(); 168} 169 170static void 171txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 172{ 173 CALLB_CPR_SAFE_BEGIN(cpr); 174 175 if (time) 176 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 177 else 178 cv_wait(cv, &tx->tx_sync_lock); 179 180 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 181} 182 183/* 184 * Stop syncing transaction groups. 185 */ 186void 187txg_sync_stop(dsl_pool_t *dp) 188{ 189 tx_state_t *tx = &dp->dp_tx; 190 191 dprintf("pool %p\n", dp); 192 /* 193 * Finish off any work in progress. 194 */ 195 ASSERT(tx->tx_threads == 2); 196 197 /* 198 * We need to ensure that we've vacated the deferred space_maps. 199 */ 200 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 201 202 /* 203 * Wake all sync threads and wait for them to die. 204 */ 205 mutex_enter(&tx->tx_sync_lock); 206 207 ASSERT(tx->tx_threads == 2); 208 209 tx->tx_exiting = 1; 210 211 cv_broadcast(&tx->tx_quiesce_more_cv); 212 cv_broadcast(&tx->tx_quiesce_done_cv); 213 cv_broadcast(&tx->tx_sync_more_cv); 214 215 while (tx->tx_threads != 0) 216 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 217 218 tx->tx_exiting = 0; 219 220 mutex_exit(&tx->tx_sync_lock); 221} 222 223uint64_t 224txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 225{ 226 tx_state_t *tx = &dp->dp_tx; 227 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 228 uint64_t txg; 229 230 mutex_enter(&tc->tc_lock); 231 232 txg = tx->tx_open_txg; 233 tc->tc_count[txg & TXG_MASK]++; 234 235 th->th_cpu = tc; 236 th->th_txg = txg; 237 238 return (txg); 239} 240 241void 242txg_rele_to_quiesce(txg_handle_t *th) 243{ 244 tx_cpu_t *tc = th->th_cpu; 245 246 mutex_exit(&tc->tc_lock); 247} 248 249void 250txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 251{ 252 tx_cpu_t *tc = th->th_cpu; 253 int g = th->th_txg & TXG_MASK; 254 255 mutex_enter(&tc->tc_lock); 256 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 257 mutex_exit(&tc->tc_lock); 258} 259 260void 261txg_rele_to_sync(txg_handle_t *th) 262{ 263 tx_cpu_t *tc = th->th_cpu; 264 int g = th->th_txg & TXG_MASK; 265 266 mutex_enter(&tc->tc_lock); 267 ASSERT(tc->tc_count[g] != 0); 268 if (--tc->tc_count[g] == 0) 269 cv_broadcast(&tc->tc_cv[g]); 270 mutex_exit(&tc->tc_lock); 271 272 th->th_cpu = NULL; /* defensive */ 273} 274 275static void 276txg_quiesce(dsl_pool_t *dp, uint64_t txg) 277{ 278 tx_state_t *tx = &dp->dp_tx; 279 int g = txg & TXG_MASK; 280 int c; 281 282 /* 283 * Grab all tx_cpu locks so nobody else can get into this txg. 284 */ 285 for (c = 0; c < max_ncpus; c++) 286 mutex_enter(&tx->tx_cpu[c].tc_lock); 287 288 ASSERT(txg == tx->tx_open_txg); 289 tx->tx_open_txg++; 290 291 /* 292 * Now that we've incremented tx_open_txg, we can let threads 293 * enter the next transaction group. 294 */ 295 for (c = 0; c < max_ncpus; c++) 296 mutex_exit(&tx->tx_cpu[c].tc_lock); 297 298 /* 299 * Quiesce the transaction group by waiting for everyone to txg_exit(). 300 */ 301 for (c = 0; c < max_ncpus; c++) { 302 tx_cpu_t *tc = &tx->tx_cpu[c]; 303 mutex_enter(&tc->tc_lock); 304 while (tc->tc_count[g] != 0) 305 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 306 mutex_exit(&tc->tc_lock); 307 } 308} 309 310static void 311txg_do_callbacks(void *arg) 312{ 313 list_t *cb_list = arg; 314 315 dmu_tx_do_callbacks(cb_list, 0); 316 317 list_destroy(cb_list); 318 319 kmem_free(cb_list, sizeof (list_t)); 320} 321 322/* 323 * Dispatch the commit callbacks registered on this txg to worker threads. 324 */ 325static void 326txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 327{ 328 int c; 329 tx_state_t *tx = &dp->dp_tx; 330 list_t *cb_list; 331 332 for (c = 0; c < max_ncpus; c++) { 333 tx_cpu_t *tc = &tx->tx_cpu[c]; 334 /* No need to lock tx_cpu_t at this point */ 335 336 int g = txg & TXG_MASK; 337 338 if (list_is_empty(&tc->tc_callbacks[g])) 339 continue; 340 341 if (tx->tx_commit_cb_taskq == NULL) { 342 /* 343 * Commit callback taskq hasn't been created yet. 344 */ 345 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 346 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 347 TASKQ_PREPOPULATE); 348 } 349 350 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 351 list_create(cb_list, sizeof (dmu_tx_callback_t), 352 offsetof(dmu_tx_callback_t, dcb_node)); 353 354 list_move_tail(&tc->tc_callbacks[g], cb_list); 355 356 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 357 txg_do_callbacks, cb_list, TQ_SLEEP); 358 } 359} 360 361static void 362txg_sync_thread(void *arg) 363{ 364 dsl_pool_t *dp = arg; 365 spa_t *spa = dp->dp_spa; 366 tx_state_t *tx = &dp->dp_tx; 367 callb_cpr_t cpr; 368 uint64_t start, delta; 369 370 txg_thread_enter(tx, &cpr); 371 372 start = delta = 0; 373 for (;;) { 374 uint64_t timer, timeout = zfs_txg_timeout * hz; 375 uint64_t txg; 376 377 /* 378 * We sync when we're scanning, there's someone waiting 379 * on us, or the quiesce thread has handed off a txg to 380 * us, or we have reached our timeout. 381 */ 382 timer = (delta >= timeout ? 0 : timeout - delta); 383 while (!dsl_scan_active(dp->dp_scan) && 384 !tx->tx_exiting && timer > 0 && 385 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 386 tx->tx_quiesced_txg == 0) { 387 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 388 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 389 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 390 delta = ddi_get_lbolt() - start; 391 timer = (delta > timeout ? 0 : timeout - delta); 392 } 393 394 /* 395 * Wait until the quiesce thread hands off a txg to us, 396 * prompting it to do so if necessary. 397 */ 398 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 399 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 400 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 401 cv_broadcast(&tx->tx_quiesce_more_cv); 402 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 403 } 404 405 if (tx->tx_exiting) 406 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 407 408 /* 409 * Consume the quiesced txg which has been handed off to 410 * us. This may cause the quiescing thread to now be 411 * able to quiesce another txg, so we must signal it. 412 */ 413 txg = tx->tx_quiesced_txg; 414 tx->tx_quiesced_txg = 0; 415 tx->tx_syncing_txg = txg; 416 cv_broadcast(&tx->tx_quiesce_more_cv); 417 418 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 419 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 420 mutex_exit(&tx->tx_sync_lock); 421 422 start = ddi_get_lbolt(); 423 spa_sync(spa, txg); 424 delta = ddi_get_lbolt() - start; 425 426 mutex_enter(&tx->tx_sync_lock); 427 tx->tx_synced_txg = txg; 428 tx->tx_syncing_txg = 0; 429 cv_broadcast(&tx->tx_sync_done_cv); 430 431 /* 432 * Dispatch commit callbacks to worker threads. 433 */ 434 txg_dispatch_callbacks(dp, txg); 435 } 436} 437 438static void 439txg_quiesce_thread(void *arg) 440{ 441 dsl_pool_t *dp = arg; 442 tx_state_t *tx = &dp->dp_tx; 443 callb_cpr_t cpr; 444 445 txg_thread_enter(tx, &cpr); 446 447 for (;;) { 448 uint64_t txg; 449 450 /* 451 * We quiesce when there's someone waiting on us. 452 * However, we can only have one txg in "quiescing" or 453 * "quiesced, waiting to sync" state. So we wait until 454 * the "quiesced, waiting to sync" txg has been consumed 455 * by the sync thread. 456 */ 457 while (!tx->tx_exiting && 458 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 459 tx->tx_quiesced_txg != 0)) 460 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 461 462 if (tx->tx_exiting) 463 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 464 465 txg = tx->tx_open_txg; 466 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 467 txg, tx->tx_quiesce_txg_waiting, 468 tx->tx_sync_txg_waiting); 469 mutex_exit(&tx->tx_sync_lock); 470 txg_quiesce(dp, txg); 471 mutex_enter(&tx->tx_sync_lock); 472 473 /* 474 * Hand this txg off to the sync thread. 475 */ 476 dprintf("quiesce done, handing off txg %llu\n", txg); 477 tx->tx_quiesced_txg = txg; 478 cv_broadcast(&tx->tx_sync_more_cv); 479 cv_broadcast(&tx->tx_quiesce_done_cv); 480 } 481} 482 483/* 484 * Delay this thread by 'ticks' if we are still in the open transaction 485 * group and there is already a waiting txg quiesing or quiesced. Abort 486 * the delay if this txg stalls or enters the quiesing state. 487 */ 488void 489txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 490{ 491 tx_state_t *tx = &dp->dp_tx; 492 clock_t timeout = ddi_get_lbolt() + ticks; 493 494 /* don't delay if this txg could transition to quiesing immediately */ 495 if (tx->tx_open_txg > txg || 496 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 497 return; 498 499 mutex_enter(&tx->tx_sync_lock); 500 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 501 mutex_exit(&tx->tx_sync_lock); 502 return; 503 } 504 505 while (ddi_get_lbolt() < timeout && 506 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 507 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 508 timeout - ddi_get_lbolt()); 509 510 mutex_exit(&tx->tx_sync_lock); 511} 512 513void 514txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 515{ 516 tx_state_t *tx = &dp->dp_tx; 517 518 mutex_enter(&tx->tx_sync_lock); 519 ASSERT(tx->tx_threads == 2); 520 if (txg == 0) 521 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 522 if (tx->tx_sync_txg_waiting < txg) 523 tx->tx_sync_txg_waiting = txg; 524 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 525 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 526 while (tx->tx_synced_txg < txg) { 527 dprintf("broadcasting sync more " 528 "tx_synced=%llu waiting=%llu dp=%p\n", 529 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 530 cv_broadcast(&tx->tx_sync_more_cv); 531 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 532 } 533 mutex_exit(&tx->tx_sync_lock); 534} 535 536void 537txg_wait_open(dsl_pool_t *dp, uint64_t txg) 538{ 539 tx_state_t *tx = &dp->dp_tx; 540 541 mutex_enter(&tx->tx_sync_lock); 542 ASSERT(tx->tx_threads == 2); 543 if (txg == 0) 544 txg = tx->tx_open_txg + 1; 545 if (tx->tx_quiesce_txg_waiting < txg) 546 tx->tx_quiesce_txg_waiting = txg; 547 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 548 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 549 while (tx->tx_open_txg < txg) { 550 cv_broadcast(&tx->tx_quiesce_more_cv); 551 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 552 } 553 mutex_exit(&tx->tx_sync_lock); 554} 555 556boolean_t 557txg_stalled(dsl_pool_t *dp) 558{ 559 tx_state_t *tx = &dp->dp_tx; 560 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 561} 562 563boolean_t 564txg_sync_waiting(dsl_pool_t *dp) 565{ 566 tx_state_t *tx = &dp->dp_tx; 567 568 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 569 tx->tx_quiesced_txg != 0); 570} 571 572/* 573 * Per-txg object lists. 574 */ 575void 576txg_list_create(txg_list_t *tl, size_t offset) 577{ 578 int t; 579 580 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 581 582 tl->tl_offset = offset; 583 584 for (t = 0; t < TXG_SIZE; t++) 585 tl->tl_head[t] = NULL; 586} 587 588void 589txg_list_destroy(txg_list_t *tl) 590{ 591 int t; 592 593 for (t = 0; t < TXG_SIZE; t++) 594 ASSERT(txg_list_empty(tl, t)); 595 596 mutex_destroy(&tl->tl_lock); 597} 598 599int 600txg_list_empty(txg_list_t *tl, uint64_t txg) 601{ 602 return (tl->tl_head[txg & TXG_MASK] == NULL); 603} 604 605/* 606 * Add an entry to the list. 607 * Returns 0 if it's a new entry, 1 if it's already there. 608 */ 609int 610txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 611{ 612 int t = txg & TXG_MASK; 613 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 614 int already_on_list; 615 616 mutex_enter(&tl->tl_lock); 617 already_on_list = tn->tn_member[t]; 618 if (!already_on_list) { 619 tn->tn_member[t] = 1; 620 tn->tn_next[t] = tl->tl_head[t]; 621 tl->tl_head[t] = tn; 622 } 623 mutex_exit(&tl->tl_lock); 624 625 return (already_on_list); 626} 627 628/* 629 * Add an entry to the end of the list (walks list to find end). 630 * Returns 0 if it's a new entry, 1 if it's already there. 631 */ 632int 633txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 634{ 635 int t = txg & TXG_MASK; 636 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 637 int already_on_list; 638 639 mutex_enter(&tl->tl_lock); 640 already_on_list = tn->tn_member[t]; 641 if (!already_on_list) { 642 txg_node_t **tp; 643 644 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 645 continue; 646 647 tn->tn_member[t] = 1; 648 tn->tn_next[t] = NULL; 649 *tp = tn; 650 } 651 mutex_exit(&tl->tl_lock); 652 653 return (already_on_list); 654} 655 656/* 657 * Remove the head of the list and return it. 658 */ 659void * 660txg_list_remove(txg_list_t *tl, uint64_t txg) 661{ 662 int t = txg & TXG_MASK; 663 txg_node_t *tn; 664 void *p = NULL; 665 666 mutex_enter(&tl->tl_lock); 667 if ((tn = tl->tl_head[t]) != NULL) { 668 p = (char *)tn - tl->tl_offset; 669 tl->tl_head[t] = tn->tn_next[t]; 670 tn->tn_next[t] = NULL; 671 tn->tn_member[t] = 0; 672 } 673 mutex_exit(&tl->tl_lock); 674 675 return (p); 676} 677 678/* 679 * Remove a specific item from the list and return it. 680 */ 681void * 682txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 683{ 684 int t = txg & TXG_MASK; 685 txg_node_t *tn, **tp; 686 687 mutex_enter(&tl->tl_lock); 688 689 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 690 if ((char *)tn - tl->tl_offset == p) { 691 *tp = tn->tn_next[t]; 692 tn->tn_next[t] = NULL; 693 tn->tn_member[t] = 0; 694 mutex_exit(&tl->tl_lock); 695 return (p); 696 } 697 } 698 699 mutex_exit(&tl->tl_lock); 700 701 return (NULL); 702} 703 704int 705txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 706{ 707 int t = txg & TXG_MASK; 708 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 709 710 return (tn->tn_member[t]); 711} 712 713/* 714 * Walk a txg list -- only safe if you know it's not changing. 715 */ 716void * 717txg_list_head(txg_list_t *tl, uint64_t txg) 718{ 719 int t = txg & TXG_MASK; 720 txg_node_t *tn = tl->tl_head[t]; 721 722 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 723} 724 725void * 726txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 727{ 728 int t = txg & TXG_MASK; 729 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 730 731 tn = tn->tn_next[t]; 732 733 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 734} 735