1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved. 25 * Copyright (c) 2014, Joyent, Inc. All rights reserved. 26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved. 27 * Copyright 2014 HybridCluster. All rights reserved. 28 * Copyright 2016 RackTop Systems. 29 * Copyright (c) 2014 Integros [integros.com] 30 * Copyright (c) 2018, loli10K <ezomori.nozomu@gmail.com>. All rights reserved. 31 */ 32 33#include <sys/dmu.h> 34#include <sys/dmu_impl.h> 35#include <sys/dmu_tx.h> 36#include <sys/dbuf.h> 37#include <sys/dnode.h> 38#include <sys/zfs_context.h> 39#include <sys/dmu_objset.h> 40#include <sys/dmu_traverse.h> 41#include <sys/dsl_dataset.h> 42#include <sys/dsl_dir.h> 43#include <sys/dsl_prop.h> 44#include <sys/dsl_pool.h> 45#include <sys/dsl_synctask.h> 46#include <sys/zfs_ioctl.h> 47#include <sys/zap.h> 48#include <sys/zio_checksum.h> 49#include <sys/zfs_znode.h> 50#include <zfs_fletcher.h> 51#include <sys/avl.h> 52#include <sys/ddt.h> 53#include <sys/zfs_onexit.h> 54#include <sys/dmu_send.h> 55#include <sys/dsl_destroy.h> 56#include <sys/blkptr.h> 57#include <sys/dsl_bookmark.h> 58#include <sys/zfeature.h> 59#include <sys/bqueue.h> 60 61#ifdef __FreeBSD__ 62#undef dump_write 63#define dump_write dmu_dump_write 64#endif 65 66/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ 67int zfs_send_corrupt_data = B_FALSE; 68int zfs_send_queue_length = 16 * 1024 * 1024; 69int zfs_recv_queue_length = 16 * 1024 * 1024; 70/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */ 71int zfs_send_set_freerecords_bit = B_TRUE; 72 73#ifdef _KERNEL 74TUNABLE_INT("vfs.zfs.send_set_freerecords_bit", &zfs_send_set_freerecords_bit); 75#endif 76 77static char *dmu_recv_tag = "dmu_recv_tag"; 78const char *recv_clone_name = "%recv"; 79 80/* 81 * Use this to override the recordsize calculation for fast zfs send estimates. 82 */ 83uint64_t zfs_override_estimate_recordsize = 0; 84 85#define BP_SPAN(datablkszsec, indblkshift, level) \ 86 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \ 87 (level) * (indblkshift - SPA_BLKPTRSHIFT))) 88 89static void byteswap_record(dmu_replay_record_t *drr); 90 91struct send_thread_arg { 92 bqueue_t q; 93 dsl_dataset_t *ds; /* Dataset to traverse */ 94 uint64_t fromtxg; /* Traverse from this txg */ 95 int flags; /* flags to pass to traverse_dataset */ 96 int error_code; 97 boolean_t cancel; 98 zbookmark_phys_t resume; 99}; 100 101struct send_block_record { 102 boolean_t eos_marker; /* Marks the end of the stream */ 103 blkptr_t bp; 104 zbookmark_phys_t zb; 105 uint8_t indblkshift; 106 uint16_t datablkszsec; 107 bqueue_node_t ln; 108}; 109 110static int 111dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) 112{ 113 dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os); 114 struct uio auio; 115 struct iovec aiov; 116 117 /* 118 * The code does not rely on this (len being a multiple of 8). We keep 119 * this assertion because of the corresponding assertion in 120 * receive_read(). Keeping this assertion ensures that we do not 121 * inadvertently break backwards compatibility (causing the assertion 122 * in receive_read() to trigger on old software). 123 * 124 * Removing the assertions could be rolled into a new feature that uses 125 * data that isn't 8-byte aligned; if the assertions were removed, a 126 * feature flag would have to be added. 127 */ 128 129 ASSERT0(len % 8); 130 131 aiov.iov_base = buf; 132 aiov.iov_len = len; 133 auio.uio_iov = &aiov; 134 auio.uio_iovcnt = 1; 135 auio.uio_resid = len; 136 auio.uio_segflg = UIO_SYSSPACE; 137 auio.uio_rw = UIO_WRITE; 138 auio.uio_offset = (off_t)-1; 139 auio.uio_td = dsp->dsa_td; 140#ifdef _KERNEL 141 if (dsp->dsa_fp->f_type == DTYPE_VNODE) 142 bwillwrite(); 143 dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0, 144 dsp->dsa_td); 145#else 146 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 147 dsp->dsa_err = EOPNOTSUPP; 148#endif 149 mutex_enter(&ds->ds_sendstream_lock); 150 *dsp->dsa_off += len; 151 mutex_exit(&ds->ds_sendstream_lock); 152 153 return (dsp->dsa_err); 154} 155 156/* 157 * For all record types except BEGIN, fill in the checksum (overlaid in 158 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything 159 * up to the start of the checksum itself. 160 */ 161static int 162dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len) 163{ 164 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 165 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 166 (void) fletcher_4_incremental_native(dsp->dsa_drr, 167 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 168 &dsp->dsa_zc); 169 if (dsp->dsa_drr->drr_type == DRR_BEGIN) { 170 dsp->dsa_sent_begin = B_TRUE; 171 } else { 172 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u. 173 drr_checksum.drr_checksum)); 174 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc; 175 } 176 if (dsp->dsa_drr->drr_type == DRR_END) { 177 dsp->dsa_sent_end = B_TRUE; 178 } 179 (void) fletcher_4_incremental_native(&dsp->dsa_drr-> 180 drr_u.drr_checksum.drr_checksum, 181 sizeof (zio_cksum_t), &dsp->dsa_zc); 182 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0) 183 return (SET_ERROR(EINTR)); 184 if (payload_len != 0) { 185 (void) fletcher_4_incremental_native(payload, payload_len, 186 &dsp->dsa_zc); 187 if (dump_bytes(dsp, payload, payload_len) != 0) 188 return (SET_ERROR(EINTR)); 189 } 190 return (0); 191} 192 193/* 194 * Fill in the drr_free struct, or perform aggregation if the previous record is 195 * also a free record, and the two are adjacent. 196 * 197 * Note that we send free records even for a full send, because we want to be 198 * able to receive a full send as a clone, which requires a list of all the free 199 * and freeobject records that were generated on the source. 200 */ 201static int 202dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 203 uint64_t length) 204{ 205 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free); 206 207 /* 208 * When we receive a free record, dbuf_free_range() assumes 209 * that the receiving system doesn't have any dbufs in the range 210 * being freed. This is always true because there is a one-record 211 * constraint: we only send one WRITE record for any given 212 * object,offset. We know that the one-record constraint is 213 * true because we always send data in increasing order by 214 * object,offset. 215 * 216 * If the increasing-order constraint ever changes, we should find 217 * another way to assert that the one-record constraint is still 218 * satisfied. 219 */ 220 ASSERT(object > dsp->dsa_last_data_object || 221 (object == dsp->dsa_last_data_object && 222 offset > dsp->dsa_last_data_offset)); 223 224 if (length != -1ULL && offset + length < offset) 225 length = -1ULL; 226 227 /* 228 * If there is a pending op, but it's not PENDING_FREE, push it out, 229 * since free block aggregation can only be done for blocks of the 230 * same type (i.e., DRR_FREE records can only be aggregated with 231 * other DRR_FREE records. DRR_FREEOBJECTS records can only be 232 * aggregated with other DRR_FREEOBJECTS records. 233 */ 234 if (dsp->dsa_pending_op != PENDING_NONE && 235 dsp->dsa_pending_op != PENDING_FREE) { 236 if (dump_record(dsp, NULL, 0) != 0) 237 return (SET_ERROR(EINTR)); 238 dsp->dsa_pending_op = PENDING_NONE; 239 } 240 241 if (dsp->dsa_pending_op == PENDING_FREE) { 242 /* 243 * There should never be a PENDING_FREE if length is -1 244 * (because dump_dnode is the only place where this 245 * function is called with a -1, and only after flushing 246 * any pending record). 247 */ 248 ASSERT(length != -1ULL); 249 /* 250 * Check to see whether this free block can be aggregated 251 * with pending one. 252 */ 253 if (drrf->drr_object == object && drrf->drr_offset + 254 drrf->drr_length == offset) { 255 drrf->drr_length += length; 256 return (0); 257 } else { 258 /* not a continuation. Push out pending record */ 259 if (dump_record(dsp, NULL, 0) != 0) 260 return (SET_ERROR(EINTR)); 261 dsp->dsa_pending_op = PENDING_NONE; 262 } 263 } 264 /* create a FREE record and make it pending */ 265 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 266 dsp->dsa_drr->drr_type = DRR_FREE; 267 drrf->drr_object = object; 268 drrf->drr_offset = offset; 269 drrf->drr_length = length; 270 drrf->drr_toguid = dsp->dsa_toguid; 271 if (length == -1ULL) { 272 if (dump_record(dsp, NULL, 0) != 0) 273 return (SET_ERROR(EINTR)); 274 } else { 275 dsp->dsa_pending_op = PENDING_FREE; 276 } 277 278 return (0); 279} 280 281static int 282dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, 283 uint64_t object, uint64_t offset, int lsize, int psize, const blkptr_t *bp, 284 void *data) 285{ 286 uint64_t payload_size; 287 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write); 288 289 /* 290 * We send data in increasing object, offset order. 291 * See comment in dump_free() for details. 292 */ 293 ASSERT(object > dsp->dsa_last_data_object || 294 (object == dsp->dsa_last_data_object && 295 offset > dsp->dsa_last_data_offset)); 296 dsp->dsa_last_data_object = object; 297 dsp->dsa_last_data_offset = offset + lsize - 1; 298 299 /* 300 * If there is any kind of pending aggregation (currently either 301 * a grouping of free objects or free blocks), push it out to 302 * the stream, since aggregation can't be done across operations 303 * of different types. 304 */ 305 if (dsp->dsa_pending_op != PENDING_NONE) { 306 if (dump_record(dsp, NULL, 0) != 0) 307 return (SET_ERROR(EINTR)); 308 dsp->dsa_pending_op = PENDING_NONE; 309 } 310 /* write a WRITE record */ 311 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 312 dsp->dsa_drr->drr_type = DRR_WRITE; 313 drrw->drr_object = object; 314 drrw->drr_type = type; 315 drrw->drr_offset = offset; 316 drrw->drr_toguid = dsp->dsa_toguid; 317 drrw->drr_logical_size = lsize; 318 319 /* only set the compression fields if the buf is compressed */ 320 if (lsize != psize) { 321 ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED); 322 ASSERT(!BP_IS_EMBEDDED(bp)); 323 ASSERT(!BP_SHOULD_BYTESWAP(bp)); 324 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp))); 325 ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF); 326 ASSERT3S(psize, >, 0); 327 ASSERT3S(lsize, >=, psize); 328 329 drrw->drr_compressiontype = BP_GET_COMPRESS(bp); 330 drrw->drr_compressed_size = psize; 331 payload_size = drrw->drr_compressed_size; 332 } else { 333 payload_size = drrw->drr_logical_size; 334 } 335 336 if (bp == NULL || BP_IS_EMBEDDED(bp)) { 337 /* 338 * There's no pre-computed checksum for partial-block 339 * writes or embedded BP's, so (like 340 * fletcher4-checkummed blocks) userland will have to 341 * compute a dedup-capable checksum itself. 342 */ 343 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF; 344 } else { 345 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp); 346 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags & 347 ZCHECKSUM_FLAG_DEDUP) 348 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP; 349 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp)); 350 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp)); 351 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp)); 352 drrw->drr_key.ddk_cksum = bp->blk_cksum; 353 } 354 355 if (dump_record(dsp, data, payload_size) != 0) 356 return (SET_ERROR(EINTR)); 357 return (0); 358} 359 360static int 361dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 362 int blksz, const blkptr_t *bp) 363{ 364 char buf[BPE_PAYLOAD_SIZE]; 365 struct drr_write_embedded *drrw = 366 &(dsp->dsa_drr->drr_u.drr_write_embedded); 367 368 if (dsp->dsa_pending_op != PENDING_NONE) { 369 if (dump_record(dsp, NULL, 0) != 0) 370 return (EINTR); 371 dsp->dsa_pending_op = PENDING_NONE; 372 } 373 374 ASSERT(BP_IS_EMBEDDED(bp)); 375 376 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 377 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED; 378 drrw->drr_object = object; 379 drrw->drr_offset = offset; 380 drrw->drr_length = blksz; 381 drrw->drr_toguid = dsp->dsa_toguid; 382 drrw->drr_compression = BP_GET_COMPRESS(bp); 383 drrw->drr_etype = BPE_GET_ETYPE(bp); 384 drrw->drr_lsize = BPE_GET_LSIZE(bp); 385 drrw->drr_psize = BPE_GET_PSIZE(bp); 386 387 decode_embedded_bp_compressed(bp, buf); 388 389 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0) 390 return (EINTR); 391 return (0); 392} 393 394static int 395dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data) 396{ 397 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill); 398 399 if (dsp->dsa_pending_op != PENDING_NONE) { 400 if (dump_record(dsp, NULL, 0) != 0) 401 return (SET_ERROR(EINTR)); 402 dsp->dsa_pending_op = PENDING_NONE; 403 } 404 405 /* write a SPILL record */ 406 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 407 dsp->dsa_drr->drr_type = DRR_SPILL; 408 drrs->drr_object = object; 409 drrs->drr_length = blksz; 410 drrs->drr_toguid = dsp->dsa_toguid; 411 412 if (dump_record(dsp, data, blksz) != 0) 413 return (SET_ERROR(EINTR)); 414 return (0); 415} 416 417static int 418dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs) 419{ 420 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects); 421 422 /* 423 * If there is a pending op, but it's not PENDING_FREEOBJECTS, 424 * push it out, since free block aggregation can only be done for 425 * blocks of the same type (i.e., DRR_FREE records can only be 426 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records 427 * can only be aggregated with other DRR_FREEOBJECTS records. 428 */ 429 if (dsp->dsa_pending_op != PENDING_NONE && 430 dsp->dsa_pending_op != PENDING_FREEOBJECTS) { 431 if (dump_record(dsp, NULL, 0) != 0) 432 return (SET_ERROR(EINTR)); 433 dsp->dsa_pending_op = PENDING_NONE; 434 } 435 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) { 436 /* 437 * See whether this free object array can be aggregated 438 * with pending one 439 */ 440 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { 441 drrfo->drr_numobjs += numobjs; 442 return (0); 443 } else { 444 /* can't be aggregated. Push out pending record */ 445 if (dump_record(dsp, NULL, 0) != 0) 446 return (SET_ERROR(EINTR)); 447 dsp->dsa_pending_op = PENDING_NONE; 448 } 449 } 450 451 /* write a FREEOBJECTS record */ 452 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 453 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS; 454 drrfo->drr_firstobj = firstobj; 455 drrfo->drr_numobjs = numobjs; 456 drrfo->drr_toguid = dsp->dsa_toguid; 457 458 dsp->dsa_pending_op = PENDING_FREEOBJECTS; 459 460 return (0); 461} 462 463static int 464dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp) 465{ 466 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object); 467 468 if (object < dsp->dsa_resume_object) { 469 /* 470 * Note: when resuming, we will visit all the dnodes in 471 * the block of dnodes that we are resuming from. In 472 * this case it's unnecessary to send the dnodes prior to 473 * the one we are resuming from. We should be at most one 474 * block's worth of dnodes behind the resume point. 475 */ 476 ASSERT3U(dsp->dsa_resume_object - object, <, 477 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)); 478 return (0); 479 } 480 481 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) 482 return (dump_freeobjects(dsp, object, 1)); 483 484 if (dsp->dsa_pending_op != PENDING_NONE) { 485 if (dump_record(dsp, NULL, 0) != 0) 486 return (SET_ERROR(EINTR)); 487 dsp->dsa_pending_op = PENDING_NONE; 488 } 489 490 /* write an OBJECT record */ 491 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 492 dsp->dsa_drr->drr_type = DRR_OBJECT; 493 drro->drr_object = object; 494 drro->drr_type = dnp->dn_type; 495 drro->drr_bonustype = dnp->dn_bonustype; 496 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 497 drro->drr_bonuslen = dnp->dn_bonuslen; 498 drro->drr_checksumtype = dnp->dn_checksum; 499 drro->drr_compress = dnp->dn_compress; 500 drro->drr_toguid = dsp->dsa_toguid; 501 502 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 503 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE) 504 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE; 505 506 if (dump_record(dsp, DN_BONUS(dnp), 507 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) { 508 return (SET_ERROR(EINTR)); 509 } 510 511 /* Free anything past the end of the file. */ 512 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) * 513 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0) 514 return (SET_ERROR(EINTR)); 515 if (dsp->dsa_err != 0) 516 return (SET_ERROR(EINTR)); 517 return (0); 518} 519 520static boolean_t 521backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp) 522{ 523 if (!BP_IS_EMBEDDED(bp)) 524 return (B_FALSE); 525 526 /* 527 * Compression function must be legacy, or explicitly enabled. 528 */ 529 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && 530 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4))) 531 return (B_FALSE); 532 533 /* 534 * Embed type must be explicitly enabled. 535 */ 536 switch (BPE_GET_ETYPE(bp)) { 537 case BP_EMBEDDED_TYPE_DATA: 538 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) 539 return (B_TRUE); 540 break; 541 default: 542 return (B_FALSE); 543 } 544 return (B_FALSE); 545} 546 547/* 548 * This is the callback function to traverse_dataset that acts as the worker 549 * thread for dmu_send_impl. 550 */ 551/*ARGSUSED*/ 552static int 553send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 554 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg) 555{ 556 struct send_thread_arg *sta = arg; 557 struct send_block_record *record; 558 uint64_t record_size; 559 int err = 0; 560 561 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 562 zb->zb_object >= sta->resume.zb_object); 563 564 if (sta->cancel) 565 return (SET_ERROR(EINTR)); 566 567 if (bp == NULL) { 568 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL); 569 return (0); 570 } else if (zb->zb_level < 0) { 571 return (0); 572 } 573 574 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP); 575 record->eos_marker = B_FALSE; 576 record->bp = *bp; 577 record->zb = *zb; 578 record->indblkshift = dnp->dn_indblkshift; 579 record->datablkszsec = dnp->dn_datablkszsec; 580 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 581 bqueue_enqueue(&sta->q, record, record_size); 582 583 return (err); 584} 585 586/* 587 * This function kicks off the traverse_dataset. It also handles setting the 588 * error code of the thread in case something goes wrong, and pushes the End of 589 * Stream record when the traverse_dataset call has finished. If there is no 590 * dataset to traverse, the thread immediately pushes End of Stream marker. 591 */ 592static void 593send_traverse_thread(void *arg) 594{ 595 struct send_thread_arg *st_arg = arg; 596 int err; 597 struct send_block_record *data; 598 599 if (st_arg->ds != NULL) { 600 err = traverse_dataset_resume(st_arg->ds, 601 st_arg->fromtxg, &st_arg->resume, 602 st_arg->flags, send_cb, st_arg); 603 604 if (err != EINTR) 605 st_arg->error_code = err; 606 } 607 data = kmem_zalloc(sizeof (*data), KM_SLEEP); 608 data->eos_marker = B_TRUE; 609 bqueue_enqueue(&st_arg->q, data, 1); 610 thread_exit(); 611} 612 613/* 614 * This function actually handles figuring out what kind of record needs to be 615 * dumped, reading the data (which has hopefully been prefetched), and calling 616 * the appropriate helper function. 617 */ 618static int 619do_dump(dmu_sendarg_t *dsa, struct send_block_record *data) 620{ 621 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os); 622 const blkptr_t *bp = &data->bp; 623 const zbookmark_phys_t *zb = &data->zb; 624 uint8_t indblkshift = data->indblkshift; 625 uint16_t dblkszsec = data->datablkszsec; 626 spa_t *spa = ds->ds_dir->dd_pool->dp_spa; 627 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE; 628 int err = 0; 629 630 ASSERT3U(zb->zb_level, >=, 0); 631 632 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 633 zb->zb_object >= dsa->dsa_resume_object); 634 635 if (zb->zb_object != DMU_META_DNODE_OBJECT && 636 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) { 637 return (0); 638 } else if (BP_IS_HOLE(bp) && 639 zb->zb_object == DMU_META_DNODE_OBJECT) { 640 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 641 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT; 642 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT); 643 } else if (BP_IS_HOLE(bp)) { 644 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 645 uint64_t offset = zb->zb_blkid * span; 646 err = dump_free(dsa, zb->zb_object, offset, span); 647 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) { 648 return (0); 649 } else if (type == DMU_OT_DNODE) { 650 int blksz = BP_GET_LSIZE(bp); 651 arc_flags_t aflags = ARC_FLAG_WAIT; 652 arc_buf_t *abuf; 653 654 ASSERT0(zb->zb_level); 655 656 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 657 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 658 &aflags, zb) != 0) 659 return (SET_ERROR(EIO)); 660 661 dnode_phys_t *blk = abuf->b_data; 662 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT); 663 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) { 664 err = dump_dnode(dsa, dnobj + i, blk + i); 665 if (err != 0) 666 break; 667 } 668 arc_buf_destroy(abuf, &abuf); 669 } else if (type == DMU_OT_SA) { 670 arc_flags_t aflags = ARC_FLAG_WAIT; 671 arc_buf_t *abuf; 672 int blksz = BP_GET_LSIZE(bp); 673 674 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 675 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 676 &aflags, zb) != 0) 677 return (SET_ERROR(EIO)); 678 679 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data); 680 arc_buf_destroy(abuf, &abuf); 681 } else if (backup_do_embed(dsa, bp)) { 682 /* it's an embedded level-0 block of a regular object */ 683 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 684 ASSERT0(zb->zb_level); 685 err = dump_write_embedded(dsa, zb->zb_object, 686 zb->zb_blkid * blksz, blksz, bp); 687 } else { 688 /* it's a level-0 block of a regular object */ 689 arc_flags_t aflags = ARC_FLAG_WAIT; 690 arc_buf_t *abuf; 691 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 692 uint64_t offset; 693 694 /* 695 * If we have large blocks stored on disk but the send flags 696 * don't allow us to send large blocks, we split the data from 697 * the arc buf into chunks. 698 */ 699 boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE && 700 !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS); 701 /* 702 * We should only request compressed data from the ARC if all 703 * the following are true: 704 * - stream compression was requested 705 * - we aren't splitting large blocks into smaller chunks 706 * - the data won't need to be byteswapped before sending 707 * - this isn't an embedded block 708 * - this isn't metadata (if receiving on a different endian 709 * system it can be byteswapped more easily) 710 */ 711 boolean_t request_compressed = 712 (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) && 713 !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) && 714 !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp)); 715 716 ASSERT0(zb->zb_level); 717 ASSERT(zb->zb_object > dsa->dsa_resume_object || 718 (zb->zb_object == dsa->dsa_resume_object && 719 zb->zb_blkid * blksz >= dsa->dsa_resume_offset)); 720 721 ASSERT0(zb->zb_level); 722 ASSERT(zb->zb_object > dsa->dsa_resume_object || 723 (zb->zb_object == dsa->dsa_resume_object && 724 zb->zb_blkid * blksz >= dsa->dsa_resume_offset)); 725 726 ASSERT3U(blksz, ==, BP_GET_LSIZE(bp)); 727 728 enum zio_flag zioflags = ZIO_FLAG_CANFAIL; 729 if (request_compressed) 730 zioflags |= ZIO_FLAG_RAW; 731 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 732 ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) { 733 if (zfs_send_corrupt_data) { 734 /* Send a block filled with 0x"zfs badd bloc" */ 735 abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA, 736 blksz); 737 uint64_t *ptr; 738 for (ptr = abuf->b_data; 739 (char *)ptr < (char *)abuf->b_data + blksz; 740 ptr++) 741 *ptr = 0x2f5baddb10cULL; 742 } else { 743 return (SET_ERROR(EIO)); 744 } 745 } 746 747 offset = zb->zb_blkid * blksz; 748 749 if (split_large_blocks) { 750 ASSERT3U(arc_get_compression(abuf), ==, 751 ZIO_COMPRESS_OFF); 752 char *buf = abuf->b_data; 753 while (blksz > 0 && err == 0) { 754 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE); 755 err = dump_write(dsa, type, zb->zb_object, 756 offset, n, n, NULL, buf); 757 offset += n; 758 buf += n; 759 blksz -= n; 760 } 761 } else { 762 err = dump_write(dsa, type, zb->zb_object, offset, 763 blksz, arc_buf_size(abuf), bp, abuf->b_data); 764 } 765 arc_buf_destroy(abuf, &abuf); 766 } 767 768 ASSERT(err == 0 || err == EINTR); 769 return (err); 770} 771 772/* 773 * Pop the new data off the queue, and free the old data. 774 */ 775static struct send_block_record * 776get_next_record(bqueue_t *bq, struct send_block_record *data) 777{ 778 struct send_block_record *tmp = bqueue_dequeue(bq); 779 kmem_free(data, sizeof (*data)); 780 return (tmp); 781} 782 783/* 784 * Actually do the bulk of the work in a zfs send. 785 * 786 * Note: Releases dp using the specified tag. 787 */ 788static int 789dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds, 790 zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone, 791 boolean_t embedok, boolean_t large_block_ok, boolean_t compressok, 792 int outfd, uint64_t resumeobj, uint64_t resumeoff, 793#ifdef illumos 794 vnode_t *vp, offset_t *off) 795#else 796 struct file *fp, offset_t *off) 797#endif 798{ 799 objset_t *os; 800 dmu_replay_record_t *drr; 801 dmu_sendarg_t *dsp; 802 int err; 803 uint64_t fromtxg = 0; 804 uint64_t featureflags = 0; 805 struct send_thread_arg to_arg = { 0 }; 806 807 err = dmu_objset_from_ds(to_ds, &os); 808 if (err != 0) { 809 dsl_pool_rele(dp, tag); 810 return (err); 811 } 812 813 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP); 814 drr->drr_type = DRR_BEGIN; 815 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; 816 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo, 817 DMU_SUBSTREAM); 818 819#ifdef _KERNEL 820 if (dmu_objset_type(os) == DMU_OST_ZFS) { 821 uint64_t version; 822 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) { 823 kmem_free(drr, sizeof (dmu_replay_record_t)); 824 dsl_pool_rele(dp, tag); 825 return (SET_ERROR(EINVAL)); 826 } 827 if (version >= ZPL_VERSION_SA) { 828 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL; 829 } 830 } 831#endif 832 833 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS]) 834 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS; 835 if (embedok && 836 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) { 837 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA; 838 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 839 featureflags |= DMU_BACKUP_FEATURE_LZ4; 840 } 841 if (compressok) { 842 featureflags |= DMU_BACKUP_FEATURE_COMPRESSED; 843 } 844 if ((featureflags & 845 (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED)) != 846 0 && spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) { 847 featureflags |= DMU_BACKUP_FEATURE_LZ4; 848 } 849 850 if (resumeobj != 0 || resumeoff != 0) { 851 featureflags |= DMU_BACKUP_FEATURE_RESUMING; 852 } 853 854 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo, 855 featureflags); 856 857 drr->drr_u.drr_begin.drr_creation_time = 858 dsl_dataset_phys(to_ds)->ds_creation_time; 859 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os); 860 if (is_clone) 861 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE; 862 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid; 863 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET) 864 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA; 865 if (zfs_send_set_freerecords_bit) 866 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS; 867 868 if (ancestor_zb != NULL) { 869 drr->drr_u.drr_begin.drr_fromguid = 870 ancestor_zb->zbm_guid; 871 fromtxg = ancestor_zb->zbm_creation_txg; 872 } 873 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname); 874 if (!to_ds->ds_is_snapshot) { 875 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--", 876 sizeof (drr->drr_u.drr_begin.drr_toname)); 877 } 878 879 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP); 880 881 dsp->dsa_drr = drr; 882 dsp->dsa_outfd = outfd; 883 dsp->dsa_proc = curproc; 884 dsp->dsa_td = curthread; 885 dsp->dsa_fp = fp; 886 dsp->dsa_os = os; 887 dsp->dsa_off = off; 888 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid; 889 dsp->dsa_pending_op = PENDING_NONE; 890 dsp->dsa_featureflags = featureflags; 891 dsp->dsa_resume_object = resumeobj; 892 dsp->dsa_resume_offset = resumeoff; 893 894 mutex_enter(&to_ds->ds_sendstream_lock); 895 list_insert_head(&to_ds->ds_sendstreams, dsp); 896 mutex_exit(&to_ds->ds_sendstream_lock); 897 898 dsl_dataset_long_hold(to_ds, FTAG); 899 dsl_pool_rele(dp, tag); 900 901 void *payload = NULL; 902 size_t payload_len = 0; 903 if (resumeobj != 0 || resumeoff != 0) { 904 dmu_object_info_t to_doi; 905 err = dmu_object_info(os, resumeobj, &to_doi); 906 if (err != 0) 907 goto out; 908 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0, 909 resumeoff / to_doi.doi_data_block_size); 910 911 nvlist_t *nvl = fnvlist_alloc(); 912 fnvlist_add_uint64(nvl, "resume_object", resumeobj); 913 fnvlist_add_uint64(nvl, "resume_offset", resumeoff); 914 payload = fnvlist_pack(nvl, &payload_len); 915 drr->drr_payloadlen = payload_len; 916 fnvlist_free(nvl); 917 } 918 919 err = dump_record(dsp, payload, payload_len); 920 fnvlist_pack_free(payload, payload_len); 921 if (err != 0) { 922 err = dsp->dsa_err; 923 goto out; 924 } 925 926 err = bqueue_init(&to_arg.q, zfs_send_queue_length, 927 offsetof(struct send_block_record, ln)); 928 to_arg.error_code = 0; 929 to_arg.cancel = B_FALSE; 930 to_arg.ds = to_ds; 931 to_arg.fromtxg = fromtxg; 932 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH; 933 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0, 934 TS_RUN, minclsyspri); 935 936 struct send_block_record *to_data; 937 to_data = bqueue_dequeue(&to_arg.q); 938 939 while (!to_data->eos_marker && err == 0) { 940 err = do_dump(dsp, to_data); 941 to_data = get_next_record(&to_arg.q, to_data); 942 if (issig(JUSTLOOKING) && issig(FORREAL)) 943 err = EINTR; 944 } 945 946 if (err != 0) { 947 to_arg.cancel = B_TRUE; 948 while (!to_data->eos_marker) { 949 to_data = get_next_record(&to_arg.q, to_data); 950 } 951 } 952 kmem_free(to_data, sizeof (*to_data)); 953 954 bqueue_destroy(&to_arg.q); 955 956 if (err == 0 && to_arg.error_code != 0) 957 err = to_arg.error_code; 958 959 if (err != 0) 960 goto out; 961 962 if (dsp->dsa_pending_op != PENDING_NONE) 963 if (dump_record(dsp, NULL, 0) != 0) 964 err = SET_ERROR(EINTR); 965 966 if (err != 0) { 967 if (err == EINTR && dsp->dsa_err != 0) 968 err = dsp->dsa_err; 969 goto out; 970 } 971 972 bzero(drr, sizeof (dmu_replay_record_t)); 973 drr->drr_type = DRR_END; 974 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc; 975 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid; 976 977 if (dump_record(dsp, NULL, 0) != 0) 978 err = dsp->dsa_err; 979 980out: 981 mutex_enter(&to_ds->ds_sendstream_lock); 982 list_remove(&to_ds->ds_sendstreams, dsp); 983 mutex_exit(&to_ds->ds_sendstream_lock); 984 985 VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end)); 986 987 kmem_free(drr, sizeof (dmu_replay_record_t)); 988 kmem_free(dsp, sizeof (dmu_sendarg_t)); 989 990 dsl_dataset_long_rele(to_ds, FTAG); 991 992 return (err); 993} 994 995int 996dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap, 997 boolean_t embedok, boolean_t large_block_ok, boolean_t compressok, 998#ifdef illumos 999 int outfd, vnode_t *vp, offset_t *off) 1000#else 1001 int outfd, struct file *fp, offset_t *off) 1002#endif 1003{ 1004 dsl_pool_t *dp; 1005 dsl_dataset_t *ds; 1006 dsl_dataset_t *fromds = NULL; 1007 int err; 1008 1009 err = dsl_pool_hold(pool, FTAG, &dp); 1010 if (err != 0) 1011 return (err); 1012 1013 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds); 1014 if (err != 0) { 1015 dsl_pool_rele(dp, FTAG); 1016 return (err); 1017 } 1018 1019 if (fromsnap != 0) { 1020 zfs_bookmark_phys_t zb; 1021 boolean_t is_clone; 1022 1023 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds); 1024 if (err != 0) { 1025 dsl_dataset_rele(ds, FTAG); 1026 dsl_pool_rele(dp, FTAG); 1027 return (err); 1028 } 1029 if (!dsl_dataset_is_before(ds, fromds, 0)) 1030 err = SET_ERROR(EXDEV); 1031 zb.zbm_creation_time = 1032 dsl_dataset_phys(fromds)->ds_creation_time; 1033 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg; 1034 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 1035 is_clone = (fromds->ds_dir != ds->ds_dir); 1036 dsl_dataset_rele(fromds, FTAG); 1037 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 1038 embedok, large_block_ok, compressok, outfd, 0, 0, fp, off); 1039 } else { 1040 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 1041 embedok, large_block_ok, compressok, outfd, 0, 0, fp, off); 1042 } 1043 dsl_dataset_rele(ds, FTAG); 1044 return (err); 1045} 1046 1047int 1048dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok, 1049 boolean_t large_block_ok, boolean_t compressok, int outfd, 1050 uint64_t resumeobj, uint64_t resumeoff, 1051#ifdef illumos 1052 vnode_t *vp, offset_t *off) 1053#else 1054 struct file *fp, offset_t *off) 1055#endif 1056{ 1057 dsl_pool_t *dp; 1058 dsl_dataset_t *ds; 1059 int err; 1060 boolean_t owned = B_FALSE; 1061 1062 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL) 1063 return (SET_ERROR(EINVAL)); 1064 1065 err = dsl_pool_hold(tosnap, FTAG, &dp); 1066 if (err != 0) 1067 return (err); 1068 1069 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) { 1070 /* 1071 * We are sending a filesystem or volume. Ensure 1072 * that it doesn't change by owning the dataset. 1073 */ 1074 err = dsl_dataset_own(dp, tosnap, FTAG, &ds); 1075 owned = B_TRUE; 1076 } else { 1077 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds); 1078 } 1079 if (err != 0) { 1080 dsl_pool_rele(dp, FTAG); 1081 return (err); 1082 } 1083 1084 if (fromsnap != NULL) { 1085 zfs_bookmark_phys_t zb; 1086 boolean_t is_clone = B_FALSE; 1087 int fsnamelen = strchr(tosnap, '@') - tosnap; 1088 1089 /* 1090 * If the fromsnap is in a different filesystem, then 1091 * mark the send stream as a clone. 1092 */ 1093 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 || 1094 (fromsnap[fsnamelen] != '@' && 1095 fromsnap[fsnamelen] != '#')) { 1096 is_clone = B_TRUE; 1097 } 1098 1099 if (strchr(fromsnap, '@')) { 1100 dsl_dataset_t *fromds; 1101 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds); 1102 if (err == 0) { 1103 if (!dsl_dataset_is_before(ds, fromds, 0)) 1104 err = SET_ERROR(EXDEV); 1105 zb.zbm_creation_time = 1106 dsl_dataset_phys(fromds)->ds_creation_time; 1107 zb.zbm_creation_txg = 1108 dsl_dataset_phys(fromds)->ds_creation_txg; 1109 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 1110 is_clone = (ds->ds_dir != fromds->ds_dir); 1111 dsl_dataset_rele(fromds, FTAG); 1112 } 1113 } else { 1114 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb); 1115 } 1116 if (err != 0) { 1117 dsl_dataset_rele(ds, FTAG); 1118 dsl_pool_rele(dp, FTAG); 1119 return (err); 1120 } 1121 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 1122 embedok, large_block_ok, compressok, 1123 outfd, resumeobj, resumeoff, fp, off); 1124 } else { 1125 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 1126 embedok, large_block_ok, compressok, 1127 outfd, resumeobj, resumeoff, fp, off); 1128 } 1129 if (owned) 1130 dsl_dataset_disown(ds, FTAG); 1131 else 1132 dsl_dataset_rele(ds, FTAG); 1133 return (err); 1134} 1135 1136static int 1137dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed, 1138 uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep) 1139{ 1140 int err = 0; 1141 uint64_t size; 1142 /* 1143 * Assume that space (both on-disk and in-stream) is dominated by 1144 * data. We will adjust for indirect blocks and the copies property, 1145 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records). 1146 */ 1147 uint64_t recordsize; 1148 uint64_t record_count; 1149 objset_t *os; 1150 VERIFY0(dmu_objset_from_ds(ds, &os)); 1151 1152 /* Assume all (uncompressed) blocks are recordsize. */ 1153 if (zfs_override_estimate_recordsize != 0) { 1154 recordsize = zfs_override_estimate_recordsize; 1155 } else if (os->os_phys->os_type == DMU_OST_ZVOL) { 1156 err = dsl_prop_get_int_ds(ds, 1157 zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize); 1158 } else { 1159 err = dsl_prop_get_int_ds(ds, 1160 zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize); 1161 } 1162 if (err != 0) 1163 return (err); 1164 record_count = uncompressed / recordsize; 1165 1166 /* 1167 * If we're estimating a send size for a compressed stream, use the 1168 * compressed data size to estimate the stream size. Otherwise, use the 1169 * uncompressed data size. 1170 */ 1171 size = stream_compressed ? compressed : uncompressed; 1172 1173 /* 1174 * Subtract out approximate space used by indirect blocks. 1175 * Assume most space is used by data blocks (non-indirect, non-dnode). 1176 * Assume no ditto blocks or internal fragmentation. 1177 * 1178 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per 1179 * block. 1180 */ 1181 size -= record_count * sizeof (blkptr_t); 1182 1183 /* Add in the space for the record associated with each block. */ 1184 size += record_count * sizeof (dmu_replay_record_t); 1185 1186 *sizep = size; 1187 1188 return (0); 1189} 1190 1191int 1192dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, 1193 boolean_t stream_compressed, uint64_t *sizep) 1194{ 1195 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1196 int err; 1197 uint64_t uncomp, comp; 1198 1199 ASSERT(dsl_pool_config_held(dp)); 1200 1201 /* tosnap must be a snapshot */ 1202 if (!ds->ds_is_snapshot) 1203 return (SET_ERROR(EINVAL)); 1204 1205 /* fromsnap, if provided, must be a snapshot */ 1206 if (fromds != NULL && !fromds->ds_is_snapshot) 1207 return (SET_ERROR(EINVAL)); 1208 1209 /* 1210 * fromsnap must be an earlier snapshot from the same fs as tosnap, 1211 * or the origin's fs. 1212 */ 1213 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0)) 1214 return (SET_ERROR(EXDEV)); 1215 1216 /* Get compressed and uncompressed size estimates of changed data. */ 1217 if (fromds == NULL) { 1218 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes; 1219 comp = dsl_dataset_phys(ds)->ds_compressed_bytes; 1220 } else { 1221 uint64_t used; 1222 err = dsl_dataset_space_written(fromds, ds, 1223 &used, &comp, &uncomp); 1224 if (err != 0) 1225 return (err); 1226 } 1227 1228 err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp, 1229 stream_compressed, sizep); 1230 /* 1231 * Add the size of the BEGIN and END records to the estimate. 1232 */ 1233 *sizep += 2 * sizeof (dmu_replay_record_t); 1234 return (err); 1235} 1236 1237struct calculate_send_arg { 1238 uint64_t uncompressed; 1239 uint64_t compressed; 1240}; 1241 1242/* 1243 * Simple callback used to traverse the blocks of a snapshot and sum their 1244 * uncompressed and compressed sizes. 1245 */ 1246/* ARGSUSED */ 1247static int 1248dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 1249 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg) 1250{ 1251 struct calculate_send_arg *space = arg; 1252 if (bp != NULL && !BP_IS_HOLE(bp)) { 1253 space->uncompressed += BP_GET_UCSIZE(bp); 1254 space->compressed += BP_GET_PSIZE(bp); 1255 } 1256 return (0); 1257} 1258 1259/* 1260 * Given a desination snapshot and a TXG, calculate the approximate size of a 1261 * send stream sent from that TXG. from_txg may be zero, indicating that the 1262 * whole snapshot will be sent. 1263 */ 1264int 1265dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg, 1266 boolean_t stream_compressed, uint64_t *sizep) 1267{ 1268 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1269 int err; 1270 struct calculate_send_arg size = { 0 }; 1271 1272 ASSERT(dsl_pool_config_held(dp)); 1273 1274 /* tosnap must be a snapshot */ 1275 if (!ds->ds_is_snapshot) 1276 return (SET_ERROR(EINVAL)); 1277 1278 /* verify that from_txg is before the provided snapshot was taken */ 1279 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) { 1280 return (SET_ERROR(EXDEV)); 1281 } 1282 1283 /* 1284 * traverse the blocks of the snapshot with birth times after 1285 * from_txg, summing their uncompressed size 1286 */ 1287 err = traverse_dataset(ds, from_txg, TRAVERSE_POST, 1288 dmu_calculate_send_traversal, &size); 1289 if (err) 1290 return (err); 1291 1292 err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed, 1293 size.compressed, stream_compressed, sizep); 1294 return (err); 1295} 1296 1297typedef struct dmu_recv_begin_arg { 1298 const char *drba_origin; 1299 dmu_recv_cookie_t *drba_cookie; 1300 cred_t *drba_cred; 1301 uint64_t drba_snapobj; 1302} dmu_recv_begin_arg_t; 1303 1304static int 1305recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, 1306 uint64_t fromguid) 1307{ 1308 uint64_t val; 1309 uint64_t children; 1310 int error; 1311 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1312 1313 /* Temporary clone name must not exist. */ 1314 error = zap_lookup(dp->dp_meta_objset, 1315 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, 1316 8, 1, &val); 1317 if (error != ENOENT) 1318 return (error == 0 ? SET_ERROR(EBUSY) : error); 1319 1320 /* Resume state must not be set. */ 1321 if (dsl_dataset_has_resume_receive_state(ds)) 1322 return (SET_ERROR(EBUSY)); 1323 1324 /* New snapshot name must not exist. */ 1325 error = zap_lookup(dp->dp_meta_objset, 1326 dsl_dataset_phys(ds)->ds_snapnames_zapobj, 1327 drba->drba_cookie->drc_tosnap, 8, 1, &val); 1328 if (error != ENOENT) 1329 return (error == 0 ? SET_ERROR(EEXIST) : error); 1330 1331 /* must not have children if receiving a ZVOL */ 1332 error = zap_count(dp->dp_meta_objset, 1333 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, &children); 1334 if (error != 0) 1335 return (error); 1336 if (drba->drba_cookie->drc_drrb->drr_type != DMU_OST_ZFS && 1337 children > 0) 1338 return (SET_ERROR(ZFS_ERR_WRONG_PARENT)); 1339 1340 /* 1341 * Check snapshot limit before receiving. We'll recheck again at the 1342 * end, but might as well abort before receiving if we're already over 1343 * the limit. 1344 * 1345 * Note that we do not check the file system limit with 1346 * dsl_dir_fscount_check because the temporary %clones don't count 1347 * against that limit. 1348 */ 1349 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT, 1350 NULL, drba->drba_cred); 1351 if (error != 0) 1352 return (error); 1353 1354 if (fromguid != 0) { 1355 dsl_dataset_t *snap; 1356 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; 1357 1358 /* Find snapshot in this dir that matches fromguid. */ 1359 while (obj != 0) { 1360 error = dsl_dataset_hold_obj(dp, obj, FTAG, 1361 &snap); 1362 if (error != 0) 1363 return (SET_ERROR(ENODEV)); 1364 if (snap->ds_dir != ds->ds_dir) { 1365 dsl_dataset_rele(snap, FTAG); 1366 return (SET_ERROR(ENODEV)); 1367 } 1368 if (dsl_dataset_phys(snap)->ds_guid == fromguid) 1369 break; 1370 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 1371 dsl_dataset_rele(snap, FTAG); 1372 } 1373 if (obj == 0) 1374 return (SET_ERROR(ENODEV)); 1375 1376 if (drba->drba_cookie->drc_force) { 1377 drba->drba_snapobj = obj; 1378 } else { 1379 /* 1380 * If we are not forcing, there must be no 1381 * changes since fromsnap. 1382 */ 1383 if (dsl_dataset_modified_since_snap(ds, snap)) { 1384 dsl_dataset_rele(snap, FTAG); 1385 return (SET_ERROR(ETXTBSY)); 1386 } 1387 drba->drba_snapobj = ds->ds_prev->ds_object; 1388 } 1389 1390 dsl_dataset_rele(snap, FTAG); 1391 } else { 1392 /* if full, then must be forced */ 1393 if (!drba->drba_cookie->drc_force) 1394 return (SET_ERROR(EEXIST)); 1395 /* start from $ORIGIN@$ORIGIN, if supported */ 1396 drba->drba_snapobj = dp->dp_origin_snap != NULL ? 1397 dp->dp_origin_snap->ds_object : 0; 1398 } 1399 1400 return (0); 1401 1402} 1403 1404static int 1405dmu_recv_begin_check(void *arg, dmu_tx_t *tx) 1406{ 1407 dmu_recv_begin_arg_t *drba = arg; 1408 dsl_pool_t *dp = dmu_tx_pool(tx); 1409 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1410 uint64_t fromguid = drrb->drr_fromguid; 1411 int flags = drrb->drr_flags; 1412 int error; 1413 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1414 dsl_dataset_t *ds; 1415 const char *tofs = drba->drba_cookie->drc_tofs; 1416 1417 /* already checked */ 1418 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1419 ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING)); 1420 1421 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1422 DMU_COMPOUNDSTREAM || 1423 drrb->drr_type >= DMU_OST_NUMTYPES || 1424 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL)) 1425 return (SET_ERROR(EINVAL)); 1426 1427 /* Verify pool version supports SA if SA_SPILL feature set */ 1428 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1429 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1430 return (SET_ERROR(ENOTSUP)); 1431 1432 if (drba->drba_cookie->drc_resumable && 1433 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET)) 1434 return (SET_ERROR(ENOTSUP)); 1435 1436 /* 1437 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1438 * record to a plain WRITE record, so the pool must have the 1439 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1440 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1441 */ 1442 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1443 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1444 return (SET_ERROR(ENOTSUP)); 1445 if ((featureflags & DMU_BACKUP_FEATURE_LZ4) && 1446 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1447 return (SET_ERROR(ENOTSUP)); 1448 1449 /* 1450 * The receiving code doesn't know how to translate large blocks 1451 * to smaller ones, so the pool must have the LARGE_BLOCKS 1452 * feature enabled if the stream has LARGE_BLOCKS. 1453 */ 1454 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 1455 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) 1456 return (SET_ERROR(ENOTSUP)); 1457 1458 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1459 if (error == 0) { 1460 /* target fs already exists; recv into temp clone */ 1461 1462 /* Can't recv a clone into an existing fs */ 1463 if (flags & DRR_FLAG_CLONE || drba->drba_origin) { 1464 dsl_dataset_rele(ds, FTAG); 1465 return (SET_ERROR(EINVAL)); 1466 } 1467 1468 error = recv_begin_check_existing_impl(drba, ds, fromguid); 1469 dsl_dataset_rele(ds, FTAG); 1470 } else if (error == ENOENT) { 1471 /* target fs does not exist; must be a full backup or clone */ 1472 char buf[ZFS_MAX_DATASET_NAME_LEN]; 1473 objset_t *os; 1474 1475 /* 1476 * If it's a non-clone incremental, we are missing the 1477 * target fs, so fail the recv. 1478 */ 1479 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE || 1480 drba->drba_origin)) 1481 return (SET_ERROR(ENOENT)); 1482 1483 /* 1484 * If we're receiving a full send as a clone, and it doesn't 1485 * contain all the necessary free records and freeobject 1486 * records, reject it. 1487 */ 1488 if (fromguid == 0 && drba->drba_origin && 1489 !(flags & DRR_FLAG_FREERECORDS)) 1490 return (SET_ERROR(EINVAL)); 1491 1492 /* Open the parent of tofs */ 1493 ASSERT3U(strlen(tofs), <, sizeof (buf)); 1494 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1); 1495 error = dsl_dataset_hold(dp, buf, FTAG, &ds); 1496 if (error != 0) 1497 return (error); 1498 1499 /* 1500 * Check filesystem and snapshot limits before receiving. We'll 1501 * recheck snapshot limits again at the end (we create the 1502 * filesystems and increment those counts during begin_sync). 1503 */ 1504 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1505 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred); 1506 if (error != 0) { 1507 dsl_dataset_rele(ds, FTAG); 1508 return (error); 1509 } 1510 1511 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1512 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred); 1513 if (error != 0) { 1514 dsl_dataset_rele(ds, FTAG); 1515 return (error); 1516 } 1517 1518 /* can't recv below anything but filesystems (eg. no ZVOLs) */ 1519 error = dmu_objset_from_ds(ds, &os); 1520 if (error != 0) { 1521 dsl_dataset_rele(ds, FTAG); 1522 return (error); 1523 } 1524 if (dmu_objset_type(os) != DMU_OST_ZFS) { 1525 dsl_dataset_rele(ds, FTAG); 1526 return (SET_ERROR(ZFS_ERR_WRONG_PARENT)); 1527 } 1528 1529 if (drba->drba_origin != NULL) { 1530 dsl_dataset_t *origin; 1531 error = dsl_dataset_hold(dp, drba->drba_origin, 1532 FTAG, &origin); 1533 if (error != 0) { 1534 dsl_dataset_rele(ds, FTAG); 1535 return (error); 1536 } 1537 if (!origin->ds_is_snapshot) { 1538 dsl_dataset_rele(origin, FTAG); 1539 dsl_dataset_rele(ds, FTAG); 1540 return (SET_ERROR(EINVAL)); 1541 } 1542 if (dsl_dataset_phys(origin)->ds_guid != fromguid && 1543 fromguid != 0) { 1544 dsl_dataset_rele(origin, FTAG); 1545 dsl_dataset_rele(ds, FTAG); 1546 return (SET_ERROR(ENODEV)); 1547 } 1548 dsl_dataset_rele(origin, FTAG); 1549 } 1550 1551 dsl_dataset_rele(ds, FTAG); 1552 error = 0; 1553 } 1554 return (error); 1555} 1556 1557static void 1558dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) 1559{ 1560 dmu_recv_begin_arg_t *drba = arg; 1561 dsl_pool_t *dp = dmu_tx_pool(tx); 1562 objset_t *mos = dp->dp_meta_objset; 1563 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1564 const char *tofs = drba->drba_cookie->drc_tofs; 1565 dsl_dataset_t *ds, *newds; 1566 uint64_t dsobj; 1567 int error; 1568 uint64_t crflags = 0; 1569 1570 if (drrb->drr_flags & DRR_FLAG_CI_DATA) 1571 crflags |= DS_FLAG_CI_DATASET; 1572 1573 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1574 if (error == 0) { 1575 /* create temporary clone */ 1576 dsl_dataset_t *snap = NULL; 1577 if (drba->drba_snapobj != 0) { 1578 VERIFY0(dsl_dataset_hold_obj(dp, 1579 drba->drba_snapobj, FTAG, &snap)); 1580 } 1581 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, 1582 snap, crflags, drba->drba_cred, tx); 1583 if (drba->drba_snapobj != 0) 1584 dsl_dataset_rele(snap, FTAG); 1585 dsl_dataset_rele(ds, FTAG); 1586 } else { 1587 dsl_dir_t *dd; 1588 const char *tail; 1589 dsl_dataset_t *origin = NULL; 1590 1591 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail)); 1592 1593 if (drba->drba_origin != NULL) { 1594 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin, 1595 FTAG, &origin)); 1596 } 1597 1598 /* Create new dataset. */ 1599 dsobj = dsl_dataset_create_sync(dd, 1600 strrchr(tofs, '/') + 1, 1601 origin, crflags, drba->drba_cred, tx); 1602 if (origin != NULL) 1603 dsl_dataset_rele(origin, FTAG); 1604 dsl_dir_rele(dd, FTAG); 1605 drba->drba_cookie->drc_newfs = B_TRUE; 1606 } 1607 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds)); 1608 1609 if (drba->drba_cookie->drc_resumable) { 1610 dsl_dataset_zapify(newds, tx); 1611 if (drrb->drr_fromguid != 0) { 1612 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID, 1613 8, 1, &drrb->drr_fromguid, tx)); 1614 } 1615 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID, 1616 8, 1, &drrb->drr_toguid, tx)); 1617 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME, 1618 1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx)); 1619 uint64_t one = 1; 1620 uint64_t zero = 0; 1621 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT, 1622 8, 1, &one, tx)); 1623 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET, 1624 8, 1, &zero, tx)); 1625 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES, 1626 8, 1, &zero, tx)); 1627 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1628 DMU_BACKUP_FEATURE_LARGE_BLOCKS) { 1629 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK, 1630 8, 1, &one, tx)); 1631 } 1632 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1633 DMU_BACKUP_FEATURE_EMBED_DATA) { 1634 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK, 1635 8, 1, &one, tx)); 1636 } 1637 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1638 DMU_BACKUP_FEATURE_COMPRESSED) { 1639 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK, 1640 8, 1, &one, tx)); 1641 } 1642 } 1643 1644 dmu_buf_will_dirty(newds->ds_dbuf, tx); 1645 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT; 1646 1647 /* 1648 * If we actually created a non-clone, we need to create the 1649 * objset in our new dataset. 1650 */ 1651 rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG); 1652 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) { 1653 (void) dmu_objset_create_impl(dp->dp_spa, 1654 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx); 1655 } 1656 rrw_exit(&newds->ds_bp_rwlock, FTAG); 1657 1658 drba->drba_cookie->drc_ds = newds; 1659 1660 spa_history_log_internal_ds(newds, "receive", tx, ""); 1661} 1662 1663static int 1664dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) 1665{ 1666 dmu_recv_begin_arg_t *drba = arg; 1667 dsl_pool_t *dp = dmu_tx_pool(tx); 1668 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1669 int error; 1670 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1671 dsl_dataset_t *ds; 1672 const char *tofs = drba->drba_cookie->drc_tofs; 1673 1674 /* already checked */ 1675 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1676 ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING); 1677 1678 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1679 DMU_COMPOUNDSTREAM || 1680 drrb->drr_type >= DMU_OST_NUMTYPES) 1681 return (SET_ERROR(EINVAL)); 1682 1683 /* Verify pool version supports SA if SA_SPILL feature set */ 1684 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1685 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1686 return (SET_ERROR(ENOTSUP)); 1687 1688 /* 1689 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1690 * record to a plain WRITE record, so the pool must have the 1691 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1692 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1693 */ 1694 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1695 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1696 return (SET_ERROR(ENOTSUP)); 1697 if ((featureflags & DMU_BACKUP_FEATURE_LZ4) && 1698 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1699 return (SET_ERROR(ENOTSUP)); 1700 1701 /* 6 extra bytes for /%recv */ 1702 char recvname[ZFS_MAX_DATASET_NAME_LEN + 6]; 1703 1704 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1705 tofs, recv_clone_name); 1706 1707 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1708 /* %recv does not exist; continue in tofs */ 1709 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1710 if (error != 0) 1711 return (error); 1712 } 1713 1714 /* check that ds is marked inconsistent */ 1715 if (!DS_IS_INCONSISTENT(ds)) { 1716 dsl_dataset_rele(ds, FTAG); 1717 return (SET_ERROR(EINVAL)); 1718 } 1719 1720 /* check that there is resuming data, and that the toguid matches */ 1721 if (!dsl_dataset_is_zapified(ds)) { 1722 dsl_dataset_rele(ds, FTAG); 1723 return (SET_ERROR(EINVAL)); 1724 } 1725 uint64_t val; 1726 error = zap_lookup(dp->dp_meta_objset, ds->ds_object, 1727 DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val); 1728 if (error != 0 || drrb->drr_toguid != val) { 1729 dsl_dataset_rele(ds, FTAG); 1730 return (SET_ERROR(EINVAL)); 1731 } 1732 1733 /* 1734 * Check if the receive is still running. If so, it will be owned. 1735 * Note that nothing else can own the dataset (e.g. after the receive 1736 * fails) because it will be marked inconsistent. 1737 */ 1738 if (dsl_dataset_has_owner(ds)) { 1739 dsl_dataset_rele(ds, FTAG); 1740 return (SET_ERROR(EBUSY)); 1741 } 1742 1743 /* There should not be any snapshots of this fs yet. */ 1744 if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) { 1745 dsl_dataset_rele(ds, FTAG); 1746 return (SET_ERROR(EINVAL)); 1747 } 1748 1749 /* 1750 * Note: resume point will be checked when we process the first WRITE 1751 * record. 1752 */ 1753 1754 /* check that the origin matches */ 1755 val = 0; 1756 (void) zap_lookup(dp->dp_meta_objset, ds->ds_object, 1757 DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val); 1758 if (drrb->drr_fromguid != val) { 1759 dsl_dataset_rele(ds, FTAG); 1760 return (SET_ERROR(EINVAL)); 1761 } 1762 1763 dsl_dataset_rele(ds, FTAG); 1764 return (0); 1765} 1766 1767static void 1768dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) 1769{ 1770 dmu_recv_begin_arg_t *drba = arg; 1771 dsl_pool_t *dp = dmu_tx_pool(tx); 1772 const char *tofs = drba->drba_cookie->drc_tofs; 1773 dsl_dataset_t *ds; 1774 uint64_t dsobj; 1775 /* 6 extra bytes for /%recv */ 1776 char recvname[ZFS_MAX_DATASET_NAME_LEN + 6]; 1777 1778 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1779 tofs, recv_clone_name); 1780 1781 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1782 /* %recv does not exist; continue in tofs */ 1783 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds)); 1784 drba->drba_cookie->drc_newfs = B_TRUE; 1785 } 1786 1787 /* clear the inconsistent flag so that we can own it */ 1788 ASSERT(DS_IS_INCONSISTENT(ds)); 1789 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1790 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 1791 dsobj = ds->ds_object; 1792 dsl_dataset_rele(ds, FTAG); 1793 1794 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds)); 1795 1796 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1797 dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT; 1798 1799 rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG); 1800 ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds))); 1801 rrw_exit(&ds->ds_bp_rwlock, FTAG); 1802 1803 drba->drba_cookie->drc_ds = ds; 1804 1805 spa_history_log_internal_ds(ds, "resume receive", tx, ""); 1806} 1807 1808/* 1809 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin() 1810 * succeeds; otherwise we will leak the holds on the datasets. 1811 */ 1812int 1813dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, 1814 boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc) 1815{ 1816 dmu_recv_begin_arg_t drba = { 0 }; 1817 1818 bzero(drc, sizeof (dmu_recv_cookie_t)); 1819 drc->drc_drr_begin = drr_begin; 1820 drc->drc_drrb = &drr_begin->drr_u.drr_begin; 1821 drc->drc_tosnap = tosnap; 1822 drc->drc_tofs = tofs; 1823 drc->drc_force = force; 1824 drc->drc_resumable = resumable; 1825 drc->drc_cred = CRED(); 1826 drc->drc_clone = (origin != NULL); 1827 1828 if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) { 1829 drc->drc_byteswap = B_TRUE; 1830 (void) fletcher_4_incremental_byteswap(drr_begin, 1831 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1832 byteswap_record(drr_begin); 1833 } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) { 1834 (void) fletcher_4_incremental_native(drr_begin, 1835 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1836 } else { 1837 return (SET_ERROR(EINVAL)); 1838 } 1839 1840 drba.drba_origin = origin; 1841 drba.drba_cookie = drc; 1842 drba.drba_cred = CRED(); 1843 1844 if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) & 1845 DMU_BACKUP_FEATURE_RESUMING) { 1846 return (dsl_sync_task(tofs, 1847 dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync, 1848 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1849 } else { 1850 return (dsl_sync_task(tofs, 1851 dmu_recv_begin_check, dmu_recv_begin_sync, 1852 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1853 } 1854} 1855 1856struct receive_record_arg { 1857 dmu_replay_record_t header; 1858 void *payload; /* Pointer to a buffer containing the payload */ 1859 /* 1860 * If the record is a write, pointer to the arc_buf_t containing the 1861 * payload. 1862 */ 1863 arc_buf_t *write_buf; 1864 int payload_size; 1865 uint64_t bytes_read; /* bytes read from stream when record created */ 1866 boolean_t eos_marker; /* Marks the end of the stream */ 1867 bqueue_node_t node; 1868}; 1869 1870struct receive_writer_arg { 1871 objset_t *os; 1872 boolean_t byteswap; 1873 bqueue_t q; 1874 1875 /* 1876 * These three args are used to signal to the main thread that we're 1877 * done. 1878 */ 1879 kmutex_t mutex; 1880 kcondvar_t cv; 1881 boolean_t done; 1882 1883 int err; 1884 /* A map from guid to dataset to help handle dedup'd streams. */ 1885 avl_tree_t *guid_to_ds_map; 1886 boolean_t resumable; 1887 uint64_t last_object; 1888 uint64_t last_offset; 1889 uint64_t max_object; /* highest object ID referenced in stream */ 1890 uint64_t bytes_read; /* bytes read when current record created */ 1891}; 1892 1893struct objlist { 1894 list_t list; /* List of struct receive_objnode. */ 1895 /* 1896 * Last object looked up. Used to assert that objects are being looked 1897 * up in ascending order. 1898 */ 1899 uint64_t last_lookup; 1900}; 1901 1902struct receive_objnode { 1903 list_node_t node; 1904 uint64_t object; 1905}; 1906 1907struct receive_arg { 1908 objset_t *os; 1909 kthread_t *td; 1910 struct file *fp; 1911 uint64_t voff; /* The current offset in the stream */ 1912 uint64_t bytes_read; 1913 /* 1914 * A record that has had its payload read in, but hasn't yet been handed 1915 * off to the worker thread. 1916 */ 1917 struct receive_record_arg *rrd; 1918 /* A record that has had its header read in, but not its payload. */ 1919 struct receive_record_arg *next_rrd; 1920 zio_cksum_t cksum; 1921 zio_cksum_t prev_cksum; 1922 int err; 1923 boolean_t byteswap; 1924 /* Sorted list of objects not to issue prefetches for. */ 1925 struct objlist ignore_objlist; 1926}; 1927 1928typedef struct guid_map_entry { 1929 uint64_t guid; 1930 dsl_dataset_t *gme_ds; 1931 avl_node_t avlnode; 1932} guid_map_entry_t; 1933 1934static int 1935guid_compare(const void *arg1, const void *arg2) 1936{ 1937 const guid_map_entry_t *gmep1 = (const guid_map_entry_t *)arg1; 1938 const guid_map_entry_t *gmep2 = (const guid_map_entry_t *)arg2; 1939 1940 return (AVL_CMP(gmep1->guid, gmep2->guid)); 1941} 1942 1943static void 1944free_guid_map_onexit(void *arg) 1945{ 1946 avl_tree_t *ca = arg; 1947 void *cookie = NULL; 1948 guid_map_entry_t *gmep; 1949 1950 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) { 1951 dsl_dataset_long_rele(gmep->gme_ds, gmep); 1952 dsl_dataset_rele(gmep->gme_ds, gmep); 1953 kmem_free(gmep, sizeof (guid_map_entry_t)); 1954 } 1955 avl_destroy(ca); 1956 kmem_free(ca, sizeof (avl_tree_t)); 1957} 1958 1959static int 1960restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid) 1961{ 1962 struct uio auio; 1963 struct iovec aiov; 1964 int error; 1965 1966 aiov.iov_base = buf; 1967 aiov.iov_len = len; 1968 auio.uio_iov = &aiov; 1969 auio.uio_iovcnt = 1; 1970 auio.uio_resid = len; 1971 auio.uio_segflg = UIO_SYSSPACE; 1972 auio.uio_rw = UIO_READ; 1973 auio.uio_offset = off; 1974 auio.uio_td = ra->td; 1975#ifdef _KERNEL 1976 error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td); 1977#else 1978 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 1979 error = EOPNOTSUPP; 1980#endif 1981 *resid = auio.uio_resid; 1982 return (error); 1983} 1984 1985static int 1986receive_read(struct receive_arg *ra, int len, void *buf) 1987{ 1988 int done = 0; 1989 1990 /* 1991 * The code doesn't rely on this (lengths being multiples of 8). See 1992 * comment in dump_bytes. 1993 */ 1994 ASSERT0(len % 8); 1995 1996 while (done < len) { 1997 ssize_t resid; 1998 1999 ra->err = restore_bytes(ra, buf + done, 2000 len - done, ra->voff, &resid); 2001 2002 if (resid == len - done) { 2003 /* 2004 * Note: ECKSUM indicates that the receive 2005 * was interrupted and can potentially be resumed. 2006 */ 2007 ra->err = SET_ERROR(ECKSUM); 2008 } 2009 ra->voff += len - done - resid; 2010 done = len - resid; 2011 if (ra->err != 0) 2012 return (ra->err); 2013 } 2014 2015 ra->bytes_read += len; 2016 2017 ASSERT3U(done, ==, len); 2018 return (0); 2019} 2020 2021static void 2022byteswap_record(dmu_replay_record_t *drr) 2023{ 2024#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) 2025#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) 2026 drr->drr_type = BSWAP_32(drr->drr_type); 2027 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); 2028 2029 switch (drr->drr_type) { 2030 case DRR_BEGIN: 2031 DO64(drr_begin.drr_magic); 2032 DO64(drr_begin.drr_versioninfo); 2033 DO64(drr_begin.drr_creation_time); 2034 DO32(drr_begin.drr_type); 2035 DO32(drr_begin.drr_flags); 2036 DO64(drr_begin.drr_toguid); 2037 DO64(drr_begin.drr_fromguid); 2038 break; 2039 case DRR_OBJECT: 2040 DO64(drr_object.drr_object); 2041 DO32(drr_object.drr_type); 2042 DO32(drr_object.drr_bonustype); 2043 DO32(drr_object.drr_blksz); 2044 DO32(drr_object.drr_bonuslen); 2045 DO64(drr_object.drr_toguid); 2046 break; 2047 case DRR_FREEOBJECTS: 2048 DO64(drr_freeobjects.drr_firstobj); 2049 DO64(drr_freeobjects.drr_numobjs); 2050 DO64(drr_freeobjects.drr_toguid); 2051 break; 2052 case DRR_WRITE: 2053 DO64(drr_write.drr_object); 2054 DO32(drr_write.drr_type); 2055 DO64(drr_write.drr_offset); 2056 DO64(drr_write.drr_logical_size); 2057 DO64(drr_write.drr_toguid); 2058 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); 2059 DO64(drr_write.drr_key.ddk_prop); 2060 DO64(drr_write.drr_compressed_size); 2061 break; 2062 case DRR_WRITE_BYREF: 2063 DO64(drr_write_byref.drr_object); 2064 DO64(drr_write_byref.drr_offset); 2065 DO64(drr_write_byref.drr_length); 2066 DO64(drr_write_byref.drr_toguid); 2067 DO64(drr_write_byref.drr_refguid); 2068 DO64(drr_write_byref.drr_refobject); 2069 DO64(drr_write_byref.drr_refoffset); 2070 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. 2071 drr_key.ddk_cksum); 2072 DO64(drr_write_byref.drr_key.ddk_prop); 2073 break; 2074 case DRR_WRITE_EMBEDDED: 2075 DO64(drr_write_embedded.drr_object); 2076 DO64(drr_write_embedded.drr_offset); 2077 DO64(drr_write_embedded.drr_length); 2078 DO64(drr_write_embedded.drr_toguid); 2079 DO32(drr_write_embedded.drr_lsize); 2080 DO32(drr_write_embedded.drr_psize); 2081 break; 2082 case DRR_FREE: 2083 DO64(drr_free.drr_object); 2084 DO64(drr_free.drr_offset); 2085 DO64(drr_free.drr_length); 2086 DO64(drr_free.drr_toguid); 2087 break; 2088 case DRR_SPILL: 2089 DO64(drr_spill.drr_object); 2090 DO64(drr_spill.drr_length); 2091 DO64(drr_spill.drr_toguid); 2092 break; 2093 case DRR_END: 2094 DO64(drr_end.drr_toguid); 2095 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); 2096 break; 2097 } 2098 2099 if (drr->drr_type != DRR_BEGIN) { 2100 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); 2101 } 2102 2103#undef DO64 2104#undef DO32 2105} 2106 2107static inline uint8_t 2108deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size) 2109{ 2110 if (bonus_type == DMU_OT_SA) { 2111 return (1); 2112 } else { 2113 return (1 + 2114 ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT)); 2115 } 2116} 2117 2118static void 2119save_resume_state(struct receive_writer_arg *rwa, 2120 uint64_t object, uint64_t offset, dmu_tx_t *tx) 2121{ 2122 int txgoff = dmu_tx_get_txg(tx) & TXG_MASK; 2123 2124 if (!rwa->resumable) 2125 return; 2126 2127 /* 2128 * We use ds_resume_bytes[] != 0 to indicate that we need to 2129 * update this on disk, so it must not be 0. 2130 */ 2131 ASSERT(rwa->bytes_read != 0); 2132 2133 /* 2134 * We only resume from write records, which have a valid 2135 * (non-meta-dnode) object number. 2136 */ 2137 ASSERT(object != 0); 2138 2139 /* 2140 * For resuming to work correctly, we must receive records in order, 2141 * sorted by object,offset. This is checked by the callers, but 2142 * assert it here for good measure. 2143 */ 2144 ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]); 2145 ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] || 2146 offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]); 2147 ASSERT3U(rwa->bytes_read, >=, 2148 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]); 2149 2150 rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object; 2151 rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset; 2152 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read; 2153} 2154 2155static int 2156receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, 2157 void *data) 2158{ 2159 dmu_object_info_t doi; 2160 dmu_tx_t *tx; 2161 uint64_t object; 2162 int err; 2163 2164 if (drro->drr_type == DMU_OT_NONE || 2165 !DMU_OT_IS_VALID(drro->drr_type) || 2166 !DMU_OT_IS_VALID(drro->drr_bonustype) || 2167 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS || 2168 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS || 2169 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) || 2170 drro->drr_blksz < SPA_MINBLOCKSIZE || 2171 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) || 2172 drro->drr_bonuslen > DN_MAX_BONUSLEN) { 2173 return (SET_ERROR(EINVAL)); 2174 } 2175 2176 err = dmu_object_info(rwa->os, drro->drr_object, &doi); 2177 2178 if (err != 0 && err != ENOENT) 2179 return (SET_ERROR(EINVAL)); 2180 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT; 2181 2182 if (drro->drr_object > rwa->max_object) 2183 rwa->max_object = drro->drr_object; 2184 2185 /* 2186 * If we are losing blkptrs or changing the block size this must 2187 * be a new file instance. We must clear out the previous file 2188 * contents before we can change this type of metadata in the dnode. 2189 */ 2190 if (err == 0) { 2191 int nblkptr; 2192 2193 nblkptr = deduce_nblkptr(drro->drr_bonustype, 2194 drro->drr_bonuslen); 2195 2196 if (drro->drr_blksz != doi.doi_data_block_size || 2197 nblkptr < doi.doi_nblkptr) { 2198 err = dmu_free_long_range(rwa->os, drro->drr_object, 2199 0, DMU_OBJECT_END); 2200 if (err != 0) 2201 return (SET_ERROR(EINVAL)); 2202 } 2203 } 2204 2205 tx = dmu_tx_create(rwa->os); 2206 dmu_tx_hold_bonus(tx, object); 2207 err = dmu_tx_assign(tx, TXG_WAIT); 2208 if (err != 0) { 2209 dmu_tx_abort(tx); 2210 return (err); 2211 } 2212 2213 if (object == DMU_NEW_OBJECT) { 2214 /* currently free, want to be allocated */ 2215 err = dmu_object_claim(rwa->os, drro->drr_object, 2216 drro->drr_type, drro->drr_blksz, 2217 drro->drr_bonustype, drro->drr_bonuslen, tx); 2218 } else if (drro->drr_type != doi.doi_type || 2219 drro->drr_blksz != doi.doi_data_block_size || 2220 drro->drr_bonustype != doi.doi_bonus_type || 2221 drro->drr_bonuslen != doi.doi_bonus_size) { 2222 /* currently allocated, but with different properties */ 2223 err = dmu_object_reclaim(rwa->os, drro->drr_object, 2224 drro->drr_type, drro->drr_blksz, 2225 drro->drr_bonustype, drro->drr_bonuslen, tx); 2226 } 2227 if (err != 0) { 2228 dmu_tx_commit(tx); 2229 return (SET_ERROR(EINVAL)); 2230 } 2231 2232 dmu_object_set_checksum(rwa->os, drro->drr_object, 2233 drro->drr_checksumtype, tx); 2234 dmu_object_set_compress(rwa->os, drro->drr_object, 2235 drro->drr_compress, tx); 2236 2237 if (data != NULL) { 2238 dmu_buf_t *db; 2239 2240 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db)); 2241 dmu_buf_will_dirty(db, tx); 2242 2243 ASSERT3U(db->db_size, >=, drro->drr_bonuslen); 2244 bcopy(data, db->db_data, drro->drr_bonuslen); 2245 if (rwa->byteswap) { 2246 dmu_object_byteswap_t byteswap = 2247 DMU_OT_BYTESWAP(drro->drr_bonustype); 2248 dmu_ot_byteswap[byteswap].ob_func(db->db_data, 2249 drro->drr_bonuslen); 2250 } 2251 dmu_buf_rele(db, FTAG); 2252 } 2253 dmu_tx_commit(tx); 2254 2255 return (0); 2256} 2257 2258/* ARGSUSED */ 2259static int 2260receive_freeobjects(struct receive_writer_arg *rwa, 2261 struct drr_freeobjects *drrfo) 2262{ 2263 uint64_t obj; 2264 int next_err = 0; 2265 2266 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj) 2267 return (SET_ERROR(EINVAL)); 2268 2269 for (obj = drrfo->drr_firstobj; 2270 obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0; 2271 next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) { 2272 int err; 2273 2274 if (dmu_object_info(rwa->os, obj, NULL) != 0) 2275 continue; 2276 2277 err = dmu_free_long_object(rwa->os, obj); 2278 if (err != 0) 2279 return (err); 2280 2281 if (obj > rwa->max_object) 2282 rwa->max_object = obj; 2283 } 2284 if (next_err != ESRCH) 2285 return (next_err); 2286 return (0); 2287} 2288 2289static int 2290receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, 2291 arc_buf_t *abuf) 2292{ 2293 dmu_tx_t *tx; 2294 int err; 2295 2296 if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset || 2297 !DMU_OT_IS_VALID(drrw->drr_type)) 2298 return (SET_ERROR(EINVAL)); 2299 2300 /* 2301 * For resuming to work, records must be in increasing order 2302 * by (object, offset). 2303 */ 2304 if (drrw->drr_object < rwa->last_object || 2305 (drrw->drr_object == rwa->last_object && 2306 drrw->drr_offset < rwa->last_offset)) { 2307 return (SET_ERROR(EINVAL)); 2308 } 2309 rwa->last_object = drrw->drr_object; 2310 rwa->last_offset = drrw->drr_offset; 2311 2312 if (rwa->last_object > rwa->max_object) 2313 rwa->max_object = rwa->last_object; 2314 2315 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) 2316 return (SET_ERROR(EINVAL)); 2317 2318 tx = dmu_tx_create(rwa->os); 2319 2320 dmu_tx_hold_write(tx, drrw->drr_object, 2321 drrw->drr_offset, drrw->drr_logical_size); 2322 err = dmu_tx_assign(tx, TXG_WAIT); 2323 if (err != 0) { 2324 dmu_tx_abort(tx); 2325 return (err); 2326 } 2327 if (rwa->byteswap) { 2328 dmu_object_byteswap_t byteswap = 2329 DMU_OT_BYTESWAP(drrw->drr_type); 2330 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, 2331 DRR_WRITE_PAYLOAD_SIZE(drrw)); 2332 } 2333 2334 /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */ 2335 dmu_buf_t *bonus; 2336 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0) 2337 return (SET_ERROR(EINVAL)); 2338 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx); 2339 2340 /* 2341 * Note: If the receive fails, we want the resume stream to start 2342 * with the same record that we last successfully received (as opposed 2343 * to the next record), so that we can verify that we are 2344 * resuming from the correct location. 2345 */ 2346 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); 2347 dmu_tx_commit(tx); 2348 dmu_buf_rele(bonus, FTAG); 2349 2350 return (0); 2351} 2352 2353/* 2354 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed 2355 * streams to refer to a copy of the data that is already on the 2356 * system because it came in earlier in the stream. This function 2357 * finds the earlier copy of the data, and uses that copy instead of 2358 * data from the stream to fulfill this write. 2359 */ 2360static int 2361receive_write_byref(struct receive_writer_arg *rwa, 2362 struct drr_write_byref *drrwbr) 2363{ 2364 dmu_tx_t *tx; 2365 int err; 2366 guid_map_entry_t gmesrch; 2367 guid_map_entry_t *gmep; 2368 avl_index_t where; 2369 objset_t *ref_os = NULL; 2370 dmu_buf_t *dbp; 2371 2372 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset) 2373 return (SET_ERROR(EINVAL)); 2374 2375 /* 2376 * If the GUID of the referenced dataset is different from the 2377 * GUID of the target dataset, find the referenced dataset. 2378 */ 2379 if (drrwbr->drr_toguid != drrwbr->drr_refguid) { 2380 gmesrch.guid = drrwbr->drr_refguid; 2381 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch, 2382 &where)) == NULL) { 2383 return (SET_ERROR(EINVAL)); 2384 } 2385 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os)) 2386 return (SET_ERROR(EINVAL)); 2387 } else { 2388 ref_os = rwa->os; 2389 } 2390 2391 if (drrwbr->drr_object > rwa->max_object) 2392 rwa->max_object = drrwbr->drr_object; 2393 2394 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject, 2395 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH); 2396 if (err != 0) 2397 return (err); 2398 2399 tx = dmu_tx_create(rwa->os); 2400 2401 dmu_tx_hold_write(tx, drrwbr->drr_object, 2402 drrwbr->drr_offset, drrwbr->drr_length); 2403 err = dmu_tx_assign(tx, TXG_WAIT); 2404 if (err != 0) { 2405 dmu_tx_abort(tx); 2406 return (err); 2407 } 2408 dmu_write(rwa->os, drrwbr->drr_object, 2409 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx); 2410 dmu_buf_rele(dbp, FTAG); 2411 2412 /* See comment in restore_write. */ 2413 save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx); 2414 dmu_tx_commit(tx); 2415 return (0); 2416} 2417 2418static int 2419receive_write_embedded(struct receive_writer_arg *rwa, 2420 struct drr_write_embedded *drrwe, void *data) 2421{ 2422 dmu_tx_t *tx; 2423 int err; 2424 2425 if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset) 2426 return (EINVAL); 2427 2428 if (drrwe->drr_psize > BPE_PAYLOAD_SIZE) 2429 return (EINVAL); 2430 2431 if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES) 2432 return (EINVAL); 2433 if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS) 2434 return (EINVAL); 2435 2436 if (drrwe->drr_object > rwa->max_object) 2437 rwa->max_object = drrwe->drr_object; 2438 2439 tx = dmu_tx_create(rwa->os); 2440 2441 dmu_tx_hold_write(tx, drrwe->drr_object, 2442 drrwe->drr_offset, drrwe->drr_length); 2443 err = dmu_tx_assign(tx, TXG_WAIT); 2444 if (err != 0) { 2445 dmu_tx_abort(tx); 2446 return (err); 2447 } 2448 2449 dmu_write_embedded(rwa->os, drrwe->drr_object, 2450 drrwe->drr_offset, data, drrwe->drr_etype, 2451 drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize, 2452 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx); 2453 2454 /* See comment in restore_write. */ 2455 save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx); 2456 dmu_tx_commit(tx); 2457 return (0); 2458} 2459 2460static int 2461receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, 2462 void *data) 2463{ 2464 dmu_tx_t *tx; 2465 dmu_buf_t *db, *db_spill; 2466 int err; 2467 2468 if (drrs->drr_length < SPA_MINBLOCKSIZE || 2469 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os))) 2470 return (SET_ERROR(EINVAL)); 2471 2472 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0) 2473 return (SET_ERROR(EINVAL)); 2474 2475 if (drrs->drr_object > rwa->max_object) 2476 rwa->max_object = drrs->drr_object; 2477 2478 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db)); 2479 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) { 2480 dmu_buf_rele(db, FTAG); 2481 return (err); 2482 } 2483 2484 tx = dmu_tx_create(rwa->os); 2485 2486 dmu_tx_hold_spill(tx, db->db_object); 2487 2488 err = dmu_tx_assign(tx, TXG_WAIT); 2489 if (err != 0) { 2490 dmu_buf_rele(db, FTAG); 2491 dmu_buf_rele(db_spill, FTAG); 2492 dmu_tx_abort(tx); 2493 return (err); 2494 } 2495 dmu_buf_will_dirty(db_spill, tx); 2496 2497 if (db_spill->db_size < drrs->drr_length) 2498 VERIFY(0 == dbuf_spill_set_blksz(db_spill, 2499 drrs->drr_length, tx)); 2500 bcopy(data, db_spill->db_data, drrs->drr_length); 2501 2502 dmu_buf_rele(db, FTAG); 2503 dmu_buf_rele(db_spill, FTAG); 2504 2505 dmu_tx_commit(tx); 2506 return (0); 2507} 2508 2509/* ARGSUSED */ 2510static int 2511receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf) 2512{ 2513 int err; 2514 2515 if (drrf->drr_length != -1ULL && 2516 drrf->drr_offset + drrf->drr_length < drrf->drr_offset) 2517 return (SET_ERROR(EINVAL)); 2518 2519 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0) 2520 return (SET_ERROR(EINVAL)); 2521 2522 if (drrf->drr_object > rwa->max_object) 2523 rwa->max_object = drrf->drr_object; 2524 2525 err = dmu_free_long_range(rwa->os, drrf->drr_object, 2526 drrf->drr_offset, drrf->drr_length); 2527 2528 return (err); 2529} 2530 2531/* used to destroy the drc_ds on error */ 2532static void 2533dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) 2534{ 2535 if (drc->drc_resumable) { 2536 /* wait for our resume state to be written to disk */ 2537 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0); 2538 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2539 } else { 2540 char name[ZFS_MAX_DATASET_NAME_LEN]; 2541 dsl_dataset_name(drc->drc_ds, name); 2542 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2543 (void) dsl_destroy_head(name); 2544 } 2545} 2546 2547static void 2548receive_cksum(struct receive_arg *ra, int len, void *buf) 2549{ 2550 if (ra->byteswap) { 2551 (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum); 2552 } else { 2553 (void) fletcher_4_incremental_native(buf, len, &ra->cksum); 2554 } 2555} 2556 2557/* 2558 * Read the payload into a buffer of size len, and update the current record's 2559 * payload field. 2560 * Allocate ra->next_rrd and read the next record's header into 2561 * ra->next_rrd->header. 2562 * Verify checksum of payload and next record. 2563 */ 2564static int 2565receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) 2566{ 2567 int err; 2568 2569 if (len != 0) { 2570 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE); 2571 err = receive_read(ra, len, buf); 2572 if (err != 0) 2573 return (err); 2574 receive_cksum(ra, len, buf); 2575 2576 /* note: rrd is NULL when reading the begin record's payload */ 2577 if (ra->rrd != NULL) { 2578 ra->rrd->payload = buf; 2579 ra->rrd->payload_size = len; 2580 ra->rrd->bytes_read = ra->bytes_read; 2581 } 2582 } 2583 2584 ra->prev_cksum = ra->cksum; 2585 2586 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP); 2587 err = receive_read(ra, sizeof (ra->next_rrd->header), 2588 &ra->next_rrd->header); 2589 ra->next_rrd->bytes_read = ra->bytes_read; 2590 if (err != 0) { 2591 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2592 ra->next_rrd = NULL; 2593 return (err); 2594 } 2595 if (ra->next_rrd->header.drr_type == DRR_BEGIN) { 2596 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2597 ra->next_rrd = NULL; 2598 return (SET_ERROR(EINVAL)); 2599 } 2600 2601 /* 2602 * Note: checksum is of everything up to but not including the 2603 * checksum itself. 2604 */ 2605 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2606 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 2607 receive_cksum(ra, 2608 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2609 &ra->next_rrd->header); 2610 2611 zio_cksum_t cksum_orig = 2612 ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2613 zio_cksum_t *cksump = 2614 &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2615 2616 if (ra->byteswap) 2617 byteswap_record(&ra->next_rrd->header); 2618 2619 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) && 2620 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) { 2621 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2622 ra->next_rrd = NULL; 2623 return (SET_ERROR(ECKSUM)); 2624 } 2625 2626 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig); 2627 2628 return (0); 2629} 2630 2631static void 2632objlist_create(struct objlist *list) 2633{ 2634 list_create(&list->list, sizeof (struct receive_objnode), 2635 offsetof(struct receive_objnode, node)); 2636 list->last_lookup = 0; 2637} 2638 2639static void 2640objlist_destroy(struct objlist *list) 2641{ 2642 for (struct receive_objnode *n = list_remove_head(&list->list); 2643 n != NULL; n = list_remove_head(&list->list)) { 2644 kmem_free(n, sizeof (*n)); 2645 } 2646 list_destroy(&list->list); 2647} 2648 2649/* 2650 * This function looks through the objlist to see if the specified object number 2651 * is contained in the objlist. In the process, it will remove all object 2652 * numbers in the list that are smaller than the specified object number. Thus, 2653 * any lookup of an object number smaller than a previously looked up object 2654 * number will always return false; therefore, all lookups should be done in 2655 * ascending order. 2656 */ 2657static boolean_t 2658objlist_exists(struct objlist *list, uint64_t object) 2659{ 2660 struct receive_objnode *node = list_head(&list->list); 2661 ASSERT3U(object, >=, list->last_lookup); 2662 list->last_lookup = object; 2663 while (node != NULL && node->object < object) { 2664 VERIFY3P(node, ==, list_remove_head(&list->list)); 2665 kmem_free(node, sizeof (*node)); 2666 node = list_head(&list->list); 2667 } 2668 return (node != NULL && node->object == object); 2669} 2670 2671/* 2672 * The objlist is a list of object numbers stored in ascending order. However, 2673 * the insertion of new object numbers does not seek out the correct location to 2674 * store a new object number; instead, it appends it to the list for simplicity. 2675 * Thus, any users must take care to only insert new object numbers in ascending 2676 * order. 2677 */ 2678static void 2679objlist_insert(struct objlist *list, uint64_t object) 2680{ 2681 struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP); 2682 node->object = object; 2683#ifdef ZFS_DEBUG 2684 struct receive_objnode *last_object = list_tail(&list->list); 2685 uint64_t last_objnum = (last_object != NULL ? last_object->object : 0); 2686 ASSERT3U(node->object, >, last_objnum); 2687#endif 2688 list_insert_tail(&list->list, node); 2689} 2690 2691/* 2692 * Issue the prefetch reads for any necessary indirect blocks. 2693 * 2694 * We use the object ignore list to tell us whether or not to issue prefetches 2695 * for a given object. We do this for both correctness (in case the blocksize 2696 * of an object has changed) and performance (if the object doesn't exist, don't 2697 * needlessly try to issue prefetches). We also trim the list as we go through 2698 * the stream to prevent it from growing to an unbounded size. 2699 * 2700 * The object numbers within will always be in sorted order, and any write 2701 * records we see will also be in sorted order, but they're not sorted with 2702 * respect to each other (i.e. we can get several object records before 2703 * receiving each object's write records). As a result, once we've reached a 2704 * given object number, we can safely remove any reference to lower object 2705 * numbers in the ignore list. In practice, we receive up to 32 object records 2706 * before receiving write records, so the list can have up to 32 nodes in it. 2707 */ 2708/* ARGSUSED */ 2709static void 2710receive_read_prefetch(struct receive_arg *ra, 2711 uint64_t object, uint64_t offset, uint64_t length) 2712{ 2713 if (!objlist_exists(&ra->ignore_objlist, object)) { 2714 dmu_prefetch(ra->os, object, 1, offset, length, 2715 ZIO_PRIORITY_SYNC_READ); 2716 } 2717} 2718 2719/* 2720 * Read records off the stream, issuing any necessary prefetches. 2721 */ 2722static int 2723receive_read_record(struct receive_arg *ra) 2724{ 2725 int err; 2726 2727 switch (ra->rrd->header.drr_type) { 2728 case DRR_OBJECT: 2729 { 2730 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object; 2731 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8); 2732 void *buf = kmem_zalloc(size, KM_SLEEP); 2733 dmu_object_info_t doi; 2734 err = receive_read_payload_and_next_header(ra, size, buf); 2735 if (err != 0) { 2736 kmem_free(buf, size); 2737 return (err); 2738 } 2739 err = dmu_object_info(ra->os, drro->drr_object, &doi); 2740 /* 2741 * See receive_read_prefetch for an explanation why we're 2742 * storing this object in the ignore_obj_list. 2743 */ 2744 if (err == ENOENT || 2745 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) { 2746 objlist_insert(&ra->ignore_objlist, drro->drr_object); 2747 err = 0; 2748 } 2749 return (err); 2750 } 2751 case DRR_FREEOBJECTS: 2752 { 2753 err = receive_read_payload_and_next_header(ra, 0, NULL); 2754 return (err); 2755 } 2756 case DRR_WRITE: 2757 { 2758 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write; 2759 arc_buf_t *abuf; 2760 boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type); 2761 if (DRR_WRITE_COMPRESSED(drrw)) { 2762 ASSERT3U(drrw->drr_compressed_size, >, 0); 2763 ASSERT3U(drrw->drr_logical_size, >=, 2764 drrw->drr_compressed_size); 2765 ASSERT(!is_meta); 2766 abuf = arc_loan_compressed_buf( 2767 dmu_objset_spa(ra->os), 2768 drrw->drr_compressed_size, drrw->drr_logical_size, 2769 drrw->drr_compressiontype); 2770 } else { 2771 abuf = arc_loan_buf(dmu_objset_spa(ra->os), 2772 is_meta, drrw->drr_logical_size); 2773 } 2774 2775 err = receive_read_payload_and_next_header(ra, 2776 DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data); 2777 if (err != 0) { 2778 dmu_return_arcbuf(abuf); 2779 return (err); 2780 } 2781 ra->rrd->write_buf = abuf; 2782 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset, 2783 drrw->drr_logical_size); 2784 return (err); 2785 } 2786 case DRR_WRITE_BYREF: 2787 { 2788 struct drr_write_byref *drrwb = 2789 &ra->rrd->header.drr_u.drr_write_byref; 2790 err = receive_read_payload_and_next_header(ra, 0, NULL); 2791 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset, 2792 drrwb->drr_length); 2793 return (err); 2794 } 2795 case DRR_WRITE_EMBEDDED: 2796 { 2797 struct drr_write_embedded *drrwe = 2798 &ra->rrd->header.drr_u.drr_write_embedded; 2799 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8); 2800 void *buf = kmem_zalloc(size, KM_SLEEP); 2801 2802 err = receive_read_payload_and_next_header(ra, size, buf); 2803 if (err != 0) { 2804 kmem_free(buf, size); 2805 return (err); 2806 } 2807 2808 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset, 2809 drrwe->drr_length); 2810 return (err); 2811 } 2812 case DRR_FREE: 2813 { 2814 /* 2815 * It might be beneficial to prefetch indirect blocks here, but 2816 * we don't really have the data to decide for sure. 2817 */ 2818 err = receive_read_payload_and_next_header(ra, 0, NULL); 2819 return (err); 2820 } 2821 case DRR_END: 2822 { 2823 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end; 2824 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum)) 2825 return (SET_ERROR(ECKSUM)); 2826 return (0); 2827 } 2828 case DRR_SPILL: 2829 { 2830 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill; 2831 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP); 2832 err = receive_read_payload_and_next_header(ra, drrs->drr_length, 2833 buf); 2834 if (err != 0) 2835 kmem_free(buf, drrs->drr_length); 2836 return (err); 2837 } 2838 default: 2839 return (SET_ERROR(EINVAL)); 2840 } 2841} 2842 2843/* 2844 * Commit the records to the pool. 2845 */ 2846static int 2847receive_process_record(struct receive_writer_arg *rwa, 2848 struct receive_record_arg *rrd) 2849{ 2850 int err; 2851 2852 /* Processing in order, therefore bytes_read should be increasing. */ 2853 ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); 2854 rwa->bytes_read = rrd->bytes_read; 2855 2856 switch (rrd->header.drr_type) { 2857 case DRR_OBJECT: 2858 { 2859 struct drr_object *drro = &rrd->header.drr_u.drr_object; 2860 err = receive_object(rwa, drro, rrd->payload); 2861 kmem_free(rrd->payload, rrd->payload_size); 2862 rrd->payload = NULL; 2863 return (err); 2864 } 2865 case DRR_FREEOBJECTS: 2866 { 2867 struct drr_freeobjects *drrfo = 2868 &rrd->header.drr_u.drr_freeobjects; 2869 return (receive_freeobjects(rwa, drrfo)); 2870 } 2871 case DRR_WRITE: 2872 { 2873 struct drr_write *drrw = &rrd->header.drr_u.drr_write; 2874 err = receive_write(rwa, drrw, rrd->write_buf); 2875 /* if receive_write() is successful, it consumes the arc_buf */ 2876 if (err != 0) 2877 dmu_return_arcbuf(rrd->write_buf); 2878 rrd->write_buf = NULL; 2879 rrd->payload = NULL; 2880 return (err); 2881 } 2882 case DRR_WRITE_BYREF: 2883 { 2884 struct drr_write_byref *drrwbr = 2885 &rrd->header.drr_u.drr_write_byref; 2886 return (receive_write_byref(rwa, drrwbr)); 2887 } 2888 case DRR_WRITE_EMBEDDED: 2889 { 2890 struct drr_write_embedded *drrwe = 2891 &rrd->header.drr_u.drr_write_embedded; 2892 err = receive_write_embedded(rwa, drrwe, rrd->payload); 2893 kmem_free(rrd->payload, rrd->payload_size); 2894 rrd->payload = NULL; 2895 return (err); 2896 } 2897 case DRR_FREE: 2898 { 2899 struct drr_free *drrf = &rrd->header.drr_u.drr_free; 2900 return (receive_free(rwa, drrf)); 2901 } 2902 case DRR_SPILL: 2903 { 2904 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill; 2905 err = receive_spill(rwa, drrs, rrd->payload); 2906 kmem_free(rrd->payload, rrd->payload_size); 2907 rrd->payload = NULL; 2908 return (err); 2909 } 2910 default: 2911 return (SET_ERROR(EINVAL)); 2912 } 2913} 2914 2915/* 2916 * dmu_recv_stream's worker thread; pull records off the queue, and then call 2917 * receive_process_record When we're done, signal the main thread and exit. 2918 */ 2919static void 2920receive_writer_thread(void *arg) 2921{ 2922 struct receive_writer_arg *rwa = arg; 2923 struct receive_record_arg *rrd; 2924 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker; 2925 rrd = bqueue_dequeue(&rwa->q)) { 2926 /* 2927 * If there's an error, the main thread will stop putting things 2928 * on the queue, but we need to clear everything in it before we 2929 * can exit. 2930 */ 2931 if (rwa->err == 0) { 2932 rwa->err = receive_process_record(rwa, rrd); 2933 } else if (rrd->write_buf != NULL) { 2934 dmu_return_arcbuf(rrd->write_buf); 2935 rrd->write_buf = NULL; 2936 rrd->payload = NULL; 2937 } else if (rrd->payload != NULL) { 2938 kmem_free(rrd->payload, rrd->payload_size); 2939 rrd->payload = NULL; 2940 } 2941 kmem_free(rrd, sizeof (*rrd)); 2942 } 2943 kmem_free(rrd, sizeof (*rrd)); 2944 mutex_enter(&rwa->mutex); 2945 rwa->done = B_TRUE; 2946 cv_signal(&rwa->cv); 2947 mutex_exit(&rwa->mutex); 2948 thread_exit(); 2949} 2950 2951static int 2952resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) 2953{ 2954 uint64_t val; 2955 objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset; 2956 uint64_t dsobj = dmu_objset_id(ra->os); 2957 uint64_t resume_obj, resume_off; 2958 2959 if (nvlist_lookup_uint64(begin_nvl, 2960 "resume_object", &resume_obj) != 0 || 2961 nvlist_lookup_uint64(begin_nvl, 2962 "resume_offset", &resume_off) != 0) { 2963 return (SET_ERROR(EINVAL)); 2964 } 2965 VERIFY0(zap_lookup(mos, dsobj, 2966 DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val)); 2967 if (resume_obj != val) 2968 return (SET_ERROR(EINVAL)); 2969 VERIFY0(zap_lookup(mos, dsobj, 2970 DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val)); 2971 if (resume_off != val) 2972 return (SET_ERROR(EINVAL)); 2973 2974 return (0); 2975} 2976 2977/* 2978 * Read in the stream's records, one by one, and apply them to the pool. There 2979 * are two threads involved; the thread that calls this function will spin up a 2980 * worker thread, read the records off the stream one by one, and issue 2981 * prefetches for any necessary indirect blocks. It will then push the records 2982 * onto an internal blocking queue. The worker thread will pull the records off 2983 * the queue, and actually write the data into the DMU. This way, the worker 2984 * thread doesn't have to wait for reads to complete, since everything it needs 2985 * (the indirect blocks) will be prefetched. 2986 * 2987 * NB: callers *must* call dmu_recv_end() if this succeeds. 2988 */ 2989int 2990dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp, 2991 int cleanup_fd, uint64_t *action_handlep) 2992{ 2993 int err = 0; 2994 struct receive_arg ra = { 0 }; 2995 struct receive_writer_arg rwa = { 0 }; 2996 int featureflags; 2997 nvlist_t *begin_nvl = NULL; 2998 2999 ra.byteswap = drc->drc_byteswap; 3000 ra.cksum = drc->drc_cksum; 3001 ra.td = curthread; 3002 ra.fp = fp; 3003 ra.voff = *voffp; 3004 3005 if (dsl_dataset_is_zapified(drc->drc_ds)) { 3006 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset, 3007 drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES, 3008 sizeof (ra.bytes_read), 1, &ra.bytes_read); 3009 } 3010 3011 objlist_create(&ra.ignore_objlist); 3012 3013 /* these were verified in dmu_recv_begin */ 3014 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==, 3015 DMU_SUBSTREAM); 3016 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES); 3017 3018 /* 3019 * Open the objset we are modifying. 3020 */ 3021 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os)); 3022 3023 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT); 3024 3025 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); 3026 3027 /* if this stream is dedup'ed, set up the avl tree for guid mapping */ 3028 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { 3029 minor_t minor; 3030 3031 if (cleanup_fd == -1) { 3032 ra.err = SET_ERROR(EBADF); 3033 goto out; 3034 } 3035 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor); 3036 if (ra.err != 0) { 3037 cleanup_fd = -1; 3038 goto out; 3039 } 3040 3041 if (*action_handlep == 0) { 3042 rwa.guid_to_ds_map = 3043 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP); 3044 avl_create(rwa.guid_to_ds_map, guid_compare, 3045 sizeof (guid_map_entry_t), 3046 offsetof(guid_map_entry_t, avlnode)); 3047 err = zfs_onexit_add_cb(minor, 3048 free_guid_map_onexit, rwa.guid_to_ds_map, 3049 action_handlep); 3050 if (ra.err != 0) 3051 goto out; 3052 } else { 3053 err = zfs_onexit_cb_data(minor, *action_handlep, 3054 (void **)&rwa.guid_to_ds_map); 3055 if (ra.err != 0) 3056 goto out; 3057 } 3058 3059 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map; 3060 } 3061 3062 uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; 3063 void *payload = NULL; 3064 if (payloadlen != 0) 3065 payload = kmem_alloc(payloadlen, KM_SLEEP); 3066 3067 err = receive_read_payload_and_next_header(&ra, payloadlen, payload); 3068 if (err != 0) { 3069 if (payloadlen != 0) 3070 kmem_free(payload, payloadlen); 3071 goto out; 3072 } 3073 if (payloadlen != 0) { 3074 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP); 3075 kmem_free(payload, payloadlen); 3076 if (err != 0) 3077 goto out; 3078 } 3079 3080 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) { 3081 err = resume_check(&ra, begin_nvl); 3082 if (err != 0) 3083 goto out; 3084 } 3085 3086 (void) bqueue_init(&rwa.q, zfs_recv_queue_length, 3087 offsetof(struct receive_record_arg, node)); 3088 cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL); 3089 mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL); 3090 rwa.os = ra.os; 3091 rwa.byteswap = drc->drc_byteswap; 3092 rwa.resumable = drc->drc_resumable; 3093 3094 (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0, 3095 TS_RUN, minclsyspri); 3096 /* 3097 * We're reading rwa.err without locks, which is safe since we are the 3098 * only reader, and the worker thread is the only writer. It's ok if we 3099 * miss a write for an iteration or two of the loop, since the writer 3100 * thread will keep freeing records we send it until we send it an eos 3101 * marker. 3102 * 3103 * We can leave this loop in 3 ways: First, if rwa.err is 3104 * non-zero. In that case, the writer thread will free the rrd we just 3105 * pushed. Second, if we're interrupted; in that case, either it's the 3106 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd 3107 * has been handed off to the writer thread who will free it. Finally, 3108 * if receive_read_record fails or we're at the end of the stream, then 3109 * we free ra.rrd and exit. 3110 */ 3111 while (rwa.err == 0) { 3112 if (issig(JUSTLOOKING) && issig(FORREAL)) { 3113 err = SET_ERROR(EINTR); 3114 break; 3115 } 3116 3117 ASSERT3P(ra.rrd, ==, NULL); 3118 ra.rrd = ra.next_rrd; 3119 ra.next_rrd = NULL; 3120 /* Allocates and loads header into ra.next_rrd */ 3121 err = receive_read_record(&ra); 3122 3123 if (ra.rrd->header.drr_type == DRR_END || err != 0) { 3124 kmem_free(ra.rrd, sizeof (*ra.rrd)); 3125 ra.rrd = NULL; 3126 break; 3127 } 3128 3129 bqueue_enqueue(&rwa.q, ra.rrd, 3130 sizeof (struct receive_record_arg) + ra.rrd->payload_size); 3131 ra.rrd = NULL; 3132 } 3133 if (ra.next_rrd == NULL) 3134 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP); 3135 ra.next_rrd->eos_marker = B_TRUE; 3136 bqueue_enqueue(&rwa.q, ra.next_rrd, 1); 3137 3138 mutex_enter(&rwa.mutex); 3139 while (!rwa.done) { 3140 cv_wait(&rwa.cv, &rwa.mutex); 3141 } 3142 mutex_exit(&rwa.mutex); 3143 3144 /* 3145 * If we are receiving a full stream as a clone, all object IDs which 3146 * are greater than the maximum ID referenced in the stream are 3147 * by definition unused and must be freed. Note that it's possible that 3148 * we've resumed this send and the first record we received was the END 3149 * record. In that case, max_object would be 0, but we shouldn't start 3150 * freeing all objects from there; instead we should start from the 3151 * resumeobj. 3152 */ 3153 if (drc->drc_clone && drc->drc_drrb->drr_fromguid == 0) { 3154 uint64_t obj; 3155 if (nvlist_lookup_uint64(begin_nvl, "resume_object", &obj) != 0) 3156 obj = 0; 3157 if (rwa.max_object > obj) 3158 obj = rwa.max_object; 3159 obj++; 3160 int free_err = 0; 3161 int next_err = 0; 3162 3163 while (next_err == 0) { 3164 free_err = dmu_free_long_object(rwa.os, obj); 3165 if (free_err != 0 && free_err != ENOENT) 3166 break; 3167 3168 next_err = dmu_object_next(rwa.os, &obj, FALSE, 0); 3169 } 3170 3171 if (err == 0) { 3172 if (free_err != 0 && free_err != ENOENT) 3173 err = free_err; 3174 else if (next_err != ESRCH) 3175 err = next_err; 3176 } 3177 } 3178 3179 cv_destroy(&rwa.cv); 3180 mutex_destroy(&rwa.mutex); 3181 bqueue_destroy(&rwa.q); 3182 if (err == 0) 3183 err = rwa.err; 3184 3185out: 3186 nvlist_free(begin_nvl); 3187 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1)) 3188 zfs_onexit_fd_rele(cleanup_fd); 3189 3190 if (err != 0) { 3191 /* 3192 * Clean up references. If receive is not resumable, 3193 * destroy what we created, so we don't leave it in 3194 * the inconsistent state. 3195 */ 3196 dmu_recv_cleanup_ds(drc); 3197 } 3198 3199 *voffp = ra.voff; 3200 objlist_destroy(&ra.ignore_objlist); 3201 return (err); 3202} 3203 3204static int 3205dmu_recv_end_check(void *arg, dmu_tx_t *tx) 3206{ 3207 dmu_recv_cookie_t *drc = arg; 3208 dsl_pool_t *dp = dmu_tx_pool(tx); 3209 int error; 3210 3211 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag); 3212 3213 if (!drc->drc_newfs) { 3214 dsl_dataset_t *origin_head; 3215 3216 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head); 3217 if (error != 0) 3218 return (error); 3219 if (drc->drc_force) { 3220 /* 3221 * We will destroy any snapshots in tofs (i.e. before 3222 * origin_head) that are after the origin (which is 3223 * the snap before drc_ds, because drc_ds can not 3224 * have any snaps of its own). 3225 */ 3226 uint64_t obj; 3227 3228 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3229 while (obj != 3230 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3231 dsl_dataset_t *snap; 3232 error = dsl_dataset_hold_obj(dp, obj, FTAG, 3233 &snap); 3234 if (error != 0) 3235 break; 3236 if (snap->ds_dir != origin_head->ds_dir) 3237 error = SET_ERROR(EINVAL); 3238 if (error == 0) { 3239 error = dsl_destroy_snapshot_check_impl( 3240 snap, B_FALSE); 3241 } 3242 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3243 dsl_dataset_rele(snap, FTAG); 3244 if (error != 0) 3245 break; 3246 } 3247 if (error != 0) { 3248 dsl_dataset_rele(origin_head, FTAG); 3249 return (error); 3250 } 3251 } 3252 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds, 3253 origin_head, drc->drc_force, drc->drc_owner, tx); 3254 if (error != 0) { 3255 dsl_dataset_rele(origin_head, FTAG); 3256 return (error); 3257 } 3258 error = dsl_dataset_snapshot_check_impl(origin_head, 3259 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3260 dsl_dataset_rele(origin_head, FTAG); 3261 if (error != 0) 3262 return (error); 3263 3264 error = dsl_destroy_head_check_impl(drc->drc_ds, 1); 3265 } else { 3266 error = dsl_dataset_snapshot_check_impl(drc->drc_ds, 3267 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3268 } 3269 return (error); 3270} 3271 3272static void 3273dmu_recv_end_sync(void *arg, dmu_tx_t *tx) 3274{ 3275 dmu_recv_cookie_t *drc = arg; 3276 dsl_pool_t *dp = dmu_tx_pool(tx); 3277 3278 spa_history_log_internal_ds(drc->drc_ds, "finish receiving", 3279 tx, "snap=%s", drc->drc_tosnap); 3280 3281 if (!drc->drc_newfs) { 3282 dsl_dataset_t *origin_head; 3283 3284 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG, 3285 &origin_head)); 3286 3287 if (drc->drc_force) { 3288 /* 3289 * Destroy any snapshots of drc_tofs (origin_head) 3290 * after the origin (the snap before drc_ds). 3291 */ 3292 uint64_t obj; 3293 3294 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3295 while (obj != 3296 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3297 dsl_dataset_t *snap; 3298 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, 3299 &snap)); 3300 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir); 3301 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3302 dsl_destroy_snapshot_sync_impl(snap, 3303 B_FALSE, tx); 3304 dsl_dataset_rele(snap, FTAG); 3305 } 3306 } 3307 VERIFY3P(drc->drc_ds->ds_prev, ==, 3308 origin_head->ds_prev); 3309 3310 dsl_dataset_clone_swap_sync_impl(drc->drc_ds, 3311 origin_head, tx); 3312 dsl_dataset_snapshot_sync_impl(origin_head, 3313 drc->drc_tosnap, tx); 3314 3315 /* set snapshot's creation time and guid */ 3316 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx); 3317 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time = 3318 drc->drc_drrb->drr_creation_time; 3319 dsl_dataset_phys(origin_head->ds_prev)->ds_guid = 3320 drc->drc_drrb->drr_toguid; 3321 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &= 3322 ~DS_FLAG_INCONSISTENT; 3323 3324 dmu_buf_will_dirty(origin_head->ds_dbuf, tx); 3325 dsl_dataset_phys(origin_head)->ds_flags &= 3326 ~DS_FLAG_INCONSISTENT; 3327 3328 drc->drc_newsnapobj = 3329 dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3330 3331 dsl_dataset_rele(origin_head, FTAG); 3332 dsl_destroy_head_sync_impl(drc->drc_ds, tx); 3333 3334 if (drc->drc_owner != NULL) 3335 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner); 3336 } else { 3337 dsl_dataset_t *ds = drc->drc_ds; 3338 3339 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx); 3340 3341 /* set snapshot's creation time and guid */ 3342 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx); 3343 dsl_dataset_phys(ds->ds_prev)->ds_creation_time = 3344 drc->drc_drrb->drr_creation_time; 3345 dsl_dataset_phys(ds->ds_prev)->ds_guid = 3346 drc->drc_drrb->drr_toguid; 3347 dsl_dataset_phys(ds->ds_prev)->ds_flags &= 3348 ~DS_FLAG_INCONSISTENT; 3349 3350 dmu_buf_will_dirty(ds->ds_dbuf, tx); 3351 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 3352 if (dsl_dataset_has_resume_receive_state(ds)) { 3353 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3354 DS_FIELD_RESUME_FROMGUID, tx); 3355 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3356 DS_FIELD_RESUME_OBJECT, tx); 3357 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3358 DS_FIELD_RESUME_OFFSET, tx); 3359 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3360 DS_FIELD_RESUME_BYTES, tx); 3361 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3362 DS_FIELD_RESUME_TOGUID, tx); 3363 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3364 DS_FIELD_RESUME_TONAME, tx); 3365 } 3366 drc->drc_newsnapobj = 3367 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj; 3368 } 3369 /* 3370 * Release the hold from dmu_recv_begin. This must be done before 3371 * we return to open context, so that when we free the dataset's dnode, 3372 * we can evict its bonus buffer. 3373 */ 3374 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 3375 drc->drc_ds = NULL; 3376} 3377 3378static int 3379add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj) 3380{ 3381 dsl_pool_t *dp; 3382 dsl_dataset_t *snapds; 3383 guid_map_entry_t *gmep; 3384 int err; 3385 3386 ASSERT(guid_map != NULL); 3387 3388 err = dsl_pool_hold(name, FTAG, &dp); 3389 if (err != 0) 3390 return (err); 3391 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP); 3392 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds); 3393 if (err == 0) { 3394 gmep->guid = dsl_dataset_phys(snapds)->ds_guid; 3395 gmep->gme_ds = snapds; 3396 avl_add(guid_map, gmep); 3397 dsl_dataset_long_hold(snapds, gmep); 3398 } else 3399 kmem_free(gmep, sizeof (*gmep)); 3400 3401 dsl_pool_rele(dp, FTAG); 3402 return (err); 3403} 3404 3405static int dmu_recv_end_modified_blocks = 3; 3406 3407static int 3408dmu_recv_existing_end(dmu_recv_cookie_t *drc) 3409{ 3410#ifdef _KERNEL 3411 /* 3412 * We will be destroying the ds; make sure its origin is unmounted if 3413 * necessary. 3414 */ 3415 char name[ZFS_MAX_DATASET_NAME_LEN]; 3416 dsl_dataset_name(drc->drc_ds, name); 3417 zfs_destroy_unmount_origin(name); 3418#endif 3419 3420 return (dsl_sync_task(drc->drc_tofs, 3421 dmu_recv_end_check, dmu_recv_end_sync, drc, 3422 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL)); 3423} 3424 3425static int 3426dmu_recv_new_end(dmu_recv_cookie_t *drc) 3427{ 3428 return (dsl_sync_task(drc->drc_tofs, 3429 dmu_recv_end_check, dmu_recv_end_sync, drc, 3430 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL)); 3431} 3432 3433int 3434dmu_recv_end(dmu_recv_cookie_t *drc, void *owner) 3435{ 3436 int error; 3437 3438 drc->drc_owner = owner; 3439 3440 if (drc->drc_newfs) 3441 error = dmu_recv_new_end(drc); 3442 else 3443 error = dmu_recv_existing_end(drc); 3444 3445 if (error != 0) { 3446 dmu_recv_cleanup_ds(drc); 3447 } else if (drc->drc_guid_to_ds_map != NULL) { 3448 (void) add_ds_to_guidmap(drc->drc_tofs, 3449 drc->drc_guid_to_ds_map, 3450 drc->drc_newsnapobj); 3451 } 3452 return (error); 3453} 3454 3455/* 3456 * Return TRUE if this objset is currently being received into. 3457 */ 3458boolean_t 3459dmu_objset_is_receiving(objset_t *os) 3460{ 3461 return (os->os_dsl_dataset != NULL && 3462 os->os_dsl_dataset->ds_owner == dmu_recv_tag); 3463} 3464