txg.c revision 208047
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26#include <sys/zfs_context.h> 27#include <sys/txg_impl.h> 28#include <sys/dmu_impl.h> 29#include <sys/dsl_pool.h> 30#include <sys/callb.h> 31 32/* 33 * Pool-wide transaction groups. 34 */ 35 36static void txg_sync_thread(void *arg); 37static void txg_quiesce_thread(void *arg); 38 39int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ 40extern int zfs_txg_synctime; 41extern uint64_t zfs_write_limit_override; 42 43SYSCTL_DECL(_vfs_zfs); 44SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, 45 "ZFS transaction groups (TXG)"); 46TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout); 47SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0, 48 "Maximum seconds worth of delta per txg"); 49TUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime); 50SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime, 51 0, "Target seconds to sync a txg"); 52TUNABLE_QUAD("vfs.zfs.txg.write_limit_override", &zfs_write_limit_override); 53SYSCTL_QUAD(_vfs_zfs_txg, OID_AUTO, write_limit_override, CTLFLAG_RW, 54 &zfs_write_limit_override, 0, 55 "Override maximum size of a txg to this size in bytes, " 56 "value of 0 means don't override"); 57 58/* 59 * Prepare the txg subsystem. 60 */ 61void 62txg_init(dsl_pool_t *dp, uint64_t txg) 63{ 64 tx_state_t *tx = &dp->dp_tx; 65 int c; 66 bzero(tx, sizeof (tx_state_t)); 67 68 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 69 70 for (c = 0; c < max_ncpus; c++) { 71 int i; 72 73 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 74 for (i = 0; i < TXG_SIZE; i++) { 75 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 76 NULL); 77 } 78 } 79 80 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 81 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 82 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 83 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 84 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 85 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 86 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 87 88 tx->tx_open_txg = txg; 89} 90 91/* 92 * Close down the txg subsystem. 93 */ 94void 95txg_fini(dsl_pool_t *dp) 96{ 97 tx_state_t *tx = &dp->dp_tx; 98 int c; 99 100 ASSERT(tx->tx_threads == 0); 101 102 cv_destroy(&tx->tx_exit_cv); 103 cv_destroy(&tx->tx_quiesce_done_cv); 104 cv_destroy(&tx->tx_quiesce_more_cv); 105 cv_destroy(&tx->tx_sync_done_cv); 106 cv_destroy(&tx->tx_sync_more_cv); 107 rw_destroy(&tx->tx_suspend); 108 mutex_destroy(&tx->tx_sync_lock); 109 110 for (c = 0; c < max_ncpus; c++) { 111 int i; 112 113 mutex_destroy(&tx->tx_cpu[c].tc_lock); 114 for (i = 0; i < TXG_SIZE; i++) 115 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 116 } 117 118 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 119 120 bzero(tx, sizeof (tx_state_t)); 121} 122 123/* 124 * Start syncing transaction groups. 125 */ 126void 127txg_sync_start(dsl_pool_t *dp) 128{ 129 tx_state_t *tx = &dp->dp_tx; 130 131 mutex_enter(&tx->tx_sync_lock); 132 133 dprintf("pool %p\n", dp); 134 135 ASSERT(tx->tx_threads == 0); 136 137 tx->tx_threads = 2; 138 139 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 140 dp, 0, &p0, TS_RUN, minclsyspri); 141 142 /* 143 * The sync thread can need a larger-than-default stack size on 144 * 32-bit x86. This is due in part to nested pools and 145 * scrub_visitbp() recursion. 146 */ 147 tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread, 148 dp, 0, &p0, TS_RUN, minclsyspri); 149 150 mutex_exit(&tx->tx_sync_lock); 151} 152 153static void 154txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 155{ 156 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 157 mutex_enter(&tx->tx_sync_lock); 158} 159 160static void 161txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 162{ 163 ASSERT(*tpp != NULL); 164 *tpp = NULL; 165 tx->tx_threads--; 166 cv_broadcast(&tx->tx_exit_cv); 167 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 168 thread_exit(); 169} 170 171static void 172txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 173{ 174 CALLB_CPR_SAFE_BEGIN(cpr); 175 176 if (time) 177 (void) cv_timedwait(cv, &tx->tx_sync_lock, time); 178 else 179 cv_wait(cv, &tx->tx_sync_lock); 180 181 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 182} 183 184/* 185 * Stop syncing transaction groups. 186 */ 187void 188txg_sync_stop(dsl_pool_t *dp) 189{ 190 tx_state_t *tx = &dp->dp_tx; 191 192 dprintf("pool %p\n", dp); 193 /* 194 * Finish off any work in progress. 195 */ 196 ASSERT(tx->tx_threads == 2); 197 txg_wait_synced(dp, 0); 198 199 /* 200 * Wake all sync threads and wait for them to die. 201 */ 202 mutex_enter(&tx->tx_sync_lock); 203 204 ASSERT(tx->tx_threads == 2); 205 206 tx->tx_exiting = 1; 207 208 cv_broadcast(&tx->tx_quiesce_more_cv); 209 cv_broadcast(&tx->tx_quiesce_done_cv); 210 cv_broadcast(&tx->tx_sync_more_cv); 211 212 while (tx->tx_threads != 0) 213 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 214 215 tx->tx_exiting = 0; 216 217 mutex_exit(&tx->tx_sync_lock); 218} 219 220uint64_t 221txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 222{ 223 tx_state_t *tx = &dp->dp_tx; 224 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 225 uint64_t txg; 226 227 mutex_enter(&tc->tc_lock); 228 229 txg = tx->tx_open_txg; 230 tc->tc_count[txg & TXG_MASK]++; 231 232 th->th_cpu = tc; 233 th->th_txg = txg; 234 235 return (txg); 236} 237 238void 239txg_rele_to_quiesce(txg_handle_t *th) 240{ 241 tx_cpu_t *tc = th->th_cpu; 242 243 mutex_exit(&tc->tc_lock); 244} 245 246void 247txg_rele_to_sync(txg_handle_t *th) 248{ 249 tx_cpu_t *tc = th->th_cpu; 250 int g = th->th_txg & TXG_MASK; 251 252 mutex_enter(&tc->tc_lock); 253 ASSERT(tc->tc_count[g] != 0); 254 if (--tc->tc_count[g] == 0) 255 cv_broadcast(&tc->tc_cv[g]); 256 mutex_exit(&tc->tc_lock); 257 258 th->th_cpu = NULL; /* defensive */ 259} 260 261static void 262txg_quiesce(dsl_pool_t *dp, uint64_t txg) 263{ 264 tx_state_t *tx = &dp->dp_tx; 265 int g = txg & TXG_MASK; 266 int c; 267 268 /* 269 * Grab all tx_cpu locks so nobody else can get into this txg. 270 */ 271 for (c = 0; c < max_ncpus; c++) 272 mutex_enter(&tx->tx_cpu[c].tc_lock); 273 274 ASSERT(txg == tx->tx_open_txg); 275 tx->tx_open_txg++; 276 277 /* 278 * Now that we've incremented tx_open_txg, we can let threads 279 * enter the next transaction group. 280 */ 281 for (c = 0; c < max_ncpus; c++) 282 mutex_exit(&tx->tx_cpu[c].tc_lock); 283 284 /* 285 * Quiesce the transaction group by waiting for everyone to txg_exit(). 286 */ 287 for (c = 0; c < max_ncpus; c++) { 288 tx_cpu_t *tc = &tx->tx_cpu[c]; 289 mutex_enter(&tc->tc_lock); 290 while (tc->tc_count[g] != 0) 291 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 292 mutex_exit(&tc->tc_lock); 293 } 294} 295 296static void 297txg_sync_thread(void *arg) 298{ 299 dsl_pool_t *dp = arg; 300 tx_state_t *tx = &dp->dp_tx; 301 callb_cpr_t cpr; 302 uint64_t start, delta; 303 304 txg_thread_enter(tx, &cpr); 305 306 start = delta = 0; 307 for (;;) { 308 uint64_t timer, timeout = zfs_txg_timeout * hz; 309 uint64_t txg; 310 311 /* 312 * We sync when we're scrubbing, there's someone waiting 313 * on us, or the quiesce thread has handed off a txg to 314 * us, or we have reached our timeout. 315 */ 316 timer = (delta >= timeout ? 0 : timeout - delta); 317 while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || 318 spa_shutting_down(dp->dp_spa)) && 319 !tx->tx_exiting && timer > 0 && 320 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 321 tx->tx_quiesced_txg == 0) { 322 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 323 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 324 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 325 delta = LBOLT - start; 326 timer = (delta > timeout ? 0 : timeout - delta); 327 } 328 329 /* 330 * Wait until the quiesce thread hands off a txg to us, 331 * prompting it to do so if necessary. 332 */ 333 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 334 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 335 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 336 cv_broadcast(&tx->tx_quiesce_more_cv); 337 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 338 } 339 340 if (tx->tx_exiting) 341 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 342 343 rw_enter(&tx->tx_suspend, RW_WRITER); 344 345 /* 346 * Consume the quiesced txg which has been handed off to 347 * us. This may cause the quiescing thread to now be 348 * able to quiesce another txg, so we must signal it. 349 */ 350 txg = tx->tx_quiesced_txg; 351 tx->tx_quiesced_txg = 0; 352 tx->tx_syncing_txg = txg; 353 cv_broadcast(&tx->tx_quiesce_more_cv); 354 rw_exit(&tx->tx_suspend); 355 356 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 357 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 358 mutex_exit(&tx->tx_sync_lock); 359 360 start = LBOLT; 361 spa_sync(dp->dp_spa, txg); 362 delta = LBOLT - start; 363 364 mutex_enter(&tx->tx_sync_lock); 365 rw_enter(&tx->tx_suspend, RW_WRITER); 366 tx->tx_synced_txg = txg; 367 tx->tx_syncing_txg = 0; 368 rw_exit(&tx->tx_suspend); 369 cv_broadcast(&tx->tx_sync_done_cv); 370 } 371} 372 373static void 374txg_quiesce_thread(void *arg) 375{ 376 dsl_pool_t *dp = arg; 377 tx_state_t *tx = &dp->dp_tx; 378 callb_cpr_t cpr; 379 380 txg_thread_enter(tx, &cpr); 381 382 for (;;) { 383 uint64_t txg; 384 385 /* 386 * We quiesce when there's someone waiting on us. 387 * However, we can only have one txg in "quiescing" or 388 * "quiesced, waiting to sync" state. So we wait until 389 * the "quiesced, waiting to sync" txg has been consumed 390 * by the sync thread. 391 */ 392 while (!tx->tx_exiting && 393 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 394 tx->tx_quiesced_txg != 0)) 395 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 396 397 if (tx->tx_exiting) 398 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 399 400 txg = tx->tx_open_txg; 401 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 402 txg, tx->tx_quiesce_txg_waiting, 403 tx->tx_sync_txg_waiting); 404 mutex_exit(&tx->tx_sync_lock); 405 txg_quiesce(dp, txg); 406 mutex_enter(&tx->tx_sync_lock); 407 408 /* 409 * Hand this txg off to the sync thread. 410 */ 411 dprintf("quiesce done, handing off txg %llu\n", txg); 412 tx->tx_quiesced_txg = txg; 413 cv_broadcast(&tx->tx_sync_more_cv); 414 cv_broadcast(&tx->tx_quiesce_done_cv); 415 } 416} 417 418/* 419 * Delay this thread by 'ticks' if we are still in the open transaction 420 * group and there is already a waiting txg quiesing or quiesced. Abort 421 * the delay if this txg stalls or enters the quiesing state. 422 */ 423void 424txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 425{ 426 tx_state_t *tx = &dp->dp_tx; 427 int timeout = LBOLT + ticks; 428 429 /* don't delay if this txg could transition to quiesing immediately */ 430 if (tx->tx_open_txg > txg || 431 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 432 return; 433 434 mutex_enter(&tx->tx_sync_lock); 435 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 436 mutex_exit(&tx->tx_sync_lock); 437 return; 438 } 439 440 while (LBOLT < timeout && 441 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 442 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 443 timeout - LBOLT); 444 445 mutex_exit(&tx->tx_sync_lock); 446} 447 448void 449txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 450{ 451 tx_state_t *tx = &dp->dp_tx; 452 453 mutex_enter(&tx->tx_sync_lock); 454 ASSERT(tx->tx_threads == 2); 455 if (txg == 0) 456 txg = tx->tx_open_txg; 457 if (tx->tx_sync_txg_waiting < txg) 458 tx->tx_sync_txg_waiting = txg; 459 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 460 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 461 while (tx->tx_synced_txg < txg) { 462 dprintf("broadcasting sync more " 463 "tx_synced=%llu waiting=%llu dp=%p\n", 464 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 465 cv_broadcast(&tx->tx_sync_more_cv); 466 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 467 } 468 mutex_exit(&tx->tx_sync_lock); 469} 470 471void 472txg_wait_open(dsl_pool_t *dp, uint64_t txg) 473{ 474 tx_state_t *tx = &dp->dp_tx; 475 476 mutex_enter(&tx->tx_sync_lock); 477 ASSERT(tx->tx_threads == 2); 478 if (txg == 0) 479 txg = tx->tx_open_txg + 1; 480 if (tx->tx_quiesce_txg_waiting < txg) 481 tx->tx_quiesce_txg_waiting = txg; 482 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 483 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 484 while (tx->tx_open_txg < txg) { 485 cv_broadcast(&tx->tx_quiesce_more_cv); 486 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 487 } 488 mutex_exit(&tx->tx_sync_lock); 489} 490 491boolean_t 492txg_stalled(dsl_pool_t *dp) 493{ 494 tx_state_t *tx = &dp->dp_tx; 495 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 496} 497 498boolean_t 499txg_sync_waiting(dsl_pool_t *dp) 500{ 501 tx_state_t *tx = &dp->dp_tx; 502 503 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 504 tx->tx_quiesced_txg != 0); 505} 506 507void 508txg_suspend(dsl_pool_t *dp) 509{ 510 tx_state_t *tx = &dp->dp_tx; 511 /* XXX some code paths suspend when they are already suspended! */ 512 rw_enter(&tx->tx_suspend, RW_READER); 513} 514 515void 516txg_resume(dsl_pool_t *dp) 517{ 518 tx_state_t *tx = &dp->dp_tx; 519 rw_exit(&tx->tx_suspend); 520} 521 522/* 523 * Per-txg object lists. 524 */ 525void 526txg_list_create(txg_list_t *tl, size_t offset) 527{ 528 int t; 529 530 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 531 532 tl->tl_offset = offset; 533 534 for (t = 0; t < TXG_SIZE; t++) 535 tl->tl_head[t] = NULL; 536} 537 538void 539txg_list_destroy(txg_list_t *tl) 540{ 541 int t; 542 543 for (t = 0; t < TXG_SIZE; t++) 544 ASSERT(txg_list_empty(tl, t)); 545 546 mutex_destroy(&tl->tl_lock); 547} 548 549int 550txg_list_empty(txg_list_t *tl, uint64_t txg) 551{ 552 return (tl->tl_head[txg & TXG_MASK] == NULL); 553} 554 555/* 556 * Add an entry to the list. 557 * Returns 0 if it's a new entry, 1 if it's already there. 558 */ 559int 560txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 561{ 562 int t = txg & TXG_MASK; 563 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 564 int already_on_list; 565 566 mutex_enter(&tl->tl_lock); 567 already_on_list = tn->tn_member[t]; 568 if (!already_on_list) { 569 tn->tn_member[t] = 1; 570 tn->tn_next[t] = tl->tl_head[t]; 571 tl->tl_head[t] = tn; 572 } 573 mutex_exit(&tl->tl_lock); 574 575 return (already_on_list); 576} 577 578/* 579 * Remove the head of the list and return it. 580 */ 581void * 582txg_list_remove(txg_list_t *tl, uint64_t txg) 583{ 584 int t = txg & TXG_MASK; 585 txg_node_t *tn; 586 void *p = NULL; 587 588 mutex_enter(&tl->tl_lock); 589 if ((tn = tl->tl_head[t]) != NULL) { 590 p = (char *)tn - tl->tl_offset; 591 tl->tl_head[t] = tn->tn_next[t]; 592 tn->tn_next[t] = NULL; 593 tn->tn_member[t] = 0; 594 } 595 mutex_exit(&tl->tl_lock); 596 597 return (p); 598} 599 600/* 601 * Remove a specific item from the list and return it. 602 */ 603void * 604txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 605{ 606 int t = txg & TXG_MASK; 607 txg_node_t *tn, **tp; 608 609 mutex_enter(&tl->tl_lock); 610 611 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 612 if ((char *)tn - tl->tl_offset == p) { 613 *tp = tn->tn_next[t]; 614 tn->tn_next[t] = NULL; 615 tn->tn_member[t] = 0; 616 mutex_exit(&tl->tl_lock); 617 return (p); 618 } 619 } 620 621 mutex_exit(&tl->tl_lock); 622 623 return (NULL); 624} 625 626int 627txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 628{ 629 int t = txg & TXG_MASK; 630 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 631 632 return (tn->tn_member[t]); 633} 634 635/* 636 * Walk a txg list -- only safe if you know it's not changing. 637 */ 638void * 639txg_list_head(txg_list_t *tl, uint64_t txg) 640{ 641 int t = txg & TXG_MASK; 642 txg_node_t *tn = tl->tl_head[t]; 643 644 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 645} 646 647void * 648txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 649{ 650 int t = txg & TXG_MASK; 651 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 652 653 tn = tn->tn_next[t]; 654 655 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 656} 657