dsl_dataset.c revision 239389
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2012 by Delphix. All rights reserved.
24 * Copyright (c) 2012, Joyent, Inc. All rights reserved.
25 * Copyright (c) 2011 Pawel Jakub Dawidek <pawel@dawidek.net>.
26 * All rights reserved.
27 * Portions Copyright (c) 2011 Martin Matuska <mm@FreeBSD.org>
28 */
29
30#include <sys/dmu_objset.h>
31#include <sys/dsl_dataset.h>
32#include <sys/dsl_dir.h>
33#include <sys/dsl_prop.h>
34#include <sys/dsl_synctask.h>
35#include <sys/dmu_traverse.h>
36#include <sys/dmu_impl.h>
37#include <sys/dmu_tx.h>
38#include <sys/arc.h>
39#include <sys/zio.h>
40#include <sys/zap.h>
41#include <sys/zfeature.h>
42#include <sys/unique.h>
43#include <sys/zfs_context.h>
44#include <sys/zfs_ioctl.h>
45#include <sys/spa.h>
46#include <sys/zfs_znode.h>
47#include <sys/zfs_onexit.h>
48#include <sys/zvol.h>
49#include <sys/dsl_scan.h>
50#include <sys/dsl_deadlist.h>
51
52static char *dsl_reaper = "the grim reaper";
53
54static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
55static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
56static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
57
58#define	SWITCH64(x, y) \
59	{ \
60		uint64_t __tmp = (x); \
61		(x) = (y); \
62		(y) = __tmp; \
63	}
64
65#define	DS_REF_MAX	(1ULL << 62)
66
67#define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
68
69#define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
70
71
72/*
73 * Figure out how much of this delta should be propogated to the dsl_dir
74 * layer.  If there's a refreservation, that space has already been
75 * partially accounted for in our ancestors.
76 */
77static int64_t
78parent_delta(dsl_dataset_t *ds, int64_t delta)
79{
80	uint64_t old_bytes, new_bytes;
81
82	if (ds->ds_reserved == 0)
83		return (delta);
84
85	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
86	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
87
88	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
89	return (new_bytes - old_bytes);
90}
91
92void
93dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
94{
95	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
96	int compressed = BP_GET_PSIZE(bp);
97	int uncompressed = BP_GET_UCSIZE(bp);
98	int64_t delta;
99
100	dprintf_bp(bp, "ds=%p", ds);
101
102	ASSERT(dmu_tx_is_syncing(tx));
103	/* It could have been compressed away to nothing */
104	if (BP_IS_HOLE(bp))
105		return;
106	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
107	ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
108	if (ds == NULL) {
109		/*
110		 * Account for the meta-objset space in its placeholder
111		 * dsl_dir.
112		 */
113		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
114		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
115		    used, compressed, uncompressed, tx);
116		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
117		return;
118	}
119	dmu_buf_will_dirty(ds->ds_dbuf, tx);
120
121	mutex_enter(&ds->ds_dir->dd_lock);
122	mutex_enter(&ds->ds_lock);
123	delta = parent_delta(ds, used);
124	ds->ds_phys->ds_referenced_bytes += used;
125	ds->ds_phys->ds_compressed_bytes += compressed;
126	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
127	ds->ds_phys->ds_unique_bytes += used;
128	mutex_exit(&ds->ds_lock);
129	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
130	    compressed, uncompressed, tx);
131	dsl_dir_transfer_space(ds->ds_dir, used - delta,
132	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
133	mutex_exit(&ds->ds_dir->dd_lock);
134}
135
136int
137dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
138    boolean_t async)
139{
140	if (BP_IS_HOLE(bp))
141		return (0);
142
143	ASSERT(dmu_tx_is_syncing(tx));
144	ASSERT(bp->blk_birth <= tx->tx_txg);
145
146	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
147	int compressed = BP_GET_PSIZE(bp);
148	int uncompressed = BP_GET_UCSIZE(bp);
149
150	ASSERT(used > 0);
151	if (ds == NULL) {
152		/*
153		 * Account for the meta-objset space in its placeholder
154		 * dataset.
155		 */
156		dsl_free(tx->tx_pool, tx->tx_txg, bp);
157
158		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
159		    -used, -compressed, -uncompressed, tx);
160		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
161		return (used);
162	}
163	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
164
165	ASSERT(!dsl_dataset_is_snapshot(ds));
166	dmu_buf_will_dirty(ds->ds_dbuf, tx);
167
168	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
169		int64_t delta;
170
171		dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
172		dsl_free(tx->tx_pool, tx->tx_txg, bp);
173
174		mutex_enter(&ds->ds_dir->dd_lock);
175		mutex_enter(&ds->ds_lock);
176		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
177		    !DS_UNIQUE_IS_ACCURATE(ds));
178		delta = parent_delta(ds, -used);
179		ds->ds_phys->ds_unique_bytes -= used;
180		mutex_exit(&ds->ds_lock);
181		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
182		    delta, -compressed, -uncompressed, tx);
183		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
184		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
185		mutex_exit(&ds->ds_dir->dd_lock);
186	} else {
187		dprintf_bp(bp, "putting on dead list: %s", "");
188		if (async) {
189			/*
190			 * We are here as part of zio's write done callback,
191			 * which means we're a zio interrupt thread.  We can't
192			 * call dsl_deadlist_insert() now because it may block
193			 * waiting for I/O.  Instead, put bp on the deferred
194			 * queue and let dsl_pool_sync() finish the job.
195			 */
196			bplist_append(&ds->ds_pending_deadlist, bp);
197		} else {
198			dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
199		}
200		ASSERT3U(ds->ds_prev->ds_object, ==,
201		    ds->ds_phys->ds_prev_snap_obj);
202		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
203		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
204		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
205		    ds->ds_object && bp->blk_birth >
206		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
207			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
208			mutex_enter(&ds->ds_prev->ds_lock);
209			ds->ds_prev->ds_phys->ds_unique_bytes += used;
210			mutex_exit(&ds->ds_prev->ds_lock);
211		}
212		if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
213			dsl_dir_transfer_space(ds->ds_dir, used,
214			    DD_USED_HEAD, DD_USED_SNAP, tx);
215		}
216	}
217	mutex_enter(&ds->ds_lock);
218	ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
219	ds->ds_phys->ds_referenced_bytes -= used;
220	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
221	ds->ds_phys->ds_compressed_bytes -= compressed;
222	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
223	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
224	mutex_exit(&ds->ds_lock);
225
226	return (used);
227}
228
229uint64_t
230dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
231{
232	uint64_t trysnap = 0;
233
234	if (ds == NULL)
235		return (0);
236	/*
237	 * The snapshot creation could fail, but that would cause an
238	 * incorrect FALSE return, which would only result in an
239	 * overestimation of the amount of space that an operation would
240	 * consume, which is OK.
241	 *
242	 * There's also a small window where we could miss a pending
243	 * snapshot, because we could set the sync task in the quiescing
244	 * phase.  So this should only be used as a guess.
245	 */
246	if (ds->ds_trysnap_txg >
247	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
248		trysnap = ds->ds_trysnap_txg;
249	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
250}
251
252boolean_t
253dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
254    uint64_t blk_birth)
255{
256	if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
257		return (B_FALSE);
258
259	ddt_prefetch(dsl_dataset_get_spa(ds), bp);
260
261	return (B_TRUE);
262}
263
264/* ARGSUSED */
265static void
266dsl_dataset_evict(dmu_buf_t *db, void *dsv)
267{
268	dsl_dataset_t *ds = dsv;
269
270	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
271
272	unique_remove(ds->ds_fsid_guid);
273
274	if (ds->ds_objset != NULL)
275		dmu_objset_evict(ds->ds_objset);
276
277	if (ds->ds_prev) {
278		dsl_dataset_drop_ref(ds->ds_prev, ds);
279		ds->ds_prev = NULL;
280	}
281
282	bplist_destroy(&ds->ds_pending_deadlist);
283	if (db != NULL) {
284		dsl_deadlist_close(&ds->ds_deadlist);
285	} else {
286		ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
287		ASSERT(!ds->ds_deadlist.dl_oldfmt);
288	}
289	if (ds->ds_dir)
290		dsl_dir_close(ds->ds_dir, ds);
291
292	ASSERT(!list_link_active(&ds->ds_synced_link));
293
294	if (mutex_owned(&ds->ds_lock))
295		mutex_exit(&ds->ds_lock);
296	mutex_destroy(&ds->ds_lock);
297	mutex_destroy(&ds->ds_recvlock);
298	if (mutex_owned(&ds->ds_opening_lock))
299		mutex_exit(&ds->ds_opening_lock);
300	mutex_destroy(&ds->ds_opening_lock);
301	rw_destroy(&ds->ds_rwlock);
302	cv_destroy(&ds->ds_exclusive_cv);
303
304	kmem_free(ds, sizeof (dsl_dataset_t));
305}
306
307static int
308dsl_dataset_get_snapname(dsl_dataset_t *ds)
309{
310	dsl_dataset_phys_t *headphys;
311	int err;
312	dmu_buf_t *headdbuf;
313	dsl_pool_t *dp = ds->ds_dir->dd_pool;
314	objset_t *mos = dp->dp_meta_objset;
315
316	if (ds->ds_snapname[0])
317		return (0);
318	if (ds->ds_phys->ds_next_snap_obj == 0)
319		return (0);
320
321	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
322	    FTAG, &headdbuf);
323	if (err)
324		return (err);
325	headphys = headdbuf->db_data;
326	err = zap_value_search(dp->dp_meta_objset,
327	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
328	dmu_buf_rele(headdbuf, FTAG);
329	return (err);
330}
331
332static int
333dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
334{
335	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
336	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
337	matchtype_t mt;
338	int err;
339
340	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
341		mt = MT_FIRST;
342	else
343		mt = MT_EXACT;
344
345	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
346	    value, mt, NULL, 0, NULL);
347	if (err == ENOTSUP && mt == MT_FIRST)
348		err = zap_lookup(mos, snapobj, name, 8, 1, value);
349	return (err);
350}
351
352static int
353dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
354{
355	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
356	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
357	matchtype_t mt;
358	int err;
359
360	dsl_dir_snap_cmtime_update(ds->ds_dir);
361
362	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
363		mt = MT_FIRST;
364	else
365		mt = MT_EXACT;
366
367	err = zap_remove_norm(mos, snapobj, name, mt, tx);
368	if (err == ENOTSUP && mt == MT_FIRST)
369		err = zap_remove(mos, snapobj, name, tx);
370	return (err);
371}
372
373static int
374dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
375    dsl_dataset_t **dsp)
376{
377	objset_t *mos = dp->dp_meta_objset;
378	dmu_buf_t *dbuf;
379	dsl_dataset_t *ds;
380	int err;
381	dmu_object_info_t doi;
382
383	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
384	    dsl_pool_sync_context(dp));
385
386	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
387	if (err)
388		return (err);
389
390	/* Make sure dsobj has the correct object type. */
391	dmu_object_info_from_db(dbuf, &doi);
392	if (doi.doi_type != DMU_OT_DSL_DATASET)
393		return (EINVAL);
394
395	ds = dmu_buf_get_user(dbuf);
396	if (ds == NULL) {
397		dsl_dataset_t *winner;
398
399		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
400		ds->ds_dbuf = dbuf;
401		ds->ds_object = dsobj;
402		ds->ds_phys = dbuf->db_data;
403
404		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
405		mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
406		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
407		mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
408
409		rw_init(&ds->ds_rwlock, 0, 0, 0);
410		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
411
412		bplist_create(&ds->ds_pending_deadlist);
413		dsl_deadlist_open(&ds->ds_deadlist,
414		    mos, ds->ds_phys->ds_deadlist_obj);
415
416		list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
417		    offsetof(dmu_sendarg_t, dsa_link));
418
419		if (err == 0) {
420			err = dsl_dir_open_obj(dp,
421			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
422		}
423		if (err) {
424			mutex_destroy(&ds->ds_lock);
425			mutex_destroy(&ds->ds_recvlock);
426			mutex_destroy(&ds->ds_opening_lock);
427			rw_destroy(&ds->ds_rwlock);
428			cv_destroy(&ds->ds_exclusive_cv);
429			bplist_destroy(&ds->ds_pending_deadlist);
430			dsl_deadlist_close(&ds->ds_deadlist);
431			kmem_free(ds, sizeof (dsl_dataset_t));
432			dmu_buf_rele(dbuf, tag);
433			return (err);
434		}
435
436		if (!dsl_dataset_is_snapshot(ds)) {
437			ds->ds_snapname[0] = '\0';
438			if (ds->ds_phys->ds_prev_snap_obj) {
439				err = dsl_dataset_get_ref(dp,
440				    ds->ds_phys->ds_prev_snap_obj,
441				    ds, &ds->ds_prev);
442			}
443		} else {
444			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
445				err = dsl_dataset_get_snapname(ds);
446			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
447				err = zap_count(
448				    ds->ds_dir->dd_pool->dp_meta_objset,
449				    ds->ds_phys->ds_userrefs_obj,
450				    &ds->ds_userrefs);
451			}
452		}
453
454		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
455			/*
456			 * In sync context, we're called with either no lock
457			 * or with the write lock.  If we're not syncing,
458			 * we're always called with the read lock held.
459			 */
460			boolean_t need_lock =
461			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
462			    dsl_pool_sync_context(dp);
463
464			if (need_lock)
465				rw_enter(&dp->dp_config_rwlock, RW_READER);
466
467			err = dsl_prop_get_ds(ds,
468			    "refreservation", sizeof (uint64_t), 1,
469			    &ds->ds_reserved, NULL);
470			if (err == 0) {
471				err = dsl_prop_get_ds(ds,
472				    "refquota", sizeof (uint64_t), 1,
473				    &ds->ds_quota, NULL);
474			}
475
476			if (need_lock)
477				rw_exit(&dp->dp_config_rwlock);
478		} else {
479			ds->ds_reserved = ds->ds_quota = 0;
480		}
481
482		if (err == 0) {
483			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
484			    dsl_dataset_evict);
485		}
486		if (err || winner) {
487			bplist_destroy(&ds->ds_pending_deadlist);
488			dsl_deadlist_close(&ds->ds_deadlist);
489			if (ds->ds_prev)
490				dsl_dataset_drop_ref(ds->ds_prev, ds);
491			dsl_dir_close(ds->ds_dir, ds);
492			mutex_destroy(&ds->ds_lock);
493			mutex_destroy(&ds->ds_recvlock);
494			mutex_destroy(&ds->ds_opening_lock);
495			rw_destroy(&ds->ds_rwlock);
496			cv_destroy(&ds->ds_exclusive_cv);
497			kmem_free(ds, sizeof (dsl_dataset_t));
498			if (err) {
499				dmu_buf_rele(dbuf, tag);
500				return (err);
501			}
502			ds = winner;
503		} else {
504			ds->ds_fsid_guid =
505			    unique_insert(ds->ds_phys->ds_fsid_guid);
506		}
507	}
508	ASSERT3P(ds->ds_dbuf, ==, dbuf);
509	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
510	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
511	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
512	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
513	mutex_enter(&ds->ds_lock);
514	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
515		mutex_exit(&ds->ds_lock);
516		dmu_buf_rele(ds->ds_dbuf, tag);
517		return (ENOENT);
518	}
519	mutex_exit(&ds->ds_lock);
520	*dsp = ds;
521	return (0);
522}
523
524static int
525dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
526{
527	dsl_pool_t *dp = ds->ds_dir->dd_pool;
528
529	/*
530	 * In syncing context we don't want the rwlock lock: there
531	 * may be an existing writer waiting for sync phase to
532	 * finish.  We don't need to worry about such writers, since
533	 * sync phase is single-threaded, so the writer can't be
534	 * doing anything while we are active.
535	 */
536	if (dsl_pool_sync_context(dp)) {
537		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
538		return (0);
539	}
540
541	/*
542	 * Normal users will hold the ds_rwlock as a READER until they
543	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
544	 * drop their READER lock after they set the ds_owner field.
545	 *
546	 * If the dataset is being destroyed, the destroy thread will
547	 * obtain a WRITER lock for exclusive access after it's done its
548	 * open-context work and then change the ds_owner to
549	 * dsl_reaper once destruction is assured.  So threads
550	 * may block here temporarily, until the "destructability" of
551	 * the dataset is determined.
552	 */
553	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
554	mutex_enter(&ds->ds_lock);
555	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
556		rw_exit(&dp->dp_config_rwlock);
557		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
558		if (DSL_DATASET_IS_DESTROYED(ds)) {
559			mutex_exit(&ds->ds_lock);
560			dsl_dataset_drop_ref(ds, tag);
561			rw_enter(&dp->dp_config_rwlock, RW_READER);
562			return (ENOENT);
563		}
564		/*
565		 * The dp_config_rwlock lives above the ds_lock. And
566		 * we need to check DSL_DATASET_IS_DESTROYED() while
567		 * holding the ds_lock, so we have to drop and reacquire
568		 * the ds_lock here.
569		 */
570		mutex_exit(&ds->ds_lock);
571		rw_enter(&dp->dp_config_rwlock, RW_READER);
572		mutex_enter(&ds->ds_lock);
573	}
574	mutex_exit(&ds->ds_lock);
575	return (0);
576}
577
578int
579dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
580    dsl_dataset_t **dsp)
581{
582	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
583
584	if (err)
585		return (err);
586	return (dsl_dataset_hold_ref(*dsp, tag));
587}
588
589int
590dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
591    void *tag, dsl_dataset_t **dsp)
592{
593	int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
594	if (err)
595		return (err);
596	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
597		dsl_dataset_rele(*dsp, tag);
598		*dsp = NULL;
599		return (EBUSY);
600	}
601	return (0);
602}
603
604int
605dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
606{
607	dsl_dir_t *dd;
608	dsl_pool_t *dp;
609	const char *snapname;
610	uint64_t obj;
611	int err = 0;
612
613	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
614	if (err)
615		return (err);
616
617	dp = dd->dd_pool;
618	obj = dd->dd_phys->dd_head_dataset_obj;
619	rw_enter(&dp->dp_config_rwlock, RW_READER);
620	if (obj)
621		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
622	else
623		err = ENOENT;
624	if (err)
625		goto out;
626
627	err = dsl_dataset_hold_ref(*dsp, tag);
628
629	/* we may be looking for a snapshot */
630	if (err == 0 && snapname != NULL) {
631		dsl_dataset_t *ds = NULL;
632
633		if (*snapname++ != '@') {
634			dsl_dataset_rele(*dsp, tag);
635			err = ENOENT;
636			goto out;
637		}
638
639		dprintf("looking for snapshot '%s'\n", snapname);
640		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
641		if (err == 0)
642			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
643		dsl_dataset_rele(*dsp, tag);
644
645		ASSERT3U((err == 0), ==, (ds != NULL));
646
647		if (ds) {
648			mutex_enter(&ds->ds_lock);
649			if (ds->ds_snapname[0] == 0)
650				(void) strlcpy(ds->ds_snapname, snapname,
651				    sizeof (ds->ds_snapname));
652			mutex_exit(&ds->ds_lock);
653			err = dsl_dataset_hold_ref(ds, tag);
654			*dsp = err ? NULL : ds;
655		}
656	}
657out:
658	rw_exit(&dp->dp_config_rwlock);
659	dsl_dir_close(dd, FTAG);
660	return (err);
661}
662
663int
664dsl_dataset_own(const char *name, boolean_t inconsistentok,
665    void *tag, dsl_dataset_t **dsp)
666{
667	int err = dsl_dataset_hold(name, tag, dsp);
668	if (err)
669		return (err);
670	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
671		dsl_dataset_rele(*dsp, tag);
672		return (EBUSY);
673	}
674	return (0);
675}
676
677void
678dsl_dataset_name(dsl_dataset_t *ds, char *name)
679{
680	if (ds == NULL) {
681		(void) strcpy(name, "mos");
682	} else {
683		dsl_dir_name(ds->ds_dir, name);
684		VERIFY(0 == dsl_dataset_get_snapname(ds));
685		if (ds->ds_snapname[0]) {
686			(void) strcat(name, "@");
687			/*
688			 * We use a "recursive" mutex so that we
689			 * can call dprintf_ds() with ds_lock held.
690			 */
691			if (!MUTEX_HELD(&ds->ds_lock)) {
692				mutex_enter(&ds->ds_lock);
693				(void) strcat(name, ds->ds_snapname);
694				mutex_exit(&ds->ds_lock);
695			} else {
696				(void) strcat(name, ds->ds_snapname);
697			}
698		}
699	}
700}
701
702static int
703dsl_dataset_namelen(dsl_dataset_t *ds)
704{
705	int result;
706
707	if (ds == NULL) {
708		result = 3;	/* "mos" */
709	} else {
710		result = dsl_dir_namelen(ds->ds_dir);
711		VERIFY(0 == dsl_dataset_get_snapname(ds));
712		if (ds->ds_snapname[0]) {
713			++result;	/* adding one for the @-sign */
714			if (!MUTEX_HELD(&ds->ds_lock)) {
715				mutex_enter(&ds->ds_lock);
716				result += strlen(ds->ds_snapname);
717				mutex_exit(&ds->ds_lock);
718			} else {
719				result += strlen(ds->ds_snapname);
720			}
721		}
722	}
723
724	return (result);
725}
726
727void
728dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
729{
730	dmu_buf_rele(ds->ds_dbuf, tag);
731}
732
733void
734dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
735{
736	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
737		rw_exit(&ds->ds_rwlock);
738	}
739	dsl_dataset_drop_ref(ds, tag);
740}
741
742void
743dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
744{
745	ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
746	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
747
748	mutex_enter(&ds->ds_lock);
749	ds->ds_owner = NULL;
750	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
751		rw_exit(&ds->ds_rwlock);
752		cv_broadcast(&ds->ds_exclusive_cv);
753	}
754	mutex_exit(&ds->ds_lock);
755	if (ds->ds_dbuf)
756		dsl_dataset_drop_ref(ds, tag);
757	else
758		dsl_dataset_evict(NULL, ds);
759}
760
761boolean_t
762dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
763{
764	boolean_t gotit = FALSE;
765
766	mutex_enter(&ds->ds_lock);
767	if (ds->ds_owner == NULL &&
768	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
769		ds->ds_owner = tag;
770		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
771			rw_exit(&ds->ds_rwlock);
772		gotit = TRUE;
773	}
774	mutex_exit(&ds->ds_lock);
775	return (gotit);
776}
777
778void
779dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
780{
781	ASSERT3P(owner, ==, ds->ds_owner);
782	if (!RW_WRITE_HELD(&ds->ds_rwlock))
783		rw_enter(&ds->ds_rwlock, RW_WRITER);
784}
785
786uint64_t
787dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
788    uint64_t flags, dmu_tx_t *tx)
789{
790	dsl_pool_t *dp = dd->dd_pool;
791	dmu_buf_t *dbuf;
792	dsl_dataset_phys_t *dsphys;
793	uint64_t dsobj;
794	objset_t *mos = dp->dp_meta_objset;
795
796	if (origin == NULL)
797		origin = dp->dp_origin_snap;
798
799	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
800	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
801	ASSERT(dmu_tx_is_syncing(tx));
802	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
803
804	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
805	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
806	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
807	dmu_buf_will_dirty(dbuf, tx);
808	dsphys = dbuf->db_data;
809	bzero(dsphys, sizeof (dsl_dataset_phys_t));
810	dsphys->ds_dir_obj = dd->dd_object;
811	dsphys->ds_flags = flags;
812	dsphys->ds_fsid_guid = unique_create();
813	do {
814		(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
815		    sizeof (dsphys->ds_guid));
816	} while (dsphys->ds_guid == 0);
817	dsphys->ds_snapnames_zapobj =
818	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
819	    DMU_OT_NONE, 0, tx);
820	dsphys->ds_creation_time = gethrestime_sec();
821	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
822
823	if (origin == NULL) {
824		dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
825	} else {
826		dsl_dataset_t *ohds;
827
828		dsphys->ds_prev_snap_obj = origin->ds_object;
829		dsphys->ds_prev_snap_txg =
830		    origin->ds_phys->ds_creation_txg;
831		dsphys->ds_referenced_bytes =
832		    origin->ds_phys->ds_referenced_bytes;
833		dsphys->ds_compressed_bytes =
834		    origin->ds_phys->ds_compressed_bytes;
835		dsphys->ds_uncompressed_bytes =
836		    origin->ds_phys->ds_uncompressed_bytes;
837		dsphys->ds_bp = origin->ds_phys->ds_bp;
838		dsphys->ds_flags |= origin->ds_phys->ds_flags;
839
840		dmu_buf_will_dirty(origin->ds_dbuf, tx);
841		origin->ds_phys->ds_num_children++;
842
843		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
844		    origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
845		dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
846		    dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
847		dsl_dataset_rele(ohds, FTAG);
848
849		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
850			if (origin->ds_phys->ds_next_clones_obj == 0) {
851				origin->ds_phys->ds_next_clones_obj =
852				    zap_create(mos,
853				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
854			}
855			VERIFY(0 == zap_add_int(mos,
856			    origin->ds_phys->ds_next_clones_obj,
857			    dsobj, tx));
858		}
859
860		dmu_buf_will_dirty(dd->dd_dbuf, tx);
861		dd->dd_phys->dd_origin_obj = origin->ds_object;
862		if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
863			if (origin->ds_dir->dd_phys->dd_clones == 0) {
864				dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
865				origin->ds_dir->dd_phys->dd_clones =
866				    zap_create(mos,
867				    DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
868			}
869			VERIFY3U(0, ==, zap_add_int(mos,
870			    origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
871		}
872	}
873
874	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
875		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
876
877	dmu_buf_rele(dbuf, FTAG);
878
879	dmu_buf_will_dirty(dd->dd_dbuf, tx);
880	dd->dd_phys->dd_head_dataset_obj = dsobj;
881
882	return (dsobj);
883}
884
885uint64_t
886dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
887    dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
888{
889	dsl_pool_t *dp = pdd->dd_pool;
890	uint64_t dsobj, ddobj;
891	dsl_dir_t *dd;
892
893	ASSERT(lastname[0] != '@');
894
895	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
896	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
897
898	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
899
900	dsl_deleg_set_create_perms(dd, tx, cr);
901
902	dsl_dir_close(dd, FTAG);
903
904	/*
905	 * If we are creating a clone, make sure we zero out any stale
906	 * data from the origin snapshots zil header.
907	 */
908	if (origin != NULL) {
909		dsl_dataset_t *ds;
910		objset_t *os;
911
912		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
913		VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
914		bzero(&os->os_zil_header, sizeof (os->os_zil_header));
915		dsl_dataset_dirty(ds, tx);
916		dsl_dataset_rele(ds, FTAG);
917	}
918
919	return (dsobj);
920}
921
922#ifdef __FreeBSD__
923/* FreeBSD ioctl compat begin */
924struct destroyarg {
925	nvlist_t *nvl;
926	const char *snapname;
927};
928
929static int
930dsl_check_snap_cb(const char *name, void *arg)
931{
932	struct destroyarg *da = arg;
933	dsl_dataset_t *ds;
934	char *dsname;
935
936	dsname = kmem_asprintf("%s@%s", name, da->snapname);
937	VERIFY(nvlist_add_boolean(da->nvl, dsname) == 0);
938
939	return (0);
940}
941
942int
943dmu_get_recursive_snaps_nvl(const char *fsname, const char *snapname,
944    nvlist_t *snaps)
945{
946	struct destroyarg *da;
947	int err;
948
949	da = kmem_zalloc(sizeof (struct destroyarg), KM_SLEEP);
950	da->nvl = snaps;
951	da->snapname = snapname;
952	err = dmu_objset_find(fsname, dsl_check_snap_cb, da,
953	    DS_FIND_CHILDREN);
954	kmem_free(da, sizeof (struct destroyarg));
955
956	return (err);
957}
958/* FreeBSD ioctl compat end */
959#endif /* __FreeBSD__ */
960
961/*
962 * The snapshots must all be in the same pool.
963 */
964int
965dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer, char *failed)
966{
967	int err;
968	dsl_sync_task_t *dst;
969	spa_t *spa;
970	nvpair_t *pair;
971	dsl_sync_task_group_t *dstg;
972
973	pair = nvlist_next_nvpair(snaps, NULL);
974	if (pair == NULL)
975		return (0);
976
977	err = spa_open(nvpair_name(pair), &spa, FTAG);
978	if (err)
979		return (err);
980	dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
981
982	for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
983	    pair = nvlist_next_nvpair(snaps, pair)) {
984		dsl_dataset_t *ds;
985
986		err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
987		if (err == 0) {
988			struct dsl_ds_destroyarg *dsda;
989
990			dsl_dataset_make_exclusive(ds, dstg);
991			dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
992			    KM_SLEEP);
993			dsda->ds = ds;
994			dsda->defer = defer;
995			dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
996			    dsl_dataset_destroy_sync, dsda, dstg, 0);
997		} else if (err == ENOENT) {
998			err = 0;
999		} else {
1000			(void) strcpy(failed, nvpair_name(pair));
1001			break;
1002		}
1003	}
1004
1005	if (err == 0)
1006		err = dsl_sync_task_group_wait(dstg);
1007
1008	for (dst = list_head(&dstg->dstg_tasks); dst;
1009	    dst = list_next(&dstg->dstg_tasks, dst)) {
1010		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
1011		dsl_dataset_t *ds = dsda->ds;
1012
1013		/*
1014		 * Return the file system name that triggered the error
1015		 */
1016		if (dst->dst_err) {
1017			dsl_dataset_name(ds, failed);
1018		}
1019		ASSERT3P(dsda->rm_origin, ==, NULL);
1020		dsl_dataset_disown(ds, dstg);
1021		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
1022	}
1023
1024	dsl_sync_task_group_destroy(dstg);
1025	spa_close(spa, FTAG);
1026	return (err);
1027
1028}
1029
1030static boolean_t
1031dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
1032{
1033	boolean_t might_destroy = B_FALSE;
1034
1035	mutex_enter(&ds->ds_lock);
1036	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
1037	    DS_IS_DEFER_DESTROY(ds))
1038		might_destroy = B_TRUE;
1039	mutex_exit(&ds->ds_lock);
1040
1041	return (might_destroy);
1042}
1043
1044/*
1045 * If we're removing a clone, and these three conditions are true:
1046 *	1) the clone's origin has no other children
1047 *	2) the clone's origin has no user references
1048 *	3) the clone's origin has been marked for deferred destruction
1049 * Then, prepare to remove the origin as part of this sync task group.
1050 */
1051static int
1052dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1053{
1054	dsl_dataset_t *ds = dsda->ds;
1055	dsl_dataset_t *origin = ds->ds_prev;
1056
1057	if (dsl_dataset_might_destroy_origin(origin)) {
1058		char *name;
1059		int namelen;
1060		int error;
1061
1062		namelen = dsl_dataset_namelen(origin) + 1;
1063		name = kmem_alloc(namelen, KM_SLEEP);
1064		dsl_dataset_name(origin, name);
1065#ifdef _KERNEL
1066		error = zfs_unmount_snap(name, NULL);
1067		if (error) {
1068			kmem_free(name, namelen);
1069			return (error);
1070		}
1071#endif
1072		error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1073		kmem_free(name, namelen);
1074		if (error)
1075			return (error);
1076		dsda->rm_origin = origin;
1077		dsl_dataset_make_exclusive(origin, tag);
1078	}
1079
1080	return (0);
1081}
1082
1083/*
1084 * ds must be opened as OWNER.  On return (whether successful or not),
1085 * ds will be closed and caller can no longer dereference it.
1086 */
1087int
1088dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1089{
1090	int err;
1091	dsl_sync_task_group_t *dstg;
1092	objset_t *os;
1093	dsl_dir_t *dd;
1094	uint64_t obj;
1095	struct dsl_ds_destroyarg dsda = { 0 };
1096	dsl_dataset_t dummy_ds = { 0 };
1097
1098	dsda.ds = ds;
1099
1100	if (dsl_dataset_is_snapshot(ds)) {
1101		/* Destroying a snapshot is simpler */
1102		dsl_dataset_make_exclusive(ds, tag);
1103
1104		dsda.defer = defer;
1105		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1106		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1107		    &dsda, tag, 0);
1108		ASSERT3P(dsda.rm_origin, ==, NULL);
1109		goto out;
1110	} else if (defer) {
1111		err = EINVAL;
1112		goto out;
1113	}
1114
1115	dd = ds->ds_dir;
1116	dummy_ds.ds_dir = dd;
1117	dummy_ds.ds_object = ds->ds_object;
1118
1119	/*
1120	 * Check for errors and mark this ds as inconsistent, in
1121	 * case we crash while freeing the objects.
1122	 */
1123	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1124	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1125	if (err)
1126		goto out;
1127
1128	err = dmu_objset_from_ds(ds, &os);
1129	if (err)
1130		goto out;
1131
1132	/*
1133	 * If async destruction is not enabled try to remove all objects
1134	 * while in the open context so that there is less work to do in
1135	 * the syncing context.
1136	 */
1137	if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1138	    &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1139		for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1140		    ds->ds_phys->ds_prev_snap_txg)) {
1141			/*
1142			 * Ignore errors, if there is not enough disk space
1143			 * we will deal with it in dsl_dataset_destroy_sync().
1144			 */
1145			(void) dmu_free_object(os, obj);
1146		}
1147		if (err != ESRCH)
1148			goto out;
1149	}
1150
1151	/*
1152	 * Only the ZIL knows how to free log blocks.
1153	 */
1154	zil_destroy(dmu_objset_zil(os), B_FALSE);
1155
1156	/*
1157	 * Sync out all in-flight IO.
1158	 */
1159	txg_wait_synced(dd->dd_pool, 0);
1160
1161	/*
1162	 * If we managed to free all the objects in open
1163	 * context, the user space accounting should be zero.
1164	 */
1165	if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1166	    dmu_objset_userused_enabled(os)) {
1167		uint64_t count;
1168
1169		ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1170		    count == 0);
1171		ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1172		    count == 0);
1173	}
1174
1175	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1176	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1177	rw_exit(&dd->dd_pool->dp_config_rwlock);
1178
1179	if (err)
1180		goto out;
1181
1182	/*
1183	 * Blow away the dsl_dir + head dataset.
1184	 */
1185	dsl_dataset_make_exclusive(ds, tag);
1186	/*
1187	 * If we're removing a clone, we might also need to remove its
1188	 * origin.
1189	 */
1190	do {
1191		dsda.need_prep = B_FALSE;
1192		if (dsl_dir_is_clone(dd)) {
1193			err = dsl_dataset_origin_rm_prep(&dsda, tag);
1194			if (err) {
1195				dsl_dir_close(dd, FTAG);
1196				goto out;
1197			}
1198		}
1199
1200		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1201		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1202		    dsl_dataset_destroy_sync, &dsda, tag, 0);
1203		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1204		    dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1205		err = dsl_sync_task_group_wait(dstg);
1206		dsl_sync_task_group_destroy(dstg);
1207
1208		/*
1209		 * We could be racing against 'zfs release' or 'zfs destroy -d'
1210		 * on the origin snap, in which case we can get EBUSY if we
1211		 * needed to destroy the origin snap but were not ready to
1212		 * do so.
1213		 */
1214		if (dsda.need_prep) {
1215			ASSERT(err == EBUSY);
1216			ASSERT(dsl_dir_is_clone(dd));
1217			ASSERT(dsda.rm_origin == NULL);
1218		}
1219	} while (dsda.need_prep);
1220
1221	if (dsda.rm_origin != NULL)
1222		dsl_dataset_disown(dsda.rm_origin, tag);
1223
1224	/* if it is successful, dsl_dir_destroy_sync will close the dd */
1225	if (err)
1226		dsl_dir_close(dd, FTAG);
1227out:
1228	dsl_dataset_disown(ds, tag);
1229	return (err);
1230}
1231
1232blkptr_t *
1233dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1234{
1235	return (&ds->ds_phys->ds_bp);
1236}
1237
1238void
1239dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1240{
1241	ASSERT(dmu_tx_is_syncing(tx));
1242	/* If it's the meta-objset, set dp_meta_rootbp */
1243	if (ds == NULL) {
1244		tx->tx_pool->dp_meta_rootbp = *bp;
1245	} else {
1246		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1247		ds->ds_phys->ds_bp = *bp;
1248	}
1249}
1250
1251spa_t *
1252dsl_dataset_get_spa(dsl_dataset_t *ds)
1253{
1254	return (ds->ds_dir->dd_pool->dp_spa);
1255}
1256
1257void
1258dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1259{
1260	dsl_pool_t *dp;
1261
1262	if (ds == NULL) /* this is the meta-objset */
1263		return;
1264
1265	ASSERT(ds->ds_objset != NULL);
1266
1267	if (ds->ds_phys->ds_next_snap_obj != 0)
1268		panic("dirtying snapshot!");
1269
1270	dp = ds->ds_dir->dd_pool;
1271
1272	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1273		/* up the hold count until we can be written out */
1274		dmu_buf_add_ref(ds->ds_dbuf, ds);
1275	}
1276}
1277
1278/*
1279 * The unique space in the head dataset can be calculated by subtracting
1280 * the space used in the most recent snapshot, that is still being used
1281 * in this file system, from the space currently in use.  To figure out
1282 * the space in the most recent snapshot still in use, we need to take
1283 * the total space used in the snapshot and subtract out the space that
1284 * has been freed up since the snapshot was taken.
1285 */
1286static void
1287dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1288{
1289	uint64_t mrs_used;
1290	uint64_t dlused, dlcomp, dluncomp;
1291
1292	ASSERT(!dsl_dataset_is_snapshot(ds));
1293
1294	if (ds->ds_phys->ds_prev_snap_obj != 0)
1295		mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1296	else
1297		mrs_used = 0;
1298
1299	dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1300
1301	ASSERT3U(dlused, <=, mrs_used);
1302	ds->ds_phys->ds_unique_bytes =
1303	    ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1304
1305	if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1306	    SPA_VERSION_UNIQUE_ACCURATE)
1307		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1308}
1309
1310struct killarg {
1311	dsl_dataset_t *ds;
1312	dmu_tx_t *tx;
1313};
1314
1315/* ARGSUSED */
1316static int
1317kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1318    const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1319{
1320	struct killarg *ka = arg;
1321	dmu_tx_t *tx = ka->tx;
1322
1323	if (bp == NULL)
1324		return (0);
1325
1326	if (zb->zb_level == ZB_ZIL_LEVEL) {
1327		ASSERT(zilog != NULL);
1328		/*
1329		 * It's a block in the intent log.  It has no
1330		 * accounting, so just free it.
1331		 */
1332		dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1333	} else {
1334		ASSERT(zilog == NULL);
1335		ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1336		(void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1337	}
1338
1339	return (0);
1340}
1341
1342/* ARGSUSED */
1343static int
1344dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1345{
1346	dsl_dataset_t *ds = arg1;
1347	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1348	uint64_t count;
1349	int err;
1350
1351	/*
1352	 * Can't delete a head dataset if there are snapshots of it.
1353	 * (Except if the only snapshots are from the branch we cloned
1354	 * from.)
1355	 */
1356	if (ds->ds_prev != NULL &&
1357	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1358		return (EBUSY);
1359
1360	/*
1361	 * This is really a dsl_dir thing, but check it here so that
1362	 * we'll be less likely to leave this dataset inconsistent &
1363	 * nearly destroyed.
1364	 */
1365	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1366	if (err)
1367		return (err);
1368	if (count != 0)
1369		return (EEXIST);
1370
1371	return (0);
1372}
1373
1374/* ARGSUSED */
1375static void
1376dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1377{
1378	dsl_dataset_t *ds = arg1;
1379	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1380
1381	/* Mark it as inconsistent on-disk, in case we crash */
1382	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1383	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1384
1385	spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1386	    "dataset = %llu", ds->ds_object);
1387}
1388
1389static int
1390dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1391    dmu_tx_t *tx)
1392{
1393	dsl_dataset_t *ds = dsda->ds;
1394	dsl_dataset_t *ds_prev = ds->ds_prev;
1395
1396	if (dsl_dataset_might_destroy_origin(ds_prev)) {
1397		struct dsl_ds_destroyarg ndsda = {0};
1398
1399		/*
1400		 * If we're not prepared to remove the origin, don't remove
1401		 * the clone either.
1402		 */
1403		if (dsda->rm_origin == NULL) {
1404			dsda->need_prep = B_TRUE;
1405			return (EBUSY);
1406		}
1407
1408		ndsda.ds = ds_prev;
1409		ndsda.is_origin_rm = B_TRUE;
1410		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1411	}
1412
1413	/*
1414	 * If we're not going to remove the origin after all,
1415	 * undo the open context setup.
1416	 */
1417	if (dsda->rm_origin != NULL) {
1418		dsl_dataset_disown(dsda->rm_origin, tag);
1419		dsda->rm_origin = NULL;
1420	}
1421
1422	return (0);
1423}
1424
1425/*
1426 * If you add new checks here, you may need to add
1427 * additional checks to the "temporary" case in
1428 * snapshot_check() in dmu_objset.c.
1429 */
1430/* ARGSUSED */
1431int
1432dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1433{
1434	struct dsl_ds_destroyarg *dsda = arg1;
1435	dsl_dataset_t *ds = dsda->ds;
1436
1437	/* we have an owner hold, so noone else can destroy us */
1438	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1439
1440	/*
1441	 * Only allow deferred destroy on pools that support it.
1442	 * NOTE: deferred destroy is only supported on snapshots.
1443	 */
1444	if (dsda->defer) {
1445		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1446		    SPA_VERSION_USERREFS)
1447			return (ENOTSUP);
1448		ASSERT(dsl_dataset_is_snapshot(ds));
1449		return (0);
1450	}
1451
1452	/*
1453	 * Can't delete a head dataset if there are snapshots of it.
1454	 * (Except if the only snapshots are from the branch we cloned
1455	 * from.)
1456	 */
1457	if (ds->ds_prev != NULL &&
1458	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1459		return (EBUSY);
1460
1461	/*
1462	 * If we made changes this txg, traverse_dsl_dataset won't find
1463	 * them.  Try again.
1464	 */
1465	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1466		return (EAGAIN);
1467
1468	if (dsl_dataset_is_snapshot(ds)) {
1469		/*
1470		 * If this snapshot has an elevated user reference count,
1471		 * we can't destroy it yet.
1472		 */
1473		if (ds->ds_userrefs > 0 && !dsda->releasing)
1474			return (EBUSY);
1475
1476		mutex_enter(&ds->ds_lock);
1477		/*
1478		 * Can't delete a branch point. However, if we're destroying
1479		 * a clone and removing its origin due to it having a user
1480		 * hold count of 0 and having been marked for deferred destroy,
1481		 * it's OK for the origin to have a single clone.
1482		 */
1483		if (ds->ds_phys->ds_num_children >
1484		    (dsda->is_origin_rm ? 2 : 1)) {
1485			mutex_exit(&ds->ds_lock);
1486			return (EEXIST);
1487		}
1488		mutex_exit(&ds->ds_lock);
1489	} else if (dsl_dir_is_clone(ds->ds_dir)) {
1490		return (dsl_dataset_origin_check(dsda, arg2, tx));
1491	}
1492
1493	/* XXX we should do some i/o error checking... */
1494	return (0);
1495}
1496
1497struct refsarg {
1498	kmutex_t lock;
1499	boolean_t gone;
1500	kcondvar_t cv;
1501};
1502
1503/* ARGSUSED */
1504static void
1505dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1506{
1507	struct refsarg *arg = argv;
1508
1509	mutex_enter(&arg->lock);
1510	arg->gone = TRUE;
1511	cv_signal(&arg->cv);
1512	mutex_exit(&arg->lock);
1513}
1514
1515static void
1516dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1517{
1518	struct refsarg arg;
1519
1520	bzero(&arg, sizeof(arg));
1521	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1522	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1523	arg.gone = FALSE;
1524	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1525	    dsl_dataset_refs_gone);
1526	dmu_buf_rele(ds->ds_dbuf, tag);
1527	mutex_enter(&arg.lock);
1528	while (!arg.gone)
1529		cv_wait(&arg.cv, &arg.lock);
1530	ASSERT(arg.gone);
1531	mutex_exit(&arg.lock);
1532	ds->ds_dbuf = NULL;
1533	ds->ds_phys = NULL;
1534	mutex_destroy(&arg.lock);
1535	cv_destroy(&arg.cv);
1536}
1537
1538static void
1539remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1540{
1541	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1542	uint64_t count;
1543	int err;
1544
1545	ASSERT(ds->ds_phys->ds_num_children >= 2);
1546	err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1547	/*
1548	 * The err should not be ENOENT, but a bug in a previous version
1549	 * of the code could cause upgrade_clones_cb() to not set
1550	 * ds_next_snap_obj when it should, leading to a missing entry.
1551	 * If we knew that the pool was created after
1552	 * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1553	 * ENOENT.  However, at least we can check that we don't have
1554	 * too many entries in the next_clones_obj even after failing to
1555	 * remove this one.
1556	 */
1557	if (err != ENOENT) {
1558		VERIFY3U(err, ==, 0);
1559	}
1560	ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1561	    &count));
1562	ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1563}
1564
1565static void
1566dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1567{
1568	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1569	zap_cursor_t zc;
1570	zap_attribute_t za;
1571
1572	/*
1573	 * If it is the old version, dd_clones doesn't exist so we can't
1574	 * find the clones, but deadlist_remove_key() is a no-op so it
1575	 * doesn't matter.
1576	 */
1577	if (ds->ds_dir->dd_phys->dd_clones == 0)
1578		return;
1579
1580	for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1581	    zap_cursor_retrieve(&zc, &za) == 0;
1582	    zap_cursor_advance(&zc)) {
1583		dsl_dataset_t *clone;
1584
1585		VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1586		    za.za_first_integer, FTAG, &clone));
1587		if (clone->ds_dir->dd_origin_txg > mintxg) {
1588			dsl_deadlist_remove_key(&clone->ds_deadlist,
1589			    mintxg, tx);
1590			dsl_dataset_remove_clones_key(clone, mintxg, tx);
1591		}
1592		dsl_dataset_rele(clone, FTAG);
1593	}
1594	zap_cursor_fini(&zc);
1595}
1596
1597struct process_old_arg {
1598	dsl_dataset_t *ds;
1599	dsl_dataset_t *ds_prev;
1600	boolean_t after_branch_point;
1601	zio_t *pio;
1602	uint64_t used, comp, uncomp;
1603};
1604
1605static int
1606process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1607{
1608	struct process_old_arg *poa = arg;
1609	dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1610
1611	if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1612		dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1613		if (poa->ds_prev && !poa->after_branch_point &&
1614		    bp->blk_birth >
1615		    poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1616			poa->ds_prev->ds_phys->ds_unique_bytes +=
1617			    bp_get_dsize_sync(dp->dp_spa, bp);
1618		}
1619	} else {
1620		poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1621		poa->comp += BP_GET_PSIZE(bp);
1622		poa->uncomp += BP_GET_UCSIZE(bp);
1623		dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1624	}
1625	return (0);
1626}
1627
1628static void
1629process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1630    dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1631{
1632	struct process_old_arg poa = { 0 };
1633	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1634	objset_t *mos = dp->dp_meta_objset;
1635
1636	ASSERT(ds->ds_deadlist.dl_oldfmt);
1637	ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1638
1639	poa.ds = ds;
1640	poa.ds_prev = ds_prev;
1641	poa.after_branch_point = after_branch_point;
1642	poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1643	VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1644	    process_old_cb, &poa, tx));
1645	VERIFY3U(zio_wait(poa.pio), ==, 0);
1646	ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1647
1648	/* change snapused */
1649	dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1650	    -poa.used, -poa.comp, -poa.uncomp, tx);
1651
1652	/* swap next's deadlist to our deadlist */
1653	dsl_deadlist_close(&ds->ds_deadlist);
1654	dsl_deadlist_close(&ds_next->ds_deadlist);
1655	SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1656	    ds->ds_phys->ds_deadlist_obj);
1657	dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1658	dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1659	    ds_next->ds_phys->ds_deadlist_obj);
1660}
1661
1662static int
1663old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1664{
1665	int err;
1666	struct killarg ka;
1667
1668	/*
1669	 * Free everything that we point to (that's born after
1670	 * the previous snapshot, if we are a clone)
1671	 *
1672	 * NB: this should be very quick, because we already
1673	 * freed all the objects in open context.
1674	 */
1675	ka.ds = ds;
1676	ka.tx = tx;
1677	err = traverse_dataset(ds,
1678	    ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1679	    kill_blkptr, &ka);
1680	ASSERT3U(err, ==, 0);
1681	ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1682
1683	return (err);
1684}
1685
1686void
1687dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1688{
1689	struct dsl_ds_destroyarg *dsda = arg1;
1690	dsl_dataset_t *ds = dsda->ds;
1691	int err;
1692	int after_branch_point = FALSE;
1693	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1694	objset_t *mos = dp->dp_meta_objset;
1695	dsl_dataset_t *ds_prev = NULL;
1696	boolean_t wont_destroy;
1697	uint64_t obj;
1698
1699	wont_destroy = (dsda->defer &&
1700	    (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1701
1702	ASSERT(ds->ds_owner || wont_destroy);
1703	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1704	ASSERT(ds->ds_prev == NULL ||
1705	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1706	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1707
1708	if (wont_destroy) {
1709		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1710		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1711		ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1712		return;
1713	}
1714
1715	/* signal any waiters that this dataset is going away */
1716	mutex_enter(&ds->ds_lock);
1717	ds->ds_owner = dsl_reaper;
1718	cv_broadcast(&ds->ds_exclusive_cv);
1719	mutex_exit(&ds->ds_lock);
1720
1721	/* Remove our reservation */
1722	if (ds->ds_reserved != 0) {
1723		dsl_prop_setarg_t psa;
1724		uint64_t value = 0;
1725
1726		dsl_prop_setarg_init_uint64(&psa, "refreservation",
1727		    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1728		    &value);
1729		psa.psa_effective_value = 0;	/* predict default value */
1730
1731		dsl_dataset_set_reservation_sync(ds, &psa, tx);
1732		ASSERT3U(ds->ds_reserved, ==, 0);
1733	}
1734
1735	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1736
1737	dsl_scan_ds_destroyed(ds, tx);
1738
1739	obj = ds->ds_object;
1740
1741	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1742		if (ds->ds_prev) {
1743			ds_prev = ds->ds_prev;
1744		} else {
1745			VERIFY(0 == dsl_dataset_hold_obj(dp,
1746			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1747		}
1748		after_branch_point =
1749		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1750
1751		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1752		if (after_branch_point &&
1753		    ds_prev->ds_phys->ds_next_clones_obj != 0) {
1754			remove_from_next_clones(ds_prev, obj, tx);
1755			if (ds->ds_phys->ds_next_snap_obj != 0) {
1756				VERIFY(0 == zap_add_int(mos,
1757				    ds_prev->ds_phys->ds_next_clones_obj,
1758				    ds->ds_phys->ds_next_snap_obj, tx));
1759			}
1760		}
1761		if (after_branch_point &&
1762		    ds->ds_phys->ds_next_snap_obj == 0) {
1763			/* This clone is toast. */
1764			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1765			ds_prev->ds_phys->ds_num_children--;
1766
1767			/*
1768			 * If the clone's origin has no other clones, no
1769			 * user holds, and has been marked for deferred
1770			 * deletion, then we should have done the necessary
1771			 * destroy setup for it.
1772			 */
1773			if (ds_prev->ds_phys->ds_num_children == 1 &&
1774			    ds_prev->ds_userrefs == 0 &&
1775			    DS_IS_DEFER_DESTROY(ds_prev)) {
1776				ASSERT3P(dsda->rm_origin, !=, NULL);
1777			} else {
1778				ASSERT3P(dsda->rm_origin, ==, NULL);
1779			}
1780		} else if (!after_branch_point) {
1781			ds_prev->ds_phys->ds_next_snap_obj =
1782			    ds->ds_phys->ds_next_snap_obj;
1783		}
1784	}
1785
1786	if (dsl_dataset_is_snapshot(ds)) {
1787		dsl_dataset_t *ds_next;
1788		uint64_t old_unique;
1789		uint64_t used = 0, comp = 0, uncomp = 0;
1790
1791		VERIFY(0 == dsl_dataset_hold_obj(dp,
1792		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1793		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1794
1795		old_unique = ds_next->ds_phys->ds_unique_bytes;
1796
1797		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1798		ds_next->ds_phys->ds_prev_snap_obj =
1799		    ds->ds_phys->ds_prev_snap_obj;
1800		ds_next->ds_phys->ds_prev_snap_txg =
1801		    ds->ds_phys->ds_prev_snap_txg;
1802		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1803		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1804
1805
1806		if (ds_next->ds_deadlist.dl_oldfmt) {
1807			process_old_deadlist(ds, ds_prev, ds_next,
1808			    after_branch_point, tx);
1809		} else {
1810			/* Adjust prev's unique space. */
1811			if (ds_prev && !after_branch_point) {
1812				dsl_deadlist_space_range(&ds_next->ds_deadlist,
1813				    ds_prev->ds_phys->ds_prev_snap_txg,
1814				    ds->ds_phys->ds_prev_snap_txg,
1815				    &used, &comp, &uncomp);
1816				ds_prev->ds_phys->ds_unique_bytes += used;
1817			}
1818
1819			/* Adjust snapused. */
1820			dsl_deadlist_space_range(&ds_next->ds_deadlist,
1821			    ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1822			    &used, &comp, &uncomp);
1823			dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1824			    -used, -comp, -uncomp, tx);
1825
1826			/* Move blocks to be freed to pool's free list. */
1827			dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1828			    &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1829			    tx);
1830			dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1831			    DD_USED_HEAD, used, comp, uncomp, tx);
1832
1833			/* Merge our deadlist into next's and free it. */
1834			dsl_deadlist_merge(&ds_next->ds_deadlist,
1835			    ds->ds_phys->ds_deadlist_obj, tx);
1836		}
1837		dsl_deadlist_close(&ds->ds_deadlist);
1838		dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1839
1840		/* Collapse range in clone heads */
1841		dsl_dataset_remove_clones_key(ds,
1842		    ds->ds_phys->ds_creation_txg, tx);
1843
1844		if (dsl_dataset_is_snapshot(ds_next)) {
1845			dsl_dataset_t *ds_nextnext;
1846
1847			/*
1848			 * Update next's unique to include blocks which
1849			 * were previously shared by only this snapshot
1850			 * and it.  Those blocks will be born after the
1851			 * prev snap and before this snap, and will have
1852			 * died after the next snap and before the one
1853			 * after that (ie. be on the snap after next's
1854			 * deadlist).
1855			 */
1856			VERIFY(0 == dsl_dataset_hold_obj(dp,
1857			    ds_next->ds_phys->ds_next_snap_obj,
1858			    FTAG, &ds_nextnext));
1859			dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1860			    ds->ds_phys->ds_prev_snap_txg,
1861			    ds->ds_phys->ds_creation_txg,
1862			    &used, &comp, &uncomp);
1863			ds_next->ds_phys->ds_unique_bytes += used;
1864			dsl_dataset_rele(ds_nextnext, FTAG);
1865			ASSERT3P(ds_next->ds_prev, ==, NULL);
1866
1867			/* Collapse range in this head. */
1868			dsl_dataset_t *hds;
1869			VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1870			    ds->ds_dir->dd_phys->dd_head_dataset_obj,
1871			    FTAG, &hds));
1872			dsl_deadlist_remove_key(&hds->ds_deadlist,
1873			    ds->ds_phys->ds_creation_txg, tx);
1874			dsl_dataset_rele(hds, FTAG);
1875
1876		} else {
1877			ASSERT3P(ds_next->ds_prev, ==, ds);
1878			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1879			ds_next->ds_prev = NULL;
1880			if (ds_prev) {
1881				VERIFY(0 == dsl_dataset_get_ref(dp,
1882				    ds->ds_phys->ds_prev_snap_obj,
1883				    ds_next, &ds_next->ds_prev));
1884			}
1885
1886			dsl_dataset_recalc_head_uniq(ds_next);
1887
1888			/*
1889			 * Reduce the amount of our unconsmed refreservation
1890			 * being charged to our parent by the amount of
1891			 * new unique data we have gained.
1892			 */
1893			if (old_unique < ds_next->ds_reserved) {
1894				int64_t mrsdelta;
1895				uint64_t new_unique =
1896				    ds_next->ds_phys->ds_unique_bytes;
1897
1898				ASSERT(old_unique <= new_unique);
1899				mrsdelta = MIN(new_unique - old_unique,
1900				    ds_next->ds_reserved - old_unique);
1901				dsl_dir_diduse_space(ds->ds_dir,
1902				    DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1903			}
1904		}
1905		dsl_dataset_rele(ds_next, FTAG);
1906	} else {
1907		zfeature_info_t *async_destroy =
1908		    &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1909
1910		/*
1911		 * There's no next snapshot, so this is a head dataset.
1912		 * Destroy the deadlist.  Unless it's a clone, the
1913		 * deadlist should be empty.  (If it's a clone, it's
1914		 * safe to ignore the deadlist contents.)
1915		 */
1916		dsl_deadlist_close(&ds->ds_deadlist);
1917		dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1918		ds->ds_phys->ds_deadlist_obj = 0;
1919
1920		if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1921			err = old_synchronous_dataset_destroy(ds, tx);
1922		} else {
1923			/*
1924			 * Move the bptree into the pool's list of trees to
1925			 * clean up and update space accounting information.
1926			 */
1927			uint64_t used, comp, uncomp;
1928
1929			ASSERT(err == 0 || err == EBUSY);
1930			if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1931				spa_feature_incr(dp->dp_spa, async_destroy, tx);
1932				dp->dp_bptree_obj = bptree_alloc(
1933				    dp->dp_meta_objset, tx);
1934				VERIFY(zap_add(dp->dp_meta_objset,
1935				    DMU_POOL_DIRECTORY_OBJECT,
1936				    DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1937				    &dp->dp_bptree_obj, tx) == 0);
1938			}
1939
1940			used = ds->ds_dir->dd_phys->dd_used_bytes;
1941			comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1942			uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1943
1944			ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1945			    ds->ds_phys->ds_unique_bytes == used);
1946
1947			bptree_add(dp->dp_meta_objset, dp->dp_bptree_obj,
1948			    &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1949			    used, comp, uncomp, tx);
1950			dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1951			    -used, -comp, -uncomp, tx);
1952			dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1953			    used, comp, uncomp, tx);
1954		}
1955
1956		if (ds->ds_prev != NULL) {
1957			if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1958				VERIFY3U(0, ==, zap_remove_int(mos,
1959				    ds->ds_prev->ds_dir->dd_phys->dd_clones,
1960				    ds->ds_object, tx));
1961			}
1962			dsl_dataset_rele(ds->ds_prev, ds);
1963			ds->ds_prev = ds_prev = NULL;
1964		}
1965	}
1966
1967	/*
1968	 * This must be done after the dsl_traverse(), because it will
1969	 * re-open the objset.
1970	 */
1971	if (ds->ds_objset) {
1972		dmu_objset_evict(ds->ds_objset);
1973		ds->ds_objset = NULL;
1974	}
1975
1976	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1977		/* Erase the link in the dir */
1978		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1979		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1980		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1981		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1982		ASSERT(err == 0);
1983	} else {
1984		/* remove from snapshot namespace */
1985		dsl_dataset_t *ds_head;
1986		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1987		VERIFY(0 == dsl_dataset_hold_obj(dp,
1988		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1989		VERIFY(0 == dsl_dataset_get_snapname(ds));
1990#ifdef ZFS_DEBUG
1991		{
1992			uint64_t val;
1993
1994			err = dsl_dataset_snap_lookup(ds_head,
1995			    ds->ds_snapname, &val);
1996			ASSERT3U(err, ==, 0);
1997			ASSERT3U(val, ==, obj);
1998		}
1999#endif
2000		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
2001		ASSERT(err == 0);
2002		dsl_dataset_rele(ds_head, FTAG);
2003	}
2004
2005	if (ds_prev && ds->ds_prev != ds_prev)
2006		dsl_dataset_rele(ds_prev, FTAG);
2007
2008	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
2009	spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
2010	    "dataset = %llu", ds->ds_object);
2011
2012	if (ds->ds_phys->ds_next_clones_obj != 0) {
2013		uint64_t count;
2014		ASSERT(0 == zap_count(mos,
2015		    ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
2016		VERIFY(0 == dmu_object_free(mos,
2017		    ds->ds_phys->ds_next_clones_obj, tx));
2018	}
2019	if (ds->ds_phys->ds_props_obj != 0)
2020		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
2021	if (ds->ds_phys->ds_userrefs_obj != 0)
2022		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
2023	dsl_dir_close(ds->ds_dir, ds);
2024	ds->ds_dir = NULL;
2025	dsl_dataset_drain_refs(ds, tag);
2026	VERIFY(0 == dmu_object_free(mos, obj, tx));
2027
2028	if (dsda->rm_origin) {
2029		/*
2030		 * Remove the origin of the clone we just destroyed.
2031		 */
2032		struct dsl_ds_destroyarg ndsda = {0};
2033
2034		ndsda.ds = dsda->rm_origin;
2035		dsl_dataset_destroy_sync(&ndsda, tag, tx);
2036	}
2037}
2038
2039static int
2040dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
2041{
2042	uint64_t asize;
2043
2044	if (!dmu_tx_is_syncing(tx))
2045		return (0);
2046
2047	/*
2048	 * If there's an fs-only reservation, any blocks that might become
2049	 * owned by the snapshot dataset must be accommodated by space
2050	 * outside of the reservation.
2051	 */
2052	ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2053	asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2054	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2055		return (ENOSPC);
2056
2057	/*
2058	 * Propogate any reserved space for this snapshot to other
2059	 * snapshot checks in this sync group.
2060	 */
2061	if (asize > 0)
2062		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2063
2064	return (0);
2065}
2066
2067int
2068dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
2069{
2070	dsl_dataset_t *ds = arg1;
2071	const char *snapname = arg2;
2072	int err;
2073	uint64_t value;
2074
2075	/*
2076	 * We don't allow multiple snapshots of the same txg.  If there
2077	 * is already one, try again.
2078	 */
2079	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2080		return (EAGAIN);
2081
2082	/*
2083	 * Check for conflicting name snapshot name.
2084	 */
2085	err = dsl_dataset_snap_lookup(ds, snapname, &value);
2086	if (err == 0)
2087		return (EEXIST);
2088	if (err != ENOENT)
2089		return (err);
2090
2091	/*
2092	 * Check that the dataset's name is not too long.  Name consists
2093	 * of the dataset's length + 1 for the @-sign + snapshot name's length
2094	 */
2095	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2096		return (ENAMETOOLONG);
2097
2098	err = dsl_dataset_snapshot_reserve_space(ds, tx);
2099	if (err)
2100		return (err);
2101
2102	ds->ds_trysnap_txg = tx->tx_txg;
2103	return (0);
2104}
2105
2106void
2107dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2108{
2109	dsl_dataset_t *ds = arg1;
2110	const char *snapname = arg2;
2111	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2112	dmu_buf_t *dbuf;
2113	dsl_dataset_phys_t *dsphys;
2114	uint64_t dsobj, crtxg;
2115	objset_t *mos = dp->dp_meta_objset;
2116	int err;
2117
2118	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2119
2120	/*
2121	 * The origin's ds_creation_txg has to be < TXG_INITIAL
2122	 */
2123	if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2124		crtxg = 1;
2125	else
2126		crtxg = tx->tx_txg;
2127
2128	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2129	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2130	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2131	dmu_buf_will_dirty(dbuf, tx);
2132	dsphys = dbuf->db_data;
2133	bzero(dsphys, sizeof (dsl_dataset_phys_t));
2134	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2135	dsphys->ds_fsid_guid = unique_create();
2136	do {
2137		(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2138		    sizeof (dsphys->ds_guid));
2139	} while (dsphys->ds_guid == 0);
2140	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2141	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2142	dsphys->ds_next_snap_obj = ds->ds_object;
2143	dsphys->ds_num_children = 1;
2144	dsphys->ds_creation_time = gethrestime_sec();
2145	dsphys->ds_creation_txg = crtxg;
2146	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2147	dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2148	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2149	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2150	dsphys->ds_flags = ds->ds_phys->ds_flags;
2151	dsphys->ds_bp = ds->ds_phys->ds_bp;
2152	dmu_buf_rele(dbuf, FTAG);
2153
2154	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2155	if (ds->ds_prev) {
2156		uint64_t next_clones_obj =
2157		    ds->ds_prev->ds_phys->ds_next_clones_obj;
2158		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2159		    ds->ds_object ||
2160		    ds->ds_prev->ds_phys->ds_num_children > 1);
2161		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2162			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2163			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2164			    ds->ds_prev->ds_phys->ds_creation_txg);
2165			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2166		} else if (next_clones_obj != 0) {
2167			remove_from_next_clones(ds->ds_prev,
2168			    dsphys->ds_next_snap_obj, tx);
2169			VERIFY3U(0, ==, zap_add_int(mos,
2170			    next_clones_obj, dsobj, tx));
2171		}
2172	}
2173
2174	/*
2175	 * If we have a reference-reservation on this dataset, we will
2176	 * need to increase the amount of refreservation being charged
2177	 * since our unique space is going to zero.
2178	 */
2179	if (ds->ds_reserved) {
2180		int64_t delta;
2181		ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2182		delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2183		dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2184		    delta, 0, 0, tx);
2185	}
2186
2187	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2188	zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2189	    ds->ds_dir->dd_myname, snapname, dsobj,
2190	    ds->ds_phys->ds_prev_snap_txg);
2191	ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2192	    UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2193	dsl_deadlist_close(&ds->ds_deadlist);
2194	dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2195	dsl_deadlist_add_key(&ds->ds_deadlist,
2196	    ds->ds_phys->ds_prev_snap_txg, tx);
2197
2198	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2199	ds->ds_phys->ds_prev_snap_obj = dsobj;
2200	ds->ds_phys->ds_prev_snap_txg = crtxg;
2201	ds->ds_phys->ds_unique_bytes = 0;
2202	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2203		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2204
2205	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2206	    snapname, 8, 1, &dsobj, tx);
2207	ASSERT(err == 0);
2208
2209	if (ds->ds_prev)
2210		dsl_dataset_drop_ref(ds->ds_prev, ds);
2211	VERIFY(0 == dsl_dataset_get_ref(dp,
2212	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2213
2214	dsl_scan_ds_snapshotted(ds, tx);
2215
2216	dsl_dir_snap_cmtime_update(ds->ds_dir);
2217
2218	spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2219	    "dataset = %llu", dsobj);
2220}
2221
2222void
2223dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2224{
2225	ASSERT(dmu_tx_is_syncing(tx));
2226	ASSERT(ds->ds_objset != NULL);
2227	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2228
2229	/*
2230	 * in case we had to change ds_fsid_guid when we opened it,
2231	 * sync it out now.
2232	 */
2233	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2234	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2235
2236	dsl_dir_dirty(ds->ds_dir, tx);
2237	dmu_objset_sync(ds->ds_objset, zio, tx);
2238}
2239
2240static void
2241get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2242{
2243	uint64_t count = 0;
2244	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2245	zap_cursor_t zc;
2246	zap_attribute_t za;
2247	nvlist_t *propval;
2248	nvlist_t *val;
2249
2250	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2251	VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2252	VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2253
2254	/*
2255	 * There may me missing entries in ds_next_clones_obj
2256	 * due to a bug in a previous version of the code.
2257	 * Only trust it if it has the right number of entries.
2258	 */
2259	if (ds->ds_phys->ds_next_clones_obj != 0) {
2260		ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2261		    &count));
2262	}
2263	if (count != ds->ds_phys->ds_num_children - 1) {
2264		goto fail;
2265	}
2266	for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2267	    zap_cursor_retrieve(&zc, &za) == 0;
2268	    zap_cursor_advance(&zc)) {
2269		dsl_dataset_t *clone;
2270		char buf[ZFS_MAXNAMELEN];
2271		/*
2272		 * Even though we hold the dp_config_rwlock, the dataset
2273		 * may fail to open, returning ENOENT.  If there is a
2274		 * thread concurrently attempting to destroy this
2275		 * dataset, it will have the ds_rwlock held for
2276		 * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2277		 * dsl_dataset_hold_ref() will fail its
2278		 * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2279		 * dp_config_rwlock, and wait for the destroy progress
2280		 * and signal ds_exclusive_cv.  If the destroy was
2281		 * successful, we will see that
2282		 * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2283		 */
2284		if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2285		    za.za_first_integer, FTAG, &clone) != 0)
2286			continue;
2287		dsl_dir_name(clone->ds_dir, buf);
2288		VERIFY(nvlist_add_boolean(val, buf) == 0);
2289		dsl_dataset_rele(clone, FTAG);
2290	}
2291	zap_cursor_fini(&zc);
2292	VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2293	VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2294	    propval) == 0);
2295fail:
2296	nvlist_free(val);
2297	nvlist_free(propval);
2298	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2299}
2300
2301void
2302dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2303{
2304	uint64_t refd, avail, uobjs, aobjs, ratio;
2305
2306	dsl_dir_stats(ds->ds_dir, nv);
2307
2308	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2309	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2310	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2311
2312	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2313	    ds->ds_phys->ds_creation_time);
2314	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2315	    ds->ds_phys->ds_creation_txg);
2316	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2317	    ds->ds_quota);
2318	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2319	    ds->ds_reserved);
2320	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2321	    ds->ds_phys->ds_guid);
2322	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2323	    ds->ds_phys->ds_unique_bytes);
2324	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2325	    ds->ds_object);
2326	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2327	    ds->ds_userrefs);
2328	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2329	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2330
2331	if (ds->ds_phys->ds_prev_snap_obj != 0) {
2332		uint64_t written, comp, uncomp;
2333		dsl_pool_t *dp = ds->ds_dir->dd_pool;
2334		dsl_dataset_t *prev;
2335
2336		rw_enter(&dp->dp_config_rwlock, RW_READER);
2337		int err = dsl_dataset_hold_obj(dp,
2338		    ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2339		rw_exit(&dp->dp_config_rwlock);
2340		if (err == 0) {
2341			err = dsl_dataset_space_written(prev, ds, &written,
2342			    &comp, &uncomp);
2343			dsl_dataset_rele(prev, FTAG);
2344			if (err == 0) {
2345				dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2346				    written);
2347			}
2348		}
2349	}
2350
2351	ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2352	    (ds->ds_phys->ds_uncompressed_bytes * 100 /
2353	    ds->ds_phys->ds_compressed_bytes);
2354	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2355
2356	if (ds->ds_phys->ds_next_snap_obj) {
2357		/*
2358		 * This is a snapshot; override the dd's space used with
2359		 * our unique space and compression ratio.
2360		 */
2361		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2362		    ds->ds_phys->ds_unique_bytes);
2363		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2364
2365		get_clones_stat(ds, nv);
2366	}
2367}
2368
2369void
2370dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2371{
2372	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2373	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2374	stat->dds_guid = ds->ds_phys->ds_guid;
2375	if (ds->ds_phys->ds_next_snap_obj) {
2376		stat->dds_is_snapshot = B_TRUE;
2377		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2378	} else {
2379		stat->dds_is_snapshot = B_FALSE;
2380		stat->dds_num_clones = 0;
2381	}
2382
2383	/* clone origin is really a dsl_dir thing... */
2384	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2385	if (dsl_dir_is_clone(ds->ds_dir)) {
2386		dsl_dataset_t *ods;
2387
2388		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2389		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2390		dsl_dataset_name(ods, stat->dds_origin);
2391		dsl_dataset_drop_ref(ods, FTAG);
2392	} else {
2393		stat->dds_origin[0] = '\0';
2394	}
2395	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2396}
2397
2398uint64_t
2399dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2400{
2401	return (ds->ds_fsid_guid);
2402}
2403
2404void
2405dsl_dataset_space(dsl_dataset_t *ds,
2406    uint64_t *refdbytesp, uint64_t *availbytesp,
2407    uint64_t *usedobjsp, uint64_t *availobjsp)
2408{
2409	*refdbytesp = ds->ds_phys->ds_referenced_bytes;
2410	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2411	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2412		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2413	if (ds->ds_quota != 0) {
2414		/*
2415		 * Adjust available bytes according to refquota
2416		 */
2417		if (*refdbytesp < ds->ds_quota)
2418			*availbytesp = MIN(*availbytesp,
2419			    ds->ds_quota - *refdbytesp);
2420		else
2421			*availbytesp = 0;
2422	}
2423	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2424	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
2425}
2426
2427boolean_t
2428dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2429{
2430	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2431
2432	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2433	    dsl_pool_sync_context(dp));
2434	if (ds->ds_prev == NULL)
2435		return (B_FALSE);
2436	if (ds->ds_phys->ds_bp.blk_birth >
2437	    ds->ds_prev->ds_phys->ds_creation_txg) {
2438		objset_t *os, *os_prev;
2439		/*
2440		 * It may be that only the ZIL differs, because it was
2441		 * reset in the head.  Don't count that as being
2442		 * modified.
2443		 */
2444		if (dmu_objset_from_ds(ds, &os) != 0)
2445			return (B_TRUE);
2446		if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2447			return (B_TRUE);
2448		return (bcmp(&os->os_phys->os_meta_dnode,
2449		    &os_prev->os_phys->os_meta_dnode,
2450		    sizeof (os->os_phys->os_meta_dnode)) != 0);
2451	}
2452	return (B_FALSE);
2453}
2454
2455/* ARGSUSED */
2456static int
2457dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2458{
2459	dsl_dataset_t *ds = arg1;
2460	char *newsnapname = arg2;
2461	dsl_dir_t *dd = ds->ds_dir;
2462	dsl_dataset_t *hds;
2463	uint64_t val;
2464	int err;
2465
2466	err = dsl_dataset_hold_obj(dd->dd_pool,
2467	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2468	if (err)
2469		return (err);
2470
2471	/* new name better not be in use */
2472	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2473	dsl_dataset_rele(hds, FTAG);
2474
2475	if (err == 0)
2476		err = EEXIST;
2477	else if (err == ENOENT)
2478		err = 0;
2479
2480	/* dataset name + 1 for the "@" + the new snapshot name must fit */
2481	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2482		err = ENAMETOOLONG;
2483
2484	return (err);
2485}
2486
2487static void
2488dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2489{
2490	char oldname[MAXPATHLEN], newname[MAXPATHLEN];
2491	dsl_dataset_t *ds = arg1;
2492	const char *newsnapname = arg2;
2493	dsl_dir_t *dd = ds->ds_dir;
2494	objset_t *mos = dd->dd_pool->dp_meta_objset;
2495	dsl_dataset_t *hds;
2496	int err;
2497
2498	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2499
2500	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2501	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2502
2503	VERIFY(0 == dsl_dataset_get_snapname(ds));
2504	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2505	ASSERT3U(err, ==, 0);
2506	dsl_dataset_name(ds, oldname);
2507	mutex_enter(&ds->ds_lock);
2508	(void) strcpy(ds->ds_snapname, newsnapname);
2509	mutex_exit(&ds->ds_lock);
2510	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2511	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2512	ASSERT3U(err, ==, 0);
2513	dsl_dataset_name(ds, newname);
2514#ifdef _KERNEL
2515	zvol_rename_minors(oldname, newname);
2516#endif
2517
2518	spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2519	    "dataset = %llu", ds->ds_object);
2520	dsl_dataset_rele(hds, FTAG);
2521}
2522
2523struct renamesnaparg {
2524	dsl_sync_task_group_t *dstg;
2525	char failed[MAXPATHLEN];
2526	char *oldsnap;
2527	char *newsnap;
2528};
2529
2530static int
2531dsl_snapshot_rename_one(const char *name, void *arg)
2532{
2533	struct renamesnaparg *ra = arg;
2534	dsl_dataset_t *ds = NULL;
2535	char *snapname;
2536	int err;
2537
2538	snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2539	(void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2540
2541	/*
2542	 * For recursive snapshot renames the parent won't be changing
2543	 * so we just pass name for both the to/from argument.
2544	 */
2545	err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2546	if (err != 0) {
2547		strfree(snapname);
2548		return (err == ENOENT ? 0 : err);
2549	}
2550
2551#ifdef _KERNEL
2552	/*
2553	 * For all filesystems undergoing rename, we'll need to unmount it.
2554	 */
2555	(void) zfs_unmount_snap(snapname, NULL);
2556#endif
2557	err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2558	strfree(snapname);
2559	if (err != 0)
2560		return (err == ENOENT ? 0 : err);
2561
2562	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2563	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2564
2565	return (0);
2566}
2567
2568static int
2569dsl_recursive_rename(char *oldname, const char *newname)
2570{
2571	int err;
2572	struct renamesnaparg *ra;
2573	dsl_sync_task_t *dst;
2574	spa_t *spa;
2575	char *cp, *fsname = spa_strdup(oldname);
2576	int len = strlen(oldname) + 1;
2577
2578	/* truncate the snapshot name to get the fsname */
2579	cp = strchr(fsname, '@');
2580	*cp = '\0';
2581
2582	err = spa_open(fsname, &spa, FTAG);
2583	if (err) {
2584		kmem_free(fsname, len);
2585		return (err);
2586	}
2587	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2588	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2589
2590	ra->oldsnap = strchr(oldname, '@') + 1;
2591	ra->newsnap = strchr(newname, '@') + 1;
2592	*ra->failed = '\0';
2593
2594	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2595	    DS_FIND_CHILDREN);
2596	kmem_free(fsname, len);
2597
2598	if (err == 0) {
2599		err = dsl_sync_task_group_wait(ra->dstg);
2600	}
2601
2602	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2603	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2604		dsl_dataset_t *ds = dst->dst_arg1;
2605		if (dst->dst_err) {
2606			dsl_dir_name(ds->ds_dir, ra->failed);
2607			(void) strlcat(ra->failed, "@", sizeof (ra->failed));
2608			(void) strlcat(ra->failed, ra->newsnap,
2609			    sizeof (ra->failed));
2610		}
2611		dsl_dataset_rele(ds, ra->dstg);
2612	}
2613
2614	if (err)
2615		(void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2616
2617	dsl_sync_task_group_destroy(ra->dstg);
2618	kmem_free(ra, sizeof (struct renamesnaparg));
2619	spa_close(spa, FTAG);
2620	return (err);
2621}
2622
2623static int
2624dsl_valid_rename(const char *oldname, void *arg)
2625{
2626	int delta = *(int *)arg;
2627
2628	if (strlen(oldname) + delta >= MAXNAMELEN)
2629		return (ENAMETOOLONG);
2630
2631	return (0);
2632}
2633
2634#pragma weak dmu_objset_rename = dsl_dataset_rename
2635int
2636dsl_dataset_rename(char *oldname, const char *newname, int flags)
2637{
2638	dsl_dir_t *dd;
2639	dsl_dataset_t *ds;
2640	const char *tail;
2641	int err;
2642
2643	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2644	if (err)
2645		return (err);
2646
2647	if (tail == NULL) {
2648		int delta = strlen(newname) - strlen(oldname);
2649
2650		/* if we're growing, validate child name lengths */
2651		if (delta > 0)
2652			err = dmu_objset_find(oldname, dsl_valid_rename,
2653			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2654
2655		if (err == 0)
2656			err = dsl_dir_rename(dd, newname, flags);
2657		dsl_dir_close(dd, FTAG);
2658		return (err);
2659	}
2660
2661	if (tail[0] != '@') {
2662		/* the name ended in a nonexistent component */
2663		dsl_dir_close(dd, FTAG);
2664		return (ENOENT);
2665	}
2666
2667	dsl_dir_close(dd, FTAG);
2668
2669	/* new name must be snapshot in same filesystem */
2670	tail = strchr(newname, '@');
2671	if (tail == NULL)
2672		return (EINVAL);
2673	tail++;
2674	if (strncmp(oldname, newname, tail - newname) != 0)
2675		return (EXDEV);
2676
2677	if (flags & ZFS_RENAME_RECURSIVE) {
2678		err = dsl_recursive_rename(oldname, newname);
2679	} else {
2680		err = dsl_dataset_hold(oldname, FTAG, &ds);
2681		if (err)
2682			return (err);
2683
2684		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2685		    dsl_dataset_snapshot_rename_check,
2686		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2687
2688		dsl_dataset_rele(ds, FTAG);
2689	}
2690
2691	return (err);
2692}
2693
2694struct promotenode {
2695	list_node_t link;
2696	dsl_dataset_t *ds;
2697};
2698
2699struct promotearg {
2700	list_t shared_snaps, origin_snaps, clone_snaps;
2701	dsl_dataset_t *origin_origin;
2702	uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2703	char *err_ds;
2704};
2705
2706static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2707static boolean_t snaplist_unstable(list_t *l);
2708
2709static int
2710dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2711{
2712	dsl_dataset_t *hds = arg1;
2713	struct promotearg *pa = arg2;
2714	struct promotenode *snap = list_head(&pa->shared_snaps);
2715	dsl_dataset_t *origin_ds = snap->ds;
2716	int err;
2717	uint64_t unused;
2718
2719	/* Check that it is a real clone */
2720	if (!dsl_dir_is_clone(hds->ds_dir))
2721		return (EINVAL);
2722
2723	/* Since this is so expensive, don't do the preliminary check */
2724	if (!dmu_tx_is_syncing(tx))
2725		return (0);
2726
2727	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2728		return (EXDEV);
2729
2730	/* compute origin's new unique space */
2731	snap = list_tail(&pa->clone_snaps);
2732	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2733	dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2734	    origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2735	    &pa->unique, &unused, &unused);
2736
2737	/*
2738	 * Walk the snapshots that we are moving
2739	 *
2740	 * Compute space to transfer.  Consider the incremental changes
2741	 * to used for each snapshot:
2742	 * (my used) = (prev's used) + (blocks born) - (blocks killed)
2743	 * So each snapshot gave birth to:
2744	 * (blocks born) = (my used) - (prev's used) + (blocks killed)
2745	 * So a sequence would look like:
2746	 * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2747	 * Which simplifies to:
2748	 * uN + kN + kN-1 + ... + k1 + k0
2749	 * Note however, if we stop before we reach the ORIGIN we get:
2750	 * uN + kN + kN-1 + ... + kM - uM-1
2751	 */
2752	pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2753	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2754	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2755	for (snap = list_head(&pa->shared_snaps); snap;
2756	    snap = list_next(&pa->shared_snaps, snap)) {
2757		uint64_t val, dlused, dlcomp, dluncomp;
2758		dsl_dataset_t *ds = snap->ds;
2759
2760		/* Check that the snapshot name does not conflict */
2761		VERIFY(0 == dsl_dataset_get_snapname(ds));
2762		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2763		if (err == 0) {
2764			err = EEXIST;
2765			goto out;
2766		}
2767		if (err != ENOENT)
2768			goto out;
2769
2770		/* The very first snapshot does not have a deadlist */
2771		if (ds->ds_phys->ds_prev_snap_obj == 0)
2772			continue;
2773
2774		dsl_deadlist_space(&ds->ds_deadlist,
2775		    &dlused, &dlcomp, &dluncomp);
2776		pa->used += dlused;
2777		pa->comp += dlcomp;
2778		pa->uncomp += dluncomp;
2779	}
2780
2781	/*
2782	 * If we are a clone of a clone then we never reached ORIGIN,
2783	 * so we need to subtract out the clone origin's used space.
2784	 */
2785	if (pa->origin_origin) {
2786		pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2787		pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2788		pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2789	}
2790
2791	/* Check that there is enough space here */
2792	err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2793	    pa->used);
2794	if (err)
2795		return (err);
2796
2797	/*
2798	 * Compute the amounts of space that will be used by snapshots
2799	 * after the promotion (for both origin and clone).  For each,
2800	 * it is the amount of space that will be on all of their
2801	 * deadlists (that was not born before their new origin).
2802	 */
2803	if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2804		uint64_t space;
2805
2806		/*
2807		 * Note, typically this will not be a clone of a clone,
2808		 * so dd_origin_txg will be < TXG_INITIAL, so
2809		 * these snaplist_space() -> dsl_deadlist_space_range()
2810		 * calls will be fast because they do not have to
2811		 * iterate over all bps.
2812		 */
2813		snap = list_head(&pa->origin_snaps);
2814		err = snaplist_space(&pa->shared_snaps,
2815		    snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2816		if (err)
2817			return (err);
2818
2819		err = snaplist_space(&pa->clone_snaps,
2820		    snap->ds->ds_dir->dd_origin_txg, &space);
2821		if (err)
2822			return (err);
2823		pa->cloneusedsnap += space;
2824	}
2825	if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2826		err = snaplist_space(&pa->origin_snaps,
2827		    origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2828		if (err)
2829			return (err);
2830	}
2831
2832	return (0);
2833out:
2834	pa->err_ds =  snap->ds->ds_snapname;
2835	return (err);
2836}
2837
2838static void
2839dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2840{
2841	dsl_dataset_t *hds = arg1;
2842	struct promotearg *pa = arg2;
2843	struct promotenode *snap = list_head(&pa->shared_snaps);
2844	dsl_dataset_t *origin_ds = snap->ds;
2845	dsl_dataset_t *origin_head;
2846	dsl_dir_t *dd = hds->ds_dir;
2847	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2848	dsl_dir_t *odd = NULL;
2849	uint64_t oldnext_obj;
2850	int64_t delta;
2851
2852	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2853
2854	snap = list_head(&pa->origin_snaps);
2855	origin_head = snap->ds;
2856
2857	/*
2858	 * We need to explicitly open odd, since origin_ds's dd will be
2859	 * changing.
2860	 */
2861	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2862	    NULL, FTAG, &odd));
2863
2864	/* change origin's next snap */
2865	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2866	oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2867	snap = list_tail(&pa->clone_snaps);
2868	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2869	origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2870
2871	/* change the origin's next clone */
2872	if (origin_ds->ds_phys->ds_next_clones_obj) {
2873		remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2874		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2875		    origin_ds->ds_phys->ds_next_clones_obj,
2876		    oldnext_obj, tx));
2877	}
2878
2879	/* change origin */
2880	dmu_buf_will_dirty(dd->dd_dbuf, tx);
2881	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2882	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2883	dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2884	dmu_buf_will_dirty(odd->dd_dbuf, tx);
2885	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2886	origin_head->ds_dir->dd_origin_txg =
2887	    origin_ds->ds_phys->ds_creation_txg;
2888
2889	/* change dd_clone entries */
2890	if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2891		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2892		    odd->dd_phys->dd_clones, hds->ds_object, tx));
2893		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2894		    pa->origin_origin->ds_dir->dd_phys->dd_clones,
2895		    hds->ds_object, tx));
2896
2897		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2898		    pa->origin_origin->ds_dir->dd_phys->dd_clones,
2899		    origin_head->ds_object, tx));
2900		if (dd->dd_phys->dd_clones == 0) {
2901			dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2902			    DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2903		}
2904		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2905		    dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2906
2907	}
2908
2909	/* move snapshots to this dir */
2910	for (snap = list_head(&pa->shared_snaps); snap;
2911	    snap = list_next(&pa->shared_snaps, snap)) {
2912		dsl_dataset_t *ds = snap->ds;
2913
2914		/* unregister props as dsl_dir is changing */
2915		if (ds->ds_objset) {
2916			dmu_objset_evict(ds->ds_objset);
2917			ds->ds_objset = NULL;
2918		}
2919		/* move snap name entry */
2920		VERIFY(0 == dsl_dataset_get_snapname(ds));
2921		VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2922		    ds->ds_snapname, tx));
2923		VERIFY(0 == zap_add(dp->dp_meta_objset,
2924		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2925		    8, 1, &ds->ds_object, tx));
2926
2927		/* change containing dsl_dir */
2928		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2929		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2930		ds->ds_phys->ds_dir_obj = dd->dd_object;
2931		ASSERT3P(ds->ds_dir, ==, odd);
2932		dsl_dir_close(ds->ds_dir, ds);
2933		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2934		    NULL, ds, &ds->ds_dir));
2935
2936		/* move any clone references */
2937		if (ds->ds_phys->ds_next_clones_obj &&
2938		    spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2939			zap_cursor_t zc;
2940			zap_attribute_t za;
2941
2942			for (zap_cursor_init(&zc, dp->dp_meta_objset,
2943			    ds->ds_phys->ds_next_clones_obj);
2944			    zap_cursor_retrieve(&zc, &za) == 0;
2945			    zap_cursor_advance(&zc)) {
2946				dsl_dataset_t *cnds;
2947				uint64_t o;
2948
2949				if (za.za_first_integer == oldnext_obj) {
2950					/*
2951					 * We've already moved the
2952					 * origin's reference.
2953					 */
2954					continue;
2955				}
2956
2957				VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2958				    za.za_first_integer, FTAG, &cnds));
2959				o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2960
2961				VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2962				    odd->dd_phys->dd_clones, o, tx), ==, 0);
2963				VERIFY3U(zap_add_int(dp->dp_meta_objset,
2964				    dd->dd_phys->dd_clones, o, tx), ==, 0);
2965				dsl_dataset_rele(cnds, FTAG);
2966			}
2967			zap_cursor_fini(&zc);
2968		}
2969
2970		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2971	}
2972
2973	/*
2974	 * Change space accounting.
2975	 * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2976	 * both be valid, or both be 0 (resulting in delta == 0).  This
2977	 * is true for each of {clone,origin} independently.
2978	 */
2979
2980	delta = pa->cloneusedsnap -
2981	    dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2982	ASSERT3S(delta, >=, 0);
2983	ASSERT3U(pa->used, >=, delta);
2984	dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2985	dsl_dir_diduse_space(dd, DD_USED_HEAD,
2986	    pa->used - delta, pa->comp, pa->uncomp, tx);
2987
2988	delta = pa->originusedsnap -
2989	    odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2990	ASSERT3S(delta, <=, 0);
2991	ASSERT3U(pa->used, >=, -delta);
2992	dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2993	dsl_dir_diduse_space(odd, DD_USED_HEAD,
2994	    -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2995
2996	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2997
2998	/* log history record */
2999	spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
3000	    "dataset = %llu", hds->ds_object);
3001
3002	dsl_dir_close(odd, FTAG);
3003}
3004
3005static char *snaplist_tag = "snaplist";
3006/*
3007 * Make a list of dsl_dataset_t's for the snapshots between first_obj
3008 * (exclusive) and last_obj (inclusive).  The list will be in reverse
3009 * order (last_obj will be the list_head()).  If first_obj == 0, do all
3010 * snapshots back to this dataset's origin.
3011 */
3012static int
3013snaplist_make(dsl_pool_t *dp, boolean_t own,
3014    uint64_t first_obj, uint64_t last_obj, list_t *l)
3015{
3016	uint64_t obj = last_obj;
3017
3018	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
3019
3020	list_create(l, sizeof (struct promotenode),
3021	    offsetof(struct promotenode, link));
3022
3023	while (obj != first_obj) {
3024		dsl_dataset_t *ds;
3025		struct promotenode *snap;
3026		int err;
3027
3028		if (own) {
3029			err = dsl_dataset_own_obj(dp, obj,
3030			    0, snaplist_tag, &ds);
3031			if (err == 0)
3032				dsl_dataset_make_exclusive(ds, snaplist_tag);
3033		} else {
3034			err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
3035		}
3036		if (err == ENOENT) {
3037			/* lost race with snapshot destroy */
3038			struct promotenode *last = list_tail(l);
3039			ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
3040			obj = last->ds->ds_phys->ds_prev_snap_obj;
3041			continue;
3042		} else if (err) {
3043			return (err);
3044		}
3045
3046		if (first_obj == 0)
3047			first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
3048
3049		snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
3050		snap->ds = ds;
3051		list_insert_tail(l, snap);
3052		obj = ds->ds_phys->ds_prev_snap_obj;
3053	}
3054
3055	return (0);
3056}
3057
3058static int
3059snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3060{
3061	struct promotenode *snap;
3062
3063	*spacep = 0;
3064	for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3065		uint64_t used, comp, uncomp;
3066		dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3067		    mintxg, UINT64_MAX, &used, &comp, &uncomp);
3068		*spacep += used;
3069	}
3070	return (0);
3071}
3072
3073static void
3074snaplist_destroy(list_t *l, boolean_t own)
3075{
3076	struct promotenode *snap;
3077
3078	if (!l || !list_link_active(&l->list_head))
3079		return;
3080
3081	while ((snap = list_tail(l)) != NULL) {
3082		list_remove(l, snap);
3083		if (own)
3084			dsl_dataset_disown(snap->ds, snaplist_tag);
3085		else
3086			dsl_dataset_rele(snap->ds, snaplist_tag);
3087		kmem_free(snap, sizeof (struct promotenode));
3088	}
3089	list_destroy(l);
3090}
3091
3092/*
3093 * Promote a clone.  Nomenclature note:
3094 * "clone" or "cds": the original clone which is being promoted
3095 * "origin" or "ods": the snapshot which is originally clone's origin
3096 * "origin head" or "ohds": the dataset which is the head
3097 * (filesystem/volume) for the origin
3098 * "origin origin": the origin of the origin's filesystem (typically
3099 * NULL, indicating that the clone is not a clone of a clone).
3100 */
3101int
3102dsl_dataset_promote(const char *name, char *conflsnap)
3103{
3104	dsl_dataset_t *ds;
3105	dsl_dir_t *dd;
3106	dsl_pool_t *dp;
3107	dmu_object_info_t doi;
3108	struct promotearg pa = { 0 };
3109	struct promotenode *snap;
3110	int err;
3111
3112	err = dsl_dataset_hold(name, FTAG, &ds);
3113	if (err)
3114		return (err);
3115	dd = ds->ds_dir;
3116	dp = dd->dd_pool;
3117
3118	err = dmu_object_info(dp->dp_meta_objset,
3119	    ds->ds_phys->ds_snapnames_zapobj, &doi);
3120	if (err) {
3121		dsl_dataset_rele(ds, FTAG);
3122		return (err);
3123	}
3124
3125	if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3126		dsl_dataset_rele(ds, FTAG);
3127		return (EINVAL);
3128	}
3129
3130	/*
3131	 * We are going to inherit all the snapshots taken before our
3132	 * origin (i.e., our new origin will be our parent's origin).
3133	 * Take ownership of them so that we can rename them into our
3134	 * namespace.
3135	 */
3136	rw_enter(&dp->dp_config_rwlock, RW_READER);
3137
3138	err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3139	    &pa.shared_snaps);
3140	if (err != 0)
3141		goto out;
3142
3143	err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3144	if (err != 0)
3145		goto out;
3146
3147	snap = list_head(&pa.shared_snaps);
3148	ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3149	err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3150	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3151	if (err != 0)
3152		goto out;
3153
3154	if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3155		err = dsl_dataset_hold_obj(dp,
3156		    snap->ds->ds_dir->dd_phys->dd_origin_obj,
3157		    FTAG, &pa.origin_origin);
3158		if (err != 0)
3159			goto out;
3160	}
3161
3162out:
3163	rw_exit(&dp->dp_config_rwlock);
3164
3165	/*
3166	 * Add in 128x the snapnames zapobj size, since we will be moving
3167	 * a bunch of snapnames to the promoted ds, and dirtying their
3168	 * bonus buffers.
3169	 */
3170	if (err == 0) {
3171		err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3172		    dsl_dataset_promote_sync, ds, &pa,
3173		    2 + 2 * doi.doi_physical_blocks_512);
3174		if (err && pa.err_ds && conflsnap)
3175			(void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3176	}
3177
3178	snaplist_destroy(&pa.shared_snaps, B_TRUE);
3179	snaplist_destroy(&pa.clone_snaps, B_FALSE);
3180	snaplist_destroy(&pa.origin_snaps, B_FALSE);
3181	if (pa.origin_origin)
3182		dsl_dataset_rele(pa.origin_origin, FTAG);
3183	dsl_dataset_rele(ds, FTAG);
3184	return (err);
3185}
3186
3187struct cloneswaparg {
3188	dsl_dataset_t *cds; /* clone dataset */
3189	dsl_dataset_t *ohds; /* origin's head dataset */
3190	boolean_t force;
3191	int64_t unused_refres_delta; /* change in unconsumed refreservation */
3192};
3193
3194/* ARGSUSED */
3195static int
3196dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3197{
3198	struct cloneswaparg *csa = arg1;
3199
3200	/* they should both be heads */
3201	if (dsl_dataset_is_snapshot(csa->cds) ||
3202	    dsl_dataset_is_snapshot(csa->ohds))
3203		return (EINVAL);
3204
3205	/* the branch point should be just before them */
3206	if (csa->cds->ds_prev != csa->ohds->ds_prev)
3207		return (EINVAL);
3208
3209	/* cds should be the clone (unless they are unrelated) */
3210	if (csa->cds->ds_prev != NULL &&
3211	    csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3212	    csa->ohds->ds_object !=
3213	    csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3214		return (EINVAL);
3215
3216	/* the clone should be a child of the origin */
3217	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3218		return (EINVAL);
3219
3220	/* ohds shouldn't be modified unless 'force' */
3221	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3222		return (ETXTBSY);
3223
3224	/* adjust amount of any unconsumed refreservation */
3225	csa->unused_refres_delta =
3226	    (int64_t)MIN(csa->ohds->ds_reserved,
3227	    csa->ohds->ds_phys->ds_unique_bytes) -
3228	    (int64_t)MIN(csa->ohds->ds_reserved,
3229	    csa->cds->ds_phys->ds_unique_bytes);
3230
3231	if (csa->unused_refres_delta > 0 &&
3232	    csa->unused_refres_delta >
3233	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3234		return (ENOSPC);
3235
3236	if (csa->ohds->ds_quota != 0 &&
3237	    csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3238		return (EDQUOT);
3239
3240	return (0);
3241}
3242
3243/* ARGSUSED */
3244static void
3245dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3246{
3247	struct cloneswaparg *csa = arg1;
3248	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3249
3250	ASSERT(csa->cds->ds_reserved == 0);
3251	ASSERT(csa->ohds->ds_quota == 0 ||
3252	    csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3253
3254	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3255	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3256
3257	if (csa->cds->ds_objset != NULL) {
3258		dmu_objset_evict(csa->cds->ds_objset);
3259		csa->cds->ds_objset = NULL;
3260	}
3261
3262	if (csa->ohds->ds_objset != NULL) {
3263		dmu_objset_evict(csa->ohds->ds_objset);
3264		csa->ohds->ds_objset = NULL;
3265	}
3266
3267	/*
3268	 * Reset origin's unique bytes, if it exists.
3269	 */
3270	if (csa->cds->ds_prev) {
3271		dsl_dataset_t *origin = csa->cds->ds_prev;
3272		uint64_t comp, uncomp;
3273
3274		dmu_buf_will_dirty(origin->ds_dbuf, tx);
3275		dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3276		    origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3277		    &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3278	}
3279
3280	/* swap blkptrs */
3281	{
3282		blkptr_t tmp;
3283		tmp = csa->ohds->ds_phys->ds_bp;
3284		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3285		csa->cds->ds_phys->ds_bp = tmp;
3286	}
3287
3288	/* set dd_*_bytes */
3289	{
3290		int64_t dused, dcomp, duncomp;
3291		uint64_t cdl_used, cdl_comp, cdl_uncomp;
3292		uint64_t odl_used, odl_comp, odl_uncomp;
3293
3294		ASSERT3U(csa->cds->ds_dir->dd_phys->
3295		    dd_used_breakdown[DD_USED_SNAP], ==, 0);
3296
3297		dsl_deadlist_space(&csa->cds->ds_deadlist,
3298		    &cdl_used, &cdl_comp, &cdl_uncomp);
3299		dsl_deadlist_space(&csa->ohds->ds_deadlist,
3300		    &odl_used, &odl_comp, &odl_uncomp);
3301
3302		dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3303		    (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3304		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3305		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3306		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3307		    cdl_uncomp -
3308		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3309
3310		dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3311		    dused, dcomp, duncomp, tx);
3312		dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3313		    -dused, -dcomp, -duncomp, tx);
3314
3315		/*
3316		 * The difference in the space used by snapshots is the
3317		 * difference in snapshot space due to the head's
3318		 * deadlist (since that's the only thing that's
3319		 * changing that affects the snapused).
3320		 */
3321		dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3322		    csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3323		    &cdl_used, &cdl_comp, &cdl_uncomp);
3324		dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3325		    csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3326		    &odl_used, &odl_comp, &odl_uncomp);
3327		dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3328		    DD_USED_HEAD, DD_USED_SNAP, tx);
3329	}
3330
3331	/* swap ds_*_bytes */
3332	SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3333	    csa->cds->ds_phys->ds_referenced_bytes);
3334	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3335	    csa->cds->ds_phys->ds_compressed_bytes);
3336	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3337	    csa->cds->ds_phys->ds_uncompressed_bytes);
3338	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3339	    csa->cds->ds_phys->ds_unique_bytes);
3340
3341	/* apply any parent delta for change in unconsumed refreservation */
3342	dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3343	    csa->unused_refres_delta, 0, 0, tx);
3344
3345	/*
3346	 * Swap deadlists.
3347	 */
3348	dsl_deadlist_close(&csa->cds->ds_deadlist);
3349	dsl_deadlist_close(&csa->ohds->ds_deadlist);
3350	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3351	    csa->cds->ds_phys->ds_deadlist_obj);
3352	dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3353	    csa->cds->ds_phys->ds_deadlist_obj);
3354	dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3355	    csa->ohds->ds_phys->ds_deadlist_obj);
3356
3357	dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3358}
3359
3360/*
3361 * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3362 * recv" into an existing fs to swizzle the file system to the new
3363 * version, and by "zfs rollback".  Can also be used to swap two
3364 * independent head datasets if neither has any snapshots.
3365 */
3366int
3367dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3368    boolean_t force)
3369{
3370	struct cloneswaparg csa;
3371	int error;
3372
3373	ASSERT(clone->ds_owner);
3374	ASSERT(origin_head->ds_owner);
3375retry:
3376	/*
3377	 * Need exclusive access for the swap. If we're swapping these
3378	 * datasets back after an error, we already hold the locks.
3379	 */
3380	if (!RW_WRITE_HELD(&clone->ds_rwlock))
3381		rw_enter(&clone->ds_rwlock, RW_WRITER);
3382	if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3383	    !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3384		rw_exit(&clone->ds_rwlock);
3385		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3386		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3387			rw_exit(&origin_head->ds_rwlock);
3388			goto retry;
3389		}
3390	}
3391	csa.cds = clone;
3392	csa.ohds = origin_head;
3393	csa.force = force;
3394	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3395	    dsl_dataset_clone_swap_check,
3396	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3397	return (error);
3398}
3399
3400/*
3401 * Given a pool name and a dataset object number in that pool,
3402 * return the name of that dataset.
3403 */
3404int
3405dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3406{
3407	spa_t *spa;
3408	dsl_pool_t *dp;
3409	dsl_dataset_t *ds;
3410	int error;
3411
3412	if ((error = spa_open(pname, &spa, FTAG)) != 0)
3413		return (error);
3414	dp = spa_get_dsl(spa);
3415	rw_enter(&dp->dp_config_rwlock, RW_READER);
3416	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3417		dsl_dataset_name(ds, buf);
3418		dsl_dataset_rele(ds, FTAG);
3419	}
3420	rw_exit(&dp->dp_config_rwlock);
3421	spa_close(spa, FTAG);
3422
3423	return (error);
3424}
3425
3426int
3427dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3428    uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3429{
3430	int error = 0;
3431
3432	ASSERT3S(asize, >, 0);
3433
3434	/*
3435	 * *ref_rsrv is the portion of asize that will come from any
3436	 * unconsumed refreservation space.
3437	 */
3438	*ref_rsrv = 0;
3439
3440	mutex_enter(&ds->ds_lock);
3441	/*
3442	 * Make a space adjustment for reserved bytes.
3443	 */
3444	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3445		ASSERT3U(*used, >=,
3446		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3447		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3448		*ref_rsrv =
3449		    asize - MIN(asize, parent_delta(ds, asize + inflight));
3450	}
3451
3452	if (!check_quota || ds->ds_quota == 0) {
3453		mutex_exit(&ds->ds_lock);
3454		return (0);
3455	}
3456	/*
3457	 * If they are requesting more space, and our current estimate
3458	 * is over quota, they get to try again unless the actual
3459	 * on-disk is over quota and there are no pending changes (which
3460	 * may free up space for us).
3461	 */
3462	if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3463		if (inflight > 0 ||
3464		    ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3465			error = ERESTART;
3466		else
3467			error = EDQUOT;
3468	}
3469	mutex_exit(&ds->ds_lock);
3470
3471	return (error);
3472}
3473
3474/* ARGSUSED */
3475static int
3476dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3477{
3478	dsl_dataset_t *ds = arg1;
3479	dsl_prop_setarg_t *psa = arg2;
3480	int err;
3481
3482	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3483		return (ENOTSUP);
3484
3485	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3486		return (err);
3487
3488	if (psa->psa_effective_value == 0)
3489		return (0);
3490
3491	if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3492	    psa->psa_effective_value < ds->ds_reserved)
3493		return (ENOSPC);
3494
3495	return (0);
3496}
3497
3498extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3499
3500void
3501dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3502{
3503	dsl_dataset_t *ds = arg1;
3504	dsl_prop_setarg_t *psa = arg2;
3505	uint64_t effective_value = psa->psa_effective_value;
3506
3507	dsl_prop_set_sync(ds, psa, tx);
3508	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3509
3510	if (ds->ds_quota != effective_value) {
3511		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3512		ds->ds_quota = effective_value;
3513
3514		spa_history_log_internal(LOG_DS_REFQUOTA,
3515		    ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu ",
3516		    (longlong_t)ds->ds_quota, ds->ds_object);
3517	}
3518}
3519
3520int
3521dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3522{
3523	dsl_dataset_t *ds;
3524	dsl_prop_setarg_t psa;
3525	int err;
3526
3527	dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3528
3529	err = dsl_dataset_hold(dsname, FTAG, &ds);
3530	if (err)
3531		return (err);
3532
3533	/*
3534	 * If someone removes a file, then tries to set the quota, we
3535	 * want to make sure the file freeing takes effect.
3536	 */
3537	txg_wait_open(ds->ds_dir->dd_pool, 0);
3538
3539	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3540	    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3541	    ds, &psa, 0);
3542
3543	dsl_dataset_rele(ds, FTAG);
3544	return (err);
3545}
3546
3547static int
3548dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3549{
3550	dsl_dataset_t *ds = arg1;
3551	dsl_prop_setarg_t *psa = arg2;
3552	uint64_t effective_value;
3553	uint64_t unique;
3554	int err;
3555
3556	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3557	    SPA_VERSION_REFRESERVATION)
3558		return (ENOTSUP);
3559
3560	if (dsl_dataset_is_snapshot(ds))
3561		return (EINVAL);
3562
3563	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3564		return (err);
3565
3566	effective_value = psa->psa_effective_value;
3567
3568	/*
3569	 * If we are doing the preliminary check in open context, the
3570	 * space estimates may be inaccurate.
3571	 */
3572	if (!dmu_tx_is_syncing(tx))
3573		return (0);
3574
3575	mutex_enter(&ds->ds_lock);
3576	if (!DS_UNIQUE_IS_ACCURATE(ds))
3577		dsl_dataset_recalc_head_uniq(ds);
3578	unique = ds->ds_phys->ds_unique_bytes;
3579	mutex_exit(&ds->ds_lock);
3580
3581	if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3582		uint64_t delta = MAX(unique, effective_value) -
3583		    MAX(unique, ds->ds_reserved);
3584
3585		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3586			return (ENOSPC);
3587		if (ds->ds_quota > 0 &&
3588		    effective_value > ds->ds_quota)
3589			return (ENOSPC);
3590	}
3591
3592	return (0);
3593}
3594
3595static void
3596dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3597{
3598	dsl_dataset_t *ds = arg1;
3599	dsl_prop_setarg_t *psa = arg2;
3600	uint64_t effective_value = psa->psa_effective_value;
3601	uint64_t unique;
3602	int64_t delta;
3603
3604	dsl_prop_set_sync(ds, psa, tx);
3605	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3606
3607	dmu_buf_will_dirty(ds->ds_dbuf, tx);
3608
3609	mutex_enter(&ds->ds_dir->dd_lock);
3610	mutex_enter(&ds->ds_lock);
3611	ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3612	unique = ds->ds_phys->ds_unique_bytes;
3613	delta = MAX(0, (int64_t)(effective_value - unique)) -
3614	    MAX(0, (int64_t)(ds->ds_reserved - unique));
3615	ds->ds_reserved = effective_value;
3616	mutex_exit(&ds->ds_lock);
3617
3618	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3619	mutex_exit(&ds->ds_dir->dd_lock);
3620
3621	spa_history_log_internal(LOG_DS_REFRESERV,
3622	    ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu",
3623	    (longlong_t)effective_value, ds->ds_object);
3624}
3625
3626int
3627dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3628    uint64_t reservation)
3629{
3630	dsl_dataset_t *ds;
3631	dsl_prop_setarg_t psa;
3632	int err;
3633
3634	dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3635	    &reservation);
3636
3637	err = dsl_dataset_hold(dsname, FTAG, &ds);
3638	if (err)
3639		return (err);
3640
3641	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3642	    dsl_dataset_set_reservation_check,
3643	    dsl_dataset_set_reservation_sync, ds, &psa, 0);
3644
3645	dsl_dataset_rele(ds, FTAG);
3646	return (err);
3647}
3648
3649typedef struct zfs_hold_cleanup_arg {
3650	dsl_pool_t *dp;
3651	uint64_t dsobj;
3652	char htag[MAXNAMELEN];
3653} zfs_hold_cleanup_arg_t;
3654
3655static void
3656dsl_dataset_user_release_onexit(void *arg)
3657{
3658	zfs_hold_cleanup_arg_t *ca = arg;
3659
3660	(void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3661	    B_TRUE);
3662	kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3663}
3664
3665void
3666dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3667    minor_t minor)
3668{
3669	zfs_hold_cleanup_arg_t *ca;
3670
3671	ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3672	ca->dp = ds->ds_dir->dd_pool;
3673	ca->dsobj = ds->ds_object;
3674	(void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3675	VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3676	    dsl_dataset_user_release_onexit, ca, NULL));
3677}
3678
3679/*
3680 * If you add new checks here, you may need to add
3681 * additional checks to the "temporary" case in
3682 * snapshot_check() in dmu_objset.c.
3683 */
3684static int
3685dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3686{
3687	dsl_dataset_t *ds = arg1;
3688	struct dsl_ds_holdarg *ha = arg2;
3689	char *htag = ha->htag;
3690	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3691	int error = 0;
3692
3693	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3694		return (ENOTSUP);
3695
3696	if (!dsl_dataset_is_snapshot(ds))
3697		return (EINVAL);
3698
3699	/* tags must be unique */
3700	mutex_enter(&ds->ds_lock);
3701	if (ds->ds_phys->ds_userrefs_obj) {
3702		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3703		    8, 1, tx);
3704		if (error == 0)
3705			error = EEXIST;
3706		else if (error == ENOENT)
3707			error = 0;
3708	}
3709	mutex_exit(&ds->ds_lock);
3710
3711	if (error == 0 && ha->temphold &&
3712	    strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3713		error = E2BIG;
3714
3715	return (error);
3716}
3717
3718void
3719dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3720{
3721	dsl_dataset_t *ds = arg1;
3722	struct dsl_ds_holdarg *ha = arg2;
3723	char *htag = ha->htag;
3724	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3725	objset_t *mos = dp->dp_meta_objset;
3726	uint64_t now = gethrestime_sec();
3727	uint64_t zapobj;
3728
3729	mutex_enter(&ds->ds_lock);
3730	if (ds->ds_phys->ds_userrefs_obj == 0) {
3731		/*
3732		 * This is the first user hold for this dataset.  Create
3733		 * the userrefs zap object.
3734		 */
3735		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3736		zapobj = ds->ds_phys->ds_userrefs_obj =
3737		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3738	} else {
3739		zapobj = ds->ds_phys->ds_userrefs_obj;
3740	}
3741	ds->ds_userrefs++;
3742	mutex_exit(&ds->ds_lock);
3743
3744	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3745
3746	if (ha->temphold) {
3747		VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3748		    htag, &now, tx));
3749	}
3750
3751	spa_history_log_internal(LOG_DS_USER_HOLD,
3752	    dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3753	    (int)ha->temphold, ds->ds_object);
3754}
3755
3756static int
3757dsl_dataset_user_hold_one(const char *dsname, void *arg)
3758{
3759	struct dsl_ds_holdarg *ha = arg;
3760	dsl_dataset_t *ds;
3761	int error;
3762	char *name;
3763
3764	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
3765	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3766	error = dsl_dataset_hold(name, ha->dstg, &ds);
3767	strfree(name);
3768	if (error == 0) {
3769		ha->gotone = B_TRUE;
3770		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3771		    dsl_dataset_user_hold_sync, ds, ha, 0);
3772	} else if (error == ENOENT && ha->recursive) {
3773		error = 0;
3774	} else {
3775		(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3776	}
3777	return (error);
3778}
3779
3780int
3781dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3782    boolean_t temphold)
3783{
3784	struct dsl_ds_holdarg *ha;
3785	int error;
3786
3787	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3788	ha->htag = htag;
3789	ha->temphold = temphold;
3790	error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3791	    dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3792	    ds, ha, 0);
3793	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3794
3795	return (error);
3796}
3797
3798int
3799dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3800    boolean_t recursive, boolean_t temphold, int cleanup_fd)
3801{
3802	struct dsl_ds_holdarg *ha;
3803	dsl_sync_task_t *dst;
3804	spa_t *spa;
3805	int error;
3806	minor_t minor = 0;
3807
3808	if (cleanup_fd != -1) {
3809		/* Currently we only support cleanup-on-exit of tempholds. */
3810		if (!temphold)
3811			return (EINVAL);
3812		error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3813		if (error)
3814			return (error);
3815	}
3816
3817	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3818
3819	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3820
3821	error = spa_open(dsname, &spa, FTAG);
3822	if (error) {
3823		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3824		if (cleanup_fd != -1)
3825			zfs_onexit_fd_rele(cleanup_fd);
3826		return (error);
3827	}
3828
3829	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3830	ha->htag = htag;
3831	ha->snapname = snapname;
3832	ha->recursive = recursive;
3833	ha->temphold = temphold;
3834
3835	if (recursive) {
3836		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3837		    ha, DS_FIND_CHILDREN);
3838	} else {
3839		error = dsl_dataset_user_hold_one(dsname, ha);
3840	}
3841	if (error == 0)
3842		error = dsl_sync_task_group_wait(ha->dstg);
3843
3844	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3845	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3846		dsl_dataset_t *ds = dst->dst_arg1;
3847
3848		if (dst->dst_err) {
3849			dsl_dataset_name(ds, ha->failed);
3850			*strchr(ha->failed, '@') = '\0';
3851		} else if (error == 0 && minor != 0 && temphold) {
3852			/*
3853			 * If this hold is to be released upon process exit,
3854			 * register that action now.
3855			 */
3856			dsl_register_onexit_hold_cleanup(ds, htag, minor);
3857		}
3858		dsl_dataset_rele(ds, ha->dstg);
3859	}
3860
3861	if (error == 0 && recursive && !ha->gotone)
3862		error = ENOENT;
3863
3864	if (error)
3865		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3866
3867	dsl_sync_task_group_destroy(ha->dstg);
3868
3869	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3870	spa_close(spa, FTAG);
3871	if (cleanup_fd != -1)
3872		zfs_onexit_fd_rele(cleanup_fd);
3873	return (error);
3874}
3875
3876struct dsl_ds_releasearg {
3877	dsl_dataset_t *ds;
3878	const char *htag;
3879	boolean_t own;		/* do we own or just hold ds? */
3880};
3881
3882static int
3883dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3884    boolean_t *might_destroy)
3885{
3886	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3887	uint64_t zapobj;
3888	uint64_t tmp;
3889	int error;
3890
3891	*might_destroy = B_FALSE;
3892
3893	mutex_enter(&ds->ds_lock);
3894	zapobj = ds->ds_phys->ds_userrefs_obj;
3895	if (zapobj == 0) {
3896		/* The tag can't possibly exist */
3897		mutex_exit(&ds->ds_lock);
3898		return (ESRCH);
3899	}
3900
3901	/* Make sure the tag exists */
3902	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3903	if (error) {
3904		mutex_exit(&ds->ds_lock);
3905		if (error == ENOENT)
3906			error = ESRCH;
3907		return (error);
3908	}
3909
3910	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3911	    DS_IS_DEFER_DESTROY(ds))
3912		*might_destroy = B_TRUE;
3913
3914	mutex_exit(&ds->ds_lock);
3915	return (0);
3916}
3917
3918static int
3919dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3920{
3921	struct dsl_ds_releasearg *ra = arg1;
3922	dsl_dataset_t *ds = ra->ds;
3923	boolean_t might_destroy;
3924	int error;
3925
3926	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3927		return (ENOTSUP);
3928
3929	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3930	if (error)
3931		return (error);
3932
3933	if (might_destroy) {
3934		struct dsl_ds_destroyarg dsda = {0};
3935
3936		if (dmu_tx_is_syncing(tx)) {
3937			/*
3938			 * If we're not prepared to remove the snapshot,
3939			 * we can't allow the release to happen right now.
3940			 */
3941			if (!ra->own)
3942				return (EBUSY);
3943		}
3944		dsda.ds = ds;
3945		dsda.releasing = B_TRUE;
3946		return (dsl_dataset_destroy_check(&dsda, tag, tx));
3947	}
3948
3949	return (0);
3950}
3951
3952static void
3953dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3954{
3955	struct dsl_ds_releasearg *ra = arg1;
3956	dsl_dataset_t *ds = ra->ds;
3957	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3958	objset_t *mos = dp->dp_meta_objset;
3959	uint64_t zapobj;
3960	uint64_t dsobj = ds->ds_object;
3961	uint64_t refs;
3962	int error;
3963
3964	mutex_enter(&ds->ds_lock);
3965	ds->ds_userrefs--;
3966	refs = ds->ds_userrefs;
3967	mutex_exit(&ds->ds_lock);
3968	error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3969	VERIFY(error == 0 || error == ENOENT);
3970	zapobj = ds->ds_phys->ds_userrefs_obj;
3971	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3972
3973	spa_history_log_internal(LOG_DS_USER_RELEASE,
3974	    dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3975	    ra->htag, (longlong_t)refs, dsobj);
3976
3977	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3978	    DS_IS_DEFER_DESTROY(ds)) {
3979		struct dsl_ds_destroyarg dsda = {0};
3980
3981		ASSERT(ra->own);
3982		dsda.ds = ds;
3983		dsda.releasing = B_TRUE;
3984		/* We already did the destroy_check */
3985		dsl_dataset_destroy_sync(&dsda, tag, tx);
3986	}
3987}
3988
3989static int
3990dsl_dataset_user_release_one(const char *dsname, void *arg)
3991{
3992	struct dsl_ds_holdarg *ha = arg;
3993	struct dsl_ds_releasearg *ra;
3994	dsl_dataset_t *ds;
3995	int error;
3996	void *dtag = ha->dstg;
3997	char *name;
3998	boolean_t own = B_FALSE;
3999	boolean_t might_destroy;
4000
4001	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
4002	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4003	error = dsl_dataset_hold(name, dtag, &ds);
4004	strfree(name);
4005	if (error == ENOENT && ha->recursive)
4006		return (0);
4007	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4008	if (error)
4009		return (error);
4010
4011	ha->gotone = B_TRUE;
4012
4013	ASSERT(dsl_dataset_is_snapshot(ds));
4014
4015	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
4016	if (error) {
4017		dsl_dataset_rele(ds, dtag);
4018		return (error);
4019	}
4020
4021	if (might_destroy) {
4022#ifdef _KERNEL
4023		name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4024		error = zfs_unmount_snap(name, NULL);
4025		strfree(name);
4026		if (error) {
4027			dsl_dataset_rele(ds, dtag);
4028			return (error);
4029		}
4030#endif
4031		if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
4032			dsl_dataset_rele(ds, dtag);
4033			return (EBUSY);
4034		} else {
4035			own = B_TRUE;
4036			dsl_dataset_make_exclusive(ds, dtag);
4037		}
4038	}
4039
4040	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
4041	ra->ds = ds;
4042	ra->htag = ha->htag;
4043	ra->own = own;
4044	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
4045	    dsl_dataset_user_release_sync, ra, dtag, 0);
4046
4047	return (0);
4048}
4049
4050int
4051dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
4052    boolean_t recursive)
4053{
4054	struct dsl_ds_holdarg *ha;
4055	dsl_sync_task_t *dst;
4056	spa_t *spa;
4057	int error;
4058
4059top:
4060	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4061
4062	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4063
4064	error = spa_open(dsname, &spa, FTAG);
4065	if (error) {
4066		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4067		return (error);
4068	}
4069
4070	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4071	ha->htag = htag;
4072	ha->snapname = snapname;
4073	ha->recursive = recursive;
4074	if (recursive) {
4075		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4076		    ha, DS_FIND_CHILDREN);
4077	} else {
4078		error = dsl_dataset_user_release_one(dsname, ha);
4079	}
4080	if (error == 0)
4081		error = dsl_sync_task_group_wait(ha->dstg);
4082
4083	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4084	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4085		struct dsl_ds_releasearg *ra = dst->dst_arg1;
4086		dsl_dataset_t *ds = ra->ds;
4087
4088		if (dst->dst_err)
4089			dsl_dataset_name(ds, ha->failed);
4090
4091		if (ra->own)
4092			dsl_dataset_disown(ds, ha->dstg);
4093		else
4094			dsl_dataset_rele(ds, ha->dstg);
4095
4096		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4097	}
4098
4099	if (error == 0 && recursive && !ha->gotone)
4100		error = ENOENT;
4101
4102	if (error && error != EBUSY)
4103		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4104
4105	dsl_sync_task_group_destroy(ha->dstg);
4106	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4107	spa_close(spa, FTAG);
4108
4109	/*
4110	 * We can get EBUSY if we were racing with deferred destroy and
4111	 * dsl_dataset_user_release_check() hadn't done the necessary
4112	 * open context setup.  We can also get EBUSY if we're racing
4113	 * with destroy and that thread is the ds_owner.  Either way
4114	 * the busy condition should be transient, and we should retry
4115	 * the release operation.
4116	 */
4117	if (error == EBUSY)
4118		goto top;
4119
4120	return (error);
4121}
4122
4123/*
4124 * Called at spa_load time (with retry == B_FALSE) to release a stale
4125 * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4126 */
4127int
4128dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4129    boolean_t retry)
4130{
4131	dsl_dataset_t *ds;
4132	char *snap;
4133	char *name;
4134	int namelen;
4135	int error;
4136
4137	do {
4138		rw_enter(&dp->dp_config_rwlock, RW_READER);
4139		error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4140		rw_exit(&dp->dp_config_rwlock);
4141		if (error)
4142			return (error);
4143		namelen = dsl_dataset_namelen(ds)+1;
4144		name = kmem_alloc(namelen, KM_SLEEP);
4145		dsl_dataset_name(ds, name);
4146		dsl_dataset_rele(ds, FTAG);
4147
4148		snap = strchr(name, '@');
4149		*snap = '\0';
4150		++snap;
4151		error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4152		kmem_free(name, namelen);
4153
4154		/*
4155		 * The object can't have been destroyed because we have a hold,
4156		 * but it might have been renamed, resulting in ENOENT.  Retry
4157		 * if we've been requested to do so.
4158		 *
4159		 * It would be nice if we could use the dsobj all the way
4160		 * through and avoid ENOENT entirely.  But we might need to
4161		 * unmount the snapshot, and there's currently no way to lookup
4162		 * a vfsp using a ZFS object id.
4163		 */
4164	} while ((error == ENOENT) && retry);
4165
4166	return (error);
4167}
4168
4169int
4170dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4171{
4172	dsl_dataset_t *ds;
4173	int err;
4174
4175	err = dsl_dataset_hold(dsname, FTAG, &ds);
4176	if (err)
4177		return (err);
4178
4179	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4180	if (ds->ds_phys->ds_userrefs_obj != 0) {
4181		zap_attribute_t *za;
4182		zap_cursor_t zc;
4183
4184		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4185		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4186		    ds->ds_phys->ds_userrefs_obj);
4187		    zap_cursor_retrieve(&zc, za) == 0;
4188		    zap_cursor_advance(&zc)) {
4189			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4190			    za->za_first_integer));
4191		}
4192		zap_cursor_fini(&zc);
4193		kmem_free(za, sizeof (zap_attribute_t));
4194	}
4195	dsl_dataset_rele(ds, FTAG);
4196	return (0);
4197}
4198
4199/*
4200 * Note, this function is used as the callback for dmu_objset_find().  We
4201 * always return 0 so that we will continue to find and process
4202 * inconsistent datasets, even if we encounter an error trying to
4203 * process one of them.
4204 */
4205/* ARGSUSED */
4206int
4207dsl_destroy_inconsistent(const char *dsname, void *arg)
4208{
4209	dsl_dataset_t *ds;
4210
4211	if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4212		if (DS_IS_INCONSISTENT(ds))
4213			(void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4214		else
4215			dsl_dataset_disown(ds, FTAG);
4216	}
4217	return (0);
4218}
4219
4220/*
4221 * Return (in *usedp) the amount of space written in new that is not
4222 * present in oldsnap.  New may be a snapshot or the head.  Old must be
4223 * a snapshot before new, in new's filesystem (or its origin).  If not then
4224 * fail and return EINVAL.
4225 *
4226 * The written space is calculated by considering two components:  First, we
4227 * ignore any freed space, and calculate the written as new's used space
4228 * minus old's used space.  Next, we add in the amount of space that was freed
4229 * between the two snapshots, thus reducing new's used space relative to old's.
4230 * Specifically, this is the space that was born before old->ds_creation_txg,
4231 * and freed before new (ie. on new's deadlist or a previous deadlist).
4232 *
4233 * space freed                         [---------------------]
4234 * snapshots                       ---O-------O--------O-------O------
4235 *                                         oldsnap            new
4236 */
4237int
4238dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4239    uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4240{
4241	int err = 0;
4242	uint64_t snapobj;
4243	dsl_pool_t *dp = new->ds_dir->dd_pool;
4244
4245	*usedp = 0;
4246	*usedp += new->ds_phys->ds_referenced_bytes;
4247	*usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4248
4249	*compp = 0;
4250	*compp += new->ds_phys->ds_compressed_bytes;
4251	*compp -= oldsnap->ds_phys->ds_compressed_bytes;
4252
4253	*uncompp = 0;
4254	*uncompp += new->ds_phys->ds_uncompressed_bytes;
4255	*uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4256
4257	rw_enter(&dp->dp_config_rwlock, RW_READER);
4258	snapobj = new->ds_object;
4259	while (snapobj != oldsnap->ds_object) {
4260		dsl_dataset_t *snap;
4261		uint64_t used, comp, uncomp;
4262
4263		if (snapobj == new->ds_object) {
4264			snap = new;
4265		} else {
4266			err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4267			if (err != 0)
4268				break;
4269		}
4270
4271		if (snap->ds_phys->ds_prev_snap_txg ==
4272		    oldsnap->ds_phys->ds_creation_txg) {
4273			/*
4274			 * The blocks in the deadlist can not be born after
4275			 * ds_prev_snap_txg, so get the whole deadlist space,
4276			 * which is more efficient (especially for old-format
4277			 * deadlists).  Unfortunately the deadlist code
4278			 * doesn't have enough information to make this
4279			 * optimization itself.
4280			 */
4281			dsl_deadlist_space(&snap->ds_deadlist,
4282			    &used, &comp, &uncomp);
4283		} else {
4284			dsl_deadlist_space_range(&snap->ds_deadlist,
4285			    0, oldsnap->ds_phys->ds_creation_txg,
4286			    &used, &comp, &uncomp);
4287		}
4288		*usedp += used;
4289		*compp += comp;
4290		*uncompp += uncomp;
4291
4292		/*
4293		 * If we get to the beginning of the chain of snapshots
4294		 * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4295		 * was not a snapshot of/before new.
4296		 */
4297		snapobj = snap->ds_phys->ds_prev_snap_obj;
4298		if (snap != new)
4299			dsl_dataset_rele(snap, FTAG);
4300		if (snapobj == 0) {
4301			err = EINVAL;
4302			break;
4303		}
4304
4305	}
4306	rw_exit(&dp->dp_config_rwlock);
4307	return (err);
4308}
4309
4310/*
4311 * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4312 * lastsnap, and all snapshots in between are deleted.
4313 *
4314 * blocks that would be freed            [---------------------------]
4315 * snapshots                       ---O-------O--------O-------O--------O
4316 *                                        firstsnap        lastsnap
4317 *
4318 * This is the set of blocks that were born after the snap before firstsnap,
4319 * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4320 * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4321 * We calculate this by iterating over the relevant deadlists (from the snap
4322 * after lastsnap, backward to the snap after firstsnap), summing up the
4323 * space on the deadlist that was born after the snap before firstsnap.
4324 */
4325int
4326dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4327    dsl_dataset_t *lastsnap,
4328    uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4329{
4330	int err = 0;
4331	uint64_t snapobj;
4332	dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4333
4334	ASSERT(dsl_dataset_is_snapshot(firstsnap));
4335	ASSERT(dsl_dataset_is_snapshot(lastsnap));
4336
4337	/*
4338	 * Check that the snapshots are in the same dsl_dir, and firstsnap
4339	 * is before lastsnap.
4340	 */
4341	if (firstsnap->ds_dir != lastsnap->ds_dir ||
4342	    firstsnap->ds_phys->ds_creation_txg >
4343	    lastsnap->ds_phys->ds_creation_txg)
4344		return (EINVAL);
4345
4346	*usedp = *compp = *uncompp = 0;
4347
4348	rw_enter(&dp->dp_config_rwlock, RW_READER);
4349	snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4350	while (snapobj != firstsnap->ds_object) {
4351		dsl_dataset_t *ds;
4352		uint64_t used, comp, uncomp;
4353
4354		err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4355		if (err != 0)
4356			break;
4357
4358		dsl_deadlist_space_range(&ds->ds_deadlist,
4359		    firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4360		    &used, &comp, &uncomp);
4361		*usedp += used;
4362		*compp += comp;
4363		*uncompp += uncomp;
4364
4365		snapobj = ds->ds_phys->ds_prev_snap_obj;
4366		ASSERT3U(snapobj, !=, 0);
4367		dsl_dataset_rele(ds, FTAG);
4368	}
4369	rw_exit(&dp->dp_config_rwlock);
4370	return (err);
4371}
4372