dmu_send.c revision 297111
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved. 25 * Copyright (c) 2014, Joyent, Inc. All rights reserved. 26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved. 27 * Copyright 2014 HybridCluster. All rights reserved. 28 * Copyright 2016 RackTop Systems. 29 */ 30 31#include <sys/dmu.h> 32#include <sys/dmu_impl.h> 33#include <sys/dmu_tx.h> 34#include <sys/dbuf.h> 35#include <sys/dnode.h> 36#include <sys/zfs_context.h> 37#include <sys/dmu_objset.h> 38#include <sys/dmu_traverse.h> 39#include <sys/dsl_dataset.h> 40#include <sys/dsl_dir.h> 41#include <sys/dsl_prop.h> 42#include <sys/dsl_pool.h> 43#include <sys/dsl_synctask.h> 44#include <sys/zfs_ioctl.h> 45#include <sys/zap.h> 46#include <sys/zio_checksum.h> 47#include <sys/zfs_znode.h> 48#include <zfs_fletcher.h> 49#include <sys/avl.h> 50#include <sys/ddt.h> 51#include <sys/zfs_onexit.h> 52#include <sys/dmu_send.h> 53#include <sys/dsl_destroy.h> 54#include <sys/blkptr.h> 55#include <sys/dsl_bookmark.h> 56#include <sys/zfeature.h> 57#include <sys/bqueue.h> 58 59#ifdef __FreeBSD__ 60#undef dump_write 61#define dump_write dmu_dump_write 62#endif 63 64/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ 65int zfs_send_corrupt_data = B_FALSE; 66int zfs_send_queue_length = 16 * 1024 * 1024; 67int zfs_recv_queue_length = 16 * 1024 * 1024; 68/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */ 69int zfs_send_set_freerecords_bit = B_TRUE; 70 71#ifdef _KERNEL 72TUNABLE_INT("vfs.zfs.send_set_freerecords_bit", &zfs_send_set_freerecords_bit); 73#endif 74 75static char *dmu_recv_tag = "dmu_recv_tag"; 76const char *recv_clone_name = "%recv"; 77 78#define BP_SPAN(datablkszsec, indblkshift, level) \ 79 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \ 80 (level) * (indblkshift - SPA_BLKPTRSHIFT))) 81 82static void byteswap_record(dmu_replay_record_t *drr); 83 84struct send_thread_arg { 85 bqueue_t q; 86 dsl_dataset_t *ds; /* Dataset to traverse */ 87 uint64_t fromtxg; /* Traverse from this txg */ 88 int flags; /* flags to pass to traverse_dataset */ 89 int error_code; 90 boolean_t cancel; 91 zbookmark_phys_t resume; 92}; 93 94struct send_block_record { 95 boolean_t eos_marker; /* Marks the end of the stream */ 96 blkptr_t bp; 97 zbookmark_phys_t zb; 98 uint8_t indblkshift; 99 uint16_t datablkszsec; 100 bqueue_node_t ln; 101}; 102 103static int 104dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) 105{ 106 dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os); 107 struct uio auio; 108 struct iovec aiov; 109 ASSERT0(len % 8); 110 111 aiov.iov_base = buf; 112 aiov.iov_len = len; 113 auio.uio_iov = &aiov; 114 auio.uio_iovcnt = 1; 115 auio.uio_resid = len; 116 auio.uio_segflg = UIO_SYSSPACE; 117 auio.uio_rw = UIO_WRITE; 118 auio.uio_offset = (off_t)-1; 119 auio.uio_td = dsp->dsa_td; 120#ifdef _KERNEL 121 if (dsp->dsa_fp->f_type == DTYPE_VNODE) 122 bwillwrite(); 123 dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0, 124 dsp->dsa_td); 125#else 126 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 127 dsp->dsa_err = EOPNOTSUPP; 128#endif 129 mutex_enter(&ds->ds_sendstream_lock); 130 *dsp->dsa_off += len; 131 mutex_exit(&ds->ds_sendstream_lock); 132 133 return (dsp->dsa_err); 134} 135 136/* 137 * For all record types except BEGIN, fill in the checksum (overlaid in 138 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything 139 * up to the start of the checksum itself. 140 */ 141static int 142dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len) 143{ 144 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 145 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 146 fletcher_4_incremental_native(dsp->dsa_drr, 147 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 148 &dsp->dsa_zc); 149 if (dsp->dsa_drr->drr_type != DRR_BEGIN) { 150 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u. 151 drr_checksum.drr_checksum)); 152 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc; 153 } 154 fletcher_4_incremental_native(&dsp->dsa_drr-> 155 drr_u.drr_checksum.drr_checksum, 156 sizeof (zio_cksum_t), &dsp->dsa_zc); 157 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0) 158 return (SET_ERROR(EINTR)); 159 if (payload_len != 0) { 160 fletcher_4_incremental_native(payload, payload_len, 161 &dsp->dsa_zc); 162 if (dump_bytes(dsp, payload, payload_len) != 0) 163 return (SET_ERROR(EINTR)); 164 } 165 return (0); 166} 167 168/* 169 * Fill in the drr_free struct, or perform aggregation if the previous record is 170 * also a free record, and the two are adjacent. 171 * 172 * Note that we send free records even for a full send, because we want to be 173 * able to receive a full send as a clone, which requires a list of all the free 174 * and freeobject records that were generated on the source. 175 */ 176static int 177dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 178 uint64_t length) 179{ 180 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free); 181 182 /* 183 * When we receive a free record, dbuf_free_range() assumes 184 * that the receiving system doesn't have any dbufs in the range 185 * being freed. This is always true because there is a one-record 186 * constraint: we only send one WRITE record for any given 187 * object,offset. We know that the one-record constraint is 188 * true because we always send data in increasing order by 189 * object,offset. 190 * 191 * If the increasing-order constraint ever changes, we should find 192 * another way to assert that the one-record constraint is still 193 * satisfied. 194 */ 195 ASSERT(object > dsp->dsa_last_data_object || 196 (object == dsp->dsa_last_data_object && 197 offset > dsp->dsa_last_data_offset)); 198 199 if (length != -1ULL && offset + length < offset) 200 length = -1ULL; 201 202 /* 203 * If there is a pending op, but it's not PENDING_FREE, push it out, 204 * since free block aggregation can only be done for blocks of the 205 * same type (i.e., DRR_FREE records can only be aggregated with 206 * other DRR_FREE records. DRR_FREEOBJECTS records can only be 207 * aggregated with other DRR_FREEOBJECTS records. 208 */ 209 if (dsp->dsa_pending_op != PENDING_NONE && 210 dsp->dsa_pending_op != PENDING_FREE) { 211 if (dump_record(dsp, NULL, 0) != 0) 212 return (SET_ERROR(EINTR)); 213 dsp->dsa_pending_op = PENDING_NONE; 214 } 215 216 if (dsp->dsa_pending_op == PENDING_FREE) { 217 /* 218 * There should never be a PENDING_FREE if length is -1 219 * (because dump_dnode is the only place where this 220 * function is called with a -1, and only after flushing 221 * any pending record). 222 */ 223 ASSERT(length != -1ULL); 224 /* 225 * Check to see whether this free block can be aggregated 226 * with pending one. 227 */ 228 if (drrf->drr_object == object && drrf->drr_offset + 229 drrf->drr_length == offset) { 230 drrf->drr_length += length; 231 return (0); 232 } else { 233 /* not a continuation. Push out pending record */ 234 if (dump_record(dsp, NULL, 0) != 0) 235 return (SET_ERROR(EINTR)); 236 dsp->dsa_pending_op = PENDING_NONE; 237 } 238 } 239 /* create a FREE record and make it pending */ 240 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 241 dsp->dsa_drr->drr_type = DRR_FREE; 242 drrf->drr_object = object; 243 drrf->drr_offset = offset; 244 drrf->drr_length = length; 245 drrf->drr_toguid = dsp->dsa_toguid; 246 if (length == -1ULL) { 247 if (dump_record(dsp, NULL, 0) != 0) 248 return (SET_ERROR(EINTR)); 249 } else { 250 dsp->dsa_pending_op = PENDING_FREE; 251 } 252 253 return (0); 254} 255 256static int 257dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, 258 uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data) 259{ 260 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write); 261 262 /* 263 * We send data in increasing object, offset order. 264 * See comment in dump_free() for details. 265 */ 266 ASSERT(object > dsp->dsa_last_data_object || 267 (object == dsp->dsa_last_data_object && 268 offset > dsp->dsa_last_data_offset)); 269 dsp->dsa_last_data_object = object; 270 dsp->dsa_last_data_offset = offset + blksz - 1; 271 272 /* 273 * If there is any kind of pending aggregation (currently either 274 * a grouping of free objects or free blocks), push it out to 275 * the stream, since aggregation can't be done across operations 276 * of different types. 277 */ 278 if (dsp->dsa_pending_op != PENDING_NONE) { 279 if (dump_record(dsp, NULL, 0) != 0) 280 return (SET_ERROR(EINTR)); 281 dsp->dsa_pending_op = PENDING_NONE; 282 } 283 /* write a WRITE record */ 284 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 285 dsp->dsa_drr->drr_type = DRR_WRITE; 286 drrw->drr_object = object; 287 drrw->drr_type = type; 288 drrw->drr_offset = offset; 289 drrw->drr_length = blksz; 290 drrw->drr_toguid = dsp->dsa_toguid; 291 if (bp == NULL || BP_IS_EMBEDDED(bp)) { 292 /* 293 * There's no pre-computed checksum for partial-block 294 * writes or embedded BP's, so (like 295 * fletcher4-checkummed blocks) userland will have to 296 * compute a dedup-capable checksum itself. 297 */ 298 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF; 299 } else { 300 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp); 301 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags & 302 ZCHECKSUM_FLAG_DEDUP) 303 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP; 304 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp)); 305 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp)); 306 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp)); 307 drrw->drr_key.ddk_cksum = bp->blk_cksum; 308 } 309 310 if (dump_record(dsp, data, blksz) != 0) 311 return (SET_ERROR(EINTR)); 312 return (0); 313} 314 315static int 316dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 317 int blksz, const blkptr_t *bp) 318{ 319 char buf[BPE_PAYLOAD_SIZE]; 320 struct drr_write_embedded *drrw = 321 &(dsp->dsa_drr->drr_u.drr_write_embedded); 322 323 if (dsp->dsa_pending_op != PENDING_NONE) { 324 if (dump_record(dsp, NULL, 0) != 0) 325 return (EINTR); 326 dsp->dsa_pending_op = PENDING_NONE; 327 } 328 329 ASSERT(BP_IS_EMBEDDED(bp)); 330 331 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 332 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED; 333 drrw->drr_object = object; 334 drrw->drr_offset = offset; 335 drrw->drr_length = blksz; 336 drrw->drr_toguid = dsp->dsa_toguid; 337 drrw->drr_compression = BP_GET_COMPRESS(bp); 338 drrw->drr_etype = BPE_GET_ETYPE(bp); 339 drrw->drr_lsize = BPE_GET_LSIZE(bp); 340 drrw->drr_psize = BPE_GET_PSIZE(bp); 341 342 decode_embedded_bp_compressed(bp, buf); 343 344 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0) 345 return (EINTR); 346 return (0); 347} 348 349static int 350dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data) 351{ 352 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill); 353 354 if (dsp->dsa_pending_op != PENDING_NONE) { 355 if (dump_record(dsp, NULL, 0) != 0) 356 return (SET_ERROR(EINTR)); 357 dsp->dsa_pending_op = PENDING_NONE; 358 } 359 360 /* write a SPILL record */ 361 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 362 dsp->dsa_drr->drr_type = DRR_SPILL; 363 drrs->drr_object = object; 364 drrs->drr_length = blksz; 365 drrs->drr_toguid = dsp->dsa_toguid; 366 367 if (dump_record(dsp, data, blksz) != 0) 368 return (SET_ERROR(EINTR)); 369 return (0); 370} 371 372static int 373dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs) 374{ 375 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects); 376 377 /* 378 * If there is a pending op, but it's not PENDING_FREEOBJECTS, 379 * push it out, since free block aggregation can only be done for 380 * blocks of the same type (i.e., DRR_FREE records can only be 381 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records 382 * can only be aggregated with other DRR_FREEOBJECTS records. 383 */ 384 if (dsp->dsa_pending_op != PENDING_NONE && 385 dsp->dsa_pending_op != PENDING_FREEOBJECTS) { 386 if (dump_record(dsp, NULL, 0) != 0) 387 return (SET_ERROR(EINTR)); 388 dsp->dsa_pending_op = PENDING_NONE; 389 } 390 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) { 391 /* 392 * See whether this free object array can be aggregated 393 * with pending one 394 */ 395 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { 396 drrfo->drr_numobjs += numobjs; 397 return (0); 398 } else { 399 /* can't be aggregated. Push out pending record */ 400 if (dump_record(dsp, NULL, 0) != 0) 401 return (SET_ERROR(EINTR)); 402 dsp->dsa_pending_op = PENDING_NONE; 403 } 404 } 405 406 /* write a FREEOBJECTS record */ 407 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 408 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS; 409 drrfo->drr_firstobj = firstobj; 410 drrfo->drr_numobjs = numobjs; 411 drrfo->drr_toguid = dsp->dsa_toguid; 412 413 dsp->dsa_pending_op = PENDING_FREEOBJECTS; 414 415 return (0); 416} 417 418static int 419dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp) 420{ 421 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object); 422 423 if (object < dsp->dsa_resume_object) { 424 /* 425 * Note: when resuming, we will visit all the dnodes in 426 * the block of dnodes that we are resuming from. In 427 * this case it's unnecessary to send the dnodes prior to 428 * the one we are resuming from. We should be at most one 429 * block's worth of dnodes behind the resume point. 430 */ 431 ASSERT3U(dsp->dsa_resume_object - object, <, 432 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)); 433 return (0); 434 } 435 436 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) 437 return (dump_freeobjects(dsp, object, 1)); 438 439 if (dsp->dsa_pending_op != PENDING_NONE) { 440 if (dump_record(dsp, NULL, 0) != 0) 441 return (SET_ERROR(EINTR)); 442 dsp->dsa_pending_op = PENDING_NONE; 443 } 444 445 /* write an OBJECT record */ 446 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 447 dsp->dsa_drr->drr_type = DRR_OBJECT; 448 drro->drr_object = object; 449 drro->drr_type = dnp->dn_type; 450 drro->drr_bonustype = dnp->dn_bonustype; 451 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 452 drro->drr_bonuslen = dnp->dn_bonuslen; 453 drro->drr_checksumtype = dnp->dn_checksum; 454 drro->drr_compress = dnp->dn_compress; 455 drro->drr_toguid = dsp->dsa_toguid; 456 457 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 458 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE) 459 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE; 460 461 if (dump_record(dsp, DN_BONUS(dnp), 462 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) { 463 return (SET_ERROR(EINTR)); 464 } 465 466 /* Free anything past the end of the file. */ 467 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) * 468 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0) 469 return (SET_ERROR(EINTR)); 470 if (dsp->dsa_err != 0) 471 return (SET_ERROR(EINTR)); 472 return (0); 473} 474 475static boolean_t 476backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp) 477{ 478 if (!BP_IS_EMBEDDED(bp)) 479 return (B_FALSE); 480 481 /* 482 * Compression function must be legacy, or explicitly enabled. 483 */ 484 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && 485 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4))) 486 return (B_FALSE); 487 488 /* 489 * Embed type must be explicitly enabled. 490 */ 491 switch (BPE_GET_ETYPE(bp)) { 492 case BP_EMBEDDED_TYPE_DATA: 493 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) 494 return (B_TRUE); 495 break; 496 default: 497 return (B_FALSE); 498 } 499 return (B_FALSE); 500} 501 502/* 503 * This is the callback function to traverse_dataset that acts as the worker 504 * thread for dmu_send_impl. 505 */ 506/*ARGSUSED*/ 507static int 508send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 509 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg) 510{ 511 struct send_thread_arg *sta = arg; 512 struct send_block_record *record; 513 uint64_t record_size; 514 int err = 0; 515 516 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 517 zb->zb_object >= sta->resume.zb_object); 518 519 if (sta->cancel) 520 return (SET_ERROR(EINTR)); 521 522 if (bp == NULL) { 523 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL); 524 return (0); 525 } else if (zb->zb_level < 0) { 526 return (0); 527 } 528 529 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP); 530 record->eos_marker = B_FALSE; 531 record->bp = *bp; 532 record->zb = *zb; 533 record->indblkshift = dnp->dn_indblkshift; 534 record->datablkszsec = dnp->dn_datablkszsec; 535 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 536 bqueue_enqueue(&sta->q, record, record_size); 537 538 return (err); 539} 540 541/* 542 * This function kicks off the traverse_dataset. It also handles setting the 543 * error code of the thread in case something goes wrong, and pushes the End of 544 * Stream record when the traverse_dataset call has finished. If there is no 545 * dataset to traverse, the thread immediately pushes End of Stream marker. 546 */ 547static void 548send_traverse_thread(void *arg) 549{ 550 struct send_thread_arg *st_arg = arg; 551 int err; 552 struct send_block_record *data; 553 554 if (st_arg->ds != NULL) { 555 err = traverse_dataset_resume(st_arg->ds, 556 st_arg->fromtxg, &st_arg->resume, 557 st_arg->flags, send_cb, st_arg); 558 559 if (err != EINTR) 560 st_arg->error_code = err; 561 } 562 data = kmem_zalloc(sizeof (*data), KM_SLEEP); 563 data->eos_marker = B_TRUE; 564 bqueue_enqueue(&st_arg->q, data, 1); 565 thread_exit(); 566} 567 568/* 569 * This function actually handles figuring out what kind of record needs to be 570 * dumped, reading the data (which has hopefully been prefetched), and calling 571 * the appropriate helper function. 572 */ 573static int 574do_dump(dmu_sendarg_t *dsa, struct send_block_record *data) 575{ 576 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os); 577 const blkptr_t *bp = &data->bp; 578 const zbookmark_phys_t *zb = &data->zb; 579 uint8_t indblkshift = data->indblkshift; 580 uint16_t dblkszsec = data->datablkszsec; 581 spa_t *spa = ds->ds_dir->dd_pool->dp_spa; 582 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE; 583 int err = 0; 584 585 ASSERT3U(zb->zb_level, >=, 0); 586 587 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 588 zb->zb_object >= dsa->dsa_resume_object); 589 590 if (zb->zb_object != DMU_META_DNODE_OBJECT && 591 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) { 592 return (0); 593 } else if (BP_IS_HOLE(bp) && 594 zb->zb_object == DMU_META_DNODE_OBJECT) { 595 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 596 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT; 597 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT); 598 } else if (BP_IS_HOLE(bp)) { 599 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 600 uint64_t offset = zb->zb_blkid * span; 601 err = dump_free(dsa, zb->zb_object, offset, span); 602 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) { 603 return (0); 604 } else if (type == DMU_OT_DNODE) { 605 int blksz = BP_GET_LSIZE(bp); 606 arc_flags_t aflags = ARC_FLAG_WAIT; 607 arc_buf_t *abuf; 608 609 ASSERT0(zb->zb_level); 610 611 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 612 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 613 &aflags, zb) != 0) 614 return (SET_ERROR(EIO)); 615 616 dnode_phys_t *blk = abuf->b_data; 617 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT); 618 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) { 619 err = dump_dnode(dsa, dnobj + i, blk + i); 620 if (err != 0) 621 break; 622 } 623 (void) arc_buf_remove_ref(abuf, &abuf); 624 } else if (type == DMU_OT_SA) { 625 arc_flags_t aflags = ARC_FLAG_WAIT; 626 arc_buf_t *abuf; 627 int blksz = BP_GET_LSIZE(bp); 628 629 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 630 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 631 &aflags, zb) != 0) 632 return (SET_ERROR(EIO)); 633 634 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data); 635 (void) arc_buf_remove_ref(abuf, &abuf); 636 } else if (backup_do_embed(dsa, bp)) { 637 /* it's an embedded level-0 block of a regular object */ 638 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 639 ASSERT0(zb->zb_level); 640 err = dump_write_embedded(dsa, zb->zb_object, 641 zb->zb_blkid * blksz, blksz, bp); 642 } else { 643 /* it's a level-0 block of a regular object */ 644 arc_flags_t aflags = ARC_FLAG_WAIT; 645 arc_buf_t *abuf; 646 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 647 uint64_t offset; 648 649 ASSERT0(zb->zb_level); 650 ASSERT(zb->zb_object > dsa->dsa_resume_object || 651 (zb->zb_object == dsa->dsa_resume_object && 652 zb->zb_blkid * blksz >= dsa->dsa_resume_offset)); 653 654 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 655 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 656 &aflags, zb) != 0) { 657 if (zfs_send_corrupt_data) { 658 /* Send a block filled with 0x"zfs badd bloc" */ 659 abuf = arc_buf_alloc(spa, blksz, &abuf, 660 ARC_BUFC_DATA); 661 uint64_t *ptr; 662 for (ptr = abuf->b_data; 663 (char *)ptr < (char *)abuf->b_data + blksz; 664 ptr++) 665 *ptr = 0x2f5baddb10cULL; 666 } else { 667 return (SET_ERROR(EIO)); 668 } 669 } 670 671 offset = zb->zb_blkid * blksz; 672 673 if (!(dsa->dsa_featureflags & 674 DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 675 blksz > SPA_OLD_MAXBLOCKSIZE) { 676 char *buf = abuf->b_data; 677 while (blksz > 0 && err == 0) { 678 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE); 679 err = dump_write(dsa, type, zb->zb_object, 680 offset, n, NULL, buf); 681 offset += n; 682 buf += n; 683 blksz -= n; 684 } 685 } else { 686 err = dump_write(dsa, type, zb->zb_object, 687 offset, blksz, bp, abuf->b_data); 688 } 689 (void) arc_buf_remove_ref(abuf, &abuf); 690 } 691 692 ASSERT(err == 0 || err == EINTR); 693 return (err); 694} 695 696/* 697 * Pop the new data off the queue, and free the old data. 698 */ 699static struct send_block_record * 700get_next_record(bqueue_t *bq, struct send_block_record *data) 701{ 702 struct send_block_record *tmp = bqueue_dequeue(bq); 703 kmem_free(data, sizeof (*data)); 704 return (tmp); 705} 706 707/* 708 * Actually do the bulk of the work in a zfs send. 709 * 710 * Note: Releases dp using the specified tag. 711 */ 712static int 713dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds, 714 zfs_bookmark_phys_t *ancestor_zb, 715 boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd, 716 uint64_t resumeobj, uint64_t resumeoff, 717#ifdef illumos 718 vnode_t *vp, offset_t *off) 719#else 720 struct file *fp, offset_t *off) 721#endif 722{ 723 objset_t *os; 724 dmu_replay_record_t *drr; 725 dmu_sendarg_t *dsp; 726 int err; 727 uint64_t fromtxg = 0; 728 uint64_t featureflags = 0; 729 struct send_thread_arg to_arg = { 0 }; 730 731 err = dmu_objset_from_ds(to_ds, &os); 732 if (err != 0) { 733 dsl_pool_rele(dp, tag); 734 return (err); 735 } 736 737 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP); 738 drr->drr_type = DRR_BEGIN; 739 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; 740 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo, 741 DMU_SUBSTREAM); 742 743#ifdef _KERNEL 744 if (dmu_objset_type(os) == DMU_OST_ZFS) { 745 uint64_t version; 746 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) { 747 kmem_free(drr, sizeof (dmu_replay_record_t)); 748 dsl_pool_rele(dp, tag); 749 return (SET_ERROR(EINVAL)); 750 } 751 if (version >= ZPL_VERSION_SA) { 752 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL; 753 } 754 } 755#endif 756 757 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS]) 758 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS; 759 if (embedok && 760 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) { 761 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA; 762 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 763 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4; 764 } 765 766 if (resumeobj != 0 || resumeoff != 0) { 767 featureflags |= DMU_BACKUP_FEATURE_RESUMING; 768 } 769 770 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo, 771 featureflags); 772 773 drr->drr_u.drr_begin.drr_creation_time = 774 dsl_dataset_phys(to_ds)->ds_creation_time; 775 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os); 776 if (is_clone) 777 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE; 778 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid; 779 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET) 780 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA; 781 if (zfs_send_set_freerecords_bit) 782 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS; 783 784 if (ancestor_zb != NULL) { 785 drr->drr_u.drr_begin.drr_fromguid = 786 ancestor_zb->zbm_guid; 787 fromtxg = ancestor_zb->zbm_creation_txg; 788 } 789 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname); 790 if (!to_ds->ds_is_snapshot) { 791 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--", 792 sizeof (drr->drr_u.drr_begin.drr_toname)); 793 } 794 795 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP); 796 797 dsp->dsa_drr = drr; 798 dsp->dsa_outfd = outfd; 799 dsp->dsa_proc = curproc; 800 dsp->dsa_td = curthread; 801 dsp->dsa_fp = fp; 802 dsp->dsa_os = os; 803 dsp->dsa_off = off; 804 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid; 805 dsp->dsa_pending_op = PENDING_NONE; 806 dsp->dsa_featureflags = featureflags; 807 dsp->dsa_resume_object = resumeobj; 808 dsp->dsa_resume_offset = resumeoff; 809 810 mutex_enter(&to_ds->ds_sendstream_lock); 811 list_insert_head(&to_ds->ds_sendstreams, dsp); 812 mutex_exit(&to_ds->ds_sendstream_lock); 813 814 dsl_dataset_long_hold(to_ds, FTAG); 815 dsl_pool_rele(dp, tag); 816 817 void *payload = NULL; 818 size_t payload_len = 0; 819 if (resumeobj != 0 || resumeoff != 0) { 820 dmu_object_info_t to_doi; 821 err = dmu_object_info(os, resumeobj, &to_doi); 822 if (err != 0) 823 goto out; 824 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0, 825 resumeoff / to_doi.doi_data_block_size); 826 827 nvlist_t *nvl = fnvlist_alloc(); 828 fnvlist_add_uint64(nvl, "resume_object", resumeobj); 829 fnvlist_add_uint64(nvl, "resume_offset", resumeoff); 830 payload = fnvlist_pack(nvl, &payload_len); 831 drr->drr_payloadlen = payload_len; 832 fnvlist_free(nvl); 833 } 834 835 err = dump_record(dsp, payload, payload_len); 836 fnvlist_pack_free(payload, payload_len); 837 if (err != 0) { 838 err = dsp->dsa_err; 839 goto out; 840 } 841 842 err = bqueue_init(&to_arg.q, zfs_send_queue_length, 843 offsetof(struct send_block_record, ln)); 844 to_arg.error_code = 0; 845 to_arg.cancel = B_FALSE; 846 to_arg.ds = to_ds; 847 to_arg.fromtxg = fromtxg; 848 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH; 849 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0, 850 TS_RUN, minclsyspri); 851 852 struct send_block_record *to_data; 853 to_data = bqueue_dequeue(&to_arg.q); 854 855 while (!to_data->eos_marker && err == 0) { 856 err = do_dump(dsp, to_data); 857 to_data = get_next_record(&to_arg.q, to_data); 858 if (issig(JUSTLOOKING) && issig(FORREAL)) 859 err = EINTR; 860 } 861 862 if (err != 0) { 863 to_arg.cancel = B_TRUE; 864 while (!to_data->eos_marker) { 865 to_data = get_next_record(&to_arg.q, to_data); 866 } 867 } 868 kmem_free(to_data, sizeof (*to_data)); 869 870 bqueue_destroy(&to_arg.q); 871 872 if (err == 0 && to_arg.error_code != 0) 873 err = to_arg.error_code; 874 875 if (err != 0) 876 goto out; 877 878 if (dsp->dsa_pending_op != PENDING_NONE) 879 if (dump_record(dsp, NULL, 0) != 0) 880 err = SET_ERROR(EINTR); 881 882 if (err != 0) { 883 if (err == EINTR && dsp->dsa_err != 0) 884 err = dsp->dsa_err; 885 goto out; 886 } 887 888 bzero(drr, sizeof (dmu_replay_record_t)); 889 drr->drr_type = DRR_END; 890 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc; 891 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid; 892 893 if (dump_record(dsp, NULL, 0) != 0) 894 err = dsp->dsa_err; 895 896out: 897 mutex_enter(&to_ds->ds_sendstream_lock); 898 list_remove(&to_ds->ds_sendstreams, dsp); 899 mutex_exit(&to_ds->ds_sendstream_lock); 900 901 kmem_free(drr, sizeof (dmu_replay_record_t)); 902 kmem_free(dsp, sizeof (dmu_sendarg_t)); 903 904 dsl_dataset_long_rele(to_ds, FTAG); 905 906 return (err); 907} 908 909int 910dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap, 911 boolean_t embedok, boolean_t large_block_ok, 912#ifdef illumos 913 int outfd, vnode_t *vp, offset_t *off) 914#else 915 int outfd, struct file *fp, offset_t *off) 916#endif 917{ 918 dsl_pool_t *dp; 919 dsl_dataset_t *ds; 920 dsl_dataset_t *fromds = NULL; 921 int err; 922 923 err = dsl_pool_hold(pool, FTAG, &dp); 924 if (err != 0) 925 return (err); 926 927 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds); 928 if (err != 0) { 929 dsl_pool_rele(dp, FTAG); 930 return (err); 931 } 932 933 if (fromsnap != 0) { 934 zfs_bookmark_phys_t zb; 935 boolean_t is_clone; 936 937 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds); 938 if (err != 0) { 939 dsl_dataset_rele(ds, FTAG); 940 dsl_pool_rele(dp, FTAG); 941 return (err); 942 } 943 if (!dsl_dataset_is_before(ds, fromds, 0)) 944 err = SET_ERROR(EXDEV); 945 zb.zbm_creation_time = 946 dsl_dataset_phys(fromds)->ds_creation_time; 947 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg; 948 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 949 is_clone = (fromds->ds_dir != ds->ds_dir); 950 dsl_dataset_rele(fromds, FTAG); 951 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 952 embedok, large_block_ok, outfd, 0, 0, fp, off); 953 } else { 954 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 955 embedok, large_block_ok, outfd, 0, 0, fp, off); 956 } 957 dsl_dataset_rele(ds, FTAG); 958 return (err); 959} 960 961int 962dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok, 963 boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff, 964#ifdef illumos 965 vnode_t *vp, offset_t *off) 966#else 967 struct file *fp, offset_t *off) 968#endif 969{ 970 dsl_pool_t *dp; 971 dsl_dataset_t *ds; 972 int err; 973 boolean_t owned = B_FALSE; 974 975 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL) 976 return (SET_ERROR(EINVAL)); 977 978 err = dsl_pool_hold(tosnap, FTAG, &dp); 979 if (err != 0) 980 return (err); 981 982 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) { 983 /* 984 * We are sending a filesystem or volume. Ensure 985 * that it doesn't change by owning the dataset. 986 */ 987 err = dsl_dataset_own(dp, tosnap, FTAG, &ds); 988 owned = B_TRUE; 989 } else { 990 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds); 991 } 992 if (err != 0) { 993 dsl_pool_rele(dp, FTAG); 994 return (err); 995 } 996 997 if (fromsnap != NULL) { 998 zfs_bookmark_phys_t zb; 999 boolean_t is_clone = B_FALSE; 1000 int fsnamelen = strchr(tosnap, '@') - tosnap; 1001 1002 /* 1003 * If the fromsnap is in a different filesystem, then 1004 * mark the send stream as a clone. 1005 */ 1006 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 || 1007 (fromsnap[fsnamelen] != '@' && 1008 fromsnap[fsnamelen] != '#')) { 1009 is_clone = B_TRUE; 1010 } 1011 1012 if (strchr(fromsnap, '@')) { 1013 dsl_dataset_t *fromds; 1014 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds); 1015 if (err == 0) { 1016 if (!dsl_dataset_is_before(ds, fromds, 0)) 1017 err = SET_ERROR(EXDEV); 1018 zb.zbm_creation_time = 1019 dsl_dataset_phys(fromds)->ds_creation_time; 1020 zb.zbm_creation_txg = 1021 dsl_dataset_phys(fromds)->ds_creation_txg; 1022 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 1023 is_clone = (ds->ds_dir != fromds->ds_dir); 1024 dsl_dataset_rele(fromds, FTAG); 1025 } 1026 } else { 1027 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb); 1028 } 1029 if (err != 0) { 1030 dsl_dataset_rele(ds, FTAG); 1031 dsl_pool_rele(dp, FTAG); 1032 return (err); 1033 } 1034 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 1035 embedok, large_block_ok, 1036 outfd, resumeobj, resumeoff, fp, off); 1037 } else { 1038 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 1039 embedok, large_block_ok, 1040 outfd, resumeobj, resumeoff, fp, off); 1041 } 1042 if (owned) 1043 dsl_dataset_disown(ds, FTAG); 1044 else 1045 dsl_dataset_rele(ds, FTAG); 1046 return (err); 1047} 1048 1049static int 1050dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size, 1051 uint64_t *sizep) 1052{ 1053 int err; 1054 /* 1055 * Assume that space (both on-disk and in-stream) is dominated by 1056 * data. We will adjust for indirect blocks and the copies property, 1057 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records). 1058 */ 1059 1060 /* 1061 * Subtract out approximate space used by indirect blocks. 1062 * Assume most space is used by data blocks (non-indirect, non-dnode). 1063 * Assume all blocks are recordsize. Assume ditto blocks and 1064 * internal fragmentation counter out compression. 1065 * 1066 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per 1067 * block, which we observe in practice. 1068 */ 1069 uint64_t recordsize; 1070 err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize); 1071 if (err != 0) 1072 return (err); 1073 size -= size / recordsize * sizeof (blkptr_t); 1074 1075 /* Add in the space for the record associated with each block. */ 1076 size += size / recordsize * sizeof (dmu_replay_record_t); 1077 1078 *sizep = size; 1079 1080 return (0); 1081} 1082 1083int 1084dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep) 1085{ 1086 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1087 int err; 1088 uint64_t size; 1089 1090 ASSERT(dsl_pool_config_held(dp)); 1091 1092 /* tosnap must be a snapshot */ 1093 if (!ds->ds_is_snapshot) 1094 return (SET_ERROR(EINVAL)); 1095 1096 /* fromsnap, if provided, must be a snapshot */ 1097 if (fromds != NULL && !fromds->ds_is_snapshot) 1098 return (SET_ERROR(EINVAL)); 1099 1100 /* 1101 * fromsnap must be an earlier snapshot from the same fs as tosnap, 1102 * or the origin's fs. 1103 */ 1104 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0)) 1105 return (SET_ERROR(EXDEV)); 1106 1107 /* Get uncompressed size estimate of changed data. */ 1108 if (fromds == NULL) { 1109 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes; 1110 } else { 1111 uint64_t used, comp; 1112 err = dsl_dataset_space_written(fromds, ds, 1113 &used, &comp, &size); 1114 if (err != 0) 1115 return (err); 1116 } 1117 1118 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1119 return (err); 1120} 1121 1122/* 1123 * Simple callback used to traverse the blocks of a snapshot and sum their 1124 * uncompressed size 1125 */ 1126/* ARGSUSED */ 1127static int 1128dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 1129 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg) 1130{ 1131 uint64_t *spaceptr = arg; 1132 if (bp != NULL && !BP_IS_HOLE(bp)) { 1133 *spaceptr += BP_GET_UCSIZE(bp); 1134 } 1135 return (0); 1136} 1137 1138/* 1139 * Given a desination snapshot and a TXG, calculate the approximate size of a 1140 * send stream sent from that TXG. from_txg may be zero, indicating that the 1141 * whole snapshot will be sent. 1142 */ 1143int 1144dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg, 1145 uint64_t *sizep) 1146{ 1147 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1148 int err; 1149 uint64_t size = 0; 1150 1151 ASSERT(dsl_pool_config_held(dp)); 1152 1153 /* tosnap must be a snapshot */ 1154 if (!dsl_dataset_is_snapshot(ds)) 1155 return (SET_ERROR(EINVAL)); 1156 1157 /* verify that from_txg is before the provided snapshot was taken */ 1158 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) { 1159 return (SET_ERROR(EXDEV)); 1160 } 1161 1162 /* 1163 * traverse the blocks of the snapshot with birth times after 1164 * from_txg, summing their uncompressed size 1165 */ 1166 err = traverse_dataset(ds, from_txg, TRAVERSE_POST, 1167 dmu_calculate_send_traversal, &size); 1168 if (err) 1169 return (err); 1170 1171 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1172 return (err); 1173} 1174 1175typedef struct dmu_recv_begin_arg { 1176 const char *drba_origin; 1177 dmu_recv_cookie_t *drba_cookie; 1178 cred_t *drba_cred; 1179 uint64_t drba_snapobj; 1180} dmu_recv_begin_arg_t; 1181 1182static int 1183recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, 1184 uint64_t fromguid) 1185{ 1186 uint64_t val; 1187 int error; 1188 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1189 1190 /* temporary clone name must not exist */ 1191 error = zap_lookup(dp->dp_meta_objset, 1192 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, 1193 8, 1, &val); 1194 if (error != ENOENT) 1195 return (error == 0 ? EBUSY : error); 1196 1197 /* new snapshot name must not exist */ 1198 error = zap_lookup(dp->dp_meta_objset, 1199 dsl_dataset_phys(ds)->ds_snapnames_zapobj, 1200 drba->drba_cookie->drc_tosnap, 8, 1, &val); 1201 if (error != ENOENT) 1202 return (error == 0 ? EEXIST : error); 1203 1204 /* 1205 * Check snapshot limit before receiving. We'll recheck again at the 1206 * end, but might as well abort before receiving if we're already over 1207 * the limit. 1208 * 1209 * Note that we do not check the file system limit with 1210 * dsl_dir_fscount_check because the temporary %clones don't count 1211 * against that limit. 1212 */ 1213 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT, 1214 NULL, drba->drba_cred); 1215 if (error != 0) 1216 return (error); 1217 1218 if (fromguid != 0) { 1219 dsl_dataset_t *snap; 1220 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; 1221 1222 /* Find snapshot in this dir that matches fromguid. */ 1223 while (obj != 0) { 1224 error = dsl_dataset_hold_obj(dp, obj, FTAG, 1225 &snap); 1226 if (error != 0) 1227 return (SET_ERROR(ENODEV)); 1228 if (snap->ds_dir != ds->ds_dir) { 1229 dsl_dataset_rele(snap, FTAG); 1230 return (SET_ERROR(ENODEV)); 1231 } 1232 if (dsl_dataset_phys(snap)->ds_guid == fromguid) 1233 break; 1234 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 1235 dsl_dataset_rele(snap, FTAG); 1236 } 1237 if (obj == 0) 1238 return (SET_ERROR(ENODEV)); 1239 1240 if (drba->drba_cookie->drc_force) { 1241 drba->drba_snapobj = obj; 1242 } else { 1243 /* 1244 * If we are not forcing, there must be no 1245 * changes since fromsnap. 1246 */ 1247 if (dsl_dataset_modified_since_snap(ds, snap)) { 1248 dsl_dataset_rele(snap, FTAG); 1249 return (SET_ERROR(ETXTBSY)); 1250 } 1251 drba->drba_snapobj = ds->ds_prev->ds_object; 1252 } 1253 1254 dsl_dataset_rele(snap, FTAG); 1255 } else { 1256 /* if full, then must be forced */ 1257 if (!drba->drba_cookie->drc_force) 1258 return (SET_ERROR(EEXIST)); 1259 /* start from $ORIGIN@$ORIGIN, if supported */ 1260 drba->drba_snapobj = dp->dp_origin_snap != NULL ? 1261 dp->dp_origin_snap->ds_object : 0; 1262 } 1263 1264 return (0); 1265 1266} 1267 1268static int 1269dmu_recv_begin_check(void *arg, dmu_tx_t *tx) 1270{ 1271 dmu_recv_begin_arg_t *drba = arg; 1272 dsl_pool_t *dp = dmu_tx_pool(tx); 1273 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1274 uint64_t fromguid = drrb->drr_fromguid; 1275 int flags = drrb->drr_flags; 1276 int error; 1277 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1278 dsl_dataset_t *ds; 1279 const char *tofs = drba->drba_cookie->drc_tofs; 1280 1281 /* already checked */ 1282 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1283 ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING)); 1284 1285 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1286 DMU_COMPOUNDSTREAM || 1287 drrb->drr_type >= DMU_OST_NUMTYPES || 1288 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL)) 1289 return (SET_ERROR(EINVAL)); 1290 1291 /* Verify pool version supports SA if SA_SPILL feature set */ 1292 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1293 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1294 return (SET_ERROR(ENOTSUP)); 1295 1296 if (drba->drba_cookie->drc_resumable && 1297 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET)) 1298 return (SET_ERROR(ENOTSUP)); 1299 1300 /* 1301 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1302 * record to a plan WRITE record, so the pool must have the 1303 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1304 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1305 */ 1306 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1307 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1308 return (SET_ERROR(ENOTSUP)); 1309 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1310 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1311 return (SET_ERROR(ENOTSUP)); 1312 1313 /* 1314 * The receiving code doesn't know how to translate large blocks 1315 * to smaller ones, so the pool must have the LARGE_BLOCKS 1316 * feature enabled if the stream has LARGE_BLOCKS. 1317 */ 1318 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 1319 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) 1320 return (SET_ERROR(ENOTSUP)); 1321 1322 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1323 if (error == 0) { 1324 /* target fs already exists; recv into temp clone */ 1325 1326 /* Can't recv a clone into an existing fs */ 1327 if (flags & DRR_FLAG_CLONE || drba->drba_origin) { 1328 dsl_dataset_rele(ds, FTAG); 1329 return (SET_ERROR(EINVAL)); 1330 } 1331 1332 error = recv_begin_check_existing_impl(drba, ds, fromguid); 1333 dsl_dataset_rele(ds, FTAG); 1334 } else if (error == ENOENT) { 1335 /* target fs does not exist; must be a full backup or clone */ 1336 char buf[MAXNAMELEN]; 1337 1338 /* 1339 * If it's a non-clone incremental, we are missing the 1340 * target fs, so fail the recv. 1341 */ 1342 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE || 1343 drba->drba_origin)) 1344 return (SET_ERROR(ENOENT)); 1345 1346 /* 1347 * If we're receiving a full send as a clone, and it doesn't 1348 * contain all the necessary free records and freeobject 1349 * records, reject it. 1350 */ 1351 if (fromguid == 0 && drba->drba_origin && 1352 !(flags & DRR_FLAG_FREERECORDS)) 1353 return (SET_ERROR(EINVAL)); 1354 1355 /* Open the parent of tofs */ 1356 ASSERT3U(strlen(tofs), <, MAXNAMELEN); 1357 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1); 1358 error = dsl_dataset_hold(dp, buf, FTAG, &ds); 1359 if (error != 0) 1360 return (error); 1361 1362 /* 1363 * Check filesystem and snapshot limits before receiving. We'll 1364 * recheck snapshot limits again at the end (we create the 1365 * filesystems and increment those counts during begin_sync). 1366 */ 1367 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1368 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred); 1369 if (error != 0) { 1370 dsl_dataset_rele(ds, FTAG); 1371 return (error); 1372 } 1373 1374 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1375 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred); 1376 if (error != 0) { 1377 dsl_dataset_rele(ds, FTAG); 1378 return (error); 1379 } 1380 1381 if (drba->drba_origin != NULL) { 1382 dsl_dataset_t *origin; 1383 error = dsl_dataset_hold(dp, drba->drba_origin, 1384 FTAG, &origin); 1385 if (error != 0) { 1386 dsl_dataset_rele(ds, FTAG); 1387 return (error); 1388 } 1389 if (!origin->ds_is_snapshot) { 1390 dsl_dataset_rele(origin, FTAG); 1391 dsl_dataset_rele(ds, FTAG); 1392 return (SET_ERROR(EINVAL)); 1393 } 1394 if (dsl_dataset_phys(origin)->ds_guid != fromguid && 1395 fromguid != 0) { 1396 dsl_dataset_rele(origin, FTAG); 1397 dsl_dataset_rele(ds, FTAG); 1398 return (SET_ERROR(ENODEV)); 1399 } 1400 dsl_dataset_rele(origin, FTAG); 1401 } 1402 dsl_dataset_rele(ds, FTAG); 1403 error = 0; 1404 } 1405 return (error); 1406} 1407 1408static void 1409dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) 1410{ 1411 dmu_recv_begin_arg_t *drba = arg; 1412 dsl_pool_t *dp = dmu_tx_pool(tx); 1413 objset_t *mos = dp->dp_meta_objset; 1414 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1415 const char *tofs = drba->drba_cookie->drc_tofs; 1416 dsl_dataset_t *ds, *newds; 1417 uint64_t dsobj; 1418 int error; 1419 uint64_t crflags = 0; 1420 1421 if (drrb->drr_flags & DRR_FLAG_CI_DATA) 1422 crflags |= DS_FLAG_CI_DATASET; 1423 1424 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1425 if (error == 0) { 1426 /* create temporary clone */ 1427 dsl_dataset_t *snap = NULL; 1428 if (drba->drba_snapobj != 0) { 1429 VERIFY0(dsl_dataset_hold_obj(dp, 1430 drba->drba_snapobj, FTAG, &snap)); 1431 } 1432 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, 1433 snap, crflags, drba->drba_cred, tx); 1434 if (drba->drba_snapobj != 0) 1435 dsl_dataset_rele(snap, FTAG); 1436 dsl_dataset_rele(ds, FTAG); 1437 } else { 1438 dsl_dir_t *dd; 1439 const char *tail; 1440 dsl_dataset_t *origin = NULL; 1441 1442 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail)); 1443 1444 if (drba->drba_origin != NULL) { 1445 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin, 1446 FTAG, &origin)); 1447 } 1448 1449 /* Create new dataset. */ 1450 dsobj = dsl_dataset_create_sync(dd, 1451 strrchr(tofs, '/') + 1, 1452 origin, crflags, drba->drba_cred, tx); 1453 if (origin != NULL) 1454 dsl_dataset_rele(origin, FTAG); 1455 dsl_dir_rele(dd, FTAG); 1456 drba->drba_cookie->drc_newfs = B_TRUE; 1457 } 1458 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds)); 1459 1460 if (drba->drba_cookie->drc_resumable) { 1461 dsl_dataset_zapify(newds, tx); 1462 if (drrb->drr_fromguid != 0) { 1463 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID, 1464 8, 1, &drrb->drr_fromguid, tx)); 1465 } 1466 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID, 1467 8, 1, &drrb->drr_toguid, tx)); 1468 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME, 1469 1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx)); 1470 uint64_t one = 1; 1471 uint64_t zero = 0; 1472 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT, 1473 8, 1, &one, tx)); 1474 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET, 1475 8, 1, &zero, tx)); 1476 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES, 1477 8, 1, &zero, tx)); 1478 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1479 DMU_BACKUP_FEATURE_EMBED_DATA) { 1480 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK, 1481 8, 1, &one, tx)); 1482 } 1483 } 1484 1485 dmu_buf_will_dirty(newds->ds_dbuf, tx); 1486 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT; 1487 1488 /* 1489 * If we actually created a non-clone, we need to create the 1490 * objset in our new dataset. 1491 */ 1492 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) { 1493 (void) dmu_objset_create_impl(dp->dp_spa, 1494 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx); 1495 } 1496 1497 drba->drba_cookie->drc_ds = newds; 1498 1499 spa_history_log_internal_ds(newds, "receive", tx, ""); 1500} 1501 1502static int 1503dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) 1504{ 1505 dmu_recv_begin_arg_t *drba = arg; 1506 dsl_pool_t *dp = dmu_tx_pool(tx); 1507 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1508 int error; 1509 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1510 dsl_dataset_t *ds; 1511 const char *tofs = drba->drba_cookie->drc_tofs; 1512 1513 /* already checked */ 1514 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1515 ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING); 1516 1517 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1518 DMU_COMPOUNDSTREAM || 1519 drrb->drr_type >= DMU_OST_NUMTYPES) 1520 return (SET_ERROR(EINVAL)); 1521 1522 /* Verify pool version supports SA if SA_SPILL feature set */ 1523 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1524 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1525 return (SET_ERROR(ENOTSUP)); 1526 1527 /* 1528 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1529 * record to a plain WRITE record, so the pool must have the 1530 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1531 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1532 */ 1533 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1534 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1535 return (SET_ERROR(ENOTSUP)); 1536 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1537 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1538 return (SET_ERROR(ENOTSUP)); 1539 1540 char recvname[ZFS_MAXNAMELEN]; 1541 1542 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1543 tofs, recv_clone_name); 1544 1545 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1546 /* %recv does not exist; continue in tofs */ 1547 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1548 if (error != 0) 1549 return (error); 1550 } 1551 1552 /* check that ds is marked inconsistent */ 1553 if (!DS_IS_INCONSISTENT(ds)) { 1554 dsl_dataset_rele(ds, FTAG); 1555 return (SET_ERROR(EINVAL)); 1556 } 1557 1558 /* check that there is resuming data, and that the toguid matches */ 1559 if (!dsl_dataset_is_zapified(ds)) { 1560 dsl_dataset_rele(ds, FTAG); 1561 return (SET_ERROR(EINVAL)); 1562 } 1563 uint64_t val; 1564 error = zap_lookup(dp->dp_meta_objset, ds->ds_object, 1565 DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val); 1566 if (error != 0 || drrb->drr_toguid != val) { 1567 dsl_dataset_rele(ds, FTAG); 1568 return (SET_ERROR(EINVAL)); 1569 } 1570 1571 /* 1572 * Check if the receive is still running. If so, it will be owned. 1573 * Note that nothing else can own the dataset (e.g. after the receive 1574 * fails) because it will be marked inconsistent. 1575 */ 1576 if (dsl_dataset_has_owner(ds)) { 1577 dsl_dataset_rele(ds, FTAG); 1578 return (SET_ERROR(EBUSY)); 1579 } 1580 1581 /* There should not be any snapshots of this fs yet. */ 1582 if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) { 1583 dsl_dataset_rele(ds, FTAG); 1584 return (SET_ERROR(EINVAL)); 1585 } 1586 1587 /* 1588 * Note: resume point will be checked when we process the first WRITE 1589 * record. 1590 */ 1591 1592 /* check that the origin matches */ 1593 val = 0; 1594 (void) zap_lookup(dp->dp_meta_objset, ds->ds_object, 1595 DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val); 1596 if (drrb->drr_fromguid != val) { 1597 dsl_dataset_rele(ds, FTAG); 1598 return (SET_ERROR(EINVAL)); 1599 } 1600 1601 dsl_dataset_rele(ds, FTAG); 1602 return (0); 1603} 1604 1605static void 1606dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) 1607{ 1608 dmu_recv_begin_arg_t *drba = arg; 1609 dsl_pool_t *dp = dmu_tx_pool(tx); 1610 const char *tofs = drba->drba_cookie->drc_tofs; 1611 dsl_dataset_t *ds; 1612 uint64_t dsobj; 1613 char recvname[ZFS_MAXNAMELEN]; 1614 1615 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1616 tofs, recv_clone_name); 1617 1618 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1619 /* %recv does not exist; continue in tofs */ 1620 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds)); 1621 drba->drba_cookie->drc_newfs = B_TRUE; 1622 } 1623 1624 /* clear the inconsistent flag so that we can own it */ 1625 ASSERT(DS_IS_INCONSISTENT(ds)); 1626 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1627 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 1628 dsobj = ds->ds_object; 1629 dsl_dataset_rele(ds, FTAG); 1630 1631 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds)); 1632 1633 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1634 dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT; 1635 1636 ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds))); 1637 1638 drba->drba_cookie->drc_ds = ds; 1639 1640 spa_history_log_internal_ds(ds, "resume receive", tx, ""); 1641} 1642 1643/* 1644 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin() 1645 * succeeds; otherwise we will leak the holds on the datasets. 1646 */ 1647int 1648dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, 1649 boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc) 1650{ 1651 dmu_recv_begin_arg_t drba = { 0 }; 1652 1653 bzero(drc, sizeof (dmu_recv_cookie_t)); 1654 drc->drc_drr_begin = drr_begin; 1655 drc->drc_drrb = &drr_begin->drr_u.drr_begin; 1656 drc->drc_tosnap = tosnap; 1657 drc->drc_tofs = tofs; 1658 drc->drc_force = force; 1659 drc->drc_resumable = resumable; 1660 drc->drc_cred = CRED(); 1661 1662 if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) { 1663 drc->drc_byteswap = B_TRUE; 1664 fletcher_4_incremental_byteswap(drr_begin, 1665 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1666 byteswap_record(drr_begin); 1667 } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) { 1668 fletcher_4_incremental_native(drr_begin, 1669 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1670 } else { 1671 return (SET_ERROR(EINVAL)); 1672 } 1673 1674 drba.drba_origin = origin; 1675 drba.drba_cookie = drc; 1676 drba.drba_cred = CRED(); 1677 1678 if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) & 1679 DMU_BACKUP_FEATURE_RESUMING) { 1680 return (dsl_sync_task(tofs, 1681 dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync, 1682 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1683 } else { 1684 return (dsl_sync_task(tofs, 1685 dmu_recv_begin_check, dmu_recv_begin_sync, 1686 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1687 } 1688} 1689 1690struct receive_record_arg { 1691 dmu_replay_record_t header; 1692 void *payload; /* Pointer to a buffer containing the payload */ 1693 /* 1694 * If the record is a write, pointer to the arc_buf_t containing the 1695 * payload. 1696 */ 1697 arc_buf_t *write_buf; 1698 int payload_size; 1699 uint64_t bytes_read; /* bytes read from stream when record created */ 1700 boolean_t eos_marker; /* Marks the end of the stream */ 1701 bqueue_node_t node; 1702}; 1703 1704struct receive_writer_arg { 1705 objset_t *os; 1706 boolean_t byteswap; 1707 bqueue_t q; 1708 1709 /* 1710 * These three args are used to signal to the main thread that we're 1711 * done. 1712 */ 1713 kmutex_t mutex; 1714 kcondvar_t cv; 1715 boolean_t done; 1716 1717 int err; 1718 /* A map from guid to dataset to help handle dedup'd streams. */ 1719 avl_tree_t *guid_to_ds_map; 1720 boolean_t resumable; 1721 uint64_t last_object, last_offset; 1722 uint64_t bytes_read; /* bytes read when current record created */ 1723}; 1724 1725struct objlist { 1726 list_t list; /* List of struct receive_objnode. */ 1727 /* 1728 * Last object looked up. Used to assert that objects are being looked 1729 * up in ascending order. 1730 */ 1731 uint64_t last_lookup; 1732}; 1733 1734struct receive_objnode { 1735 list_node_t node; 1736 uint64_t object; 1737}; 1738 1739struct receive_arg { 1740 objset_t *os; 1741 kthread_t *td; 1742 struct file *fp; 1743 uint64_t voff; /* The current offset in the stream */ 1744 uint64_t bytes_read; 1745 /* 1746 * A record that has had its payload read in, but hasn't yet been handed 1747 * off to the worker thread. 1748 */ 1749 struct receive_record_arg *rrd; 1750 /* A record that has had its header read in, but not its payload. */ 1751 struct receive_record_arg *next_rrd; 1752 zio_cksum_t cksum; 1753 zio_cksum_t prev_cksum; 1754 int err; 1755 boolean_t byteswap; 1756 /* Sorted list of objects not to issue prefetches for. */ 1757 struct objlist ignore_objlist; 1758}; 1759 1760typedef struct guid_map_entry { 1761 uint64_t guid; 1762 dsl_dataset_t *gme_ds; 1763 avl_node_t avlnode; 1764} guid_map_entry_t; 1765 1766static int 1767guid_compare(const void *arg1, const void *arg2) 1768{ 1769 const guid_map_entry_t *gmep1 = arg1; 1770 const guid_map_entry_t *gmep2 = arg2; 1771 1772 if (gmep1->guid < gmep2->guid) 1773 return (-1); 1774 else if (gmep1->guid > gmep2->guid) 1775 return (1); 1776 return (0); 1777} 1778 1779static void 1780free_guid_map_onexit(void *arg) 1781{ 1782 avl_tree_t *ca = arg; 1783 void *cookie = NULL; 1784 guid_map_entry_t *gmep; 1785 1786 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) { 1787 dsl_dataset_long_rele(gmep->gme_ds, gmep); 1788 dsl_dataset_rele(gmep->gme_ds, gmep); 1789 kmem_free(gmep, sizeof (guid_map_entry_t)); 1790 } 1791 avl_destroy(ca); 1792 kmem_free(ca, sizeof (avl_tree_t)); 1793} 1794 1795static int 1796restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid) 1797{ 1798 struct uio auio; 1799 struct iovec aiov; 1800 int error; 1801 1802 aiov.iov_base = buf; 1803 aiov.iov_len = len; 1804 auio.uio_iov = &aiov; 1805 auio.uio_iovcnt = 1; 1806 auio.uio_resid = len; 1807 auio.uio_segflg = UIO_SYSSPACE; 1808 auio.uio_rw = UIO_READ; 1809 auio.uio_offset = off; 1810 auio.uio_td = ra->td; 1811#ifdef _KERNEL 1812 error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td); 1813#else 1814 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 1815 error = EOPNOTSUPP; 1816#endif 1817 *resid = auio.uio_resid; 1818 return (error); 1819} 1820 1821static int 1822receive_read(struct receive_arg *ra, int len, void *buf) 1823{ 1824 int done = 0; 1825 1826 /* some things will require 8-byte alignment, so everything must */ 1827 ASSERT0(len % 8); 1828 1829 while (done < len) { 1830 ssize_t resid; 1831 1832 ra->err = restore_bytes(ra, buf + done, 1833 len - done, ra->voff, &resid); 1834 1835 if (resid == len - done) { 1836 /* 1837 * Note: ECKSUM indicates that the receive 1838 * was interrupted and can potentially be resumed. 1839 */ 1840 ra->err = SET_ERROR(ECKSUM); 1841 } 1842 ra->voff += len - done - resid; 1843 done = len - resid; 1844 if (ra->err != 0) 1845 return (ra->err); 1846 } 1847 1848 ra->bytes_read += len; 1849 1850 ASSERT3U(done, ==, len); 1851 return (0); 1852} 1853 1854static void 1855byteswap_record(dmu_replay_record_t *drr) 1856{ 1857#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) 1858#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) 1859 drr->drr_type = BSWAP_32(drr->drr_type); 1860 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); 1861 1862 switch (drr->drr_type) { 1863 case DRR_BEGIN: 1864 DO64(drr_begin.drr_magic); 1865 DO64(drr_begin.drr_versioninfo); 1866 DO64(drr_begin.drr_creation_time); 1867 DO32(drr_begin.drr_type); 1868 DO32(drr_begin.drr_flags); 1869 DO64(drr_begin.drr_toguid); 1870 DO64(drr_begin.drr_fromguid); 1871 break; 1872 case DRR_OBJECT: 1873 DO64(drr_object.drr_object); 1874 DO32(drr_object.drr_type); 1875 DO32(drr_object.drr_bonustype); 1876 DO32(drr_object.drr_blksz); 1877 DO32(drr_object.drr_bonuslen); 1878 DO64(drr_object.drr_toguid); 1879 break; 1880 case DRR_FREEOBJECTS: 1881 DO64(drr_freeobjects.drr_firstobj); 1882 DO64(drr_freeobjects.drr_numobjs); 1883 DO64(drr_freeobjects.drr_toguid); 1884 break; 1885 case DRR_WRITE: 1886 DO64(drr_write.drr_object); 1887 DO32(drr_write.drr_type); 1888 DO64(drr_write.drr_offset); 1889 DO64(drr_write.drr_length); 1890 DO64(drr_write.drr_toguid); 1891 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); 1892 DO64(drr_write.drr_key.ddk_prop); 1893 break; 1894 case DRR_WRITE_BYREF: 1895 DO64(drr_write_byref.drr_object); 1896 DO64(drr_write_byref.drr_offset); 1897 DO64(drr_write_byref.drr_length); 1898 DO64(drr_write_byref.drr_toguid); 1899 DO64(drr_write_byref.drr_refguid); 1900 DO64(drr_write_byref.drr_refobject); 1901 DO64(drr_write_byref.drr_refoffset); 1902 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. 1903 drr_key.ddk_cksum); 1904 DO64(drr_write_byref.drr_key.ddk_prop); 1905 break; 1906 case DRR_WRITE_EMBEDDED: 1907 DO64(drr_write_embedded.drr_object); 1908 DO64(drr_write_embedded.drr_offset); 1909 DO64(drr_write_embedded.drr_length); 1910 DO64(drr_write_embedded.drr_toguid); 1911 DO32(drr_write_embedded.drr_lsize); 1912 DO32(drr_write_embedded.drr_psize); 1913 break; 1914 case DRR_FREE: 1915 DO64(drr_free.drr_object); 1916 DO64(drr_free.drr_offset); 1917 DO64(drr_free.drr_length); 1918 DO64(drr_free.drr_toguid); 1919 break; 1920 case DRR_SPILL: 1921 DO64(drr_spill.drr_object); 1922 DO64(drr_spill.drr_length); 1923 DO64(drr_spill.drr_toguid); 1924 break; 1925 case DRR_END: 1926 DO64(drr_end.drr_toguid); 1927 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); 1928 break; 1929 } 1930 1931 if (drr->drr_type != DRR_BEGIN) { 1932 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); 1933 } 1934 1935#undef DO64 1936#undef DO32 1937} 1938 1939static inline uint8_t 1940deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size) 1941{ 1942 if (bonus_type == DMU_OT_SA) { 1943 return (1); 1944 } else { 1945 return (1 + 1946 ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT)); 1947 } 1948} 1949 1950static void 1951save_resume_state(struct receive_writer_arg *rwa, 1952 uint64_t object, uint64_t offset, dmu_tx_t *tx) 1953{ 1954 int txgoff = dmu_tx_get_txg(tx) & TXG_MASK; 1955 1956 if (!rwa->resumable) 1957 return; 1958 1959 /* 1960 * We use ds_resume_bytes[] != 0 to indicate that we need to 1961 * update this on disk, so it must not be 0. 1962 */ 1963 ASSERT(rwa->bytes_read != 0); 1964 1965 /* 1966 * We only resume from write records, which have a valid 1967 * (non-meta-dnode) object number. 1968 */ 1969 ASSERT(object != 0); 1970 1971 /* 1972 * For resuming to work correctly, we must receive records in order, 1973 * sorted by object,offset. This is checked by the callers, but 1974 * assert it here for good measure. 1975 */ 1976 ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]); 1977 ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] || 1978 offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]); 1979 ASSERT3U(rwa->bytes_read, >=, 1980 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]); 1981 1982 rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object; 1983 rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset; 1984 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read; 1985} 1986 1987static int 1988receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, 1989 void *data) 1990{ 1991 dmu_object_info_t doi; 1992 dmu_tx_t *tx; 1993 uint64_t object; 1994 int err; 1995 1996 if (drro->drr_type == DMU_OT_NONE || 1997 !DMU_OT_IS_VALID(drro->drr_type) || 1998 !DMU_OT_IS_VALID(drro->drr_bonustype) || 1999 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS || 2000 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS || 2001 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) || 2002 drro->drr_blksz < SPA_MINBLOCKSIZE || 2003 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) || 2004 drro->drr_bonuslen > DN_MAX_BONUSLEN) { 2005 return (SET_ERROR(EINVAL)); 2006 } 2007 2008 err = dmu_object_info(rwa->os, drro->drr_object, &doi); 2009 2010 if (err != 0 && err != ENOENT) 2011 return (SET_ERROR(EINVAL)); 2012 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT; 2013 2014 /* 2015 * If we are losing blkptrs or changing the block size this must 2016 * be a new file instance. We must clear out the previous file 2017 * contents before we can change this type of metadata in the dnode. 2018 */ 2019 if (err == 0) { 2020 int nblkptr; 2021 2022 nblkptr = deduce_nblkptr(drro->drr_bonustype, 2023 drro->drr_bonuslen); 2024 2025 if (drro->drr_blksz != doi.doi_data_block_size || 2026 nblkptr < doi.doi_nblkptr) { 2027 err = dmu_free_long_range(rwa->os, drro->drr_object, 2028 0, DMU_OBJECT_END); 2029 if (err != 0) 2030 return (SET_ERROR(EINVAL)); 2031 } 2032 } 2033 2034 tx = dmu_tx_create(rwa->os); 2035 dmu_tx_hold_bonus(tx, object); 2036 err = dmu_tx_assign(tx, TXG_WAIT); 2037 if (err != 0) { 2038 dmu_tx_abort(tx); 2039 return (err); 2040 } 2041 2042 if (object == DMU_NEW_OBJECT) { 2043 /* currently free, want to be allocated */ 2044 err = dmu_object_claim(rwa->os, drro->drr_object, 2045 drro->drr_type, drro->drr_blksz, 2046 drro->drr_bonustype, drro->drr_bonuslen, tx); 2047 } else if (drro->drr_type != doi.doi_type || 2048 drro->drr_blksz != doi.doi_data_block_size || 2049 drro->drr_bonustype != doi.doi_bonus_type || 2050 drro->drr_bonuslen != doi.doi_bonus_size) { 2051 /* currently allocated, but with different properties */ 2052 err = dmu_object_reclaim(rwa->os, drro->drr_object, 2053 drro->drr_type, drro->drr_blksz, 2054 drro->drr_bonustype, drro->drr_bonuslen, tx); 2055 } 2056 if (err != 0) { 2057 dmu_tx_commit(tx); 2058 return (SET_ERROR(EINVAL)); 2059 } 2060 2061 dmu_object_set_checksum(rwa->os, drro->drr_object, 2062 drro->drr_checksumtype, tx); 2063 dmu_object_set_compress(rwa->os, drro->drr_object, 2064 drro->drr_compress, tx); 2065 2066 if (data != NULL) { 2067 dmu_buf_t *db; 2068 2069 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db)); 2070 dmu_buf_will_dirty(db, tx); 2071 2072 ASSERT3U(db->db_size, >=, drro->drr_bonuslen); 2073 bcopy(data, db->db_data, drro->drr_bonuslen); 2074 if (rwa->byteswap) { 2075 dmu_object_byteswap_t byteswap = 2076 DMU_OT_BYTESWAP(drro->drr_bonustype); 2077 dmu_ot_byteswap[byteswap].ob_func(db->db_data, 2078 drro->drr_bonuslen); 2079 } 2080 dmu_buf_rele(db, FTAG); 2081 } 2082 dmu_tx_commit(tx); 2083 2084 return (0); 2085} 2086 2087/* ARGSUSED */ 2088static int 2089receive_freeobjects(struct receive_writer_arg *rwa, 2090 struct drr_freeobjects *drrfo) 2091{ 2092 uint64_t obj; 2093 int next_err = 0; 2094 2095 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj) 2096 return (SET_ERROR(EINVAL)); 2097 2098 for (obj = drrfo->drr_firstobj; 2099 obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0; 2100 next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) { 2101 int err; 2102 2103 if (dmu_object_info(rwa->os, obj, NULL) != 0) 2104 continue; 2105 2106 err = dmu_free_long_object(rwa->os, obj); 2107 if (err != 0) 2108 return (err); 2109 } 2110 if (next_err != ESRCH) 2111 return (next_err); 2112 return (0); 2113} 2114 2115static int 2116receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, 2117 arc_buf_t *abuf) 2118{ 2119 dmu_tx_t *tx; 2120 int err; 2121 2122 if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset || 2123 !DMU_OT_IS_VALID(drrw->drr_type)) 2124 return (SET_ERROR(EINVAL)); 2125 2126 /* 2127 * For resuming to work, records must be in increasing order 2128 * by (object, offset). 2129 */ 2130 if (drrw->drr_object < rwa->last_object || 2131 (drrw->drr_object == rwa->last_object && 2132 drrw->drr_offset < rwa->last_offset)) { 2133 return (SET_ERROR(EINVAL)); 2134 } 2135 rwa->last_object = drrw->drr_object; 2136 rwa->last_offset = drrw->drr_offset; 2137 2138 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) 2139 return (SET_ERROR(EINVAL)); 2140 2141 tx = dmu_tx_create(rwa->os); 2142 2143 dmu_tx_hold_write(tx, drrw->drr_object, 2144 drrw->drr_offset, drrw->drr_length); 2145 err = dmu_tx_assign(tx, TXG_WAIT); 2146 if (err != 0) { 2147 dmu_tx_abort(tx); 2148 return (err); 2149 } 2150 if (rwa->byteswap) { 2151 dmu_object_byteswap_t byteswap = 2152 DMU_OT_BYTESWAP(drrw->drr_type); 2153 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, 2154 drrw->drr_length); 2155 } 2156 2157 dmu_buf_t *bonus; 2158 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0) 2159 return (SET_ERROR(EINVAL)); 2160 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx); 2161 2162 /* 2163 * Note: If the receive fails, we want the resume stream to start 2164 * with the same record that we last successfully received (as opposed 2165 * to the next record), so that we can verify that we are 2166 * resuming from the correct location. 2167 */ 2168 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); 2169 dmu_tx_commit(tx); 2170 dmu_buf_rele(bonus, FTAG); 2171 2172 return (0); 2173} 2174 2175/* 2176 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed 2177 * streams to refer to a copy of the data that is already on the 2178 * system because it came in earlier in the stream. This function 2179 * finds the earlier copy of the data, and uses that copy instead of 2180 * data from the stream to fulfill this write. 2181 */ 2182static int 2183receive_write_byref(struct receive_writer_arg *rwa, 2184 struct drr_write_byref *drrwbr) 2185{ 2186 dmu_tx_t *tx; 2187 int err; 2188 guid_map_entry_t gmesrch; 2189 guid_map_entry_t *gmep; 2190 avl_index_t where; 2191 objset_t *ref_os = NULL; 2192 dmu_buf_t *dbp; 2193 2194 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset) 2195 return (SET_ERROR(EINVAL)); 2196 2197 /* 2198 * If the GUID of the referenced dataset is different from the 2199 * GUID of the target dataset, find the referenced dataset. 2200 */ 2201 if (drrwbr->drr_toguid != drrwbr->drr_refguid) { 2202 gmesrch.guid = drrwbr->drr_refguid; 2203 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch, 2204 &where)) == NULL) { 2205 return (SET_ERROR(EINVAL)); 2206 } 2207 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os)) 2208 return (SET_ERROR(EINVAL)); 2209 } else { 2210 ref_os = rwa->os; 2211 } 2212 2213 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject, 2214 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH); 2215 if (err != 0) 2216 return (err); 2217 2218 tx = dmu_tx_create(rwa->os); 2219 2220 dmu_tx_hold_write(tx, drrwbr->drr_object, 2221 drrwbr->drr_offset, drrwbr->drr_length); 2222 err = dmu_tx_assign(tx, TXG_WAIT); 2223 if (err != 0) { 2224 dmu_tx_abort(tx); 2225 return (err); 2226 } 2227 dmu_write(rwa->os, drrwbr->drr_object, 2228 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx); 2229 dmu_buf_rele(dbp, FTAG); 2230 2231 /* See comment in restore_write. */ 2232 save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx); 2233 dmu_tx_commit(tx); 2234 return (0); 2235} 2236 2237static int 2238receive_write_embedded(struct receive_writer_arg *rwa, 2239 struct drr_write_embedded *drrwe, void *data) 2240{ 2241 dmu_tx_t *tx; 2242 int err; 2243 2244 if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset) 2245 return (EINVAL); 2246 2247 if (drrwe->drr_psize > BPE_PAYLOAD_SIZE) 2248 return (EINVAL); 2249 2250 if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES) 2251 return (EINVAL); 2252 if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS) 2253 return (EINVAL); 2254 2255 tx = dmu_tx_create(rwa->os); 2256 2257 dmu_tx_hold_write(tx, drrwe->drr_object, 2258 drrwe->drr_offset, drrwe->drr_length); 2259 err = dmu_tx_assign(tx, TXG_WAIT); 2260 if (err != 0) { 2261 dmu_tx_abort(tx); 2262 return (err); 2263 } 2264 2265 dmu_write_embedded(rwa->os, drrwe->drr_object, 2266 drrwe->drr_offset, data, drrwe->drr_etype, 2267 drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize, 2268 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx); 2269 2270 /* See comment in restore_write. */ 2271 save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx); 2272 dmu_tx_commit(tx); 2273 return (0); 2274} 2275 2276static int 2277receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, 2278 void *data) 2279{ 2280 dmu_tx_t *tx; 2281 dmu_buf_t *db, *db_spill; 2282 int err; 2283 2284 if (drrs->drr_length < SPA_MINBLOCKSIZE || 2285 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os))) 2286 return (SET_ERROR(EINVAL)); 2287 2288 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0) 2289 return (SET_ERROR(EINVAL)); 2290 2291 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db)); 2292 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) { 2293 dmu_buf_rele(db, FTAG); 2294 return (err); 2295 } 2296 2297 tx = dmu_tx_create(rwa->os); 2298 2299 dmu_tx_hold_spill(tx, db->db_object); 2300 2301 err = dmu_tx_assign(tx, TXG_WAIT); 2302 if (err != 0) { 2303 dmu_buf_rele(db, FTAG); 2304 dmu_buf_rele(db_spill, FTAG); 2305 dmu_tx_abort(tx); 2306 return (err); 2307 } 2308 dmu_buf_will_dirty(db_spill, tx); 2309 2310 if (db_spill->db_size < drrs->drr_length) 2311 VERIFY(0 == dbuf_spill_set_blksz(db_spill, 2312 drrs->drr_length, tx)); 2313 bcopy(data, db_spill->db_data, drrs->drr_length); 2314 2315 dmu_buf_rele(db, FTAG); 2316 dmu_buf_rele(db_spill, FTAG); 2317 2318 dmu_tx_commit(tx); 2319 return (0); 2320} 2321 2322/* ARGSUSED */ 2323static int 2324receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf) 2325{ 2326 int err; 2327 2328 if (drrf->drr_length != -1ULL && 2329 drrf->drr_offset + drrf->drr_length < drrf->drr_offset) 2330 return (SET_ERROR(EINVAL)); 2331 2332 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0) 2333 return (SET_ERROR(EINVAL)); 2334 2335 err = dmu_free_long_range(rwa->os, drrf->drr_object, 2336 drrf->drr_offset, drrf->drr_length); 2337 2338 return (err); 2339} 2340 2341/* used to destroy the drc_ds on error */ 2342static void 2343dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) 2344{ 2345 if (drc->drc_resumable) { 2346 /* wait for our resume state to be written to disk */ 2347 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0); 2348 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2349 } else { 2350 char name[MAXNAMELEN]; 2351 dsl_dataset_name(drc->drc_ds, name); 2352 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2353 (void) dsl_destroy_head(name); 2354 } 2355} 2356 2357static void 2358receive_cksum(struct receive_arg *ra, int len, void *buf) 2359{ 2360 if (ra->byteswap) { 2361 fletcher_4_incremental_byteswap(buf, len, &ra->cksum); 2362 } else { 2363 fletcher_4_incremental_native(buf, len, &ra->cksum); 2364 } 2365} 2366 2367/* 2368 * Read the payload into a buffer of size len, and update the current record's 2369 * payload field. 2370 * Allocate ra->next_rrd and read the next record's header into 2371 * ra->next_rrd->header. 2372 * Verify checksum of payload and next record. 2373 */ 2374static int 2375receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) 2376{ 2377 int err; 2378 2379 if (len != 0) { 2380 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE); 2381 err = receive_read(ra, len, buf); 2382 if (err != 0) 2383 return (err); 2384 receive_cksum(ra, len, buf); 2385 2386 /* note: rrd is NULL when reading the begin record's payload */ 2387 if (ra->rrd != NULL) { 2388 ra->rrd->payload = buf; 2389 ra->rrd->payload_size = len; 2390 ra->rrd->bytes_read = ra->bytes_read; 2391 } 2392 } 2393 2394 ra->prev_cksum = ra->cksum; 2395 2396 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP); 2397 err = receive_read(ra, sizeof (ra->next_rrd->header), 2398 &ra->next_rrd->header); 2399 ra->next_rrd->bytes_read = ra->bytes_read; 2400 if (err != 0) { 2401 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2402 ra->next_rrd = NULL; 2403 return (err); 2404 } 2405 if (ra->next_rrd->header.drr_type == DRR_BEGIN) { 2406 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2407 ra->next_rrd = NULL; 2408 return (SET_ERROR(EINVAL)); 2409 } 2410 2411 /* 2412 * Note: checksum is of everything up to but not including the 2413 * checksum itself. 2414 */ 2415 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2416 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 2417 receive_cksum(ra, 2418 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2419 &ra->next_rrd->header); 2420 2421 zio_cksum_t cksum_orig = 2422 ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2423 zio_cksum_t *cksump = 2424 &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2425 2426 if (ra->byteswap) 2427 byteswap_record(&ra->next_rrd->header); 2428 2429 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) && 2430 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) { 2431 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2432 ra->next_rrd = NULL; 2433 return (SET_ERROR(ECKSUM)); 2434 } 2435 2436 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig); 2437 2438 return (0); 2439} 2440 2441static void 2442objlist_create(struct objlist *list) 2443{ 2444 list_create(&list->list, sizeof (struct receive_objnode), 2445 offsetof(struct receive_objnode, node)); 2446 list->last_lookup = 0; 2447} 2448 2449static void 2450objlist_destroy(struct objlist *list) 2451{ 2452 for (struct receive_objnode *n = list_remove_head(&list->list); 2453 n != NULL; n = list_remove_head(&list->list)) { 2454 kmem_free(n, sizeof (*n)); 2455 } 2456 list_destroy(&list->list); 2457} 2458 2459/* 2460 * This function looks through the objlist to see if the specified object number 2461 * is contained in the objlist. In the process, it will remove all object 2462 * numbers in the list that are smaller than the specified object number. Thus, 2463 * any lookup of an object number smaller than a previously looked up object 2464 * number will always return false; therefore, all lookups should be done in 2465 * ascending order. 2466 */ 2467static boolean_t 2468objlist_exists(struct objlist *list, uint64_t object) 2469{ 2470 struct receive_objnode *node = list_head(&list->list); 2471 ASSERT3U(object, >=, list->last_lookup); 2472 list->last_lookup = object; 2473 while (node != NULL && node->object < object) { 2474 VERIFY3P(node, ==, list_remove_head(&list->list)); 2475 kmem_free(node, sizeof (*node)); 2476 node = list_head(&list->list); 2477 } 2478 return (node != NULL && node->object == object); 2479} 2480 2481/* 2482 * The objlist is a list of object numbers stored in ascending order. However, 2483 * the insertion of new object numbers does not seek out the correct location to 2484 * store a new object number; instead, it appends it to the list for simplicity. 2485 * Thus, any users must take care to only insert new object numbers in ascending 2486 * order. 2487 */ 2488static void 2489objlist_insert(struct objlist *list, uint64_t object) 2490{ 2491 struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP); 2492 node->object = object; 2493#ifdef ZFS_DEBUG 2494 struct receive_objnode *last_object = list_tail(&list->list); 2495 uint64_t last_objnum = (last_object != NULL ? last_object->object : 0); 2496 ASSERT3U(node->object, >, last_objnum); 2497#endif 2498 list_insert_tail(&list->list, node); 2499} 2500 2501/* 2502 * Issue the prefetch reads for any necessary indirect blocks. 2503 * 2504 * We use the object ignore list to tell us whether or not to issue prefetches 2505 * for a given object. We do this for both correctness (in case the blocksize 2506 * of an object has changed) and performance (if the object doesn't exist, don't 2507 * needlessly try to issue prefetches). We also trim the list as we go through 2508 * the stream to prevent it from growing to an unbounded size. 2509 * 2510 * The object numbers within will always be in sorted order, and any write 2511 * records we see will also be in sorted order, but they're not sorted with 2512 * respect to each other (i.e. we can get several object records before 2513 * receiving each object's write records). As a result, once we've reached a 2514 * given object number, we can safely remove any reference to lower object 2515 * numbers in the ignore list. In practice, we receive up to 32 object records 2516 * before receiving write records, so the list can have up to 32 nodes in it. 2517 */ 2518/* ARGSUSED */ 2519static void 2520receive_read_prefetch(struct receive_arg *ra, 2521 uint64_t object, uint64_t offset, uint64_t length) 2522{ 2523 if (!objlist_exists(&ra->ignore_objlist, object)) { 2524 dmu_prefetch(ra->os, object, 1, offset, length, 2525 ZIO_PRIORITY_SYNC_READ); 2526 } 2527} 2528 2529/* 2530 * Read records off the stream, issuing any necessary prefetches. 2531 */ 2532static int 2533receive_read_record(struct receive_arg *ra) 2534{ 2535 int err; 2536 2537 switch (ra->rrd->header.drr_type) { 2538 case DRR_OBJECT: 2539 { 2540 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object; 2541 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8); 2542 void *buf = kmem_zalloc(size, KM_SLEEP); 2543 dmu_object_info_t doi; 2544 err = receive_read_payload_and_next_header(ra, size, buf); 2545 if (err != 0) { 2546 kmem_free(buf, size); 2547 return (err); 2548 } 2549 err = dmu_object_info(ra->os, drro->drr_object, &doi); 2550 /* 2551 * See receive_read_prefetch for an explanation why we're 2552 * storing this object in the ignore_obj_list. 2553 */ 2554 if (err == ENOENT || 2555 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) { 2556 objlist_insert(&ra->ignore_objlist, drro->drr_object); 2557 err = 0; 2558 } 2559 return (err); 2560 } 2561 case DRR_FREEOBJECTS: 2562 { 2563 err = receive_read_payload_and_next_header(ra, 0, NULL); 2564 return (err); 2565 } 2566 case DRR_WRITE: 2567 { 2568 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write; 2569 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os), 2570 drrw->drr_length); 2571 2572 err = receive_read_payload_and_next_header(ra, 2573 drrw->drr_length, abuf->b_data); 2574 if (err != 0) { 2575 dmu_return_arcbuf(abuf); 2576 return (err); 2577 } 2578 ra->rrd->write_buf = abuf; 2579 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset, 2580 drrw->drr_length); 2581 return (err); 2582 } 2583 case DRR_WRITE_BYREF: 2584 { 2585 struct drr_write_byref *drrwb = 2586 &ra->rrd->header.drr_u.drr_write_byref; 2587 err = receive_read_payload_and_next_header(ra, 0, NULL); 2588 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset, 2589 drrwb->drr_length); 2590 return (err); 2591 } 2592 case DRR_WRITE_EMBEDDED: 2593 { 2594 struct drr_write_embedded *drrwe = 2595 &ra->rrd->header.drr_u.drr_write_embedded; 2596 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8); 2597 void *buf = kmem_zalloc(size, KM_SLEEP); 2598 2599 err = receive_read_payload_and_next_header(ra, size, buf); 2600 if (err != 0) { 2601 kmem_free(buf, size); 2602 return (err); 2603 } 2604 2605 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset, 2606 drrwe->drr_length); 2607 return (err); 2608 } 2609 case DRR_FREE: 2610 { 2611 /* 2612 * It might be beneficial to prefetch indirect blocks here, but 2613 * we don't really have the data to decide for sure. 2614 */ 2615 err = receive_read_payload_and_next_header(ra, 0, NULL); 2616 return (err); 2617 } 2618 case DRR_END: 2619 { 2620 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end; 2621 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum)) 2622 return (SET_ERROR(ECKSUM)); 2623 return (0); 2624 } 2625 case DRR_SPILL: 2626 { 2627 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill; 2628 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP); 2629 err = receive_read_payload_and_next_header(ra, drrs->drr_length, 2630 buf); 2631 if (err != 0) 2632 kmem_free(buf, drrs->drr_length); 2633 return (err); 2634 } 2635 default: 2636 return (SET_ERROR(EINVAL)); 2637 } 2638} 2639 2640/* 2641 * Commit the records to the pool. 2642 */ 2643static int 2644receive_process_record(struct receive_writer_arg *rwa, 2645 struct receive_record_arg *rrd) 2646{ 2647 int err; 2648 2649 /* Processing in order, therefore bytes_read should be increasing. */ 2650 ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); 2651 rwa->bytes_read = rrd->bytes_read; 2652 2653 switch (rrd->header.drr_type) { 2654 case DRR_OBJECT: 2655 { 2656 struct drr_object *drro = &rrd->header.drr_u.drr_object; 2657 err = receive_object(rwa, drro, rrd->payload); 2658 kmem_free(rrd->payload, rrd->payload_size); 2659 rrd->payload = NULL; 2660 return (err); 2661 } 2662 case DRR_FREEOBJECTS: 2663 { 2664 struct drr_freeobjects *drrfo = 2665 &rrd->header.drr_u.drr_freeobjects; 2666 return (receive_freeobjects(rwa, drrfo)); 2667 } 2668 case DRR_WRITE: 2669 { 2670 struct drr_write *drrw = &rrd->header.drr_u.drr_write; 2671 err = receive_write(rwa, drrw, rrd->write_buf); 2672 /* if receive_write() is successful, it consumes the arc_buf */ 2673 if (err != 0) 2674 dmu_return_arcbuf(rrd->write_buf); 2675 rrd->write_buf = NULL; 2676 rrd->payload = NULL; 2677 return (err); 2678 } 2679 case DRR_WRITE_BYREF: 2680 { 2681 struct drr_write_byref *drrwbr = 2682 &rrd->header.drr_u.drr_write_byref; 2683 return (receive_write_byref(rwa, drrwbr)); 2684 } 2685 case DRR_WRITE_EMBEDDED: 2686 { 2687 struct drr_write_embedded *drrwe = 2688 &rrd->header.drr_u.drr_write_embedded; 2689 err = receive_write_embedded(rwa, drrwe, rrd->payload); 2690 kmem_free(rrd->payload, rrd->payload_size); 2691 rrd->payload = NULL; 2692 return (err); 2693 } 2694 case DRR_FREE: 2695 { 2696 struct drr_free *drrf = &rrd->header.drr_u.drr_free; 2697 return (receive_free(rwa, drrf)); 2698 } 2699 case DRR_SPILL: 2700 { 2701 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill; 2702 err = receive_spill(rwa, drrs, rrd->payload); 2703 kmem_free(rrd->payload, rrd->payload_size); 2704 rrd->payload = NULL; 2705 return (err); 2706 } 2707 default: 2708 return (SET_ERROR(EINVAL)); 2709 } 2710} 2711 2712/* 2713 * dmu_recv_stream's worker thread; pull records off the queue, and then call 2714 * receive_process_record When we're done, signal the main thread and exit. 2715 */ 2716static void 2717receive_writer_thread(void *arg) 2718{ 2719 struct receive_writer_arg *rwa = arg; 2720 struct receive_record_arg *rrd; 2721 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker; 2722 rrd = bqueue_dequeue(&rwa->q)) { 2723 /* 2724 * If there's an error, the main thread will stop putting things 2725 * on the queue, but we need to clear everything in it before we 2726 * can exit. 2727 */ 2728 if (rwa->err == 0) { 2729 rwa->err = receive_process_record(rwa, rrd); 2730 } else if (rrd->write_buf != NULL) { 2731 dmu_return_arcbuf(rrd->write_buf); 2732 rrd->write_buf = NULL; 2733 rrd->payload = NULL; 2734 } else if (rrd->payload != NULL) { 2735 kmem_free(rrd->payload, rrd->payload_size); 2736 rrd->payload = NULL; 2737 } 2738 kmem_free(rrd, sizeof (*rrd)); 2739 } 2740 kmem_free(rrd, sizeof (*rrd)); 2741 mutex_enter(&rwa->mutex); 2742 rwa->done = B_TRUE; 2743 cv_signal(&rwa->cv); 2744 mutex_exit(&rwa->mutex); 2745 thread_exit(); 2746} 2747 2748static int 2749resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) 2750{ 2751 uint64_t val; 2752 objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset; 2753 uint64_t dsobj = dmu_objset_id(ra->os); 2754 uint64_t resume_obj, resume_off; 2755 2756 if (nvlist_lookup_uint64(begin_nvl, 2757 "resume_object", &resume_obj) != 0 || 2758 nvlist_lookup_uint64(begin_nvl, 2759 "resume_offset", &resume_off) != 0) { 2760 return (SET_ERROR(EINVAL)); 2761 } 2762 VERIFY0(zap_lookup(mos, dsobj, 2763 DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val)); 2764 if (resume_obj != val) 2765 return (SET_ERROR(EINVAL)); 2766 VERIFY0(zap_lookup(mos, dsobj, 2767 DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val)); 2768 if (resume_off != val) 2769 return (SET_ERROR(EINVAL)); 2770 2771 return (0); 2772} 2773 2774/* 2775 * Read in the stream's records, one by one, and apply them to the pool. There 2776 * are two threads involved; the thread that calls this function will spin up a 2777 * worker thread, read the records off the stream one by one, and issue 2778 * prefetches for any necessary indirect blocks. It will then push the records 2779 * onto an internal blocking queue. The worker thread will pull the records off 2780 * the queue, and actually write the data into the DMU. This way, the worker 2781 * thread doesn't have to wait for reads to complete, since everything it needs 2782 * (the indirect blocks) will be prefetched. 2783 * 2784 * NB: callers *must* call dmu_recv_end() if this succeeds. 2785 */ 2786int 2787dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp, 2788 int cleanup_fd, uint64_t *action_handlep) 2789{ 2790 int err = 0; 2791 struct receive_arg ra = { 0 }; 2792 struct receive_writer_arg rwa = { 0 }; 2793 int featureflags; 2794 nvlist_t *begin_nvl = NULL; 2795 2796 ra.byteswap = drc->drc_byteswap; 2797 ra.cksum = drc->drc_cksum; 2798 ra.td = curthread; 2799 ra.fp = fp; 2800 ra.voff = *voffp; 2801 2802 if (dsl_dataset_is_zapified(drc->drc_ds)) { 2803 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset, 2804 drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES, 2805 sizeof (ra.bytes_read), 1, &ra.bytes_read); 2806 } 2807 2808 objlist_create(&ra.ignore_objlist); 2809 2810 /* these were verified in dmu_recv_begin */ 2811 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==, 2812 DMU_SUBSTREAM); 2813 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES); 2814 2815 /* 2816 * Open the objset we are modifying. 2817 */ 2818 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os)); 2819 2820 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT); 2821 2822 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); 2823 2824 /* if this stream is dedup'ed, set up the avl tree for guid mapping */ 2825 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { 2826 minor_t minor; 2827 2828 if (cleanup_fd == -1) { 2829 ra.err = SET_ERROR(EBADF); 2830 goto out; 2831 } 2832 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor); 2833 if (ra.err != 0) { 2834 cleanup_fd = -1; 2835 goto out; 2836 } 2837 2838 if (*action_handlep == 0) { 2839 rwa.guid_to_ds_map = 2840 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP); 2841 avl_create(rwa.guid_to_ds_map, guid_compare, 2842 sizeof (guid_map_entry_t), 2843 offsetof(guid_map_entry_t, avlnode)); 2844 err = zfs_onexit_add_cb(minor, 2845 free_guid_map_onexit, rwa.guid_to_ds_map, 2846 action_handlep); 2847 if (ra.err != 0) 2848 goto out; 2849 } else { 2850 err = zfs_onexit_cb_data(minor, *action_handlep, 2851 (void **)&rwa.guid_to_ds_map); 2852 if (ra.err != 0) 2853 goto out; 2854 } 2855 2856 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map; 2857 } 2858 2859 uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; 2860 void *payload = NULL; 2861 if (payloadlen != 0) 2862 payload = kmem_alloc(payloadlen, KM_SLEEP); 2863 2864 err = receive_read_payload_and_next_header(&ra, payloadlen, payload); 2865 if (err != 0) { 2866 if (payloadlen != 0) 2867 kmem_free(payload, payloadlen); 2868 goto out; 2869 } 2870 if (payloadlen != 0) { 2871 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP); 2872 kmem_free(payload, payloadlen); 2873 if (err != 0) 2874 goto out; 2875 } 2876 2877 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) { 2878 err = resume_check(&ra, begin_nvl); 2879 if (err != 0) 2880 goto out; 2881 } 2882 2883 (void) bqueue_init(&rwa.q, zfs_recv_queue_length, 2884 offsetof(struct receive_record_arg, node)); 2885 cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL); 2886 mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL); 2887 rwa.os = ra.os; 2888 rwa.byteswap = drc->drc_byteswap; 2889 rwa.resumable = drc->drc_resumable; 2890 2891 (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0, 2892 TS_RUN, minclsyspri); 2893 /* 2894 * We're reading rwa.err without locks, which is safe since we are the 2895 * only reader, and the worker thread is the only writer. It's ok if we 2896 * miss a write for an iteration or two of the loop, since the writer 2897 * thread will keep freeing records we send it until we send it an eos 2898 * marker. 2899 * 2900 * We can leave this loop in 3 ways: First, if rwa.err is 2901 * non-zero. In that case, the writer thread will free the rrd we just 2902 * pushed. Second, if we're interrupted; in that case, either it's the 2903 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd 2904 * has been handed off to the writer thread who will free it. Finally, 2905 * if receive_read_record fails or we're at the end of the stream, then 2906 * we free ra.rrd and exit. 2907 */ 2908 while (rwa.err == 0) { 2909 if (issig(JUSTLOOKING) && issig(FORREAL)) { 2910 err = SET_ERROR(EINTR); 2911 break; 2912 } 2913 2914 ASSERT3P(ra.rrd, ==, NULL); 2915 ra.rrd = ra.next_rrd; 2916 ra.next_rrd = NULL; 2917 /* Allocates and loads header into ra.next_rrd */ 2918 err = receive_read_record(&ra); 2919 2920 if (ra.rrd->header.drr_type == DRR_END || err != 0) { 2921 kmem_free(ra.rrd, sizeof (*ra.rrd)); 2922 ra.rrd = NULL; 2923 break; 2924 } 2925 2926 bqueue_enqueue(&rwa.q, ra.rrd, 2927 sizeof (struct receive_record_arg) + ra.rrd->payload_size); 2928 ra.rrd = NULL; 2929 } 2930 if (ra.next_rrd == NULL) 2931 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP); 2932 ra.next_rrd->eos_marker = B_TRUE; 2933 bqueue_enqueue(&rwa.q, ra.next_rrd, 1); 2934 2935 mutex_enter(&rwa.mutex); 2936 while (!rwa.done) { 2937 cv_wait(&rwa.cv, &rwa.mutex); 2938 } 2939 mutex_exit(&rwa.mutex); 2940 2941 cv_destroy(&rwa.cv); 2942 mutex_destroy(&rwa.mutex); 2943 bqueue_destroy(&rwa.q); 2944 if (err == 0) 2945 err = rwa.err; 2946 2947out: 2948 nvlist_free(begin_nvl); 2949 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1)) 2950 zfs_onexit_fd_rele(cleanup_fd); 2951 2952 if (err != 0) { 2953 /* 2954 * Clean up references. If receive is not resumable, 2955 * destroy what we created, so we don't leave it in 2956 * the inconsistent state. 2957 */ 2958 dmu_recv_cleanup_ds(drc); 2959 } 2960 2961 *voffp = ra.voff; 2962 objlist_destroy(&ra.ignore_objlist); 2963 return (err); 2964} 2965 2966static int 2967dmu_recv_end_check(void *arg, dmu_tx_t *tx) 2968{ 2969 dmu_recv_cookie_t *drc = arg; 2970 dsl_pool_t *dp = dmu_tx_pool(tx); 2971 int error; 2972 2973 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag); 2974 2975 if (!drc->drc_newfs) { 2976 dsl_dataset_t *origin_head; 2977 2978 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head); 2979 if (error != 0) 2980 return (error); 2981 if (drc->drc_force) { 2982 /* 2983 * We will destroy any snapshots in tofs (i.e. before 2984 * origin_head) that are after the origin (which is 2985 * the snap before drc_ds, because drc_ds can not 2986 * have any snaps of its own). 2987 */ 2988 uint64_t obj; 2989 2990 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 2991 while (obj != 2992 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 2993 dsl_dataset_t *snap; 2994 error = dsl_dataset_hold_obj(dp, obj, FTAG, 2995 &snap); 2996 if (error != 0) 2997 break; 2998 if (snap->ds_dir != origin_head->ds_dir) 2999 error = SET_ERROR(EINVAL); 3000 if (error == 0) { 3001 error = dsl_destroy_snapshot_check_impl( 3002 snap, B_FALSE); 3003 } 3004 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3005 dsl_dataset_rele(snap, FTAG); 3006 if (error != 0) 3007 break; 3008 } 3009 if (error != 0) { 3010 dsl_dataset_rele(origin_head, FTAG); 3011 return (error); 3012 } 3013 } 3014 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds, 3015 origin_head, drc->drc_force, drc->drc_owner, tx); 3016 if (error != 0) { 3017 dsl_dataset_rele(origin_head, FTAG); 3018 return (error); 3019 } 3020 error = dsl_dataset_snapshot_check_impl(origin_head, 3021 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3022 dsl_dataset_rele(origin_head, FTAG); 3023 if (error != 0) 3024 return (error); 3025 3026 error = dsl_destroy_head_check_impl(drc->drc_ds, 1); 3027 } else { 3028 error = dsl_dataset_snapshot_check_impl(drc->drc_ds, 3029 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3030 } 3031 return (error); 3032} 3033 3034static void 3035dmu_recv_end_sync(void *arg, dmu_tx_t *tx) 3036{ 3037 dmu_recv_cookie_t *drc = arg; 3038 dsl_pool_t *dp = dmu_tx_pool(tx); 3039 3040 spa_history_log_internal_ds(drc->drc_ds, "finish receiving", 3041 tx, "snap=%s", drc->drc_tosnap); 3042 3043 if (!drc->drc_newfs) { 3044 dsl_dataset_t *origin_head; 3045 3046 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG, 3047 &origin_head)); 3048 3049 if (drc->drc_force) { 3050 /* 3051 * Destroy any snapshots of drc_tofs (origin_head) 3052 * after the origin (the snap before drc_ds). 3053 */ 3054 uint64_t obj; 3055 3056 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3057 while (obj != 3058 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3059 dsl_dataset_t *snap; 3060 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, 3061 &snap)); 3062 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir); 3063 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3064 dsl_destroy_snapshot_sync_impl(snap, 3065 B_FALSE, tx); 3066 dsl_dataset_rele(snap, FTAG); 3067 } 3068 } 3069 VERIFY3P(drc->drc_ds->ds_prev, ==, 3070 origin_head->ds_prev); 3071 3072 dsl_dataset_clone_swap_sync_impl(drc->drc_ds, 3073 origin_head, tx); 3074 dsl_dataset_snapshot_sync_impl(origin_head, 3075 drc->drc_tosnap, tx); 3076 3077 /* set snapshot's creation time and guid */ 3078 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx); 3079 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time = 3080 drc->drc_drrb->drr_creation_time; 3081 dsl_dataset_phys(origin_head->ds_prev)->ds_guid = 3082 drc->drc_drrb->drr_toguid; 3083 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &= 3084 ~DS_FLAG_INCONSISTENT; 3085 3086 dmu_buf_will_dirty(origin_head->ds_dbuf, tx); 3087 dsl_dataset_phys(origin_head)->ds_flags &= 3088 ~DS_FLAG_INCONSISTENT; 3089 3090 dsl_dataset_rele(origin_head, FTAG); 3091 dsl_destroy_head_sync_impl(drc->drc_ds, tx); 3092 3093 if (drc->drc_owner != NULL) 3094 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner); 3095 } else { 3096 dsl_dataset_t *ds = drc->drc_ds; 3097 3098 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx); 3099 3100 /* set snapshot's creation time and guid */ 3101 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx); 3102 dsl_dataset_phys(ds->ds_prev)->ds_creation_time = 3103 drc->drc_drrb->drr_creation_time; 3104 dsl_dataset_phys(ds->ds_prev)->ds_guid = 3105 drc->drc_drrb->drr_toguid; 3106 dsl_dataset_phys(ds->ds_prev)->ds_flags &= 3107 ~DS_FLAG_INCONSISTENT; 3108 3109 dmu_buf_will_dirty(ds->ds_dbuf, tx); 3110 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 3111 if (dsl_dataset_has_resume_receive_state(ds)) { 3112 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3113 DS_FIELD_RESUME_FROMGUID, tx); 3114 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3115 DS_FIELD_RESUME_OBJECT, tx); 3116 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3117 DS_FIELD_RESUME_OFFSET, tx); 3118 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3119 DS_FIELD_RESUME_BYTES, tx); 3120 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3121 DS_FIELD_RESUME_TOGUID, tx); 3122 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3123 DS_FIELD_RESUME_TONAME, tx); 3124 } 3125 } 3126 drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj; 3127 /* 3128 * Release the hold from dmu_recv_begin. This must be done before 3129 * we return to open context, so that when we free the dataset's dnode, 3130 * we can evict its bonus buffer. 3131 */ 3132 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 3133 drc->drc_ds = NULL; 3134} 3135 3136static int 3137add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj) 3138{ 3139 dsl_pool_t *dp; 3140 dsl_dataset_t *snapds; 3141 guid_map_entry_t *gmep; 3142 int err; 3143 3144 ASSERT(guid_map != NULL); 3145 3146 err = dsl_pool_hold(name, FTAG, &dp); 3147 if (err != 0) 3148 return (err); 3149 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP); 3150 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds); 3151 if (err == 0) { 3152 gmep->guid = dsl_dataset_phys(snapds)->ds_guid; 3153 gmep->gme_ds = snapds; 3154 avl_add(guid_map, gmep); 3155 dsl_dataset_long_hold(snapds, gmep); 3156 } else 3157 kmem_free(gmep, sizeof (*gmep)); 3158 3159 dsl_pool_rele(dp, FTAG); 3160 return (err); 3161} 3162 3163static int dmu_recv_end_modified_blocks = 3; 3164 3165static int 3166dmu_recv_existing_end(dmu_recv_cookie_t *drc) 3167{ 3168 int error; 3169 char name[MAXNAMELEN]; 3170 3171#ifdef _KERNEL 3172 /* 3173 * We will be destroying the ds; make sure its origin is unmounted if 3174 * necessary. 3175 */ 3176 dsl_dataset_name(drc->drc_ds, name); 3177 zfs_destroy_unmount_origin(name); 3178#endif 3179 3180 error = dsl_sync_task(drc->drc_tofs, 3181 dmu_recv_end_check, dmu_recv_end_sync, drc, 3182 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3183 3184 if (error != 0) 3185 dmu_recv_cleanup_ds(drc); 3186 return (error); 3187} 3188 3189static int 3190dmu_recv_new_end(dmu_recv_cookie_t *drc) 3191{ 3192 int error; 3193 3194 error = dsl_sync_task(drc->drc_tofs, 3195 dmu_recv_end_check, dmu_recv_end_sync, drc, 3196 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3197 3198 if (error != 0) { 3199 dmu_recv_cleanup_ds(drc); 3200 } else if (drc->drc_guid_to_ds_map != NULL) { 3201 (void) add_ds_to_guidmap(drc->drc_tofs, 3202 drc->drc_guid_to_ds_map, 3203 drc->drc_newsnapobj); 3204 } 3205 return (error); 3206} 3207 3208int 3209dmu_recv_end(dmu_recv_cookie_t *drc, void *owner) 3210{ 3211 drc->drc_owner = owner; 3212 3213 if (drc->drc_newfs) 3214 return (dmu_recv_new_end(drc)); 3215 else 3216 return (dmu_recv_existing_end(drc)); 3217} 3218 3219/* 3220 * Return TRUE if this objset is currently being received into. 3221 */ 3222boolean_t 3223dmu_objset_is_receiving(objset_t *os) 3224{ 3225 return (os->os_dsl_dataset != NULL && 3226 os->os_dsl_dataset->ds_owner == dmu_recv_tag); 3227} 3228