dsl_dataset.c revision 247187
1240122Smarcel/*
2240122Smarcel * CDDL HEADER START
3240122Smarcel *
4240122Smarcel * The contents of this file are subject to the terms of the
5240122Smarcel * Common Development and Distribution License (the "License").
6240122Smarcel * You may not use this file except in compliance with the License.
7240122Smarcel *
8240122Smarcel * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9240122Smarcel * or http://www.opensolaris.org/os/licensing.
10240122Smarcel * See the License for the specific language governing permissions
11240122Smarcel * and limitations under the License.
12240122Smarcel *
13240122Smarcel * When distributing Covered Code, include this CDDL HEADER in each
14240122Smarcel * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15240122Smarcel * If applicable, add the following below this CDDL HEADER, with the
16240122Smarcel * fields enclosed by brackets "[]" replaced with your own identifying
17240122Smarcel * information: Portions Copyright [yyyy] [name of copyright owner]
18240122Smarcel *
19240122Smarcel * CDDL HEADER END
20240122Smarcel */
21240122Smarcel/*
22240122Smarcel * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23240122Smarcel * Copyright (c) 2012 by Delphix. All rights reserved.
24240122Smarcel * Copyright (c) 2012, Joyent, Inc. All rights reserved.
25240122Smarcel * Copyright (c) 2011 Pawel Jakub Dawidek <pawel@dawidek.net>.
26258289Sjmmv * All rights reserved.
27240122Smarcel * Portions Copyright (c) 2011 Martin Matuska <mm@FreeBSD.org>
28240122Smarcel */
29240122Smarcel
30240122Smarcel#include <sys/dmu_objset.h>
31240122Smarcel#include <sys/dsl_dataset.h>
32240122Smarcel#include <sys/dsl_dir.h>
33240122Smarcel#include <sys/dsl_prop.h>
34240122Smarcel#include <sys/dsl_synctask.h>
35240122Smarcel#include <sys/dmu_traverse.h>
36240122Smarcel#include <sys/dmu_impl.h>
37240122Smarcel#include <sys/dmu_tx.h>
38240122Smarcel#include <sys/arc.h>
39240122Smarcel#include <sys/zio.h>
40240122Smarcel#include <sys/zap.h>
41240122Smarcel#include <sys/zfeature.h>
42240122Smarcel#include <sys/unique.h>
43240122Smarcel#include <sys/zfs_context.h>
44240122Smarcel#include <sys/zfs_ioctl.h>
45240122Smarcel#include <sys/spa.h>
46240122Smarcel#include <sys/zfs_znode.h>
47240122Smarcel#include <sys/zfs_onexit.h>
48240122Smarcel#include <sys/zvol.h>
49240122Smarcel#include <sys/dsl_scan.h>
50240122Smarcel#include <sys/dsl_deadlist.h>
51240122Smarcel
52240122Smarcelstatic char *dsl_reaper = "the grim reaper";
53240122Smarcel
54240122Smarcelstatic dsl_checkfunc_t dsl_dataset_destroy_begin_check;
55240122Smarcelstatic dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
56240122Smarcelstatic dsl_syncfunc_t dsl_dataset_set_reservation_sync;
57240122Smarcel
58240122Smarcel#define	SWITCH64(x, y) \
59240122Smarcel	{ \
60240122Smarcel		uint64_t __tmp = (x); \
61240122Smarcel		(x) = (y); \
62240122Smarcel		(y) = __tmp; \
63240122Smarcel	}
64240122Smarcel
65240122Smarcel#define	DS_REF_MAX	(1ULL << 62)
66240122Smarcel
67240122Smarcel#define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
68240122Smarcel
69240122Smarcel#define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
70240122Smarcel
71240122Smarcel
72240122Smarcel/*
73240122Smarcel * Figure out how much of this delta should be propogated to the dsl_dir
74240122Smarcel * layer.  If there's a refreservation, that space has already been
75240122Smarcel * partially accounted for in our ancestors.
76240122Smarcel */
77240122Smarcelstatic int64_t
78240122Smarcelparent_delta(dsl_dataset_t *ds, int64_t delta)
79240122Smarcel{
80240122Smarcel	uint64_t old_bytes, new_bytes;
81240122Smarcel
82240122Smarcel	if (ds->ds_reserved == 0)
83240122Smarcel		return (delta);
84240122Smarcel
85240122Smarcel	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
86240122Smarcel	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
87240122Smarcel
88240122Smarcel	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
89240122Smarcel	return (new_bytes - old_bytes);
90240122Smarcel}
91240122Smarcel
92240122Smarcelvoid
93240122Smarceldsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
94240122Smarcel{
95240122Smarcel	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
96258289Sjmmv	int compressed = BP_GET_PSIZE(bp);
97240122Smarcel	int uncompressed = BP_GET_UCSIZE(bp);
98240122Smarcel	int64_t delta;
99240122Smarcel
100240122Smarcel	dprintf_bp(bp, "ds=%p", ds);
101240122Smarcel
102240122Smarcel	ASSERT(dmu_tx_is_syncing(tx));
103240122Smarcel	/* It could have been compressed away to nothing */
104240122Smarcel	if (BP_IS_HOLE(bp))
105258289Sjmmv		return;
106240122Smarcel	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
107240122Smarcel	ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
108240122Smarcel	if (ds == NULL) {
109240122Smarcel		dsl_pool_mos_diduse_space(tx->tx_pool,
110240122Smarcel		    used, compressed, uncompressed);
111258289Sjmmv		return;
112	}
113	dmu_buf_will_dirty(ds->ds_dbuf, tx);
114
115	mutex_enter(&ds->ds_dir->dd_lock);
116	mutex_enter(&ds->ds_lock);
117	delta = parent_delta(ds, used);
118	ds->ds_phys->ds_referenced_bytes += used;
119	ds->ds_phys->ds_compressed_bytes += compressed;
120	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
121	ds->ds_phys->ds_unique_bytes += used;
122	mutex_exit(&ds->ds_lock);
123	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
124	    compressed, uncompressed, tx);
125	dsl_dir_transfer_space(ds->ds_dir, used - delta,
126	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
127	mutex_exit(&ds->ds_dir->dd_lock);
128}
129
130int
131dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
132    boolean_t async)
133{
134	if (BP_IS_HOLE(bp))
135		return (0);
136
137	ASSERT(dmu_tx_is_syncing(tx));
138	ASSERT(bp->blk_birth <= tx->tx_txg);
139
140	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
141	int compressed = BP_GET_PSIZE(bp);
142	int uncompressed = BP_GET_UCSIZE(bp);
143
144	ASSERT(used > 0);
145	if (ds == NULL) {
146		dsl_free(tx->tx_pool, tx->tx_txg, bp);
147		dsl_pool_mos_diduse_space(tx->tx_pool,
148		    -used, -compressed, -uncompressed);
149		return (used);
150	}
151	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
152
153	ASSERT(!dsl_dataset_is_snapshot(ds));
154	dmu_buf_will_dirty(ds->ds_dbuf, tx);
155
156	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
157		int64_t delta;
158
159		dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
160		dsl_free(tx->tx_pool, tx->tx_txg, bp);
161
162		mutex_enter(&ds->ds_dir->dd_lock);
163		mutex_enter(&ds->ds_lock);
164		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
165		    !DS_UNIQUE_IS_ACCURATE(ds));
166		delta = parent_delta(ds, -used);
167		ds->ds_phys->ds_unique_bytes -= used;
168		mutex_exit(&ds->ds_lock);
169		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
170		    delta, -compressed, -uncompressed, tx);
171		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
172		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
173		mutex_exit(&ds->ds_dir->dd_lock);
174	} else {
175		dprintf_bp(bp, "putting on dead list: %s", "");
176		if (async) {
177			/*
178			 * We are here as part of zio's write done callback,
179			 * which means we're a zio interrupt thread.  We can't
180			 * call dsl_deadlist_insert() now because it may block
181			 * waiting for I/O.  Instead, put bp on the deferred
182			 * queue and let dsl_pool_sync() finish the job.
183			 */
184			bplist_append(&ds->ds_pending_deadlist, bp);
185		} else {
186			dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
187		}
188		ASSERT3U(ds->ds_prev->ds_object, ==,
189		    ds->ds_phys->ds_prev_snap_obj);
190		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
191		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
192		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
193		    ds->ds_object && bp->blk_birth >
194		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
195			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
196			mutex_enter(&ds->ds_prev->ds_lock);
197			ds->ds_prev->ds_phys->ds_unique_bytes += used;
198			mutex_exit(&ds->ds_prev->ds_lock);
199		}
200		if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
201			dsl_dir_transfer_space(ds->ds_dir, used,
202			    DD_USED_HEAD, DD_USED_SNAP, tx);
203		}
204	}
205	mutex_enter(&ds->ds_lock);
206	ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
207	ds->ds_phys->ds_referenced_bytes -= used;
208	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
209	ds->ds_phys->ds_compressed_bytes -= compressed;
210	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
211	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
212	mutex_exit(&ds->ds_lock);
213
214	return (used);
215}
216
217uint64_t
218dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
219{
220	uint64_t trysnap = 0;
221
222	if (ds == NULL)
223		return (0);
224	/*
225	 * The snapshot creation could fail, but that would cause an
226	 * incorrect FALSE return, which would only result in an
227	 * overestimation of the amount of space that an operation would
228	 * consume, which is OK.
229	 *
230	 * There's also a small window where we could miss a pending
231	 * snapshot, because we could set the sync task in the quiescing
232	 * phase.  So this should only be used as a guess.
233	 */
234	if (ds->ds_trysnap_txg >
235	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
236		trysnap = ds->ds_trysnap_txg;
237	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
238}
239
240boolean_t
241dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
242    uint64_t blk_birth)
243{
244	if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
245		return (B_FALSE);
246
247	ddt_prefetch(dsl_dataset_get_spa(ds), bp);
248
249	return (B_TRUE);
250}
251
252/* ARGSUSED */
253static void
254dsl_dataset_evict(dmu_buf_t *db, void *dsv)
255{
256	dsl_dataset_t *ds = dsv;
257
258	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
259
260	unique_remove(ds->ds_fsid_guid);
261
262	if (ds->ds_objset != NULL)
263		dmu_objset_evict(ds->ds_objset);
264
265	if (ds->ds_prev) {
266		dsl_dataset_drop_ref(ds->ds_prev, ds);
267		ds->ds_prev = NULL;
268	}
269
270	bplist_destroy(&ds->ds_pending_deadlist);
271	if (db != NULL) {
272		dsl_deadlist_close(&ds->ds_deadlist);
273	} else {
274		ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
275		ASSERT(!ds->ds_deadlist.dl_oldfmt);
276	}
277	if (ds->ds_dir)
278		dsl_dir_close(ds->ds_dir, ds);
279
280	ASSERT(!list_link_active(&ds->ds_synced_link));
281
282	if (mutex_owned(&ds->ds_lock))
283		mutex_exit(&ds->ds_lock);
284	mutex_destroy(&ds->ds_lock);
285	mutex_destroy(&ds->ds_recvlock);
286	if (mutex_owned(&ds->ds_opening_lock))
287		mutex_exit(&ds->ds_opening_lock);
288	mutex_destroy(&ds->ds_opening_lock);
289	rw_destroy(&ds->ds_rwlock);
290	cv_destroy(&ds->ds_exclusive_cv);
291
292	kmem_free(ds, sizeof (dsl_dataset_t));
293}
294
295static int
296dsl_dataset_get_snapname(dsl_dataset_t *ds)
297{
298	dsl_dataset_phys_t *headphys;
299	int err;
300	dmu_buf_t *headdbuf;
301	dsl_pool_t *dp = ds->ds_dir->dd_pool;
302	objset_t *mos = dp->dp_meta_objset;
303
304	if (ds->ds_snapname[0])
305		return (0);
306	if (ds->ds_phys->ds_next_snap_obj == 0)
307		return (0);
308
309	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
310	    FTAG, &headdbuf);
311	if (err)
312		return (err);
313	headphys = headdbuf->db_data;
314	err = zap_value_search(dp->dp_meta_objset,
315	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
316	dmu_buf_rele(headdbuf, FTAG);
317	return (err);
318}
319
320static int
321dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
322{
323	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
324	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
325	matchtype_t mt;
326	int err;
327
328	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
329		mt = MT_FIRST;
330	else
331		mt = MT_EXACT;
332
333	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
334	    value, mt, NULL, 0, NULL);
335	if (err == ENOTSUP && mt == MT_FIRST)
336		err = zap_lookup(mos, snapobj, name, 8, 1, value);
337	return (err);
338}
339
340static int
341dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
342{
343	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
344	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
345	matchtype_t mt;
346	int err;
347
348	dsl_dir_snap_cmtime_update(ds->ds_dir);
349
350	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
351		mt = MT_FIRST;
352	else
353		mt = MT_EXACT;
354
355	err = zap_remove_norm(mos, snapobj, name, mt, tx);
356	if (err == ENOTSUP && mt == MT_FIRST)
357		err = zap_remove(mos, snapobj, name, tx);
358	return (err);
359}
360
361static int
362dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
363    dsl_dataset_t **dsp)
364{
365	objset_t *mos = dp->dp_meta_objset;
366	dmu_buf_t *dbuf;
367	dsl_dataset_t *ds;
368	int err;
369	dmu_object_info_t doi;
370
371	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
372	    dsl_pool_sync_context(dp));
373
374	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
375	if (err)
376		return (err);
377
378	/* Make sure dsobj has the correct object type. */
379	dmu_object_info_from_db(dbuf, &doi);
380	if (doi.doi_type != DMU_OT_DSL_DATASET)
381		return (EINVAL);
382
383	ds = dmu_buf_get_user(dbuf);
384	if (ds == NULL) {
385		dsl_dataset_t *winner = NULL;
386
387		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
388		ds->ds_dbuf = dbuf;
389		ds->ds_object = dsobj;
390		ds->ds_phys = dbuf->db_data;
391
392		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
393		mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
394		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
395		mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
396
397		rw_init(&ds->ds_rwlock, 0, 0, 0);
398		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
399
400		bplist_create(&ds->ds_pending_deadlist);
401		dsl_deadlist_open(&ds->ds_deadlist,
402		    mos, ds->ds_phys->ds_deadlist_obj);
403
404		list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
405		    offsetof(dmu_sendarg_t, dsa_link));
406
407		if (err == 0) {
408			err = dsl_dir_open_obj(dp,
409			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
410		}
411		if (err) {
412			mutex_destroy(&ds->ds_lock);
413			mutex_destroy(&ds->ds_recvlock);
414			mutex_destroy(&ds->ds_opening_lock);
415			rw_destroy(&ds->ds_rwlock);
416			cv_destroy(&ds->ds_exclusive_cv);
417			bplist_destroy(&ds->ds_pending_deadlist);
418			dsl_deadlist_close(&ds->ds_deadlist);
419			kmem_free(ds, sizeof (dsl_dataset_t));
420			dmu_buf_rele(dbuf, tag);
421			return (err);
422		}
423
424		if (!dsl_dataset_is_snapshot(ds)) {
425			ds->ds_snapname[0] = '\0';
426			if (ds->ds_phys->ds_prev_snap_obj) {
427				err = dsl_dataset_get_ref(dp,
428				    ds->ds_phys->ds_prev_snap_obj,
429				    ds, &ds->ds_prev);
430			}
431		} else {
432			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
433				err = dsl_dataset_get_snapname(ds);
434			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
435				err = zap_count(
436				    ds->ds_dir->dd_pool->dp_meta_objset,
437				    ds->ds_phys->ds_userrefs_obj,
438				    &ds->ds_userrefs);
439			}
440		}
441
442		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
443			/*
444			 * In sync context, we're called with either no lock
445			 * or with the write lock.  If we're not syncing,
446			 * we're always called with the read lock held.
447			 */
448			boolean_t need_lock =
449			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
450			    dsl_pool_sync_context(dp);
451
452			if (need_lock)
453				rw_enter(&dp->dp_config_rwlock, RW_READER);
454
455			err = dsl_prop_get_ds(ds,
456			    "refreservation", sizeof (uint64_t), 1,
457			    &ds->ds_reserved, NULL);
458			if (err == 0) {
459				err = dsl_prop_get_ds(ds,
460				    "refquota", sizeof (uint64_t), 1,
461				    &ds->ds_quota, NULL);
462			}
463
464			if (need_lock)
465				rw_exit(&dp->dp_config_rwlock);
466		} else {
467			ds->ds_reserved = ds->ds_quota = 0;
468		}
469
470		if (err != 0 || (winner = dmu_buf_set_user_ie(dbuf, ds,
471		    &ds->ds_phys, dsl_dataset_evict)) != NULL) {
472			bplist_destroy(&ds->ds_pending_deadlist);
473			dsl_deadlist_close(&ds->ds_deadlist);
474			if (ds->ds_prev)
475				dsl_dataset_drop_ref(ds->ds_prev, ds);
476			dsl_dir_close(ds->ds_dir, ds);
477			mutex_destroy(&ds->ds_lock);
478			mutex_destroy(&ds->ds_recvlock);
479			mutex_destroy(&ds->ds_opening_lock);
480			rw_destroy(&ds->ds_rwlock);
481			cv_destroy(&ds->ds_exclusive_cv);
482			kmem_free(ds, sizeof (dsl_dataset_t));
483			if (err) {
484				dmu_buf_rele(dbuf, tag);
485				return (err);
486			}
487			ds = winner;
488		} else {
489			ds->ds_fsid_guid =
490			    unique_insert(ds->ds_phys->ds_fsid_guid);
491		}
492	}
493	ASSERT3P(ds->ds_dbuf, ==, dbuf);
494	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
495	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
496	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
497	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
498	mutex_enter(&ds->ds_lock);
499	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
500		mutex_exit(&ds->ds_lock);
501		dmu_buf_rele(ds->ds_dbuf, tag);
502		return (ENOENT);
503	}
504	mutex_exit(&ds->ds_lock);
505	*dsp = ds;
506	return (0);
507}
508
509static int
510dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
511{
512	dsl_pool_t *dp = ds->ds_dir->dd_pool;
513
514	/*
515	 * In syncing context we don't want the rwlock lock: there
516	 * may be an existing writer waiting for sync phase to
517	 * finish.  We don't need to worry about such writers, since
518	 * sync phase is single-threaded, so the writer can't be
519	 * doing anything while we are active.
520	 */
521	if (dsl_pool_sync_context(dp)) {
522		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
523		return (0);
524	}
525
526	/*
527	 * Normal users will hold the ds_rwlock as a READER until they
528	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
529	 * drop their READER lock after they set the ds_owner field.
530	 *
531	 * If the dataset is being destroyed, the destroy thread will
532	 * obtain a WRITER lock for exclusive access after it's done its
533	 * open-context work and then change the ds_owner to
534	 * dsl_reaper once destruction is assured.  So threads
535	 * may block here temporarily, until the "destructability" of
536	 * the dataset is determined.
537	 */
538	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
539	mutex_enter(&ds->ds_lock);
540	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
541		rw_exit(&dp->dp_config_rwlock);
542		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
543		if (DSL_DATASET_IS_DESTROYED(ds)) {
544			mutex_exit(&ds->ds_lock);
545			dsl_dataset_drop_ref(ds, tag);
546			rw_enter(&dp->dp_config_rwlock, RW_READER);
547			return (ENOENT);
548		}
549		/*
550		 * The dp_config_rwlock lives above the ds_lock. And
551		 * we need to check DSL_DATASET_IS_DESTROYED() while
552		 * holding the ds_lock, so we have to drop and reacquire
553		 * the ds_lock here.
554		 */
555		mutex_exit(&ds->ds_lock);
556		rw_enter(&dp->dp_config_rwlock, RW_READER);
557		mutex_enter(&ds->ds_lock);
558	}
559	mutex_exit(&ds->ds_lock);
560	return (0);
561}
562
563int
564dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
565    dsl_dataset_t **dsp)
566{
567	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
568
569	if (err)
570		return (err);
571	return (dsl_dataset_hold_ref(*dsp, tag));
572}
573
574int
575dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
576    void *tag, dsl_dataset_t **dsp)
577{
578	int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
579	if (err)
580		return (err);
581	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
582		dsl_dataset_rele(*dsp, tag);
583		*dsp = NULL;
584		return (EBUSY);
585	}
586	return (0);
587}
588
589int
590dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
591{
592	dsl_dir_t *dd;
593	dsl_pool_t *dp;
594	const char *snapname;
595	uint64_t obj;
596	int err = 0;
597
598	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
599	if (err)
600		return (err);
601
602	dp = dd->dd_pool;
603	obj = dd->dd_phys->dd_head_dataset_obj;
604	rw_enter(&dp->dp_config_rwlock, RW_READER);
605	if (obj)
606		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
607	else
608		err = ENOENT;
609	if (err)
610		goto out;
611
612	err = dsl_dataset_hold_ref(*dsp, tag);
613
614	/* we may be looking for a snapshot */
615	if (err == 0 && snapname != NULL) {
616		dsl_dataset_t *ds = NULL;
617
618		if (*snapname++ != '@') {
619			dsl_dataset_rele(*dsp, tag);
620			err = ENOENT;
621			goto out;
622		}
623
624		dprintf("looking for snapshot '%s'\n", snapname);
625		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
626		if (err == 0)
627			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
628		dsl_dataset_rele(*dsp, tag);
629
630		ASSERT3U((err == 0), ==, (ds != NULL));
631
632		if (ds) {
633			mutex_enter(&ds->ds_lock);
634			if (ds->ds_snapname[0] == 0)
635				(void) strlcpy(ds->ds_snapname, snapname,
636				    sizeof (ds->ds_snapname));
637			mutex_exit(&ds->ds_lock);
638			err = dsl_dataset_hold_ref(ds, tag);
639			*dsp = err ? NULL : ds;
640		}
641	}
642out:
643	rw_exit(&dp->dp_config_rwlock);
644	dsl_dir_close(dd, FTAG);
645	return (err);
646}
647
648int
649dsl_dataset_own(const char *name, boolean_t inconsistentok,
650    void *tag, dsl_dataset_t **dsp)
651{
652	int err = dsl_dataset_hold(name, tag, dsp);
653	if (err)
654		return (err);
655	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
656		dsl_dataset_rele(*dsp, tag);
657		return (EBUSY);
658	}
659	return (0);
660}
661
662void
663dsl_dataset_name(dsl_dataset_t *ds, char *name)
664{
665	if (ds == NULL) {
666		(void) strcpy(name, "mos");
667	} else {
668		dsl_dir_name(ds->ds_dir, name);
669		VERIFY(0 == dsl_dataset_get_snapname(ds));
670		if (ds->ds_snapname[0]) {
671			(void) strcat(name, "@");
672			/*
673			 * We use a "recursive" mutex so that we
674			 * can call dprintf_ds() with ds_lock held.
675			 */
676			if (!MUTEX_HELD(&ds->ds_lock)) {
677				mutex_enter(&ds->ds_lock);
678				(void) strcat(name, ds->ds_snapname);
679				mutex_exit(&ds->ds_lock);
680			} else {
681				(void) strcat(name, ds->ds_snapname);
682			}
683		}
684	}
685}
686
687static int
688dsl_dataset_namelen(dsl_dataset_t *ds)
689{
690	int result;
691
692	if (ds == NULL) {
693		result = 3;	/* "mos" */
694	} else {
695		result = dsl_dir_namelen(ds->ds_dir);
696		VERIFY(0 == dsl_dataset_get_snapname(ds));
697		if (ds->ds_snapname[0]) {
698			++result;	/* adding one for the @-sign */
699			if (!MUTEX_HELD(&ds->ds_lock)) {
700				mutex_enter(&ds->ds_lock);
701				result += strlen(ds->ds_snapname);
702				mutex_exit(&ds->ds_lock);
703			} else {
704				result += strlen(ds->ds_snapname);
705			}
706		}
707	}
708
709	return (result);
710}
711
712void
713dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
714{
715	dmu_buf_rele(ds->ds_dbuf, tag);
716}
717
718void
719dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
720{
721	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
722		rw_exit(&ds->ds_rwlock);
723	}
724	dsl_dataset_drop_ref(ds, tag);
725}
726
727void
728dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
729{
730	ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
731	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
732
733	mutex_enter(&ds->ds_lock);
734	ds->ds_owner = NULL;
735	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
736		rw_exit(&ds->ds_rwlock);
737		cv_broadcast(&ds->ds_exclusive_cv);
738	}
739	mutex_exit(&ds->ds_lock);
740	if (ds->ds_dbuf)
741		dsl_dataset_drop_ref(ds, tag);
742	else
743		dsl_dataset_evict(NULL, ds);
744}
745
746boolean_t
747dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
748{
749	boolean_t gotit = FALSE;
750
751	mutex_enter(&ds->ds_lock);
752	if (ds->ds_owner == NULL &&
753	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
754		ds->ds_owner = tag;
755		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
756			rw_exit(&ds->ds_rwlock);
757		gotit = TRUE;
758	}
759	mutex_exit(&ds->ds_lock);
760	return (gotit);
761}
762
763void
764dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
765{
766	ASSERT3P(owner, ==, ds->ds_owner);
767	if (!RW_WRITE_HELD(&ds->ds_rwlock))
768		rw_enter(&ds->ds_rwlock, RW_WRITER);
769}
770
771uint64_t
772dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
773    uint64_t flags, dmu_tx_t *tx)
774{
775	dsl_pool_t *dp = dd->dd_pool;
776	dmu_buf_t *dbuf;
777	dsl_dataset_phys_t *dsphys;
778	uint64_t dsobj;
779	objset_t *mos = dp->dp_meta_objset;
780
781	if (origin == NULL)
782		origin = dp->dp_origin_snap;
783
784	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
785	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
786	ASSERT(dmu_tx_is_syncing(tx));
787	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
788
789	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
790	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
791	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
792	dmu_buf_will_dirty(dbuf, tx);
793	dsphys = dbuf->db_data;
794	bzero(dsphys, sizeof (dsl_dataset_phys_t));
795	dsphys->ds_dir_obj = dd->dd_object;
796	dsphys->ds_flags = flags;
797	dsphys->ds_fsid_guid = unique_create();
798	do {
799		(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
800		    sizeof (dsphys->ds_guid));
801	} while (dsphys->ds_guid == 0);
802	dsphys->ds_snapnames_zapobj =
803	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
804	    DMU_OT_NONE, 0, tx);
805	dsphys->ds_creation_time = gethrestime_sec();
806	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
807
808	if (origin == NULL) {
809		dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
810	} else {
811		dsl_dataset_t *ohds;
812
813		dsphys->ds_prev_snap_obj = origin->ds_object;
814		dsphys->ds_prev_snap_txg =
815		    origin->ds_phys->ds_creation_txg;
816		dsphys->ds_referenced_bytes =
817		    origin->ds_phys->ds_referenced_bytes;
818		dsphys->ds_compressed_bytes =
819		    origin->ds_phys->ds_compressed_bytes;
820		dsphys->ds_uncompressed_bytes =
821		    origin->ds_phys->ds_uncompressed_bytes;
822		dsphys->ds_bp = origin->ds_phys->ds_bp;
823		dsphys->ds_flags |= origin->ds_phys->ds_flags;
824
825		dmu_buf_will_dirty(origin->ds_dbuf, tx);
826		origin->ds_phys->ds_num_children++;
827
828		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
829		    origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
830		dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
831		    dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
832		dsl_dataset_rele(ohds, FTAG);
833
834		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
835			if (origin->ds_phys->ds_next_clones_obj == 0) {
836				origin->ds_phys->ds_next_clones_obj =
837				    zap_create(mos,
838				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
839			}
840			VERIFY(0 == zap_add_int(mos,
841			    origin->ds_phys->ds_next_clones_obj,
842			    dsobj, tx));
843		}
844
845		dmu_buf_will_dirty(dd->dd_dbuf, tx);
846		dd->dd_phys->dd_origin_obj = origin->ds_object;
847		if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
848			if (origin->ds_dir->dd_phys->dd_clones == 0) {
849				dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
850				origin->ds_dir->dd_phys->dd_clones =
851				    zap_create(mos,
852				    DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
853			}
854			VERIFY3U(0, ==, zap_add_int(mos,
855			    origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
856		}
857	}
858
859	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
860		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
861
862	dmu_buf_rele(dbuf, FTAG);
863
864	dmu_buf_will_dirty(dd->dd_dbuf, tx);
865	dd->dd_phys->dd_head_dataset_obj = dsobj;
866
867	return (dsobj);
868}
869
870uint64_t
871dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
872    dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
873{
874	dsl_pool_t *dp = pdd->dd_pool;
875	uint64_t dsobj, ddobj;
876	dsl_dir_t *dd;
877
878	ASSERT(lastname[0] != '@');
879
880	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
881	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
882
883	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
884
885	dsl_deleg_set_create_perms(dd, tx, cr);
886
887	dsl_dir_close(dd, FTAG);
888
889	/*
890	 * If we are creating a clone, make sure we zero out any stale
891	 * data from the origin snapshots zil header.
892	 */
893	if (origin != NULL) {
894		dsl_dataset_t *ds;
895		objset_t *os;
896
897		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
898		VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
899		bzero(&os->os_zil_header, sizeof (os->os_zil_header));
900		dsl_dataset_dirty(ds, tx);
901		dsl_dataset_rele(ds, FTAG);
902	}
903
904	return (dsobj);
905}
906
907#ifdef __FreeBSD__
908/* FreeBSD ioctl compat begin */
909struct destroyarg {
910	nvlist_t *nvl;
911	const char *snapname;
912};
913
914static int
915dsl_check_snap_cb(const char *name, void *arg)
916{
917	struct destroyarg *da = arg;
918	dsl_dataset_t *ds;
919	char *dsname;
920
921	dsname = kmem_asprintf("%s@%s", name, da->snapname);
922	VERIFY(nvlist_add_boolean(da->nvl, dsname) == 0);
923
924	return (0);
925}
926
927int
928dmu_get_recursive_snaps_nvl(const char *fsname, const char *snapname,
929    nvlist_t *snaps)
930{
931	struct destroyarg *da;
932	int err;
933
934	da = kmem_zalloc(sizeof (struct destroyarg), KM_SLEEP);
935	da->nvl = snaps;
936	da->snapname = snapname;
937	err = dmu_objset_find(fsname, dsl_check_snap_cb, da,
938	    DS_FIND_CHILDREN);
939	kmem_free(da, sizeof (struct destroyarg));
940
941	return (err);
942}
943/* FreeBSD ioctl compat end */
944#endif /* __FreeBSD__ */
945
946/*
947 * The snapshots must all be in the same pool.
948 */
949int
950dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer, char *failed)
951{
952	int err;
953	dsl_sync_task_t *dst;
954	spa_t *spa;
955	nvpair_t *pair;
956	dsl_sync_task_group_t *dstg;
957
958	pair = nvlist_next_nvpair(snaps, NULL);
959	if (pair == NULL)
960		return (0);
961
962	err = spa_open(nvpair_name(pair), &spa, FTAG);
963	if (err)
964		return (err);
965	dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
966
967	for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
968	    pair = nvlist_next_nvpair(snaps, pair)) {
969		dsl_dataset_t *ds;
970
971		err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
972		if (err == 0) {
973			struct dsl_ds_destroyarg *dsda;
974
975			dsl_dataset_make_exclusive(ds, dstg);
976			dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
977			    KM_SLEEP);
978			dsda->ds = ds;
979			dsda->defer = defer;
980			dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
981			    dsl_dataset_destroy_sync, dsda, dstg, 0);
982		} else if (err == ENOENT) {
983			err = 0;
984		} else {
985			(void) strcpy(failed, nvpair_name(pair));
986			break;
987		}
988	}
989
990	if (err == 0)
991		err = dsl_sync_task_group_wait(dstg);
992
993	for (dst = list_head(&dstg->dstg_tasks); dst;
994	    dst = list_next(&dstg->dstg_tasks, dst)) {
995		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
996		dsl_dataset_t *ds = dsda->ds;
997
998		/*
999		 * Return the file system name that triggered the error
1000		 */
1001		if (dst->dst_err) {
1002			dsl_dataset_name(ds, failed);
1003		}
1004		ASSERT3P(dsda->rm_origin, ==, NULL);
1005		dsl_dataset_disown(ds, dstg);
1006		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
1007	}
1008
1009	dsl_sync_task_group_destroy(dstg);
1010	spa_close(spa, FTAG);
1011	return (err);
1012
1013}
1014
1015static boolean_t
1016dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
1017{
1018	boolean_t might_destroy = B_FALSE;
1019
1020	mutex_enter(&ds->ds_lock);
1021	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
1022	    DS_IS_DEFER_DESTROY(ds))
1023		might_destroy = B_TRUE;
1024	mutex_exit(&ds->ds_lock);
1025
1026	return (might_destroy);
1027}
1028
1029/*
1030 * If we're removing a clone, and these three conditions are true:
1031 *	1) the clone's origin has no other children
1032 *	2) the clone's origin has no user references
1033 *	3) the clone's origin has been marked for deferred destruction
1034 * Then, prepare to remove the origin as part of this sync task group.
1035 */
1036static int
1037dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1038{
1039	dsl_dataset_t *ds = dsda->ds;
1040	dsl_dataset_t *origin = ds->ds_prev;
1041
1042	if (dsl_dataset_might_destroy_origin(origin)) {
1043		char *name;
1044		int namelen;
1045		int error;
1046
1047		namelen = dsl_dataset_namelen(origin) + 1;
1048		name = kmem_alloc(namelen, KM_SLEEP);
1049		dsl_dataset_name(origin, name);
1050#ifdef _KERNEL
1051		error = zfs_unmount_snap(name, NULL);
1052		if (error) {
1053			kmem_free(name, namelen);
1054			return (error);
1055		}
1056#endif
1057		error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1058		kmem_free(name, namelen);
1059		if (error)
1060			return (error);
1061		dsda->rm_origin = origin;
1062		dsl_dataset_make_exclusive(origin, tag);
1063	}
1064
1065	return (0);
1066}
1067
1068/*
1069 * ds must be opened as OWNER.  On return (whether successful or not),
1070 * ds will be closed and caller can no longer dereference it.
1071 */
1072int
1073dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1074{
1075	int err;
1076	dsl_sync_task_group_t *dstg;
1077	objset_t *os;
1078	dsl_dir_t *dd;
1079	uint64_t obj;
1080	struct dsl_ds_destroyarg dsda = { 0 };
1081	dsl_dataset_t dummy_ds = { 0 };
1082
1083	dsda.ds = ds;
1084
1085	if (dsl_dataset_is_snapshot(ds)) {
1086		/* Destroying a snapshot is simpler */
1087		dsl_dataset_make_exclusive(ds, tag);
1088
1089		dsda.defer = defer;
1090		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1091		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1092		    &dsda, tag, 0);
1093		ASSERT3P(dsda.rm_origin, ==, NULL);
1094		goto out;
1095	} else if (defer) {
1096		err = EINVAL;
1097		goto out;
1098	}
1099
1100	dd = ds->ds_dir;
1101	dummy_ds.ds_dir = dd;
1102	dummy_ds.ds_object = ds->ds_object;
1103
1104	if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1105	    &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1106		/*
1107		 * Check for errors and mark this ds as inconsistent, in
1108		 * case we crash while freeing the objects.
1109		 */
1110		err = dsl_sync_task_do(dd->dd_pool,
1111		    dsl_dataset_destroy_begin_check,
1112		    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1113		if (err)
1114			goto out;
1115
1116		err = dmu_objset_from_ds(ds, &os);
1117		if (err)
1118			goto out;
1119
1120		/*
1121		 * Remove all objects while in the open context so that
1122		 * there is less work to do in the syncing context.
1123		 */
1124		for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1125		    ds->ds_phys->ds_prev_snap_txg)) {
1126			/*
1127			 * Ignore errors, if there is not enough disk space
1128			 * we will deal with it in dsl_dataset_destroy_sync().
1129			 */
1130			(void) dmu_free_object(os, obj);
1131		}
1132		if (err != ESRCH)
1133			goto out;
1134
1135		/*
1136		 * Sync out all in-flight IO.
1137		 */
1138		txg_wait_synced(dd->dd_pool, 0);
1139
1140		/*
1141		 * If we managed to free all the objects in open
1142		 * context, the user space accounting should be zero.
1143		 */
1144		if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1145		    dmu_objset_userused_enabled(os)) {
1146			uint64_t count;
1147
1148			ASSERT(zap_count(os, DMU_USERUSED_OBJECT,
1149			    &count) != 0 || count == 0);
1150			ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT,
1151			    &count) != 0 || count == 0);
1152		}
1153	}
1154
1155	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1156	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1157	rw_exit(&dd->dd_pool->dp_config_rwlock);
1158
1159	if (err)
1160		goto out;
1161
1162	/*
1163	 * Blow away the dsl_dir + head dataset.
1164	 */
1165	dsl_dataset_make_exclusive(ds, tag);
1166	/*
1167	 * If we're removing a clone, we might also need to remove its
1168	 * origin.
1169	 */
1170	do {
1171		dsda.need_prep = B_FALSE;
1172		if (dsl_dir_is_clone(dd)) {
1173			err = dsl_dataset_origin_rm_prep(&dsda, tag);
1174			if (err) {
1175				dsl_dir_close(dd, FTAG);
1176				goto out;
1177			}
1178		}
1179
1180		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1181		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1182		    dsl_dataset_destroy_sync, &dsda, tag, 0);
1183		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1184		    dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1185		err = dsl_sync_task_group_wait(dstg);
1186		dsl_sync_task_group_destroy(dstg);
1187
1188		/*
1189		 * We could be racing against 'zfs release' or 'zfs destroy -d'
1190		 * on the origin snap, in which case we can get EBUSY if we
1191		 * needed to destroy the origin snap but were not ready to
1192		 * do so.
1193		 */
1194		if (dsda.need_prep) {
1195			ASSERT(err == EBUSY);
1196			ASSERT(dsl_dir_is_clone(dd));
1197			ASSERT(dsda.rm_origin == NULL);
1198		}
1199	} while (dsda.need_prep);
1200
1201	if (dsda.rm_origin != NULL)
1202		dsl_dataset_disown(dsda.rm_origin, tag);
1203
1204	/* if it is successful, dsl_dir_destroy_sync will close the dd */
1205	if (err)
1206		dsl_dir_close(dd, FTAG);
1207out:
1208	dsl_dataset_disown(ds, tag);
1209	return (err);
1210}
1211
1212blkptr_t *
1213dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1214{
1215	return (&ds->ds_phys->ds_bp);
1216}
1217
1218void
1219dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1220{
1221	ASSERT(dmu_tx_is_syncing(tx));
1222	/* If it's the meta-objset, set dp_meta_rootbp */
1223	if (ds == NULL) {
1224		tx->tx_pool->dp_meta_rootbp = *bp;
1225	} else {
1226		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1227		ds->ds_phys->ds_bp = *bp;
1228	}
1229}
1230
1231spa_t *
1232dsl_dataset_get_spa(dsl_dataset_t *ds)
1233{
1234	return (ds->ds_dir->dd_pool->dp_spa);
1235}
1236
1237void
1238dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1239{
1240	dsl_pool_t *dp;
1241
1242	if (ds == NULL) /* this is the meta-objset */
1243		return;
1244
1245	ASSERT(ds->ds_objset != NULL);
1246
1247	if (ds->ds_phys->ds_next_snap_obj != 0)
1248		panic("dirtying snapshot!");
1249
1250	dp = ds->ds_dir->dd_pool;
1251
1252	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1253		/* up the hold count until we can be written out */
1254		dmu_buf_add_ref(ds->ds_dbuf, ds);
1255	}
1256}
1257
1258boolean_t
1259dsl_dataset_is_dirty(dsl_dataset_t *ds)
1260{
1261	for (int t = 0; t < TXG_SIZE; t++) {
1262		if (txg_list_member(&ds->ds_dir->dd_pool->dp_dirty_datasets,
1263		    ds, t))
1264			return (B_TRUE);
1265	}
1266	return (B_FALSE);
1267}
1268
1269/*
1270 * The unique space in the head dataset can be calculated by subtracting
1271 * the space used in the most recent snapshot, that is still being used
1272 * in this file system, from the space currently in use.  To figure out
1273 * the space in the most recent snapshot still in use, we need to take
1274 * the total space used in the snapshot and subtract out the space that
1275 * has been freed up since the snapshot was taken.
1276 */
1277static void
1278dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1279{
1280	uint64_t mrs_used;
1281	uint64_t dlused, dlcomp, dluncomp;
1282
1283	ASSERT(!dsl_dataset_is_snapshot(ds));
1284
1285	if (ds->ds_phys->ds_prev_snap_obj != 0)
1286		mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1287	else
1288		mrs_used = 0;
1289
1290	dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1291
1292	ASSERT3U(dlused, <=, mrs_used);
1293	ds->ds_phys->ds_unique_bytes =
1294	    ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1295
1296	if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1297	    SPA_VERSION_UNIQUE_ACCURATE)
1298		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1299}
1300
1301struct killarg {
1302	dsl_dataset_t *ds;
1303	dmu_tx_t *tx;
1304};
1305
1306/* ARGSUSED */
1307static int
1308kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1309    const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1310{
1311	struct killarg *ka = arg;
1312	dmu_tx_t *tx = ka->tx;
1313
1314	if (bp == NULL)
1315		return (0);
1316
1317	if (zb->zb_level == ZB_ZIL_LEVEL) {
1318		ASSERT(zilog != NULL);
1319		/*
1320		 * It's a block in the intent log.  It has no
1321		 * accounting, so just free it.
1322		 */
1323		dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1324	} else {
1325		ASSERT(zilog == NULL);
1326		ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1327		(void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1328	}
1329
1330	return (0);
1331}
1332
1333/* ARGSUSED */
1334static int
1335dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1336{
1337	dsl_dataset_t *ds = arg1;
1338	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1339	uint64_t count;
1340	int err;
1341
1342	/*
1343	 * Can't delete a head dataset if there are snapshots of it.
1344	 * (Except if the only snapshots are from the branch we cloned
1345	 * from.)
1346	 */
1347	if (ds->ds_prev != NULL &&
1348	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1349		return (EBUSY);
1350
1351	/*
1352	 * This is really a dsl_dir thing, but check it here so that
1353	 * we'll be less likely to leave this dataset inconsistent &
1354	 * nearly destroyed.
1355	 */
1356	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1357	if (err)
1358		return (err);
1359	if (count != 0)
1360		return (EEXIST);
1361
1362	return (0);
1363}
1364
1365/* ARGSUSED */
1366static void
1367dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1368{
1369	dsl_dataset_t *ds = arg1;
1370	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1371
1372	/* Mark it as inconsistent on-disk, in case we crash */
1373	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1374	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1375
1376	spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1377	    "dataset = %llu", ds->ds_object);
1378}
1379
1380static int
1381dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1382    dmu_tx_t *tx)
1383{
1384	dsl_dataset_t *ds = dsda->ds;
1385	dsl_dataset_t *ds_prev = ds->ds_prev;
1386
1387	if (dsl_dataset_might_destroy_origin(ds_prev)) {
1388		struct dsl_ds_destroyarg ndsda = {0};
1389
1390		/*
1391		 * If we're not prepared to remove the origin, don't remove
1392		 * the clone either.
1393		 */
1394		if (dsda->rm_origin == NULL) {
1395			dsda->need_prep = B_TRUE;
1396			return (EBUSY);
1397		}
1398
1399		ndsda.ds = ds_prev;
1400		ndsda.is_origin_rm = B_TRUE;
1401		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1402	}
1403
1404	/*
1405	 * If we're not going to remove the origin after all,
1406	 * undo the open context setup.
1407	 */
1408	if (dsda->rm_origin != NULL) {
1409		dsl_dataset_disown(dsda->rm_origin, tag);
1410		dsda->rm_origin = NULL;
1411	}
1412
1413	return (0);
1414}
1415
1416/*
1417 * If you add new checks here, you may need to add
1418 * additional checks to the "temporary" case in
1419 * snapshot_check() in dmu_objset.c.
1420 */
1421/* ARGSUSED */
1422int
1423dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1424{
1425	struct dsl_ds_destroyarg *dsda = arg1;
1426	dsl_dataset_t *ds = dsda->ds;
1427
1428	/* we have an owner hold, so noone else can destroy us */
1429	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1430
1431	/*
1432	 * Only allow deferred destroy on pools that support it.
1433	 * NOTE: deferred destroy is only supported on snapshots.
1434	 */
1435	if (dsda->defer) {
1436		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1437		    SPA_VERSION_USERREFS)
1438			return (ENOTSUP);
1439		ASSERT(dsl_dataset_is_snapshot(ds));
1440		return (0);
1441	}
1442
1443	/*
1444	 * Can't delete a head dataset if there are snapshots of it.
1445	 * (Except if the only snapshots are from the branch we cloned
1446	 * from.)
1447	 */
1448	if (ds->ds_prev != NULL &&
1449	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1450		return (EBUSY);
1451
1452	/*
1453	 * If we made changes this txg, traverse_dsl_dataset won't find
1454	 * them.  Try again.
1455	 */
1456	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1457		return (EAGAIN);
1458
1459	if (dsl_dataset_is_snapshot(ds)) {
1460		/*
1461		 * If this snapshot has an elevated user reference count,
1462		 * we can't destroy it yet.
1463		 */
1464		if (ds->ds_userrefs > 0 && !dsda->releasing)
1465			return (EBUSY);
1466
1467		mutex_enter(&ds->ds_lock);
1468		/*
1469		 * Can't delete a branch point. However, if we're destroying
1470		 * a clone and removing its origin due to it having a user
1471		 * hold count of 0 and having been marked for deferred destroy,
1472		 * it's OK for the origin to have a single clone.
1473		 */
1474		if (ds->ds_phys->ds_num_children >
1475		    (dsda->is_origin_rm ? 2 : 1)) {
1476			mutex_exit(&ds->ds_lock);
1477			return (EEXIST);
1478		}
1479		mutex_exit(&ds->ds_lock);
1480	} else if (dsl_dir_is_clone(ds->ds_dir)) {
1481		return (dsl_dataset_origin_check(dsda, arg2, tx));
1482	}
1483
1484	/* XXX we should do some i/o error checking... */
1485	return (0);
1486}
1487
1488struct refsarg {
1489	kmutex_t lock;
1490	boolean_t gone;
1491	kcondvar_t cv;
1492};
1493
1494/* ARGSUSED */
1495static void
1496dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1497{
1498	struct refsarg *arg = argv;
1499
1500	mutex_enter(&arg->lock);
1501	arg->gone = TRUE;
1502	cv_signal(&arg->cv);
1503	mutex_exit(&arg->lock);
1504}
1505
1506static void
1507dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1508{
1509	struct refsarg arg;
1510
1511	bzero(&arg, sizeof(arg));
1512	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1513	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1514	arg.gone = FALSE;
1515	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1516	    dsl_dataset_refs_gone);
1517	dmu_buf_rele(ds->ds_dbuf, tag);
1518	mutex_enter(&arg.lock);
1519	while (!arg.gone)
1520		cv_wait(&arg.cv, &arg.lock);
1521	ASSERT(arg.gone);
1522	mutex_exit(&arg.lock);
1523	ds->ds_dbuf = NULL;
1524	ds->ds_phys = NULL;
1525	mutex_destroy(&arg.lock);
1526	cv_destroy(&arg.cv);
1527}
1528
1529static void
1530remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1531{
1532	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1533	uint64_t count;
1534	int err;
1535
1536	ASSERT(ds->ds_phys->ds_num_children >= 2);
1537	err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1538	/*
1539	 * The err should not be ENOENT, but a bug in a previous version
1540	 * of the code could cause upgrade_clones_cb() to not set
1541	 * ds_next_snap_obj when it should, leading to a missing entry.
1542	 * If we knew that the pool was created after
1543	 * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1544	 * ENOENT.  However, at least we can check that we don't have
1545	 * too many entries in the next_clones_obj even after failing to
1546	 * remove this one.
1547	 */
1548	if (err != ENOENT) {
1549		VERIFY0(err);
1550	}
1551	ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1552	    &count));
1553	ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1554}
1555
1556static void
1557dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1558{
1559	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1560	zap_cursor_t zc;
1561	zap_attribute_t za;
1562
1563	/*
1564	 * If it is the old version, dd_clones doesn't exist so we can't
1565	 * find the clones, but deadlist_remove_key() is a no-op so it
1566	 * doesn't matter.
1567	 */
1568	if (ds->ds_dir->dd_phys->dd_clones == 0)
1569		return;
1570
1571	for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1572	    zap_cursor_retrieve(&zc, &za) == 0;
1573	    zap_cursor_advance(&zc)) {
1574		dsl_dataset_t *clone;
1575
1576		VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1577		    za.za_first_integer, FTAG, &clone));
1578		if (clone->ds_dir->dd_origin_txg > mintxg) {
1579			dsl_deadlist_remove_key(&clone->ds_deadlist,
1580			    mintxg, tx);
1581			dsl_dataset_remove_clones_key(clone, mintxg, tx);
1582		}
1583		dsl_dataset_rele(clone, FTAG);
1584	}
1585	zap_cursor_fini(&zc);
1586}
1587
1588struct process_old_arg {
1589	dsl_dataset_t *ds;
1590	dsl_dataset_t *ds_prev;
1591	boolean_t after_branch_point;
1592	zio_t *pio;
1593	uint64_t used, comp, uncomp;
1594};
1595
1596static int
1597process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1598{
1599	struct process_old_arg *poa = arg;
1600	dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1601
1602	if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1603		dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1604		if (poa->ds_prev && !poa->after_branch_point &&
1605		    bp->blk_birth >
1606		    poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1607			poa->ds_prev->ds_phys->ds_unique_bytes +=
1608			    bp_get_dsize_sync(dp->dp_spa, bp);
1609		}
1610	} else {
1611		poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1612		poa->comp += BP_GET_PSIZE(bp);
1613		poa->uncomp += BP_GET_UCSIZE(bp);
1614		dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1615	}
1616	return (0);
1617}
1618
1619static void
1620process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1621    dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1622{
1623	struct process_old_arg poa = { 0 };
1624	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1625	objset_t *mos = dp->dp_meta_objset;
1626
1627	ASSERT(ds->ds_deadlist.dl_oldfmt);
1628	ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1629
1630	poa.ds = ds;
1631	poa.ds_prev = ds_prev;
1632	poa.after_branch_point = after_branch_point;
1633	poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1634	VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1635	    process_old_cb, &poa, tx));
1636	VERIFY0(zio_wait(poa.pio));
1637	ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1638
1639	/* change snapused */
1640	dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1641	    -poa.used, -poa.comp, -poa.uncomp, tx);
1642
1643	/* swap next's deadlist to our deadlist */
1644	dsl_deadlist_close(&ds->ds_deadlist);
1645	dsl_deadlist_close(&ds_next->ds_deadlist);
1646	SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1647	    ds->ds_phys->ds_deadlist_obj);
1648	dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1649	dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1650	    ds_next->ds_phys->ds_deadlist_obj);
1651}
1652
1653static int
1654old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1655{
1656	int err;
1657	struct killarg ka;
1658
1659	/*
1660	 * Free everything that we point to (that's born after
1661	 * the previous snapshot, if we are a clone)
1662	 *
1663	 * NB: this should be very quick, because we already
1664	 * freed all the objects in open context.
1665	 */
1666	ka.ds = ds;
1667	ka.tx = tx;
1668	err = traverse_dataset(ds,
1669	    ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1670	    kill_blkptr, &ka);
1671	ASSERT0(err);
1672	ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1673
1674	return (err);
1675}
1676
1677void
1678dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1679{
1680	struct dsl_ds_destroyarg *dsda = arg1;
1681	dsl_dataset_t *ds = dsda->ds;
1682	int err;
1683	int after_branch_point = FALSE;
1684	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1685	objset_t *mos = dp->dp_meta_objset;
1686	dsl_dataset_t *ds_prev = NULL;
1687	boolean_t wont_destroy;
1688	uint64_t obj;
1689
1690	wont_destroy = (dsda->defer &&
1691	    (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1692
1693	ASSERT(ds->ds_owner || wont_destroy);
1694	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1695	ASSERT(ds->ds_prev == NULL ||
1696	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1697	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1698
1699	if (wont_destroy) {
1700		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1701		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1702		ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1703		return;
1704	}
1705
1706	/* signal any waiters that this dataset is going away */
1707	mutex_enter(&ds->ds_lock);
1708	ds->ds_owner = dsl_reaper;
1709	cv_broadcast(&ds->ds_exclusive_cv);
1710	mutex_exit(&ds->ds_lock);
1711
1712	/* Remove our reservation */
1713	if (ds->ds_reserved != 0) {
1714		dsl_prop_setarg_t psa;
1715		uint64_t value = 0;
1716
1717		dsl_prop_setarg_init_uint64(&psa, "refreservation",
1718		    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1719		    &value);
1720		psa.psa_effective_value = 0;	/* predict default value */
1721
1722		dsl_dataset_set_reservation_sync(ds, &psa, tx);
1723		ASSERT0(ds->ds_reserved);
1724	}
1725
1726	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1727
1728	dsl_scan_ds_destroyed(ds, tx);
1729
1730	obj = ds->ds_object;
1731
1732	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1733		if (ds->ds_prev) {
1734			ds_prev = ds->ds_prev;
1735		} else {
1736			VERIFY(0 == dsl_dataset_hold_obj(dp,
1737			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1738		}
1739		after_branch_point =
1740		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1741
1742		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1743		if (after_branch_point &&
1744		    ds_prev->ds_phys->ds_next_clones_obj != 0) {
1745			remove_from_next_clones(ds_prev, obj, tx);
1746			if (ds->ds_phys->ds_next_snap_obj != 0) {
1747				VERIFY(0 == zap_add_int(mos,
1748				    ds_prev->ds_phys->ds_next_clones_obj,
1749				    ds->ds_phys->ds_next_snap_obj, tx));
1750			}
1751		}
1752		if (after_branch_point &&
1753		    ds->ds_phys->ds_next_snap_obj == 0) {
1754			/* This clone is toast. */
1755			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1756			ds_prev->ds_phys->ds_num_children--;
1757
1758			/*
1759			 * If the clone's origin has no other clones, no
1760			 * user holds, and has been marked for deferred
1761			 * deletion, then we should have done the necessary
1762			 * destroy setup for it.
1763			 */
1764			if (ds_prev->ds_phys->ds_num_children == 1 &&
1765			    ds_prev->ds_userrefs == 0 &&
1766			    DS_IS_DEFER_DESTROY(ds_prev)) {
1767				ASSERT3P(dsda->rm_origin, !=, NULL);
1768			} else {
1769				ASSERT3P(dsda->rm_origin, ==, NULL);
1770			}
1771		} else if (!after_branch_point) {
1772			ds_prev->ds_phys->ds_next_snap_obj =
1773			    ds->ds_phys->ds_next_snap_obj;
1774		}
1775	}
1776
1777	if (dsl_dataset_is_snapshot(ds)) {
1778		dsl_dataset_t *ds_next;
1779		uint64_t old_unique;
1780		uint64_t used = 0, comp = 0, uncomp = 0;
1781
1782		VERIFY(0 == dsl_dataset_hold_obj(dp,
1783		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1784		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1785
1786		old_unique = ds_next->ds_phys->ds_unique_bytes;
1787
1788		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1789		ds_next->ds_phys->ds_prev_snap_obj =
1790		    ds->ds_phys->ds_prev_snap_obj;
1791		ds_next->ds_phys->ds_prev_snap_txg =
1792		    ds->ds_phys->ds_prev_snap_txg;
1793		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1794		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1795
1796
1797		if (ds_next->ds_deadlist.dl_oldfmt) {
1798			process_old_deadlist(ds, ds_prev, ds_next,
1799			    after_branch_point, tx);
1800		} else {
1801			/* Adjust prev's unique space. */
1802			if (ds_prev && !after_branch_point) {
1803				dsl_deadlist_space_range(&ds_next->ds_deadlist,
1804				    ds_prev->ds_phys->ds_prev_snap_txg,
1805				    ds->ds_phys->ds_prev_snap_txg,
1806				    &used, &comp, &uncomp);
1807				ds_prev->ds_phys->ds_unique_bytes += used;
1808			}
1809
1810			/* Adjust snapused. */
1811			dsl_deadlist_space_range(&ds_next->ds_deadlist,
1812			    ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1813			    &used, &comp, &uncomp);
1814			dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1815			    -used, -comp, -uncomp, tx);
1816
1817			/* Move blocks to be freed to pool's free list. */
1818			dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1819			    &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1820			    tx);
1821			dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1822			    DD_USED_HEAD, used, comp, uncomp, tx);
1823
1824			/* Merge our deadlist into next's and free it. */
1825			dsl_deadlist_merge(&ds_next->ds_deadlist,
1826			    ds->ds_phys->ds_deadlist_obj, tx);
1827		}
1828		dsl_deadlist_close(&ds->ds_deadlist);
1829		dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1830
1831		/* Collapse range in clone heads */
1832		dsl_dataset_remove_clones_key(ds,
1833		    ds->ds_phys->ds_creation_txg, tx);
1834
1835		if (dsl_dataset_is_snapshot(ds_next)) {
1836			dsl_dataset_t *ds_nextnext;
1837
1838			/*
1839			 * Update next's unique to include blocks which
1840			 * were previously shared by only this snapshot
1841			 * and it.  Those blocks will be born after the
1842			 * prev snap and before this snap, and will have
1843			 * died after the next snap and before the one
1844			 * after that (ie. be on the snap after next's
1845			 * deadlist).
1846			 */
1847			VERIFY(0 == dsl_dataset_hold_obj(dp,
1848			    ds_next->ds_phys->ds_next_snap_obj,
1849			    FTAG, &ds_nextnext));
1850			dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1851			    ds->ds_phys->ds_prev_snap_txg,
1852			    ds->ds_phys->ds_creation_txg,
1853			    &used, &comp, &uncomp);
1854			ds_next->ds_phys->ds_unique_bytes += used;
1855			dsl_dataset_rele(ds_nextnext, FTAG);
1856			ASSERT3P(ds_next->ds_prev, ==, NULL);
1857
1858			/* Collapse range in this head. */
1859			dsl_dataset_t *hds;
1860			VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1861			    ds->ds_dir->dd_phys->dd_head_dataset_obj,
1862			    FTAG, &hds));
1863			dsl_deadlist_remove_key(&hds->ds_deadlist,
1864			    ds->ds_phys->ds_creation_txg, tx);
1865			dsl_dataset_rele(hds, FTAG);
1866
1867		} else {
1868			ASSERT3P(ds_next->ds_prev, ==, ds);
1869			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1870			ds_next->ds_prev = NULL;
1871			if (ds_prev) {
1872				VERIFY(0 == dsl_dataset_get_ref(dp,
1873				    ds->ds_phys->ds_prev_snap_obj,
1874				    ds_next, &ds_next->ds_prev));
1875			}
1876
1877			dsl_dataset_recalc_head_uniq(ds_next);
1878
1879			/*
1880			 * Reduce the amount of our unconsmed refreservation
1881			 * being charged to our parent by the amount of
1882			 * new unique data we have gained.
1883			 */
1884			if (old_unique < ds_next->ds_reserved) {
1885				int64_t mrsdelta;
1886				uint64_t new_unique =
1887				    ds_next->ds_phys->ds_unique_bytes;
1888
1889				ASSERT(old_unique <= new_unique);
1890				mrsdelta = MIN(new_unique - old_unique,
1891				    ds_next->ds_reserved - old_unique);
1892				dsl_dir_diduse_space(ds->ds_dir,
1893				    DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1894			}
1895		}
1896		dsl_dataset_rele(ds_next, FTAG);
1897	} else {
1898		zfeature_info_t *async_destroy =
1899		    &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1900		objset_t *os;
1901
1902		/*
1903		 * There's no next snapshot, so this is a head dataset.
1904		 * Destroy the deadlist.  Unless it's a clone, the
1905		 * deadlist should be empty.  (If it's a clone, it's
1906		 * safe to ignore the deadlist contents.)
1907		 */
1908		dsl_deadlist_close(&ds->ds_deadlist);
1909		dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1910		ds->ds_phys->ds_deadlist_obj = 0;
1911
1912		VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
1913
1914		if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1915			err = old_synchronous_dataset_destroy(ds, tx);
1916		} else {
1917			/*
1918			 * Move the bptree into the pool's list of trees to
1919			 * clean up and update space accounting information.
1920			 */
1921			uint64_t used, comp, uncomp;
1922
1923			zil_destroy_sync(dmu_objset_zil(os), tx);
1924
1925			if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1926				spa_feature_incr(dp->dp_spa, async_destroy, tx);
1927				dp->dp_bptree_obj = bptree_alloc(mos, tx);
1928				VERIFY(zap_add(mos,
1929				    DMU_POOL_DIRECTORY_OBJECT,
1930				    DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1931				    &dp->dp_bptree_obj, tx) == 0);
1932			}
1933
1934			used = ds->ds_dir->dd_phys->dd_used_bytes;
1935			comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1936			uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1937
1938			ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1939			    ds->ds_phys->ds_unique_bytes == used);
1940
1941			bptree_add(mos, dp->dp_bptree_obj,
1942			    &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1943			    used, comp, uncomp, tx);
1944			dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1945			    -used, -comp, -uncomp, tx);
1946			dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1947			    used, comp, uncomp, tx);
1948		}
1949
1950		if (ds->ds_prev != NULL) {
1951			if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1952				VERIFY3U(0, ==, zap_remove_int(mos,
1953				    ds->ds_prev->ds_dir->dd_phys->dd_clones,
1954				    ds->ds_object, tx));
1955			}
1956			dsl_dataset_rele(ds->ds_prev, ds);
1957			ds->ds_prev = ds_prev = NULL;
1958		}
1959	}
1960
1961	/*
1962	 * This must be done after the dsl_traverse(), because it will
1963	 * re-open the objset.
1964	 */
1965	if (ds->ds_objset) {
1966		dmu_objset_evict(ds->ds_objset);
1967		ds->ds_objset = NULL;
1968	}
1969
1970	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1971		/* Erase the link in the dir */
1972		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1973		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1974		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1975		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1976		ASSERT(err == 0);
1977	} else {
1978		/* remove from snapshot namespace */
1979		dsl_dataset_t *ds_head;
1980		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1981		VERIFY(0 == dsl_dataset_hold_obj(dp,
1982		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1983		VERIFY(0 == dsl_dataset_get_snapname(ds));
1984#ifdef ZFS_DEBUG
1985		{
1986			uint64_t val;
1987
1988			err = dsl_dataset_snap_lookup(ds_head,
1989			    ds->ds_snapname, &val);
1990			ASSERT0(err);
1991			ASSERT3U(val, ==, obj);
1992		}
1993#endif
1994		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1995		ASSERT(err == 0);
1996		dsl_dataset_rele(ds_head, FTAG);
1997	}
1998
1999	if (ds_prev && ds->ds_prev != ds_prev)
2000		dsl_dataset_rele(ds_prev, FTAG);
2001
2002	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
2003	spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
2004	    "dataset = %llu", ds->ds_object);
2005
2006	if (ds->ds_phys->ds_next_clones_obj != 0) {
2007		uint64_t count;
2008		ASSERT(0 == zap_count(mos,
2009		    ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
2010		VERIFY(0 == dmu_object_free(mos,
2011		    ds->ds_phys->ds_next_clones_obj, tx));
2012	}
2013	if (ds->ds_phys->ds_props_obj != 0)
2014		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
2015	if (ds->ds_phys->ds_userrefs_obj != 0)
2016		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
2017	dsl_dir_close(ds->ds_dir, ds);
2018	ds->ds_dir = NULL;
2019	dsl_dataset_drain_refs(ds, tag);
2020	VERIFY(0 == dmu_object_free(mos, obj, tx));
2021
2022	if (dsda->rm_origin) {
2023		/*
2024		 * Remove the origin of the clone we just destroyed.
2025		 */
2026		struct dsl_ds_destroyarg ndsda = {0};
2027
2028		ndsda.ds = dsda->rm_origin;
2029		dsl_dataset_destroy_sync(&ndsda, tag, tx);
2030	}
2031}
2032
2033static int
2034dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
2035{
2036	uint64_t asize;
2037
2038	if (!dmu_tx_is_syncing(tx))
2039		return (0);
2040
2041	/*
2042	 * If there's an fs-only reservation, any blocks that might become
2043	 * owned by the snapshot dataset must be accommodated by space
2044	 * outside of the reservation.
2045	 */
2046	ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2047	asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2048	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2049		return (ENOSPC);
2050
2051	/*
2052	 * Propogate any reserved space for this snapshot to other
2053	 * snapshot checks in this sync group.
2054	 */
2055	if (asize > 0)
2056		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2057
2058	return (0);
2059}
2060
2061int
2062dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
2063{
2064	dsl_dataset_t *ds = arg1;
2065	const char *snapname = arg2;
2066	int err;
2067	uint64_t value;
2068
2069	/*
2070	 * We don't allow multiple snapshots of the same txg.  If there
2071	 * is already one, try again.
2072	 */
2073	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2074		return (EAGAIN);
2075
2076	/*
2077	 * Check for conflicting name snapshot name.
2078	 */
2079	err = dsl_dataset_snap_lookup(ds, snapname, &value);
2080	if (err == 0)
2081		return (EEXIST);
2082	if (err != ENOENT)
2083		return (err);
2084
2085	/*
2086	 * Check that the dataset's name is not too long.  Name consists
2087	 * of the dataset's length + 1 for the @-sign + snapshot name's length
2088	 */
2089	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2090		return (ENAMETOOLONG);
2091
2092	err = dsl_dataset_snapshot_reserve_space(ds, tx);
2093	if (err)
2094		return (err);
2095
2096	ds->ds_trysnap_txg = tx->tx_txg;
2097	return (0);
2098}
2099
2100void
2101dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2102{
2103	dsl_dataset_t *ds = arg1;
2104	const char *snapname = arg2;
2105	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2106	dmu_buf_t *dbuf;
2107	dsl_dataset_phys_t *dsphys;
2108	uint64_t dsobj, crtxg;
2109	objset_t *mos = dp->dp_meta_objset;
2110	int err;
2111
2112	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2113
2114	/*
2115	 * The origin's ds_creation_txg has to be < TXG_INITIAL
2116	 */
2117	if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2118		crtxg = 1;
2119	else
2120		crtxg = tx->tx_txg;
2121
2122	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2123	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2124	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2125	dmu_buf_will_dirty(dbuf, tx);
2126	dsphys = dbuf->db_data;
2127	bzero(dsphys, sizeof (dsl_dataset_phys_t));
2128	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2129	dsphys->ds_fsid_guid = unique_create();
2130	do {
2131		(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2132		    sizeof (dsphys->ds_guid));
2133	} while (dsphys->ds_guid == 0);
2134	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2135	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2136	dsphys->ds_next_snap_obj = ds->ds_object;
2137	dsphys->ds_num_children = 1;
2138	dsphys->ds_creation_time = gethrestime_sec();
2139	dsphys->ds_creation_txg = crtxg;
2140	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2141	dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2142	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2143	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2144	dsphys->ds_flags = ds->ds_phys->ds_flags;
2145	dsphys->ds_bp = ds->ds_phys->ds_bp;
2146	dmu_buf_rele(dbuf, FTAG);
2147
2148	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2149	if (ds->ds_prev) {
2150		uint64_t next_clones_obj =
2151		    ds->ds_prev->ds_phys->ds_next_clones_obj;
2152		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2153		    ds->ds_object ||
2154		    ds->ds_prev->ds_phys->ds_num_children > 1);
2155		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2156			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2157			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2158			    ds->ds_prev->ds_phys->ds_creation_txg);
2159			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2160		} else if (next_clones_obj != 0) {
2161			remove_from_next_clones(ds->ds_prev,
2162			    dsphys->ds_next_snap_obj, tx);
2163			VERIFY3U(0, ==, zap_add_int(mos,
2164			    next_clones_obj, dsobj, tx));
2165		}
2166	}
2167
2168	/*
2169	 * If we have a reference-reservation on this dataset, we will
2170	 * need to increase the amount of refreservation being charged
2171	 * since our unique space is going to zero.
2172	 */
2173	if (ds->ds_reserved) {
2174		int64_t delta;
2175		ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2176		delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2177		dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2178		    delta, 0, 0, tx);
2179	}
2180
2181	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2182	zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2183	    ds->ds_dir->dd_myname, snapname, dsobj,
2184	    ds->ds_phys->ds_prev_snap_txg);
2185	ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2186	    UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2187	dsl_deadlist_close(&ds->ds_deadlist);
2188	dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2189	dsl_deadlist_add_key(&ds->ds_deadlist,
2190	    ds->ds_phys->ds_prev_snap_txg, tx);
2191
2192	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2193	ds->ds_phys->ds_prev_snap_obj = dsobj;
2194	ds->ds_phys->ds_prev_snap_txg = crtxg;
2195	ds->ds_phys->ds_unique_bytes = 0;
2196	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2197		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2198
2199	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2200	    snapname, 8, 1, &dsobj, tx);
2201	ASSERT(err == 0);
2202
2203	if (ds->ds_prev)
2204		dsl_dataset_drop_ref(ds->ds_prev, ds);
2205	VERIFY(0 == dsl_dataset_get_ref(dp,
2206	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2207
2208	dsl_scan_ds_snapshotted(ds, tx);
2209
2210	dsl_dir_snap_cmtime_update(ds->ds_dir);
2211
2212	spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2213	    "dataset = %llu", dsobj);
2214}
2215
2216void
2217dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2218{
2219	ASSERT(dmu_tx_is_syncing(tx));
2220	ASSERT(ds->ds_objset != NULL);
2221	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2222
2223	/*
2224	 * in case we had to change ds_fsid_guid when we opened it,
2225	 * sync it out now.
2226	 */
2227	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2228	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2229
2230	dmu_objset_sync(ds->ds_objset, zio, tx);
2231}
2232
2233static void
2234get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2235{
2236	uint64_t count = 0;
2237	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2238	zap_cursor_t zc;
2239	zap_attribute_t za;
2240	nvlist_t *propval;
2241	nvlist_t *val;
2242
2243	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2244	VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2245	VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2246
2247	/*
2248	 * There may me missing entries in ds_next_clones_obj
2249	 * due to a bug in a previous version of the code.
2250	 * Only trust it if it has the right number of entries.
2251	 */
2252	if (ds->ds_phys->ds_next_clones_obj != 0) {
2253		ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2254		    &count));
2255	}
2256	if (count != ds->ds_phys->ds_num_children - 1) {
2257		goto fail;
2258	}
2259	for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2260	    zap_cursor_retrieve(&zc, &za) == 0;
2261	    zap_cursor_advance(&zc)) {
2262		dsl_dataset_t *clone;
2263		char buf[ZFS_MAXNAMELEN];
2264		/*
2265		 * Even though we hold the dp_config_rwlock, the dataset
2266		 * may fail to open, returning ENOENT.  If there is a
2267		 * thread concurrently attempting to destroy this
2268		 * dataset, it will have the ds_rwlock held for
2269		 * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2270		 * dsl_dataset_hold_ref() will fail its
2271		 * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2272		 * dp_config_rwlock, and wait for the destroy progress
2273		 * and signal ds_exclusive_cv.  If the destroy was
2274		 * successful, we will see that
2275		 * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2276		 */
2277		if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2278		    za.za_first_integer, FTAG, &clone) != 0)
2279			continue;
2280		dsl_dir_name(clone->ds_dir, buf);
2281		VERIFY(nvlist_add_boolean(val, buf) == 0);
2282		dsl_dataset_rele(clone, FTAG);
2283	}
2284	zap_cursor_fini(&zc);
2285	VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2286	VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2287	    propval) == 0);
2288fail:
2289	nvlist_free(val);
2290	nvlist_free(propval);
2291	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2292}
2293
2294void
2295dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2296{
2297	uint64_t refd, avail, uobjs, aobjs, ratio;
2298
2299	dsl_dir_stats(ds->ds_dir, nv);
2300
2301	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2302	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2303	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2304
2305	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2306	    ds->ds_phys->ds_creation_time);
2307	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2308	    ds->ds_phys->ds_creation_txg);
2309	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2310	    ds->ds_quota);
2311	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2312	    ds->ds_reserved);
2313	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2314	    ds->ds_phys->ds_guid);
2315	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2316	    ds->ds_phys->ds_unique_bytes);
2317	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2318	    ds->ds_object);
2319	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2320	    ds->ds_userrefs);
2321	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2322	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2323
2324	if (ds->ds_phys->ds_prev_snap_obj != 0) {
2325		uint64_t written, comp, uncomp;
2326		dsl_pool_t *dp = ds->ds_dir->dd_pool;
2327		dsl_dataset_t *prev;
2328
2329		rw_enter(&dp->dp_config_rwlock, RW_READER);
2330		int err = dsl_dataset_hold_obj(dp,
2331		    ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2332		rw_exit(&dp->dp_config_rwlock);
2333		if (err == 0) {
2334			err = dsl_dataset_space_written(prev, ds, &written,
2335			    &comp, &uncomp);
2336			dsl_dataset_rele(prev, FTAG);
2337			if (err == 0) {
2338				dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2339				    written);
2340			}
2341		}
2342	}
2343	ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2344	    (ds->ds_phys->ds_uncompressed_bytes * 100 /
2345	    ds->ds_phys->ds_compressed_bytes);
2346	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2347
2348	if (ds->ds_phys->ds_next_snap_obj) {
2349		/*
2350		 * This is a snapshot; override the dd's space used with
2351		 * our unique space and compression ratio.
2352		 */
2353		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2354		    ds->ds_phys->ds_unique_bytes);
2355		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2356
2357		get_clones_stat(ds, nv);
2358	}
2359}
2360
2361void
2362dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2363{
2364	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2365	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2366	stat->dds_guid = ds->ds_phys->ds_guid;
2367	if (ds->ds_phys->ds_next_snap_obj) {
2368		stat->dds_is_snapshot = B_TRUE;
2369		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2370	} else {
2371		stat->dds_is_snapshot = B_FALSE;
2372		stat->dds_num_clones = 0;
2373	}
2374
2375	/* clone origin is really a dsl_dir thing... */
2376	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2377	if (dsl_dir_is_clone(ds->ds_dir)) {
2378		dsl_dataset_t *ods;
2379
2380		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2381		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2382		dsl_dataset_name(ods, stat->dds_origin);
2383		dsl_dataset_drop_ref(ods, FTAG);
2384	} else {
2385		stat->dds_origin[0] = '\0';
2386	}
2387	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2388}
2389
2390uint64_t
2391dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2392{
2393	return (ds->ds_fsid_guid);
2394}
2395
2396void
2397dsl_dataset_space(dsl_dataset_t *ds,
2398    uint64_t *refdbytesp, uint64_t *availbytesp,
2399    uint64_t *usedobjsp, uint64_t *availobjsp)
2400{
2401	*refdbytesp = ds->ds_phys->ds_referenced_bytes;
2402	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2403	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2404		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2405	if (ds->ds_quota != 0) {
2406		/*
2407		 * Adjust available bytes according to refquota
2408		 */
2409		if (*refdbytesp < ds->ds_quota)
2410			*availbytesp = MIN(*availbytesp,
2411			    ds->ds_quota - *refdbytesp);
2412		else
2413			*availbytesp = 0;
2414	}
2415	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2416	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
2417}
2418
2419boolean_t
2420dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2421{
2422	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2423
2424	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2425	    dsl_pool_sync_context(dp));
2426	if (ds->ds_prev == NULL)
2427		return (B_FALSE);
2428	if (ds->ds_phys->ds_bp.blk_birth >
2429	    ds->ds_prev->ds_phys->ds_creation_txg) {
2430		objset_t *os, *os_prev;
2431		/*
2432		 * It may be that only the ZIL differs, because it was
2433		 * reset in the head.  Don't count that as being
2434		 * modified.
2435		 */
2436		if (dmu_objset_from_ds(ds, &os) != 0)
2437			return (B_TRUE);
2438		if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2439			return (B_TRUE);
2440		return (bcmp(&os->os_phys->os_meta_dnode,
2441		    &os_prev->os_phys->os_meta_dnode,
2442		    sizeof (os->os_phys->os_meta_dnode)) != 0);
2443	}
2444	return (B_FALSE);
2445}
2446
2447/* ARGSUSED */
2448static int
2449dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2450{
2451	dsl_dataset_t *ds = arg1;
2452	char *newsnapname = arg2;
2453	dsl_dir_t *dd = ds->ds_dir;
2454	dsl_dataset_t *hds;
2455	uint64_t val;
2456	int err;
2457
2458	err = dsl_dataset_hold_obj(dd->dd_pool,
2459	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2460	if (err)
2461		return (err);
2462
2463	/* new name better not be in use */
2464	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2465	dsl_dataset_rele(hds, FTAG);
2466
2467	if (err == 0)
2468		err = EEXIST;
2469	else if (err == ENOENT)
2470		err = 0;
2471
2472	/* dataset name + 1 for the "@" + the new snapshot name must fit */
2473	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2474		err = ENAMETOOLONG;
2475
2476	return (err);
2477}
2478
2479static void
2480dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2481{
2482	char oldname[MAXPATHLEN], newname[MAXPATHLEN];
2483	dsl_dataset_t *ds = arg1;
2484	const char *newsnapname = arg2;
2485	dsl_dir_t *dd = ds->ds_dir;
2486	objset_t *mos = dd->dd_pool->dp_meta_objset;
2487	dsl_dataset_t *hds;
2488	int err;
2489
2490	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2491
2492	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2493	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2494
2495	VERIFY(0 == dsl_dataset_get_snapname(ds));
2496	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2497	ASSERT0(err);
2498	dsl_dataset_name(ds, oldname);
2499	mutex_enter(&ds->ds_lock);
2500	(void) strcpy(ds->ds_snapname, newsnapname);
2501	mutex_exit(&ds->ds_lock);
2502	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2503	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2504	ASSERT0(err);
2505	dsl_dataset_name(ds, newname);
2506#ifdef _KERNEL
2507	zvol_rename_minors(oldname, newname);
2508#endif
2509
2510	spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2511	    "dataset = %llu", ds->ds_object);
2512	dsl_dataset_rele(hds, FTAG);
2513}
2514
2515struct renamesnaparg {
2516	dsl_sync_task_group_t *dstg;
2517	char failed[MAXPATHLEN];
2518	char *oldsnap;
2519	char *newsnap;
2520	int error;
2521};
2522
2523static int
2524dsl_snapshot_rename_one(const char *name, void *arg)
2525{
2526	struct renamesnaparg *ra = arg;
2527	dsl_dataset_t *ds = NULL;
2528	char *snapname;
2529	int err;
2530
2531	snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2532	(void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2533
2534	/*
2535	 * For recursive snapshot renames the parent won't be changing
2536	 * so we just pass name for both the to/from argument.
2537	 */
2538	err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2539	if (err != 0) {
2540		strfree(snapname);
2541		return (err == ENOENT ? 0 : err);
2542	}
2543
2544#ifdef _KERNEL
2545	/*
2546	 * For all filesystems undergoing rename, we'll need to unmount it.
2547	 */
2548	(void) zfs_unmount_snap(snapname, NULL);
2549#endif
2550	err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2551	strfree(snapname);
2552	if (err != 0)
2553		return (err == ENOENT ? 0 : err);
2554
2555	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2556	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2557
2558	/* First successful rename clears the error. */
2559	ra->error = 0;
2560
2561	return (0);
2562}
2563
2564static int
2565dsl_recursive_rename(char *oldname, const char *newname)
2566{
2567	int err;
2568	struct renamesnaparg *ra;
2569	dsl_sync_task_t *dst;
2570	spa_t *spa;
2571	char *cp, *fsname = spa_strdup(oldname);
2572	int len = strlen(oldname) + 1;
2573
2574	/* truncate the snapshot name to get the fsname */
2575	cp = strchr(fsname, '@');
2576	*cp = '\0';
2577
2578	err = spa_open(fsname, &spa, FTAG);
2579	if (err) {
2580		kmem_free(fsname, len);
2581		return (err);
2582	}
2583	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2584	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2585
2586	ra->oldsnap = strchr(oldname, '@') + 1;
2587	ra->newsnap = strchr(newname, '@') + 1;
2588	*ra->failed = '\0';
2589	ra->error = ENOENT;
2590
2591	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2592	    DS_FIND_CHILDREN);
2593	kmem_free(fsname, len);
2594	if (err == 0)
2595		err = ra->error;
2596
2597	if (err == 0)
2598		err = dsl_sync_task_group_wait(ra->dstg);
2599
2600	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2601	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2602		dsl_dataset_t *ds = dst->dst_arg1;
2603		if (dst->dst_err) {
2604			dsl_dir_name(ds->ds_dir, ra->failed);
2605			(void) strlcat(ra->failed, "@", sizeof (ra->failed));
2606			(void) strlcat(ra->failed, ra->newsnap,
2607			    sizeof (ra->failed));
2608		}
2609		dsl_dataset_rele(ds, ra->dstg);
2610	}
2611
2612	if (err)
2613		(void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2614
2615	dsl_sync_task_group_destroy(ra->dstg);
2616	kmem_free(ra, sizeof (struct renamesnaparg));
2617	spa_close(spa, FTAG);
2618	return (err);
2619}
2620
2621static int
2622dsl_valid_rename(const char *oldname, void *arg)
2623{
2624	int delta = *(int *)arg;
2625
2626	if (strlen(oldname) + delta >= MAXNAMELEN)
2627		return (ENAMETOOLONG);
2628
2629	return (0);
2630}
2631
2632#pragma weak dmu_objset_rename = dsl_dataset_rename
2633int
2634dsl_dataset_rename(char *oldname, const char *newname, int flags)
2635{
2636	dsl_dir_t *dd;
2637	dsl_dataset_t *ds;
2638	const char *tail;
2639	int err;
2640
2641	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2642	if (err)
2643		return (err);
2644
2645	if (tail == NULL) {
2646		int delta = strlen(newname) - strlen(oldname);
2647
2648		/* if we're growing, validate child name lengths */
2649		if (delta > 0)
2650			err = dmu_objset_find(oldname, dsl_valid_rename,
2651			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2652
2653		if (err == 0)
2654			err = dsl_dir_rename(dd, newname, flags);
2655		dsl_dir_close(dd, FTAG);
2656		return (err);
2657	}
2658
2659	if (tail[0] != '@') {
2660		/* the name ended in a nonexistent component */
2661		dsl_dir_close(dd, FTAG);
2662		return (ENOENT);
2663	}
2664
2665	dsl_dir_close(dd, FTAG);
2666
2667	/* new name must be snapshot in same filesystem */
2668	tail = strchr(newname, '@');
2669	if (tail == NULL)
2670		return (EINVAL);
2671	tail++;
2672	if (strncmp(oldname, newname, tail - newname) != 0)
2673		return (EXDEV);
2674
2675	if (flags & ZFS_RENAME_RECURSIVE) {
2676		err = dsl_recursive_rename(oldname, newname);
2677	} else {
2678		err = dsl_dataset_hold(oldname, FTAG, &ds);
2679		if (err)
2680			return (err);
2681
2682		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2683		    dsl_dataset_snapshot_rename_check,
2684		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2685
2686		dsl_dataset_rele(ds, FTAG);
2687	}
2688
2689	return (err);
2690}
2691
2692struct promotenode {
2693	list_node_t link;
2694	dsl_dataset_t *ds;
2695};
2696
2697struct promotearg {
2698	list_t shared_snaps, origin_snaps, clone_snaps;
2699	dsl_dataset_t *origin_origin;
2700	uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2701	char *err_ds;
2702};
2703
2704static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2705static boolean_t snaplist_unstable(list_t *l);
2706
2707static int
2708dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2709{
2710	dsl_dataset_t *hds = arg1;
2711	struct promotearg *pa = arg2;
2712	struct promotenode *snap = list_head(&pa->shared_snaps);
2713	dsl_dataset_t *origin_ds = snap->ds;
2714	int err;
2715	uint64_t unused;
2716
2717	/* Check that it is a real clone */
2718	if (!dsl_dir_is_clone(hds->ds_dir))
2719		return (EINVAL);
2720
2721	/* Since this is so expensive, don't do the preliminary check */
2722	if (!dmu_tx_is_syncing(tx))
2723		return (0);
2724
2725	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2726		return (EXDEV);
2727
2728	/* compute origin's new unique space */
2729	snap = list_tail(&pa->clone_snaps);
2730	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2731	dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2732	    origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2733	    &pa->unique, &unused, &unused);
2734
2735	/*
2736	 * Walk the snapshots that we are moving
2737	 *
2738	 * Compute space to transfer.  Consider the incremental changes
2739	 * to used for each snapshot:
2740	 * (my used) = (prev's used) + (blocks born) - (blocks killed)
2741	 * So each snapshot gave birth to:
2742	 * (blocks born) = (my used) - (prev's used) + (blocks killed)
2743	 * So a sequence would look like:
2744	 * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2745	 * Which simplifies to:
2746	 * uN + kN + kN-1 + ... + k1 + k0
2747	 * Note however, if we stop before we reach the ORIGIN we get:
2748	 * uN + kN + kN-1 + ... + kM - uM-1
2749	 */
2750	pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2751	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2752	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2753	for (snap = list_head(&pa->shared_snaps); snap;
2754	    snap = list_next(&pa->shared_snaps, snap)) {
2755		uint64_t val, dlused, dlcomp, dluncomp;
2756		dsl_dataset_t *ds = snap->ds;
2757
2758		/* Check that the snapshot name does not conflict */
2759		VERIFY(0 == dsl_dataset_get_snapname(ds));
2760		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2761		if (err == 0) {
2762			err = EEXIST;
2763			goto out;
2764		}
2765		if (err != ENOENT)
2766			goto out;
2767
2768		/* The very first snapshot does not have a deadlist */
2769		if (ds->ds_phys->ds_prev_snap_obj == 0)
2770			continue;
2771
2772		dsl_deadlist_space(&ds->ds_deadlist,
2773		    &dlused, &dlcomp, &dluncomp);
2774		pa->used += dlused;
2775		pa->comp += dlcomp;
2776		pa->uncomp += dluncomp;
2777	}
2778
2779	/*
2780	 * If we are a clone of a clone then we never reached ORIGIN,
2781	 * so we need to subtract out the clone origin's used space.
2782	 */
2783	if (pa->origin_origin) {
2784		pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2785		pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2786		pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2787	}
2788
2789	/* Check that there is enough space here */
2790	err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2791	    pa->used);
2792	if (err)
2793		return (err);
2794
2795	/*
2796	 * Compute the amounts of space that will be used by snapshots
2797	 * after the promotion (for both origin and clone).  For each,
2798	 * it is the amount of space that will be on all of their
2799	 * deadlists (that was not born before their new origin).
2800	 */
2801	if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2802		uint64_t space;
2803
2804		/*
2805		 * Note, typically this will not be a clone of a clone,
2806		 * so dd_origin_txg will be < TXG_INITIAL, so
2807		 * these snaplist_space() -> dsl_deadlist_space_range()
2808		 * calls will be fast because they do not have to
2809		 * iterate over all bps.
2810		 */
2811		snap = list_head(&pa->origin_snaps);
2812		err = snaplist_space(&pa->shared_snaps,
2813		    snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2814		if (err)
2815			return (err);
2816
2817		err = snaplist_space(&pa->clone_snaps,
2818		    snap->ds->ds_dir->dd_origin_txg, &space);
2819		if (err)
2820			return (err);
2821		pa->cloneusedsnap += space;
2822	}
2823	if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2824		err = snaplist_space(&pa->origin_snaps,
2825		    origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2826		if (err)
2827			return (err);
2828	}
2829
2830	return (0);
2831out:
2832	pa->err_ds =  snap->ds->ds_snapname;
2833	return (err);
2834}
2835
2836static void
2837dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2838{
2839	dsl_dataset_t *hds = arg1;
2840	struct promotearg *pa = arg2;
2841	struct promotenode *snap = list_head(&pa->shared_snaps);
2842	dsl_dataset_t *origin_ds = snap->ds;
2843	dsl_dataset_t *origin_head;
2844	dsl_dir_t *dd = hds->ds_dir;
2845	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2846	dsl_dir_t *odd = NULL;
2847	uint64_t oldnext_obj;
2848	int64_t delta;
2849
2850	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2851
2852	snap = list_head(&pa->origin_snaps);
2853	origin_head = snap->ds;
2854
2855	/*
2856	 * We need to explicitly open odd, since origin_ds's dd will be
2857	 * changing.
2858	 */
2859	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2860	    NULL, FTAG, &odd));
2861
2862	/* change origin's next snap */
2863	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2864	oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2865	snap = list_tail(&pa->clone_snaps);
2866	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2867	origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2868
2869	/* change the origin's next clone */
2870	if (origin_ds->ds_phys->ds_next_clones_obj) {
2871		remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2872		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2873		    origin_ds->ds_phys->ds_next_clones_obj,
2874		    oldnext_obj, tx));
2875	}
2876
2877	/* change origin */
2878	dmu_buf_will_dirty(dd->dd_dbuf, tx);
2879	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2880	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2881	dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2882	dmu_buf_will_dirty(odd->dd_dbuf, tx);
2883	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2884	origin_head->ds_dir->dd_origin_txg =
2885	    origin_ds->ds_phys->ds_creation_txg;
2886
2887	/* change dd_clone entries */
2888	if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2889		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2890		    odd->dd_phys->dd_clones, hds->ds_object, tx));
2891		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2892		    pa->origin_origin->ds_dir->dd_phys->dd_clones,
2893		    hds->ds_object, tx));
2894
2895		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2896		    pa->origin_origin->ds_dir->dd_phys->dd_clones,
2897		    origin_head->ds_object, tx));
2898		if (dd->dd_phys->dd_clones == 0) {
2899			dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2900			    DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2901		}
2902		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2903		    dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2904
2905	}
2906
2907	/* move snapshots to this dir */
2908	for (snap = list_head(&pa->shared_snaps); snap;
2909	    snap = list_next(&pa->shared_snaps, snap)) {
2910		dsl_dataset_t *ds = snap->ds;
2911
2912		/* unregister props as dsl_dir is changing */
2913		if (ds->ds_objset) {
2914			dmu_objset_evict(ds->ds_objset);
2915			ds->ds_objset = NULL;
2916		}
2917		/* move snap name entry */
2918		VERIFY(0 == dsl_dataset_get_snapname(ds));
2919		VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2920		    ds->ds_snapname, tx));
2921		VERIFY(0 == zap_add(dp->dp_meta_objset,
2922		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2923		    8, 1, &ds->ds_object, tx));
2924
2925		/* change containing dsl_dir */
2926		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2927		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2928		ds->ds_phys->ds_dir_obj = dd->dd_object;
2929		ASSERT3P(ds->ds_dir, ==, odd);
2930		dsl_dir_close(ds->ds_dir, ds);
2931		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2932		    NULL, ds, &ds->ds_dir));
2933
2934		/* move any clone references */
2935		if (ds->ds_phys->ds_next_clones_obj &&
2936		    spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2937			zap_cursor_t zc;
2938			zap_attribute_t za;
2939
2940			for (zap_cursor_init(&zc, dp->dp_meta_objset,
2941			    ds->ds_phys->ds_next_clones_obj);
2942			    zap_cursor_retrieve(&zc, &za) == 0;
2943			    zap_cursor_advance(&zc)) {
2944				dsl_dataset_t *cnds;
2945				uint64_t o;
2946
2947				if (za.za_first_integer == oldnext_obj) {
2948					/*
2949					 * We've already moved the
2950					 * origin's reference.
2951					 */
2952					continue;
2953				}
2954
2955				VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2956				    za.za_first_integer, FTAG, &cnds));
2957				o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2958
2959				VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2960				    odd->dd_phys->dd_clones, o, tx), ==, 0);
2961				VERIFY3U(zap_add_int(dp->dp_meta_objset,
2962				    dd->dd_phys->dd_clones, o, tx), ==, 0);
2963				dsl_dataset_rele(cnds, FTAG);
2964			}
2965			zap_cursor_fini(&zc);
2966		}
2967
2968		ASSERT0(dsl_prop_numcb(ds));
2969	}
2970
2971	/*
2972	 * Change space accounting.
2973	 * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2974	 * both be valid, or both be 0 (resulting in delta == 0).  This
2975	 * is true for each of {clone,origin} independently.
2976	 */
2977
2978	delta = pa->cloneusedsnap -
2979	    dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2980	ASSERT3S(delta, >=, 0);
2981	ASSERT3U(pa->used, >=, delta);
2982	dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2983	dsl_dir_diduse_space(dd, DD_USED_HEAD,
2984	    pa->used - delta, pa->comp, pa->uncomp, tx);
2985
2986	delta = pa->originusedsnap -
2987	    odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2988	ASSERT3S(delta, <=, 0);
2989	ASSERT3U(pa->used, >=, -delta);
2990	dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2991	dsl_dir_diduse_space(odd, DD_USED_HEAD,
2992	    -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2993
2994	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2995
2996	/* log history record */
2997	spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2998	    "dataset = %llu", hds->ds_object);
2999
3000	dsl_dir_close(odd, FTAG);
3001}
3002
3003static char *snaplist_tag = "snaplist";
3004/*
3005 * Make a list of dsl_dataset_t's for the snapshots between first_obj
3006 * (exclusive) and last_obj (inclusive).  The list will be in reverse
3007 * order (last_obj will be the list_head()).  If first_obj == 0, do all
3008 * snapshots back to this dataset's origin.
3009 */
3010static int
3011snaplist_make(dsl_pool_t *dp, boolean_t own,
3012    uint64_t first_obj, uint64_t last_obj, list_t *l)
3013{
3014	uint64_t obj = last_obj;
3015
3016	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
3017
3018	list_create(l, sizeof (struct promotenode),
3019	    offsetof(struct promotenode, link));
3020
3021	while (obj != first_obj) {
3022		dsl_dataset_t *ds;
3023		struct promotenode *snap;
3024		int err;
3025
3026		if (own) {
3027			err = dsl_dataset_own_obj(dp, obj,
3028			    0, snaplist_tag, &ds);
3029			if (err == 0)
3030				dsl_dataset_make_exclusive(ds, snaplist_tag);
3031		} else {
3032			err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
3033		}
3034		if (err == ENOENT) {
3035			/* lost race with snapshot destroy */
3036			struct promotenode *last = list_tail(l);
3037			ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
3038			obj = last->ds->ds_phys->ds_prev_snap_obj;
3039			continue;
3040		} else if (err) {
3041			return (err);
3042		}
3043
3044		if (first_obj == 0)
3045			first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
3046
3047		snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
3048		snap->ds = ds;
3049		list_insert_tail(l, snap);
3050		obj = ds->ds_phys->ds_prev_snap_obj;
3051	}
3052
3053	return (0);
3054}
3055
3056static int
3057snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3058{
3059	struct promotenode *snap;
3060
3061	*spacep = 0;
3062	for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3063		uint64_t used, comp, uncomp;
3064		dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3065		    mintxg, UINT64_MAX, &used, &comp, &uncomp);
3066		*spacep += used;
3067	}
3068	return (0);
3069}
3070
3071static void
3072snaplist_destroy(list_t *l, boolean_t own)
3073{
3074	struct promotenode *snap;
3075
3076	if (!l || !list_link_active(&l->list_head))
3077		return;
3078
3079	while ((snap = list_tail(l)) != NULL) {
3080		list_remove(l, snap);
3081		if (own)
3082			dsl_dataset_disown(snap->ds, snaplist_tag);
3083		else
3084			dsl_dataset_rele(snap->ds, snaplist_tag);
3085		kmem_free(snap, sizeof (struct promotenode));
3086	}
3087	list_destroy(l);
3088}
3089
3090/*
3091 * Promote a clone.  Nomenclature note:
3092 * "clone" or "cds": the original clone which is being promoted
3093 * "origin" or "ods": the snapshot which is originally clone's origin
3094 * "origin head" or "ohds": the dataset which is the head
3095 * (filesystem/volume) for the origin
3096 * "origin origin": the origin of the origin's filesystem (typically
3097 * NULL, indicating that the clone is not a clone of a clone).
3098 */
3099int
3100dsl_dataset_promote(const char *name, char *conflsnap)
3101{
3102	dsl_dataset_t *ds;
3103	dsl_dir_t *dd;
3104	dsl_pool_t *dp;
3105	dmu_object_info_t doi;
3106	struct promotearg pa = { 0 };
3107	struct promotenode *snap;
3108	int err;
3109
3110	err = dsl_dataset_hold(name, FTAG, &ds);
3111	if (err)
3112		return (err);
3113	dd = ds->ds_dir;
3114	dp = dd->dd_pool;
3115
3116	err = dmu_object_info(dp->dp_meta_objset,
3117	    ds->ds_phys->ds_snapnames_zapobj, &doi);
3118	if (err) {
3119		dsl_dataset_rele(ds, FTAG);
3120		return (err);
3121	}
3122
3123	if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3124		dsl_dataset_rele(ds, FTAG);
3125		return (EINVAL);
3126	}
3127
3128	/*
3129	 * We are going to inherit all the snapshots taken before our
3130	 * origin (i.e., our new origin will be our parent's origin).
3131	 * Take ownership of them so that we can rename them into our
3132	 * namespace.
3133	 */
3134	rw_enter(&dp->dp_config_rwlock, RW_READER);
3135
3136	err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3137	    &pa.shared_snaps);
3138	if (err != 0)
3139		goto out;
3140
3141	err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3142	if (err != 0)
3143		goto out;
3144
3145	snap = list_head(&pa.shared_snaps);
3146	ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3147	err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3148	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3149	if (err != 0)
3150		goto out;
3151
3152	if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3153		err = dsl_dataset_hold_obj(dp,
3154		    snap->ds->ds_dir->dd_phys->dd_origin_obj,
3155		    FTAG, &pa.origin_origin);
3156		if (err != 0)
3157			goto out;
3158	}
3159
3160out:
3161	rw_exit(&dp->dp_config_rwlock);
3162
3163	/*
3164	 * Add in 128x the snapnames zapobj size, since we will be moving
3165	 * a bunch of snapnames to the promoted ds, and dirtying their
3166	 * bonus buffers.
3167	 */
3168	if (err == 0) {
3169		err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3170		    dsl_dataset_promote_sync, ds, &pa,
3171		    2 + 2 * doi.doi_physical_blocks_512);
3172		if (err && pa.err_ds && conflsnap)
3173			(void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3174	}
3175
3176	snaplist_destroy(&pa.shared_snaps, B_TRUE);
3177	snaplist_destroy(&pa.clone_snaps, B_FALSE);
3178	snaplist_destroy(&pa.origin_snaps, B_FALSE);
3179	if (pa.origin_origin)
3180		dsl_dataset_rele(pa.origin_origin, FTAG);
3181	dsl_dataset_rele(ds, FTAG);
3182	return (err);
3183}
3184
3185struct cloneswaparg {
3186	dsl_dataset_t *cds; /* clone dataset */
3187	dsl_dataset_t *ohds; /* origin's head dataset */
3188	boolean_t force;
3189	int64_t unused_refres_delta; /* change in unconsumed refreservation */
3190};
3191
3192/* ARGSUSED */
3193static int
3194dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3195{
3196	struct cloneswaparg *csa = arg1;
3197
3198	/* they should both be heads */
3199	if (dsl_dataset_is_snapshot(csa->cds) ||
3200	    dsl_dataset_is_snapshot(csa->ohds))
3201		return (EINVAL);
3202
3203	/* the branch point should be just before them */
3204	if (csa->cds->ds_prev != csa->ohds->ds_prev)
3205		return (EINVAL);
3206
3207	/* cds should be the clone (unless they are unrelated) */
3208	if (csa->cds->ds_prev != NULL &&
3209	    csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3210	    csa->ohds->ds_object !=
3211	    csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3212		return (EINVAL);
3213
3214	/* the clone should be a child of the origin */
3215	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3216		return (EINVAL);
3217
3218	/* ohds shouldn't be modified unless 'force' */
3219	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3220		return (ETXTBSY);
3221
3222	/* adjust amount of any unconsumed refreservation */
3223	csa->unused_refres_delta =
3224	    (int64_t)MIN(csa->ohds->ds_reserved,
3225	    csa->ohds->ds_phys->ds_unique_bytes) -
3226	    (int64_t)MIN(csa->ohds->ds_reserved,
3227	    csa->cds->ds_phys->ds_unique_bytes);
3228
3229	if (csa->unused_refres_delta > 0 &&
3230	    csa->unused_refres_delta >
3231	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3232		return (ENOSPC);
3233
3234	if (csa->ohds->ds_quota != 0 &&
3235	    csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3236		return (EDQUOT);
3237
3238	return (0);
3239}
3240
3241/* ARGSUSED */
3242static void
3243dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3244{
3245	struct cloneswaparg *csa = arg1;
3246	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3247
3248	ASSERT(csa->cds->ds_reserved == 0);
3249	ASSERT(csa->ohds->ds_quota == 0 ||
3250	    csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3251
3252	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3253	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3254
3255	if (csa->cds->ds_objset != NULL) {
3256		dmu_objset_evict(csa->cds->ds_objset);
3257		csa->cds->ds_objset = NULL;
3258	}
3259
3260	if (csa->ohds->ds_objset != NULL) {
3261		dmu_objset_evict(csa->ohds->ds_objset);
3262		csa->ohds->ds_objset = NULL;
3263	}
3264
3265	/*
3266	 * Reset origin's unique bytes, if it exists.
3267	 */
3268	if (csa->cds->ds_prev) {
3269		dsl_dataset_t *origin = csa->cds->ds_prev;
3270		uint64_t comp, uncomp;
3271
3272		dmu_buf_will_dirty(origin->ds_dbuf, tx);
3273		dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3274		    origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3275		    &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3276	}
3277
3278	/* swap blkptrs */
3279	{
3280		blkptr_t tmp;
3281		tmp = csa->ohds->ds_phys->ds_bp;
3282		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3283		csa->cds->ds_phys->ds_bp = tmp;
3284	}
3285
3286	/* set dd_*_bytes */
3287	{
3288		int64_t dused, dcomp, duncomp;
3289		uint64_t cdl_used, cdl_comp, cdl_uncomp;
3290		uint64_t odl_used, odl_comp, odl_uncomp;
3291
3292		ASSERT3U(csa->cds->ds_dir->dd_phys->
3293		    dd_used_breakdown[DD_USED_SNAP], ==, 0);
3294
3295		dsl_deadlist_space(&csa->cds->ds_deadlist,
3296		    &cdl_used, &cdl_comp, &cdl_uncomp);
3297		dsl_deadlist_space(&csa->ohds->ds_deadlist,
3298		    &odl_used, &odl_comp, &odl_uncomp);
3299
3300		dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3301		    (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3302		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3303		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3304		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3305		    cdl_uncomp -
3306		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3307
3308		dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3309		    dused, dcomp, duncomp, tx);
3310		dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3311		    -dused, -dcomp, -duncomp, tx);
3312
3313		/*
3314		 * The difference in the space used by snapshots is the
3315		 * difference in snapshot space due to the head's
3316		 * deadlist (since that's the only thing that's
3317		 * changing that affects the snapused).
3318		 */
3319		dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3320		    csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3321		    &cdl_used, &cdl_comp, &cdl_uncomp);
3322		dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3323		    csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3324		    &odl_used, &odl_comp, &odl_uncomp);
3325		dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3326		    DD_USED_HEAD, DD_USED_SNAP, tx);
3327	}
3328
3329	/* swap ds_*_bytes */
3330	SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3331	    csa->cds->ds_phys->ds_referenced_bytes);
3332	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3333	    csa->cds->ds_phys->ds_compressed_bytes);
3334	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3335	    csa->cds->ds_phys->ds_uncompressed_bytes);
3336	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3337	    csa->cds->ds_phys->ds_unique_bytes);
3338
3339	/* apply any parent delta for change in unconsumed refreservation */
3340	dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3341	    csa->unused_refres_delta, 0, 0, tx);
3342
3343	/*
3344	 * Swap deadlists.
3345	 */
3346	dsl_deadlist_close(&csa->cds->ds_deadlist);
3347	dsl_deadlist_close(&csa->ohds->ds_deadlist);
3348	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3349	    csa->cds->ds_phys->ds_deadlist_obj);
3350	dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3351	    csa->cds->ds_phys->ds_deadlist_obj);
3352	dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3353	    csa->ohds->ds_phys->ds_deadlist_obj);
3354
3355	dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3356}
3357
3358/*
3359 * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3360 * recv" into an existing fs to swizzle the file system to the new
3361 * version, and by "zfs rollback".  Can also be used to swap two
3362 * independent head datasets if neither has any snapshots.
3363 */
3364int
3365dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3366    boolean_t force)
3367{
3368	struct cloneswaparg csa;
3369	int error;
3370
3371	ASSERT(clone->ds_owner);
3372	ASSERT(origin_head->ds_owner);
3373retry:
3374	/*
3375	 * Need exclusive access for the swap. If we're swapping these
3376	 * datasets back after an error, we already hold the locks.
3377	 */
3378	if (!RW_WRITE_HELD(&clone->ds_rwlock))
3379		rw_enter(&clone->ds_rwlock, RW_WRITER);
3380	if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3381	    !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3382		rw_exit(&clone->ds_rwlock);
3383		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3384		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3385			rw_exit(&origin_head->ds_rwlock);
3386			goto retry;
3387		}
3388	}
3389	csa.cds = clone;
3390	csa.ohds = origin_head;
3391	csa.force = force;
3392	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3393	    dsl_dataset_clone_swap_check,
3394	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3395	return (error);
3396}
3397
3398/*
3399 * Given a pool name and a dataset object number in that pool,
3400 * return the name of that dataset.
3401 */
3402int
3403dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3404{
3405	spa_t *spa;
3406	dsl_pool_t *dp;
3407	dsl_dataset_t *ds;
3408	int error;
3409
3410	if ((error = spa_open(pname, &spa, FTAG)) != 0)
3411		return (error);
3412	dp = spa_get_dsl(spa);
3413	rw_enter(&dp->dp_config_rwlock, RW_READER);
3414	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3415		dsl_dataset_name(ds, buf);
3416		dsl_dataset_rele(ds, FTAG);
3417	}
3418	rw_exit(&dp->dp_config_rwlock);
3419	spa_close(spa, FTAG);
3420
3421	return (error);
3422}
3423
3424int
3425dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3426    uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3427{
3428	int error = 0;
3429
3430	ASSERT3S(asize, >, 0);
3431
3432	/*
3433	 * *ref_rsrv is the portion of asize that will come from any
3434	 * unconsumed refreservation space.
3435	 */
3436	*ref_rsrv = 0;
3437
3438	mutex_enter(&ds->ds_lock);
3439	/*
3440	 * Make a space adjustment for reserved bytes.
3441	 */
3442	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3443		ASSERT3U(*used, >=,
3444		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3445		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3446		*ref_rsrv =
3447		    asize - MIN(asize, parent_delta(ds, asize + inflight));
3448	}
3449
3450	if (!check_quota || ds->ds_quota == 0) {
3451		mutex_exit(&ds->ds_lock);
3452		return (0);
3453	}
3454	/*
3455	 * If they are requesting more space, and our current estimate
3456	 * is over quota, they get to try again unless the actual
3457	 * on-disk is over quota and there are no pending changes (which
3458	 * may free up space for us).
3459	 */
3460	if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3461		if (inflight > 0 ||
3462		    ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3463			error = ERESTART;
3464		else
3465			error = EDQUOT;
3466	}
3467	mutex_exit(&ds->ds_lock);
3468
3469	return (error);
3470}
3471
3472/* ARGSUSED */
3473static int
3474dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3475{
3476	dsl_dataset_t *ds = arg1;
3477	dsl_prop_setarg_t *psa = arg2;
3478	int err;
3479
3480	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3481		return (ENOTSUP);
3482
3483	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3484		return (err);
3485
3486	if (psa->psa_effective_value == 0)
3487		return (0);
3488
3489	if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3490	    psa->psa_effective_value < ds->ds_reserved)
3491		return (ENOSPC);
3492
3493	return (0);
3494}
3495
3496extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3497
3498void
3499dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3500{
3501	dsl_dataset_t *ds = arg1;
3502	dsl_prop_setarg_t *psa = arg2;
3503	uint64_t effective_value = psa->psa_effective_value;
3504
3505	dsl_prop_set_sync(ds, psa, tx);
3506	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3507
3508	if (ds->ds_quota != effective_value) {
3509		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3510		ds->ds_quota = effective_value;
3511	}
3512}
3513
3514int
3515dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3516{
3517	dsl_dataset_t *ds;
3518	dsl_prop_setarg_t psa;
3519	int err;
3520
3521	dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3522
3523	err = dsl_dataset_hold(dsname, FTAG, &ds);
3524	if (err)
3525		return (err);
3526
3527	/*
3528	 * If someone removes a file, then tries to set the quota, we
3529	 * want to make sure the file freeing takes effect.
3530	 */
3531	txg_wait_open(ds->ds_dir->dd_pool, 0);
3532
3533	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3534	    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3535	    ds, &psa, 0);
3536
3537	dsl_dataset_rele(ds, FTAG);
3538	return (err);
3539}
3540
3541static int
3542dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3543{
3544	dsl_dataset_t *ds = arg1;
3545	dsl_prop_setarg_t *psa = arg2;
3546	uint64_t effective_value;
3547	uint64_t unique;
3548	int err;
3549
3550	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3551	    SPA_VERSION_REFRESERVATION)
3552		return (ENOTSUP);
3553
3554	if (dsl_dataset_is_snapshot(ds))
3555		return (EINVAL);
3556
3557	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3558		return (err);
3559
3560	effective_value = psa->psa_effective_value;
3561
3562	/*
3563	 * If we are doing the preliminary check in open context, the
3564	 * space estimates may be inaccurate.
3565	 */
3566	if (!dmu_tx_is_syncing(tx))
3567		return (0);
3568
3569	mutex_enter(&ds->ds_lock);
3570	if (!DS_UNIQUE_IS_ACCURATE(ds))
3571		dsl_dataset_recalc_head_uniq(ds);
3572	unique = ds->ds_phys->ds_unique_bytes;
3573	mutex_exit(&ds->ds_lock);
3574
3575	if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3576		uint64_t delta = MAX(unique, effective_value) -
3577		    MAX(unique, ds->ds_reserved);
3578
3579		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3580			return (ENOSPC);
3581		if (ds->ds_quota > 0 &&
3582		    effective_value > ds->ds_quota)
3583			return (ENOSPC);
3584	}
3585
3586	return (0);
3587}
3588
3589static void
3590dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3591{
3592	dsl_dataset_t *ds = arg1;
3593	dsl_prop_setarg_t *psa = arg2;
3594	uint64_t effective_value = psa->psa_effective_value;
3595	uint64_t unique;
3596	int64_t delta;
3597
3598	dsl_prop_set_sync(ds, psa, tx);
3599	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3600
3601	dmu_buf_will_dirty(ds->ds_dbuf, tx);
3602
3603	mutex_enter(&ds->ds_dir->dd_lock);
3604	mutex_enter(&ds->ds_lock);
3605	ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3606	unique = ds->ds_phys->ds_unique_bytes;
3607	delta = MAX(0, (int64_t)(effective_value - unique)) -
3608	    MAX(0, (int64_t)(ds->ds_reserved - unique));
3609	ds->ds_reserved = effective_value;
3610	mutex_exit(&ds->ds_lock);
3611
3612	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3613	mutex_exit(&ds->ds_dir->dd_lock);
3614}
3615
3616int
3617dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3618    uint64_t reservation)
3619{
3620	dsl_dataset_t *ds;
3621	dsl_prop_setarg_t psa;
3622	int err;
3623
3624	dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3625	    &reservation);
3626
3627	err = dsl_dataset_hold(dsname, FTAG, &ds);
3628	if (err)
3629		return (err);
3630
3631	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3632	    dsl_dataset_set_reservation_check,
3633	    dsl_dataset_set_reservation_sync, ds, &psa, 0);
3634
3635	dsl_dataset_rele(ds, FTAG);
3636	return (err);
3637}
3638
3639typedef struct zfs_hold_cleanup_arg {
3640	dsl_pool_t *dp;
3641	uint64_t dsobj;
3642	char htag[MAXNAMELEN];
3643} zfs_hold_cleanup_arg_t;
3644
3645static void
3646dsl_dataset_user_release_onexit(void *arg)
3647{
3648	zfs_hold_cleanup_arg_t *ca = arg;
3649
3650	(void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3651	    B_TRUE);
3652	kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3653}
3654
3655void
3656dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3657    minor_t minor)
3658{
3659	zfs_hold_cleanup_arg_t *ca;
3660
3661	ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3662	ca->dp = ds->ds_dir->dd_pool;
3663	ca->dsobj = ds->ds_object;
3664	(void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3665	VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3666	    dsl_dataset_user_release_onexit, ca, NULL));
3667}
3668
3669/*
3670 * If you add new checks here, you may need to add
3671 * additional checks to the "temporary" case in
3672 * snapshot_check() in dmu_objset.c.
3673 */
3674static int
3675dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3676{
3677	dsl_dataset_t *ds = arg1;
3678	struct dsl_ds_holdarg *ha = arg2;
3679	char *htag = ha->htag;
3680	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3681	int error = 0;
3682
3683	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3684		return (ENOTSUP);
3685
3686	if (!dsl_dataset_is_snapshot(ds))
3687		return (EINVAL);
3688
3689	/* tags must be unique */
3690	mutex_enter(&ds->ds_lock);
3691	if (ds->ds_phys->ds_userrefs_obj) {
3692		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3693		    8, 1, tx);
3694		if (error == 0)
3695			error = EEXIST;
3696		else if (error == ENOENT)
3697			error = 0;
3698	}
3699	mutex_exit(&ds->ds_lock);
3700
3701	if (error == 0 && ha->temphold &&
3702	    strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3703		error = E2BIG;
3704
3705	return (error);
3706}
3707
3708void
3709dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3710{
3711	dsl_dataset_t *ds = arg1;
3712	struct dsl_ds_holdarg *ha = arg2;
3713	char *htag = ha->htag;
3714	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3715	objset_t *mos = dp->dp_meta_objset;
3716	uint64_t now = gethrestime_sec();
3717	uint64_t zapobj;
3718
3719	mutex_enter(&ds->ds_lock);
3720	if (ds->ds_phys->ds_userrefs_obj == 0) {
3721		/*
3722		 * This is the first user hold for this dataset.  Create
3723		 * the userrefs zap object.
3724		 */
3725		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3726		zapobj = ds->ds_phys->ds_userrefs_obj =
3727		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3728	} else {
3729		zapobj = ds->ds_phys->ds_userrefs_obj;
3730	}
3731	ds->ds_userrefs++;
3732	mutex_exit(&ds->ds_lock);
3733
3734	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3735
3736	if (ha->temphold) {
3737		VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3738		    htag, &now, tx));
3739	}
3740
3741	spa_history_log_internal(LOG_DS_USER_HOLD,
3742	    dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3743	    (int)ha->temphold, ds->ds_object);
3744}
3745
3746static int
3747dsl_dataset_user_hold_one(const char *dsname, void *arg)
3748{
3749	struct dsl_ds_holdarg *ha = arg;
3750	dsl_dataset_t *ds;
3751	int error;
3752	char *name;
3753
3754	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
3755	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3756	error = dsl_dataset_hold(name, ha->dstg, &ds);
3757	strfree(name);
3758	if (error == 0) {
3759		ha->gotone = B_TRUE;
3760		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3761		    dsl_dataset_user_hold_sync, ds, ha, 0);
3762	} else if (error == ENOENT && ha->recursive) {
3763		error = 0;
3764	} else {
3765		(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3766	}
3767	return (error);
3768}
3769
3770int
3771dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3772    boolean_t temphold)
3773{
3774	struct dsl_ds_holdarg *ha;
3775	int error;
3776
3777	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3778	ha->htag = htag;
3779	ha->temphold = temphold;
3780	error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3781	    dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3782	    ds, ha, 0);
3783	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3784
3785	return (error);
3786}
3787
3788int
3789dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3790    boolean_t recursive, boolean_t temphold, int cleanup_fd)
3791{
3792	struct dsl_ds_holdarg *ha;
3793	dsl_sync_task_t *dst;
3794	spa_t *spa;
3795	int error;
3796	minor_t minor = 0;
3797
3798	if (cleanup_fd != -1) {
3799		/* Currently we only support cleanup-on-exit of tempholds. */
3800		if (!temphold)
3801			return (EINVAL);
3802		error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3803		if (error)
3804			return (error);
3805	}
3806
3807	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3808
3809	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3810
3811	error = spa_open(dsname, &spa, FTAG);
3812	if (error) {
3813		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3814		if (cleanup_fd != -1)
3815			zfs_onexit_fd_rele(cleanup_fd);
3816		return (error);
3817	}
3818
3819	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3820	ha->htag = htag;
3821	ha->snapname = snapname;
3822	ha->recursive = recursive;
3823	ha->temphold = temphold;
3824
3825	if (recursive) {
3826		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3827		    ha, DS_FIND_CHILDREN);
3828	} else {
3829		error = dsl_dataset_user_hold_one(dsname, ha);
3830	}
3831	if (error == 0)
3832		error = dsl_sync_task_group_wait(ha->dstg);
3833
3834	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3835	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3836		dsl_dataset_t *ds = dst->dst_arg1;
3837
3838		if (dst->dst_err) {
3839			dsl_dataset_name(ds, ha->failed);
3840			*strchr(ha->failed, '@') = '\0';
3841		} else if (error == 0 && minor != 0 && temphold) {
3842			/*
3843			 * If this hold is to be released upon process exit,
3844			 * register that action now.
3845			 */
3846			dsl_register_onexit_hold_cleanup(ds, htag, minor);
3847		}
3848		dsl_dataset_rele(ds, ha->dstg);
3849	}
3850
3851	if (error == 0 && recursive && !ha->gotone)
3852		error = ENOENT;
3853
3854	if (error)
3855		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3856
3857	dsl_sync_task_group_destroy(ha->dstg);
3858
3859	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3860	spa_close(spa, FTAG);
3861	if (cleanup_fd != -1)
3862		zfs_onexit_fd_rele(cleanup_fd);
3863	return (error);
3864}
3865
3866struct dsl_ds_releasearg {
3867	dsl_dataset_t *ds;
3868	const char *htag;
3869	boolean_t own;		/* do we own or just hold ds? */
3870};
3871
3872static int
3873dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3874    boolean_t *might_destroy)
3875{
3876	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3877	uint64_t zapobj;
3878	uint64_t tmp;
3879	int error;
3880
3881	*might_destroy = B_FALSE;
3882
3883	mutex_enter(&ds->ds_lock);
3884	zapobj = ds->ds_phys->ds_userrefs_obj;
3885	if (zapobj == 0) {
3886		/* The tag can't possibly exist */
3887		mutex_exit(&ds->ds_lock);
3888		return (ESRCH);
3889	}
3890
3891	/* Make sure the tag exists */
3892	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3893	if (error) {
3894		mutex_exit(&ds->ds_lock);
3895		if (error == ENOENT)
3896			error = ESRCH;
3897		return (error);
3898	}
3899
3900	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3901	    DS_IS_DEFER_DESTROY(ds))
3902		*might_destroy = B_TRUE;
3903
3904	mutex_exit(&ds->ds_lock);
3905	return (0);
3906}
3907
3908static int
3909dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3910{
3911	struct dsl_ds_releasearg *ra = arg1;
3912	dsl_dataset_t *ds = ra->ds;
3913	boolean_t might_destroy;
3914	int error;
3915
3916	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3917		return (ENOTSUP);
3918
3919	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3920	if (error)
3921		return (error);
3922
3923	if (might_destroy) {
3924		struct dsl_ds_destroyarg dsda = {0};
3925
3926		if (dmu_tx_is_syncing(tx)) {
3927			/*
3928			 * If we're not prepared to remove the snapshot,
3929			 * we can't allow the release to happen right now.
3930			 */
3931			if (!ra->own)
3932				return (EBUSY);
3933		}
3934		dsda.ds = ds;
3935		dsda.releasing = B_TRUE;
3936		return (dsl_dataset_destroy_check(&dsda, tag, tx));
3937	}
3938
3939	return (0);
3940}
3941
3942static void
3943dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3944{
3945	struct dsl_ds_releasearg *ra = arg1;
3946	dsl_dataset_t *ds = ra->ds;
3947	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3948	objset_t *mos = dp->dp_meta_objset;
3949	uint64_t zapobj;
3950	uint64_t dsobj = ds->ds_object;
3951	uint64_t refs;
3952	int error;
3953
3954	mutex_enter(&ds->ds_lock);
3955	ds->ds_userrefs--;
3956	refs = ds->ds_userrefs;
3957	mutex_exit(&ds->ds_lock);
3958	error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3959	VERIFY(error == 0 || error == ENOENT);
3960	zapobj = ds->ds_phys->ds_userrefs_obj;
3961	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3962
3963	spa_history_log_internal(LOG_DS_USER_RELEASE,
3964	    dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3965	    ra->htag, (longlong_t)refs, dsobj);
3966
3967	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3968	    DS_IS_DEFER_DESTROY(ds)) {
3969		struct dsl_ds_destroyarg dsda = {0};
3970
3971		ASSERT(ra->own);
3972		dsda.ds = ds;
3973		dsda.releasing = B_TRUE;
3974		/* We already did the destroy_check */
3975		dsl_dataset_destroy_sync(&dsda, tag, tx);
3976	}
3977}
3978
3979static int
3980dsl_dataset_user_release_one(const char *dsname, void *arg)
3981{
3982	struct dsl_ds_holdarg *ha = arg;
3983	struct dsl_ds_releasearg *ra;
3984	dsl_dataset_t *ds;
3985	int error;
3986	void *dtag = ha->dstg;
3987	char *name;
3988	boolean_t own = B_FALSE;
3989	boolean_t might_destroy;
3990
3991	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3992	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3993	error = dsl_dataset_hold(name, dtag, &ds);
3994	strfree(name);
3995	if (error == ENOENT && ha->recursive)
3996		return (0);
3997	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3998	if (error)
3999		return (error);
4000
4001	ha->gotone = B_TRUE;
4002
4003	ASSERT(dsl_dataset_is_snapshot(ds));
4004
4005	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
4006	if (error) {
4007		dsl_dataset_rele(ds, dtag);
4008		return (error);
4009	}
4010
4011	if (might_destroy) {
4012#ifdef _KERNEL
4013		name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4014		error = zfs_unmount_snap(name, NULL);
4015		strfree(name);
4016		if (error) {
4017			dsl_dataset_rele(ds, dtag);
4018			return (error);
4019		}
4020#endif
4021		if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
4022			dsl_dataset_rele(ds, dtag);
4023			return (EBUSY);
4024		} else {
4025			own = B_TRUE;
4026			dsl_dataset_make_exclusive(ds, dtag);
4027		}
4028	}
4029
4030	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
4031	ra->ds = ds;
4032	ra->htag = ha->htag;
4033	ra->own = own;
4034	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
4035	    dsl_dataset_user_release_sync, ra, dtag, 0);
4036
4037	return (0);
4038}
4039
4040int
4041dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
4042    boolean_t recursive)
4043{
4044	struct dsl_ds_holdarg *ha;
4045	dsl_sync_task_t *dst;
4046	spa_t *spa;
4047	int error;
4048
4049top:
4050	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4051
4052	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4053
4054	error = spa_open(dsname, &spa, FTAG);
4055	if (error) {
4056		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4057		return (error);
4058	}
4059
4060	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4061	ha->htag = htag;
4062	ha->snapname = snapname;
4063	ha->recursive = recursive;
4064	if (recursive) {
4065		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4066		    ha, DS_FIND_CHILDREN);
4067	} else {
4068		error = dsl_dataset_user_release_one(dsname, ha);
4069	}
4070	if (error == 0)
4071		error = dsl_sync_task_group_wait(ha->dstg);
4072
4073	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4074	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4075		struct dsl_ds_releasearg *ra = dst->dst_arg1;
4076		dsl_dataset_t *ds = ra->ds;
4077
4078		if (dst->dst_err)
4079			dsl_dataset_name(ds, ha->failed);
4080
4081		if (ra->own)
4082			dsl_dataset_disown(ds, ha->dstg);
4083		else
4084			dsl_dataset_rele(ds, ha->dstg);
4085
4086		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4087	}
4088
4089	if (error == 0 && recursive && !ha->gotone)
4090		error = ENOENT;
4091
4092	if (error && error != EBUSY)
4093		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4094
4095	dsl_sync_task_group_destroy(ha->dstg);
4096	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4097	spa_close(spa, FTAG);
4098
4099	/*
4100	 * We can get EBUSY if we were racing with deferred destroy and
4101	 * dsl_dataset_user_release_check() hadn't done the necessary
4102	 * open context setup.  We can also get EBUSY if we're racing
4103	 * with destroy and that thread is the ds_owner.  Either way
4104	 * the busy condition should be transient, and we should retry
4105	 * the release operation.
4106	 */
4107	if (error == EBUSY)
4108		goto top;
4109
4110	return (error);
4111}
4112
4113/*
4114 * Called at spa_load time (with retry == B_FALSE) to release a stale
4115 * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4116 */
4117int
4118dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4119    boolean_t retry)
4120{
4121	dsl_dataset_t *ds;
4122	char *snap;
4123	char *name;
4124	int namelen;
4125	int error;
4126
4127	do {
4128		rw_enter(&dp->dp_config_rwlock, RW_READER);
4129		error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4130		rw_exit(&dp->dp_config_rwlock);
4131		if (error)
4132			return (error);
4133		namelen = dsl_dataset_namelen(ds)+1;
4134		name = kmem_alloc(namelen, KM_SLEEP);
4135		dsl_dataset_name(ds, name);
4136		dsl_dataset_rele(ds, FTAG);
4137
4138		snap = strchr(name, '@');
4139		*snap = '\0';
4140		++snap;
4141		error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4142		kmem_free(name, namelen);
4143
4144		/*
4145		 * The object can't have been destroyed because we have a hold,
4146		 * but it might have been renamed, resulting in ENOENT.  Retry
4147		 * if we've been requested to do so.
4148		 *
4149		 * It would be nice if we could use the dsobj all the way
4150		 * through and avoid ENOENT entirely.  But we might need to
4151		 * unmount the snapshot, and there's currently no way to lookup
4152		 * a vfsp using a ZFS object id.
4153		 */
4154	} while ((error == ENOENT) && retry);
4155
4156	return (error);
4157}
4158
4159int
4160dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4161{
4162	dsl_dataset_t *ds;
4163	int err;
4164
4165	err = dsl_dataset_hold(dsname, FTAG, &ds);
4166	if (err)
4167		return (err);
4168
4169	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4170	if (ds->ds_phys->ds_userrefs_obj != 0) {
4171		zap_attribute_t *za;
4172		zap_cursor_t zc;
4173
4174		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4175		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4176		    ds->ds_phys->ds_userrefs_obj);
4177		    zap_cursor_retrieve(&zc, za) == 0;
4178		    zap_cursor_advance(&zc)) {
4179			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4180			    za->za_first_integer));
4181		}
4182		zap_cursor_fini(&zc);
4183		kmem_free(za, sizeof (zap_attribute_t));
4184	}
4185	dsl_dataset_rele(ds, FTAG);
4186	return (0);
4187}
4188
4189/*
4190 * Note, this function is used as the callback for dmu_objset_find().  We
4191 * always return 0 so that we will continue to find and process
4192 * inconsistent datasets, even if we encounter an error trying to
4193 * process one of them.
4194 */
4195/* ARGSUSED */
4196int
4197dsl_destroy_inconsistent(const char *dsname, void *arg)
4198{
4199	dsl_dataset_t *ds;
4200
4201	if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4202		if (DS_IS_INCONSISTENT(ds))
4203			(void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4204		else
4205			dsl_dataset_disown(ds, FTAG);
4206	}
4207	return (0);
4208}
4209
4210/*
4211 * Return (in *usedp) the amount of space written in new that is not
4212 * present in oldsnap.  New may be a snapshot or the head.  Old must be
4213 * a snapshot before new, in new's filesystem (or its origin).  If not then
4214 * fail and return EINVAL.
4215 *
4216 * The written space is calculated by considering two components:  First, we
4217 * ignore any freed space, and calculate the written as new's used space
4218 * minus old's used space.  Next, we add in the amount of space that was freed
4219 * between the two snapshots, thus reducing new's used space relative to old's.
4220 * Specifically, this is the space that was born before old->ds_creation_txg,
4221 * and freed before new (ie. on new's deadlist or a previous deadlist).
4222 *
4223 * space freed                         [---------------------]
4224 * snapshots                       ---O-------O--------O-------O------
4225 *                                         oldsnap            new
4226 */
4227int
4228dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4229    uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4230{
4231	int err = 0;
4232	uint64_t snapobj;
4233	dsl_pool_t *dp = new->ds_dir->dd_pool;
4234
4235	*usedp = 0;
4236	*usedp += new->ds_phys->ds_referenced_bytes;
4237	*usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4238
4239	*compp = 0;
4240	*compp += new->ds_phys->ds_compressed_bytes;
4241	*compp -= oldsnap->ds_phys->ds_compressed_bytes;
4242
4243	*uncompp = 0;
4244	*uncompp += new->ds_phys->ds_uncompressed_bytes;
4245	*uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4246
4247	rw_enter(&dp->dp_config_rwlock, RW_READER);
4248	snapobj = new->ds_object;
4249	while (snapobj != oldsnap->ds_object) {
4250		dsl_dataset_t *snap;
4251		uint64_t used, comp, uncomp;
4252
4253		if (snapobj == new->ds_object) {
4254			snap = new;
4255		} else {
4256			err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4257			if (err != 0)
4258				break;
4259		}
4260
4261		if (snap->ds_phys->ds_prev_snap_txg ==
4262		    oldsnap->ds_phys->ds_creation_txg) {
4263			/*
4264			 * The blocks in the deadlist can not be born after
4265			 * ds_prev_snap_txg, so get the whole deadlist space,
4266			 * which is more efficient (especially for old-format
4267			 * deadlists).  Unfortunately the deadlist code
4268			 * doesn't have enough information to make this
4269			 * optimization itself.
4270			 */
4271			dsl_deadlist_space(&snap->ds_deadlist,
4272			    &used, &comp, &uncomp);
4273		} else {
4274			dsl_deadlist_space_range(&snap->ds_deadlist,
4275			    0, oldsnap->ds_phys->ds_creation_txg,
4276			    &used, &comp, &uncomp);
4277		}
4278		*usedp += used;
4279		*compp += comp;
4280		*uncompp += uncomp;
4281
4282		/*
4283		 * If we get to the beginning of the chain of snapshots
4284		 * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4285		 * was not a snapshot of/before new.
4286		 */
4287		snapobj = snap->ds_phys->ds_prev_snap_obj;
4288		if (snap != new)
4289			dsl_dataset_rele(snap, FTAG);
4290		if (snapobj == 0) {
4291			err = EINVAL;
4292			break;
4293		}
4294
4295	}
4296	rw_exit(&dp->dp_config_rwlock);
4297	return (err);
4298}
4299
4300/*
4301 * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4302 * lastsnap, and all snapshots in between are deleted.
4303 *
4304 * blocks that would be freed            [---------------------------]
4305 * snapshots                       ---O-------O--------O-------O--------O
4306 *                                        firstsnap        lastsnap
4307 *
4308 * This is the set of blocks that were born after the snap before firstsnap,
4309 * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4310 * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4311 * We calculate this by iterating over the relevant deadlists (from the snap
4312 * after lastsnap, backward to the snap after firstsnap), summing up the
4313 * space on the deadlist that was born after the snap before firstsnap.
4314 */
4315int
4316dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4317    dsl_dataset_t *lastsnap,
4318    uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4319{
4320	int err = 0;
4321	uint64_t snapobj;
4322	dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4323
4324	ASSERT(dsl_dataset_is_snapshot(firstsnap));
4325	ASSERT(dsl_dataset_is_snapshot(lastsnap));
4326
4327	/*
4328	 * Check that the snapshots are in the same dsl_dir, and firstsnap
4329	 * is before lastsnap.
4330	 */
4331	if (firstsnap->ds_dir != lastsnap->ds_dir ||
4332	    firstsnap->ds_phys->ds_creation_txg >
4333	    lastsnap->ds_phys->ds_creation_txg)
4334		return (EINVAL);
4335
4336	*usedp = *compp = *uncompp = 0;
4337
4338	rw_enter(&dp->dp_config_rwlock, RW_READER);
4339	snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4340	while (snapobj != firstsnap->ds_object) {
4341		dsl_dataset_t *ds;
4342		uint64_t used, comp, uncomp;
4343
4344		err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4345		if (err != 0)
4346			break;
4347
4348		dsl_deadlist_space_range(&ds->ds_deadlist,
4349		    firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4350		    &used, &comp, &uncomp);
4351		*usedp += used;
4352		*compp += comp;
4353		*uncompp += uncomp;
4354
4355		snapobj = ds->ds_phys->ds_prev_snap_obj;
4356		ASSERT3U(snapobj, !=, 0);
4357		dsl_dataset_rele(ds, FTAG);
4358	}
4359	rw_exit(&dp->dp_config_rwlock);
4360	return (err);
4361}
4362