txg.c revision 217319
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26#include <sys/zfs_context.h> 27#include <sys/txg_impl.h> 28#include <sys/dmu_impl.h> 29#include <sys/dsl_pool.h> 30#include <sys/callb.h> 31 32/* 33 * Pool-wide transaction groups. 34 */ 35 36static void txg_sync_thread(void *arg); 37static void txg_quiesce_thread(void *arg); 38 39int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ 40extern int zfs_txg_synctime; 41extern uint64_t zfs_write_limit_override; 42 43SYSCTL_DECL(_vfs_zfs); 44SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, 45 "ZFS transaction groups (TXG)"); 46TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 47SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0, 48 "Maximum seconds worth of delta per txg"); 49TUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime); 50SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime, 51 0, "Target seconds to sync a txg"); 52TUNABLE_QUAD("vfs.zfs.txg.write_limit_override", &zfs_write_limit_override); 53SYSCTL_UQUAD(_vfs_zfs_txg, OID_AUTO, write_limit_override, CTLFLAG_RW, 54 &zfs_write_limit_override, 0, 55 "Override maximum size of a txg to this size in bytes, " 56 "value of 0 means don't override"); 57 58/* 59 * Prepare the txg subsystem. 60 */ 61void 62txg_init(dsl_pool_t *dp, uint64_t txg) 63{ 64 tx_state_t *tx = &dp->dp_tx; 65 int c; 66 bzero(tx, sizeof (tx_state_t)); 67 68 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 69 70 for (c = 0; c < max_ncpus; c++) { 71 int i; 72 73 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 74 for (i = 0; i < TXG_SIZE; i++) { 75 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 76 NULL); 77 } 78 } 79 80 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 81 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 82 83 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 84 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 85 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 86 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 87 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 88 89 tx->tx_open_txg = txg; 90} 91 92/* 93 * Close down the txg subsystem. 94 */ 95void 96txg_fini(dsl_pool_t *dp) 97{ 98 tx_state_t *tx = &dp->dp_tx; 99 int c; 100 101 ASSERT(tx->tx_threads == 0); 102 103 rw_destroy(&tx->tx_suspend); 104 mutex_destroy(&tx->tx_sync_lock); 105 106 cv_destroy(&tx->tx_sync_more_cv); 107 cv_destroy(&tx->tx_sync_done_cv); 108 cv_destroy(&tx->tx_quiesce_more_cv); 109 cv_destroy(&tx->tx_quiesce_done_cv); 110 cv_destroy(&tx->tx_exit_cv); 111 112 for (c = 0; c < max_ncpus; c++) { 113 int i; 114 115 mutex_destroy(&tx->tx_cpu[c].tc_lock); 116 for (i = 0; i < TXG_SIZE; i++) 117 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 118 } 119 120 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 121 122 bzero(tx, sizeof (tx_state_t)); 123} 124 125/* 126 * Start syncing transaction groups. 127 */ 128void 129txg_sync_start(dsl_pool_t *dp) 130{ 131 tx_state_t *tx = &dp->dp_tx; 132 133 mutex_enter(&tx->tx_sync_lock); 134 135 dprintf("pool %p\n", dp); 136 137 ASSERT(tx->tx_threads == 0); 138 139 tx->tx_threads = 2; 140 141 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 142 dp, 0, &p0, TS_RUN, minclsyspri); 143 144 /* 145 * The sync thread can need a larger-than-default stack size on 146 * 32-bit x86. This is due in part to nested pools and 147 * scrub_visitbp() recursion. 148 */ 149 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, 150 dp, 0, &p0, TS_RUN, minclsyspri); 151 152 mutex_exit(&tx->tx_sync_lock); 153} 154 155static void 156txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 157{ 158 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 159 mutex_enter(&tx->tx_sync_lock); 160} 161 162static void 163txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 164{ 165 ASSERT(*tpp != NULL); 166 *tpp = NULL; 167 tx->tx_threads--; 168 cv_broadcast(&tx->tx_exit_cv); 169 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 170 thread_exit(); 171} 172 173static void 174txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 175{ 176 CALLB_CPR_SAFE_BEGIN(cpr); 177 178 if (time) 179 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 180 else 181 cv_wait(cv, &tx->tx_sync_lock); 182 183 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 184} 185 186/* 187 * Stop syncing transaction groups. 188 */ 189void 190txg_sync_stop(dsl_pool_t *dp) 191{ 192 tx_state_t *tx = &dp->dp_tx; 193 194 dprintf("pool %p\n", dp); 195 /* 196 * Finish off any work in progress. 197 */ 198 ASSERT(tx->tx_threads == 2); 199 txg_wait_synced(dp, 0); 200 201 /* 202 * Wake all sync threads and wait for them to die. 203 */ 204 mutex_enter(&tx->tx_sync_lock); 205 206 ASSERT(tx->tx_threads == 2); 207 208 tx->tx_exiting = 1; 209 210 cv_broadcast(&tx->tx_quiesce_more_cv); 211 cv_broadcast(&tx->tx_quiesce_done_cv); 212 cv_broadcast(&tx->tx_sync_more_cv); 213 214 while (tx->tx_threads != 0) 215 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 216 217 tx->tx_exiting = 0; 218 219 mutex_exit(&tx->tx_sync_lock); 220} 221 222uint64_t 223txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 224{ 225 tx_state_t *tx = &dp->dp_tx; 226 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 227 uint64_t txg; 228 229 mutex_enter(&tc->tc_lock); 230 231 txg = tx->tx_open_txg; 232 tc->tc_count[txg & TXG_MASK]++; 233 234 th->th_cpu = tc; 235 th->th_txg = txg; 236 237 return (txg); 238} 239 240void 241txg_rele_to_quiesce(txg_handle_t *th) 242{ 243 tx_cpu_t *tc = th->th_cpu; 244 245 mutex_exit(&tc->tc_lock); 246} 247 248void 249txg_rele_to_sync(txg_handle_t *th) 250{ 251 tx_cpu_t *tc = th->th_cpu; 252 int g = th->th_txg & TXG_MASK; 253 254 mutex_enter(&tc->tc_lock); 255 ASSERT(tc->tc_count[g] != 0); 256 if (--tc->tc_count[g] == 0) 257 cv_broadcast(&tc->tc_cv[g]); 258 mutex_exit(&tc->tc_lock); 259 260 th->th_cpu = NULL; /* defensive */ 261} 262 263static void 264txg_quiesce(dsl_pool_t *dp, uint64_t txg) 265{ 266 tx_state_t *tx = &dp->dp_tx; 267 int g = txg & TXG_MASK; 268 int c; 269 270 /* 271 * Grab all tx_cpu locks so nobody else can get into this txg. 272 */ 273 for (c = 0; c < max_ncpus; c++) 274 mutex_enter(&tx->tx_cpu[c].tc_lock); 275 276 ASSERT(txg == tx->tx_open_txg); 277 tx->tx_open_txg++; 278 279 /* 280 * Now that we've incremented tx_open_txg, we can let threads 281 * enter the next transaction group. 282 */ 283 for (c = 0; c < max_ncpus; c++) 284 mutex_exit(&tx->tx_cpu[c].tc_lock); 285 286 /* 287 * Quiesce the transaction group by waiting for everyone to txg_exit(). 288 */ 289 for (c = 0; c < max_ncpus; c++) { 290 tx_cpu_t *tc = &tx->tx_cpu[c]; 291 mutex_enter(&tc->tc_lock); 292 while (tc->tc_count[g] != 0) 293 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 294 mutex_exit(&tc->tc_lock); 295 } 296} 297 298static void 299txg_sync_thread(void *arg) 300{ 301 dsl_pool_t *dp = arg; 302 tx_state_t *tx = &dp->dp_tx; 303 callb_cpr_t cpr; 304 uint64_t start, delta; 305 306 txg_thread_enter(tx, &cpr); 307 308 start = delta = 0; 309 for (;;) { 310 uint64_t timer, timeout = zfs_txg_timeout * hz; 311 uint64_t txg; 312 313 /* 314 * We sync when we're scrubbing, there's someone waiting 315 * on us, or the quiesce thread has handed off a txg to 316 * us, or we have reached our timeout. 317 */ 318 timer = (delta >= timeout ? 0 : timeout - delta); 319 while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || 320 spa_shutting_down(dp->dp_spa)) && 321 !tx->tx_exiting && timer > 0 && 322 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 323 tx->tx_quiesced_txg == 0) { 324 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 325 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 326 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 327 delta = LBOLT - start; 328 timer = (delta > timeout ? 0 : timeout - delta); 329 } 330 331 /* 332 * Wait until the quiesce thread hands off a txg to us, 333 * prompting it to do so if necessary. 334 */ 335 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 336 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 337 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 338 cv_broadcast(&tx->tx_quiesce_more_cv); 339 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 340 } 341 342 if (tx->tx_exiting) 343 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 344 345 rw_enter(&tx->tx_suspend, RW_WRITER); 346 347 /* 348 * Consume the quiesced txg which has been handed off to 349 * us. This may cause the quiescing thread to now be 350 * able to quiesce another txg, so we must signal it. 351 */ 352 txg = tx->tx_quiesced_txg; 353 tx->tx_quiesced_txg = 0; 354 tx->tx_syncing_txg = txg; 355 cv_broadcast(&tx->tx_quiesce_more_cv); 356 rw_exit(&tx->tx_suspend); 357 358 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 359 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 360 mutex_exit(&tx->tx_sync_lock); 361 362 start = LBOLT; 363 spa_sync(dp->dp_spa, txg); 364 delta = LBOLT - start; 365 366 mutex_enter(&tx->tx_sync_lock); 367 rw_enter(&tx->tx_suspend, RW_WRITER); 368 tx->tx_synced_txg = txg; 369 tx->tx_syncing_txg = 0; 370 rw_exit(&tx->tx_suspend); 371 cv_broadcast(&tx->tx_sync_done_cv); 372 } 373} 374 375static void 376txg_quiesce_thread(void *arg) 377{ 378 dsl_pool_t *dp = arg; 379 tx_state_t *tx = &dp->dp_tx; 380 callb_cpr_t cpr; 381 382 txg_thread_enter(tx, &cpr); 383 384 for (;;) { 385 uint64_t txg; 386 387 /* 388 * We quiesce when there's someone waiting on us. 389 * However, we can only have one txg in "quiescing" or 390 * "quiesced, waiting to sync" state. So we wait until 391 * the "quiesced, waiting to sync" txg has been consumed 392 * by the sync thread. 393 */ 394 while (!tx->tx_exiting && 395 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 396 tx->tx_quiesced_txg != 0)) 397 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 398 399 if (tx->tx_exiting) 400 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 401 402 txg = tx->tx_open_txg; 403 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 404 txg, tx->tx_quiesce_txg_waiting, 405 tx->tx_sync_txg_waiting); 406 mutex_exit(&tx->tx_sync_lock); 407 txg_quiesce(dp, txg); 408 mutex_enter(&tx->tx_sync_lock); 409 410 /* 411 * Hand this txg off to the sync thread. 412 */ 413 dprintf("quiesce done, handing off txg %llu\n", txg); 414 tx->tx_quiesced_txg = txg; 415 cv_broadcast(&tx->tx_sync_more_cv); 416 cv_broadcast(&tx->tx_quiesce_done_cv); 417 } 418} 419 420/* 421 * Delay this thread by 'ticks' if we are still in the open transaction 422 * group and there is already a waiting txg quiesing or quiesced. Abort 423 * the delay if this txg stalls or enters the quiesing state. 424 */ 425void 426txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 427{ 428 tx_state_t *tx = &dp->dp_tx; 429 int timeout = LBOLT + ticks; 430 431 /* don't delay if this txg could transition to quiesing immediately */ 432 if (tx->tx_open_txg > txg || 433 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 434 return; 435 436 mutex_enter(&tx->tx_sync_lock); 437 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 438 mutex_exit(&tx->tx_sync_lock); 439 return; 440 } 441 442 while (LBOLT < timeout && 443 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 444 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 445 timeout - LBOLT); 446 447 mutex_exit(&tx->tx_sync_lock); 448} 449 450void 451txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 452{ 453 tx_state_t *tx = &dp->dp_tx; 454 455 mutex_enter(&tx->tx_sync_lock); 456 ASSERT(tx->tx_threads == 2); 457 if (txg == 0) 458 txg = tx->tx_open_txg; 459 if (tx->tx_sync_txg_waiting < txg) 460 tx->tx_sync_txg_waiting = txg; 461 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 462 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 463 while (tx->tx_synced_txg < txg) { 464 dprintf("broadcasting sync more " 465 "tx_synced=%llu waiting=%llu dp=%p\n", 466 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 467 cv_broadcast(&tx->tx_sync_more_cv); 468 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 469 } 470 mutex_exit(&tx->tx_sync_lock); 471} 472 473void 474txg_wait_open(dsl_pool_t *dp, uint64_t txg) 475{ 476 tx_state_t *tx = &dp->dp_tx; 477 478 mutex_enter(&tx->tx_sync_lock); 479 ASSERT(tx->tx_threads == 2); 480 if (txg == 0) 481 txg = tx->tx_open_txg + 1; 482 if (tx->tx_quiesce_txg_waiting < txg) 483 tx->tx_quiesce_txg_waiting = txg; 484 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 485 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 486 while (tx->tx_open_txg < txg) { 487 cv_broadcast(&tx->tx_quiesce_more_cv); 488 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 489 } 490 mutex_exit(&tx->tx_sync_lock); 491} 492 493boolean_t 494txg_stalled(dsl_pool_t *dp) 495{ 496 tx_state_t *tx = &dp->dp_tx; 497 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 498} 499 500boolean_t 501txg_sync_waiting(dsl_pool_t *dp) 502{ 503 tx_state_t *tx = &dp->dp_tx; 504 505 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 506 tx->tx_quiesced_txg != 0); 507} 508 509void 510txg_suspend(dsl_pool_t *dp) 511{ 512 tx_state_t *tx = &dp->dp_tx; 513 /* XXX some code paths suspend when they are already suspended! */ 514 rw_enter(&tx->tx_suspend, RW_READER); 515} 516 517void 518txg_resume(dsl_pool_t *dp) 519{ 520 tx_state_t *tx = &dp->dp_tx; 521 rw_exit(&tx->tx_suspend); 522} 523 524/* 525 * Per-txg object lists. 526 */ 527void 528txg_list_create(txg_list_t *tl, size_t offset) 529{ 530 int t; 531 532 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 533 534 tl->tl_offset = offset; 535 536 for (t = 0; t < TXG_SIZE; t++) 537 tl->tl_head[t] = NULL; 538} 539 540void 541txg_list_destroy(txg_list_t *tl) 542{ 543 int t; 544 545 for (t = 0; t < TXG_SIZE; t++) 546 ASSERT(txg_list_empty(tl, t)); 547 548 mutex_destroy(&tl->tl_lock); 549} 550 551int 552txg_list_empty(txg_list_t *tl, uint64_t txg) 553{ 554 return (tl->tl_head[txg & TXG_MASK] == NULL); 555} 556 557/* 558 * Add an entry to the list. 559 * Returns 0 if it's a new entry, 1 if it's already there. 560 */ 561int 562txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 563{ 564 int t = txg & TXG_MASK; 565 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 566 int already_on_list; 567 568 mutex_enter(&tl->tl_lock); 569 already_on_list = tn->tn_member[t]; 570 if (!already_on_list) { 571 tn->tn_member[t] = 1; 572 tn->tn_next[t] = tl->tl_head[t]; 573 tl->tl_head[t] = tn; 574 } 575 mutex_exit(&tl->tl_lock); 576 577 return (already_on_list); 578} 579 580/* 581 * Remove the head of the list and return it. 582 */ 583void * 584txg_list_remove(txg_list_t *tl, uint64_t txg) 585{ 586 int t = txg & TXG_MASK; 587 txg_node_t *tn; 588 void *p = NULL; 589 590 mutex_enter(&tl->tl_lock); 591 if ((tn = tl->tl_head[t]) != NULL) { 592 p = (char *)tn - tl->tl_offset; 593 tl->tl_head[t] = tn->tn_next[t]; 594 tn->tn_next[t] = NULL; 595 tn->tn_member[t] = 0; 596 } 597 mutex_exit(&tl->tl_lock); 598 599 return (p); 600} 601 602/* 603 * Remove a specific item from the list and return it. 604 */ 605void * 606txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 607{ 608 int t = txg & TXG_MASK; 609 txg_node_t *tn, **tp; 610 611 mutex_enter(&tl->tl_lock); 612 613 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 614 if ((char *)tn - tl->tl_offset == p) { 615 *tp = tn->tn_next[t]; 616 tn->tn_next[t] = NULL; 617 tn->tn_member[t] = 0; 618 mutex_exit(&tl->tl_lock); 619 return (p); 620 } 621 } 622 623 mutex_exit(&tl->tl_lock); 624 625 return (NULL); 626} 627 628int 629txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 630{ 631 int t = txg & TXG_MASK; 632 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 633 634 return (tn->tn_member[t]); 635} 636 637/* 638 * Walk a txg list -- only safe if you know it's not changing. 639 */ 640void * 641txg_list_head(txg_list_t *tl, uint64_t txg) 642{ 643 int t = txg & TXG_MASK; 644 txg_node_t *tn = tl->tl_head[t]; 645 646 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 647} 648 649void * 650txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 651{ 652 int t = txg & TXG_MASK; 653 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 654 655 tn = tn->tn_next[t]; 656 657 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 658} 659