1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23 * Use is subject to license terms.
24 */
25
26#include <sys/dmu_objset.h>
27#include <sys/dsl_dataset.h>
28#include <sys/dsl_dir.h>
29#include <sys/dsl_prop.h>
30#include <sys/dsl_synctask.h>
31#include <sys/dmu_traverse.h>
32#include <sys/dmu_tx.h>
33#include <sys/arc.h>
34#include <sys/zio.h>
35#include <sys/zap.h>
36#include <sys/unique.h>
37#include <sys/zfs_context.h>
38#include <sys/zfs_ioctl.h>
39#include <sys/spa.h>
40#include <sys/zfs_znode.h>
41#include <sys/zvol.h>
42
43static char *dsl_reaper = "the grim reaper";
44
45static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
46static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
47static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
48
49#define	DS_REF_MAX	(1ULL << 62)
50
51#define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
52
53#define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
54
55
56/*
57 * Figure out how much of this delta should be propogated to the dsl_dir
58 * layer.  If there's a refreservation, that space has already been
59 * partially accounted for in our ancestors.
60 */
61static int64_t
62parent_delta(dsl_dataset_t *ds, int64_t delta)
63{
64	uint64_t old_bytes, new_bytes;
65
66	if (ds->ds_reserved == 0)
67		return (delta);
68
69	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
70	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
71
72	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
73	return (new_bytes - old_bytes);
74}
75
76void
77dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
78{
79	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
80	int compressed = BP_GET_PSIZE(bp);
81	int uncompressed = BP_GET_UCSIZE(bp);
82	int64_t delta;
83
84	dprintf_bp(bp, "born, ds=%p\n", ds);
85
86	ASSERT(dmu_tx_is_syncing(tx));
87	/* It could have been compressed away to nothing */
88	if (BP_IS_HOLE(bp))
89		return;
90	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
91	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
92	if (ds == NULL) {
93		/*
94		 * Account for the meta-objset space in its placeholder
95		 * dsl_dir.
96		 */
97		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
98		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
99		    used, compressed, uncompressed, tx);
100		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
101		return;
102	}
103	dmu_buf_will_dirty(ds->ds_dbuf, tx);
104	mutex_enter(&ds->ds_dir->dd_lock);
105	mutex_enter(&ds->ds_lock);
106	delta = parent_delta(ds, used);
107	ds->ds_phys->ds_used_bytes += used;
108	ds->ds_phys->ds_compressed_bytes += compressed;
109	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
110	ds->ds_phys->ds_unique_bytes += used;
111	mutex_exit(&ds->ds_lock);
112	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
113	    compressed, uncompressed, tx);
114	dsl_dir_transfer_space(ds->ds_dir, used - delta,
115	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
116	mutex_exit(&ds->ds_dir->dd_lock);
117}
118
119int
120dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
121    boolean_t async)
122{
123	if (BP_IS_HOLE(bp))
124		return (0);
125
126	ASSERT(dmu_tx_is_syncing(tx));
127	ASSERT(bp->blk_birth <= tx->tx_txg);
128
129	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
130	int compressed = BP_GET_PSIZE(bp);
131	int uncompressed = BP_GET_UCSIZE(bp);
132
133	ASSERT(used > 0);
134	if (ds == NULL) {
135		/*
136		 * Account for the meta-objset space in its placeholder
137		 * dataset.
138		 */
139		dsl_free(tx->tx_pool, tx->tx_txg, bp);
140
141		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
142		    -used, -compressed, -uncompressed, tx);
143		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
144		return (used);
145	}
146	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
147
148	ASSERT(!dsl_dataset_is_snapshot(ds));
149	dmu_buf_will_dirty(ds->ds_dbuf, tx);
150
151	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
152		int64_t delta;
153
154		dprintf_bp(bp, "freeing: %s", "");
155		dsl_free(tx->tx_pool, tx->tx_txg, bp);
156
157		mutex_enter(&ds->ds_dir->dd_lock);
158		mutex_enter(&ds->ds_lock);
159		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
160		    !DS_UNIQUE_IS_ACCURATE(ds));
161		delta = parent_delta(ds, -used);
162		ds->ds_phys->ds_unique_bytes -= used;
163		mutex_exit(&ds->ds_lock);
164		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
165		    delta, -compressed, -uncompressed, tx);
166		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
167		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
168		mutex_exit(&ds->ds_dir->dd_lock);
169	} else {
170		dprintf_bp(bp, "putting on dead list: %s", "");
171		if (async) {
172			/*
173			 * We are here as part of zio's write done callback,
174			 * which means we're a zio interrupt thread.  We can't
175			 * call bplist_enqueue() now because it may block
176			 * waiting for I/O.  Instead, put bp on the deferred
177			 * queue and let dsl_pool_sync() finish the job.
178			 */
179			bplist_enqueue_deferred(&ds->ds_deadlist, bp);
180		} else {
181			VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
182		}
183		ASSERT3U(ds->ds_prev->ds_object, ==,
184		    ds->ds_phys->ds_prev_snap_obj);
185		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
186		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
187		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
188		    ds->ds_object && bp->blk_birth >
189		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
190			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
191			mutex_enter(&ds->ds_prev->ds_lock);
192			ds->ds_prev->ds_phys->ds_unique_bytes += used;
193			mutex_exit(&ds->ds_prev->ds_lock);
194		}
195		if (bp->blk_birth > ds->ds_origin_txg) {
196			dsl_dir_transfer_space(ds->ds_dir, used,
197			    DD_USED_HEAD, DD_USED_SNAP, tx);
198		}
199	}
200	mutex_enter(&ds->ds_lock);
201	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
202	ds->ds_phys->ds_used_bytes -= used;
203	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
204	ds->ds_phys->ds_compressed_bytes -= compressed;
205	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
206	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
207	mutex_exit(&ds->ds_lock);
208
209	return (used);
210}
211
212uint64_t
213dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
214{
215	uint64_t trysnap = 0;
216
217	if (ds == NULL)
218		return (0);
219	/*
220	 * The snapshot creation could fail, but that would cause an
221	 * incorrect FALSE return, which would only result in an
222	 * overestimation of the amount of space that an operation would
223	 * consume, which is OK.
224	 *
225	 * There's also a small window where we could miss a pending
226	 * snapshot, because we could set the sync task in the quiescing
227	 * phase.  So this should only be used as a guess.
228	 */
229	if (ds->ds_trysnap_txg >
230	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
231		trysnap = ds->ds_trysnap_txg;
232	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
233}
234
235boolean_t
236dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
237{
238	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
239}
240
241/* ARGSUSED */
242static void
243dsl_dataset_evict(dmu_buf_t *db, void *dsv)
244{
245	dsl_dataset_t *ds = dsv;
246
247	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
248
249	unique_remove(ds->ds_fsid_guid);
250
251	if (ds->ds_objset != NULL)
252		dmu_objset_evict(ds->ds_objset);
253
254	if (ds->ds_prev) {
255		dsl_dataset_drop_ref(ds->ds_prev, ds);
256		ds->ds_prev = NULL;
257	}
258
259	bplist_close(&ds->ds_deadlist);
260	if (ds->ds_dir)
261		dsl_dir_close(ds->ds_dir, ds);
262
263	ASSERT(!list_link_active(&ds->ds_synced_link));
264
265	mutex_destroy(&ds->ds_lock);
266	mutex_destroy(&ds->ds_recvlock);
267	mutex_destroy(&ds->ds_opening_lock);
268	rw_destroy(&ds->ds_rwlock);
269	cv_destroy(&ds->ds_exclusive_cv);
270	bplist_fini(&ds->ds_deadlist);
271
272	kmem_free(ds, sizeof (dsl_dataset_t));
273}
274
275static int
276dsl_dataset_get_snapname(dsl_dataset_t *ds)
277{
278	dsl_dataset_phys_t *headphys;
279	int err;
280	dmu_buf_t *headdbuf;
281	dsl_pool_t *dp = ds->ds_dir->dd_pool;
282	objset_t *mos = dp->dp_meta_objset;
283
284	if (ds->ds_snapname[0])
285		return (0);
286	if (ds->ds_phys->ds_next_snap_obj == 0)
287		return (0);
288
289	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
290	    FTAG, &headdbuf);
291	if (err)
292		return (err);
293	headphys = headdbuf->db_data;
294	err = zap_value_search(dp->dp_meta_objset,
295	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
296	dmu_buf_rele(headdbuf, FTAG);
297	return (err);
298}
299
300static int
301dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
302{
303	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
304	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
305	matchtype_t mt;
306	int err;
307
308	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
309		mt = MT_FIRST;
310	else
311		mt = MT_EXACT;
312
313	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
314	    value, mt, NULL, 0, NULL);
315	if (err == ENOTSUP && mt == MT_FIRST)
316		err = zap_lookup(mos, snapobj, name, 8, 1, value);
317	return (err);
318}
319
320static int
321dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
322{
323	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
324	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
325	matchtype_t mt;
326	int err;
327
328	dsl_dir_snap_cmtime_update(ds->ds_dir);
329
330	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
331		mt = MT_FIRST;
332	else
333		mt = MT_EXACT;
334
335	err = zap_remove_norm(mos, snapobj, name, mt, tx);
336	if (err == ENOTSUP && mt == MT_FIRST)
337		err = zap_remove(mos, snapobj, name, tx);
338	return (err);
339}
340
341static int
342dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
343    dsl_dataset_t **dsp)
344{
345	objset_t *mos = dp->dp_meta_objset;
346	dmu_buf_t *dbuf;
347	dsl_dataset_t *ds;
348	int err;
349
350	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
351	    dsl_pool_sync_context(dp));
352
353	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
354	if (err)
355		return (err);
356	ds = dmu_buf_get_user(dbuf);
357	if (ds == NULL) {
358		dsl_dataset_t *winner;
359
360		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
361		ds->ds_dbuf = dbuf;
362		ds->ds_object = dsobj;
363		ds->ds_phys = dbuf->db_data;
364
365		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
366		mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
367		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
368		rw_init(&ds->ds_rwlock, 0, 0, 0);
369		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
370		bplist_init(&ds->ds_deadlist);
371
372		err = bplist_open(&ds->ds_deadlist,
373		    mos, ds->ds_phys->ds_deadlist_obj);
374		if (err == 0) {
375			err = dsl_dir_open_obj(dp,
376			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
377		}
378		if (err) {
379			/*
380			 * we don't really need to close the blist if we
381			 * just opened it.
382			 */
383			mutex_destroy(&ds->ds_lock);
384			mutex_destroy(&ds->ds_recvlock);
385			mutex_destroy(&ds->ds_opening_lock);
386			rw_destroy(&ds->ds_rwlock);
387			cv_destroy(&ds->ds_exclusive_cv);
388			bplist_fini(&ds->ds_deadlist);
389			kmem_free(ds, sizeof (dsl_dataset_t));
390			dmu_buf_rele(dbuf, tag);
391			return (err);
392		}
393
394		if (!dsl_dataset_is_snapshot(ds)) {
395			ds->ds_snapname[0] = '\0';
396			if (ds->ds_phys->ds_prev_snap_obj) {
397				err = dsl_dataset_get_ref(dp,
398				    ds->ds_phys->ds_prev_snap_obj,
399				    ds, &ds->ds_prev);
400			}
401
402			if (err == 0 && dsl_dir_is_clone(ds->ds_dir)) {
403				dsl_dataset_t *origin;
404
405				err = dsl_dataset_hold_obj(dp,
406				    ds->ds_dir->dd_phys->dd_origin_obj,
407				    FTAG, &origin);
408				if (err == 0) {
409					ds->ds_origin_txg =
410					    origin->ds_phys->ds_creation_txg;
411					dsl_dataset_rele(origin, FTAG);
412				}
413			}
414		} else {
415			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
416				err = dsl_dataset_get_snapname(ds);
417			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
418				err = zap_count(
419				    ds->ds_dir->dd_pool->dp_meta_objset,
420				    ds->ds_phys->ds_userrefs_obj,
421				    &ds->ds_userrefs);
422			}
423		}
424
425		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
426			/*
427			 * In sync context, we're called with either no lock
428			 * or with the write lock.  If we're not syncing,
429			 * we're always called with the read lock held.
430			 */
431			boolean_t need_lock =
432			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
433			    dsl_pool_sync_context(dp);
434
435			if (need_lock)
436				rw_enter(&dp->dp_config_rwlock, RW_READER);
437
438			err = dsl_prop_get_ds(ds,
439			    "refreservation", sizeof (uint64_t), 1,
440			    &ds->ds_reserved, NULL);
441			if (err == 0) {
442				err = dsl_prop_get_ds(ds,
443				    "refquota", sizeof (uint64_t), 1,
444				    &ds->ds_quota, NULL);
445			}
446
447			if (need_lock)
448				rw_exit(&dp->dp_config_rwlock);
449		} else {
450			ds->ds_reserved = ds->ds_quota = 0;
451		}
452
453		if (err == 0) {
454			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
455			    dsl_dataset_evict);
456		}
457		if (err || winner) {
458			bplist_close(&ds->ds_deadlist);
459			if (ds->ds_prev)
460				dsl_dataset_drop_ref(ds->ds_prev, ds);
461			dsl_dir_close(ds->ds_dir, ds);
462			mutex_destroy(&ds->ds_lock);
463			mutex_destroy(&ds->ds_recvlock);
464			mutex_destroy(&ds->ds_opening_lock);
465			rw_destroy(&ds->ds_rwlock);
466			cv_destroy(&ds->ds_exclusive_cv);
467			bplist_fini(&ds->ds_deadlist);
468			kmem_free(ds, sizeof (dsl_dataset_t));
469			if (err) {
470				dmu_buf_rele(dbuf, tag);
471				return (err);
472			}
473			ds = winner;
474		} else {
475			ds->ds_fsid_guid =
476			    unique_insert(ds->ds_phys->ds_fsid_guid);
477		}
478	}
479	ASSERT3P(ds->ds_dbuf, ==, dbuf);
480	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
481	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
482	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
483	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
484	mutex_enter(&ds->ds_lock);
485	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
486		mutex_exit(&ds->ds_lock);
487		dmu_buf_rele(ds->ds_dbuf, tag);
488		return (ENOENT);
489	}
490	mutex_exit(&ds->ds_lock);
491	*dsp = ds;
492	return (0);
493}
494
495static int
496dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
497{
498	dsl_pool_t *dp = ds->ds_dir->dd_pool;
499
500	/*
501	 * In syncing context we don't want the rwlock lock: there
502	 * may be an existing writer waiting for sync phase to
503	 * finish.  We don't need to worry about such writers, since
504	 * sync phase is single-threaded, so the writer can't be
505	 * doing anything while we are active.
506	 */
507	if (dsl_pool_sync_context(dp)) {
508		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
509		return (0);
510	}
511
512	/*
513	 * Normal users will hold the ds_rwlock as a READER until they
514	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
515	 * drop their READER lock after they set the ds_owner field.
516	 *
517	 * If the dataset is being destroyed, the destroy thread will
518	 * obtain a WRITER lock for exclusive access after it's done its
519	 * open-context work and then change the ds_owner to
520	 * dsl_reaper once destruction is assured.  So threads
521	 * may block here temporarily, until the "destructability" of
522	 * the dataset is determined.
523	 */
524	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
525	mutex_enter(&ds->ds_lock);
526	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
527		rw_exit(&dp->dp_config_rwlock);
528		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
529		if (DSL_DATASET_IS_DESTROYED(ds)) {
530			mutex_exit(&ds->ds_lock);
531			dsl_dataset_drop_ref(ds, tag);
532			rw_enter(&dp->dp_config_rwlock, RW_READER);
533			return (ENOENT);
534		}
535		/*
536		 * The dp_config_rwlock lives above the ds_lock. And
537		 * we need to check DSL_DATASET_IS_DESTROYED() while
538		 * holding the ds_lock, so we have to drop and reacquire
539		 * the ds_lock here.
540		 */
541		mutex_exit(&ds->ds_lock);
542		rw_enter(&dp->dp_config_rwlock, RW_READER);
543		mutex_enter(&ds->ds_lock);
544	}
545	mutex_exit(&ds->ds_lock);
546	return (0);
547}
548
549int
550dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
551    dsl_dataset_t **dsp)
552{
553	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
554
555	if (err)
556		return (err);
557	return (dsl_dataset_hold_ref(*dsp, tag));
558}
559
560int
561dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
562    void *tag, dsl_dataset_t **dsp)
563{
564	int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
565	if (err)
566		return (err);
567	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
568		dsl_dataset_rele(*dsp, tag);
569		*dsp = NULL;
570		return (EBUSY);
571	}
572	return (0);
573}
574
575int
576dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
577{
578	dsl_dir_t *dd;
579	dsl_pool_t *dp;
580	const char *snapname;
581	uint64_t obj;
582	int err = 0;
583
584	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
585	if (err)
586		return (err);
587
588	dp = dd->dd_pool;
589	obj = dd->dd_phys->dd_head_dataset_obj;
590	rw_enter(&dp->dp_config_rwlock, RW_READER);
591	if (obj)
592		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
593	else
594		err = ENOENT;
595	if (err)
596		goto out;
597
598	err = dsl_dataset_hold_ref(*dsp, tag);
599
600	/* we may be looking for a snapshot */
601	if (err == 0 && snapname != NULL) {
602		dsl_dataset_t *ds = NULL;
603
604		if (*snapname++ != '@') {
605			dsl_dataset_rele(*dsp, tag);
606			err = ENOENT;
607			goto out;
608		}
609
610		dprintf("looking for snapshot '%s'\n", snapname);
611		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
612		if (err == 0)
613			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
614		dsl_dataset_rele(*dsp, tag);
615
616		ASSERT3U((err == 0), ==, (ds != NULL));
617
618		if (ds) {
619			mutex_enter(&ds->ds_lock);
620			if (ds->ds_snapname[0] == 0)
621				(void) strlcpy(ds->ds_snapname, snapname,
622				    sizeof (ds->ds_snapname));
623			mutex_exit(&ds->ds_lock);
624			err = dsl_dataset_hold_ref(ds, tag);
625			*dsp = err ? NULL : ds;
626		}
627	}
628out:
629	rw_exit(&dp->dp_config_rwlock);
630	dsl_dir_close(dd, FTAG);
631	return (err);
632}
633
634int
635dsl_dataset_own(const char *name, boolean_t inconsistentok,
636    void *tag, dsl_dataset_t **dsp)
637{
638	int err = dsl_dataset_hold(name, tag, dsp);
639	if (err)
640		return (err);
641	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
642		dsl_dataset_rele(*dsp, tag);
643		return (EBUSY);
644	}
645	return (0);
646}
647
648void
649dsl_dataset_name(dsl_dataset_t *ds, char *name)
650{
651	if (ds == NULL) {
652		(void) strcpy(name, "mos");
653	} else {
654		dsl_dir_name(ds->ds_dir, name);
655		VERIFY(0 == dsl_dataset_get_snapname(ds));
656		if (ds->ds_snapname[0]) {
657			(void) strcat(name, "@");
658			/*
659			 * We use a "recursive" mutex so that we
660			 * can call dprintf_ds() with ds_lock held.
661			 */
662			if (!MUTEX_HELD(&ds->ds_lock)) {
663				mutex_enter(&ds->ds_lock);
664				(void) strcat(name, ds->ds_snapname);
665				mutex_exit(&ds->ds_lock);
666			} else {
667				(void) strcat(name, ds->ds_snapname);
668			}
669		}
670	}
671}
672
673static int
674dsl_dataset_namelen(dsl_dataset_t *ds)
675{
676	int result;
677
678	if (ds == NULL) {
679		result = 3;	/* "mos" */
680	} else {
681		result = dsl_dir_namelen(ds->ds_dir);
682		VERIFY(0 == dsl_dataset_get_snapname(ds));
683		if (ds->ds_snapname[0]) {
684			++result;	/* adding one for the @-sign */
685			if (!MUTEX_HELD(&ds->ds_lock)) {
686				mutex_enter(&ds->ds_lock);
687				result += strlen(ds->ds_snapname);
688				mutex_exit(&ds->ds_lock);
689			} else {
690				result += strlen(ds->ds_snapname);
691			}
692		}
693	}
694
695	return (result);
696}
697
698void
699dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
700{
701	dmu_buf_rele(ds->ds_dbuf, tag);
702}
703
704void
705dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
706{
707	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
708		rw_exit(&ds->ds_rwlock);
709	}
710	dsl_dataset_drop_ref(ds, tag);
711}
712
713void
714dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
715{
716	ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
717	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
718
719	mutex_enter(&ds->ds_lock);
720	ds->ds_owner = NULL;
721	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
722		rw_exit(&ds->ds_rwlock);
723		cv_broadcast(&ds->ds_exclusive_cv);
724	}
725	mutex_exit(&ds->ds_lock);
726	if (ds->ds_dbuf)
727		dsl_dataset_drop_ref(ds, tag);
728	else
729		dsl_dataset_evict(ds->ds_dbuf, ds);
730}
731
732boolean_t
733dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
734{
735	boolean_t gotit = FALSE;
736
737	mutex_enter(&ds->ds_lock);
738	if (ds->ds_owner == NULL &&
739	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
740		ds->ds_owner = tag;
741		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
742			rw_exit(&ds->ds_rwlock);
743		gotit = TRUE;
744	}
745	mutex_exit(&ds->ds_lock);
746	return (gotit);
747}
748
749void
750dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
751{
752	ASSERT3P(owner, ==, ds->ds_owner);
753	if (!RW_WRITE_HELD(&ds->ds_rwlock))
754		rw_enter(&ds->ds_rwlock, RW_WRITER);
755}
756
757uint64_t
758dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
759    uint64_t flags, dmu_tx_t *tx)
760{
761	dsl_pool_t *dp = dd->dd_pool;
762	dmu_buf_t *dbuf;
763	dsl_dataset_phys_t *dsphys;
764	uint64_t dsobj;
765	objset_t *mos = dp->dp_meta_objset;
766
767	if (origin == NULL)
768		origin = dp->dp_origin_snap;
769
770	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
771	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
772	ASSERT(dmu_tx_is_syncing(tx));
773	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
774
775	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
776	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
777	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
778	dmu_buf_will_dirty(dbuf, tx);
779	dsphys = dbuf->db_data;
780	bzero(dsphys, sizeof (dsl_dataset_phys_t));
781	dsphys->ds_dir_obj = dd->dd_object;
782	dsphys->ds_flags = flags;
783	dsphys->ds_fsid_guid = unique_create();
784	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
785	    sizeof (dsphys->ds_guid));
786	dsphys->ds_snapnames_zapobj =
787	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
788	    DMU_OT_NONE, 0, tx);
789	dsphys->ds_creation_time = gethrestime_sec();
790	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
791	dsphys->ds_deadlist_obj =
792	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
793
794	if (origin) {
795		dsphys->ds_prev_snap_obj = origin->ds_object;
796		dsphys->ds_prev_snap_txg =
797		    origin->ds_phys->ds_creation_txg;
798		dsphys->ds_used_bytes =
799		    origin->ds_phys->ds_used_bytes;
800		dsphys->ds_compressed_bytes =
801		    origin->ds_phys->ds_compressed_bytes;
802		dsphys->ds_uncompressed_bytes =
803		    origin->ds_phys->ds_uncompressed_bytes;
804		dsphys->ds_bp = origin->ds_phys->ds_bp;
805		dsphys->ds_flags |= origin->ds_phys->ds_flags;
806
807		dmu_buf_will_dirty(origin->ds_dbuf, tx);
808		origin->ds_phys->ds_num_children++;
809
810		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
811			if (origin->ds_phys->ds_next_clones_obj == 0) {
812				origin->ds_phys->ds_next_clones_obj =
813				    zap_create(mos,
814				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
815			}
816			VERIFY(0 == zap_add_int(mos,
817			    origin->ds_phys->ds_next_clones_obj,
818			    dsobj, tx));
819		}
820
821		dmu_buf_will_dirty(dd->dd_dbuf, tx);
822		dd->dd_phys->dd_origin_obj = origin->ds_object;
823	}
824
825	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
826		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
827
828	dmu_buf_rele(dbuf, FTAG);
829
830	dmu_buf_will_dirty(dd->dd_dbuf, tx);
831	dd->dd_phys->dd_head_dataset_obj = dsobj;
832
833	return (dsobj);
834}
835
836uint64_t
837dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
838    dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
839{
840	dsl_pool_t *dp = pdd->dd_pool;
841	uint64_t dsobj, ddobj;
842	dsl_dir_t *dd;
843
844	ASSERT(lastname[0] != '@');
845
846	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
847	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
848
849	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
850
851	dsl_deleg_set_create_perms(dd, tx, cr);
852
853	dsl_dir_close(dd, FTAG);
854
855	return (dsobj);
856}
857
858struct destroyarg {
859	dsl_sync_task_group_t *dstg;
860	char *snapname;
861	char *failed;
862	boolean_t defer;
863};
864
865static int
866dsl_snapshot_destroy_one(const char *name, void *arg)
867{
868	struct destroyarg *da = arg;
869	dsl_dataset_t *ds;
870	int err;
871	char *dsname;
872
873	dsname = kmem_asprintf("%s@%s", name, da->snapname);
874	err = dsl_dataset_own(dsname, B_TRUE, da->dstg, &ds);
875	strfree(dsname);
876	if (err == 0) {
877		struct dsl_ds_destroyarg *dsda;
878
879		dsl_dataset_make_exclusive(ds, da->dstg);
880		if (ds->ds_objset != NULL) {
881			dmu_objset_evict(ds->ds_objset);
882			ds->ds_objset = NULL;
883		}
884		dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
885		dsda->ds = ds;
886		dsda->defer = da->defer;
887		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
888		    dsl_dataset_destroy_sync, dsda, da->dstg, 0);
889	} else if (err == ENOENT) {
890		err = 0;
891	} else {
892		(void) strcpy(da->failed, name);
893	}
894	return (err);
895}
896
897/*
898 * Destroy 'snapname' in all descendants of 'fsname'.
899 */
900#pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
901int
902dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
903{
904	int err;
905	struct destroyarg da;
906	dsl_sync_task_t *dst;
907	spa_t *spa;
908
909	err = spa_open(fsname, &spa, FTAG);
910	if (err)
911		return (err);
912	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
913	da.snapname = snapname;
914	da.failed = fsname;
915	da.defer = defer;
916
917	err = dmu_objset_find(fsname,
918	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
919
920	if (err == 0)
921		err = dsl_sync_task_group_wait(da.dstg);
922
923	for (dst = list_head(&da.dstg->dstg_tasks); dst;
924	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
925		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
926		dsl_dataset_t *ds = dsda->ds;
927
928		/*
929		 * Return the file system name that triggered the error
930		 */
931		if (dst->dst_err) {
932			dsl_dataset_name(ds, fsname);
933			*strchr(fsname, '@') = '\0';
934		}
935		ASSERT3P(dsda->rm_origin, ==, NULL);
936		dsl_dataset_disown(ds, da.dstg);
937		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
938	}
939
940	dsl_sync_task_group_destroy(da.dstg);
941	spa_close(spa, FTAG);
942	return (err);
943}
944
945static boolean_t
946dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
947{
948	boolean_t might_destroy = B_FALSE;
949
950	mutex_enter(&ds->ds_lock);
951	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
952	    DS_IS_DEFER_DESTROY(ds))
953		might_destroy = B_TRUE;
954	mutex_exit(&ds->ds_lock);
955
956	return (might_destroy);
957}
958
959/*
960 * If we're removing a clone, and these three conditions are true:
961 *	1) the clone's origin has no other children
962 *	2) the clone's origin has no user references
963 *	3) the clone's origin has been marked for deferred destruction
964 * Then, prepare to remove the origin as part of this sync task group.
965 */
966static int
967dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
968{
969	dsl_dataset_t *ds = dsda->ds;
970	dsl_dataset_t *origin = ds->ds_prev;
971
972	if (dsl_dataset_might_destroy_origin(origin)) {
973		char *name;
974		int namelen;
975		int error;
976
977		namelen = dsl_dataset_namelen(origin) + 1;
978		name = kmem_alloc(namelen, KM_SLEEP);
979		dsl_dataset_name(origin, name);
980#ifdef _KERNEL
981		error = zfs_unmount_snap(name, NULL);
982		if (error) {
983			kmem_free(name, namelen);
984			return (error);
985		}
986#endif
987		error = dsl_dataset_own(name, B_TRUE, tag, &origin);
988		kmem_free(name, namelen);
989		if (error)
990			return (error);
991		dsda->rm_origin = origin;
992		dsl_dataset_make_exclusive(origin, tag);
993
994		if (origin->ds_objset != NULL) {
995			dmu_objset_evict(origin->ds_objset);
996			origin->ds_objset = NULL;
997		}
998	}
999
1000	return (0);
1001}
1002
1003/*
1004 * ds must be opened as OWNER.  On return (whether successful or not),
1005 * ds will be closed and caller can no longer dereference it.
1006 */
1007int
1008dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1009{
1010	int err;
1011	dsl_sync_task_group_t *dstg;
1012	objset_t *os;
1013	dsl_dir_t *dd;
1014	uint64_t obj;
1015	struct dsl_ds_destroyarg dsda = { 0 };
1016	dsl_dataset_t dummy_ds = { 0 };
1017
1018	dsda.ds = ds;
1019
1020	if (dsl_dataset_is_snapshot(ds)) {
1021		/* Destroying a snapshot is simpler */
1022		dsl_dataset_make_exclusive(ds, tag);
1023
1024		if (ds->ds_objset != NULL) {
1025			dmu_objset_evict(ds->ds_objset);
1026			ds->ds_objset = NULL;
1027		}
1028		dsda.defer = defer;
1029		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1030		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1031		    &dsda, tag, 0);
1032		ASSERT3P(dsda.rm_origin, ==, NULL);
1033		goto out;
1034	} else if (defer) {
1035		err = EINVAL;
1036		goto out;
1037	}
1038
1039	dd = ds->ds_dir;
1040	dummy_ds.ds_dir = dd;
1041	dummy_ds.ds_object = ds->ds_object;
1042
1043	/*
1044	 * Check for errors and mark this ds as inconsistent, in
1045	 * case we crash while freeing the objects.
1046	 */
1047	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1048	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1049	if (err)
1050		goto out;
1051
1052	err = dmu_objset_from_ds(ds, &os);
1053	if (err)
1054		goto out;
1055
1056	/*
1057	 * remove the objects in open context, so that we won't
1058	 * have too much to do in syncing context.
1059	 */
1060	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1061	    ds->ds_phys->ds_prev_snap_txg)) {
1062		/*
1063		 * Ignore errors, if there is not enough disk space
1064		 * we will deal with it in dsl_dataset_destroy_sync().
1065		 */
1066		(void) dmu_free_object(os, obj);
1067	}
1068
1069	/*
1070	 * We need to sync out all in-flight IO before we try to evict
1071	 * (the dataset evict func is trying to clear the cached entries
1072	 * for this dataset in the ARC).
1073	 */
1074	txg_wait_synced(dd->dd_pool, 0);
1075
1076	/*
1077	 * If we managed to free all the objects in open
1078	 * context, the user space accounting should be zero.
1079	 */
1080	if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1081	    dmu_objset_userused_enabled(os)) {
1082		uint64_t count;
1083
1084		ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1085		    count == 0);
1086		ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1087		    count == 0);
1088	}
1089
1090	if (err != ESRCH)
1091		goto out;
1092
1093	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1094	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1095	rw_exit(&dd->dd_pool->dp_config_rwlock);
1096
1097	if (err)
1098		goto out;
1099
1100	if (ds->ds_objset) {
1101		/*
1102		 * We need to sync out all in-flight IO before we try
1103		 * to evict (the dataset evict func is trying to clear
1104		 * the cached entries for this dataset in the ARC).
1105		 */
1106		txg_wait_synced(dd->dd_pool, 0);
1107	}
1108
1109	/*
1110	 * Blow away the dsl_dir + head dataset.
1111	 */
1112	dsl_dataset_make_exclusive(ds, tag);
1113	if (ds->ds_objset) {
1114		dmu_objset_evict(ds->ds_objset);
1115		ds->ds_objset = NULL;
1116	}
1117
1118	/*
1119	 * If we're removing a clone, we might also need to remove its
1120	 * origin.
1121	 */
1122	do {
1123		dsda.need_prep = B_FALSE;
1124		if (dsl_dir_is_clone(dd)) {
1125			err = dsl_dataset_origin_rm_prep(&dsda, tag);
1126			if (err) {
1127				dsl_dir_close(dd, FTAG);
1128				goto out;
1129			}
1130		}
1131
1132		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1133		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1134		    dsl_dataset_destroy_sync, &dsda, tag, 0);
1135		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1136		    dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1137		err = dsl_sync_task_group_wait(dstg);
1138		dsl_sync_task_group_destroy(dstg);
1139
1140		/*
1141		 * We could be racing against 'zfs release' or 'zfs destroy -d'
1142		 * on the origin snap, in which case we can get EBUSY if we
1143		 * needed to destroy the origin snap but were not ready to
1144		 * do so.
1145		 */
1146		if (dsda.need_prep) {
1147			ASSERT(err == EBUSY);
1148			ASSERT(dsl_dir_is_clone(dd));
1149			ASSERT(dsda.rm_origin == NULL);
1150		}
1151	} while (dsda.need_prep);
1152
1153	if (dsda.rm_origin != NULL)
1154		dsl_dataset_disown(dsda.rm_origin, tag);
1155
1156	/* if it is successful, dsl_dir_destroy_sync will close the dd */
1157	if (err)
1158		dsl_dir_close(dd, FTAG);
1159out:
1160	dsl_dataset_disown(ds, tag);
1161	return (err);
1162}
1163
1164blkptr_t *
1165dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1166{
1167	return (&ds->ds_phys->ds_bp);
1168}
1169
1170void
1171dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1172{
1173	ASSERT(dmu_tx_is_syncing(tx));
1174	/* If it's the meta-objset, set dp_meta_rootbp */
1175	if (ds == NULL) {
1176		tx->tx_pool->dp_meta_rootbp = *bp;
1177	} else {
1178		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1179		ds->ds_phys->ds_bp = *bp;
1180	}
1181}
1182
1183spa_t *
1184dsl_dataset_get_spa(dsl_dataset_t *ds)
1185{
1186	return (ds->ds_dir->dd_pool->dp_spa);
1187}
1188
1189void
1190dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1191{
1192	dsl_pool_t *dp;
1193
1194	if (ds == NULL) /* this is the meta-objset */
1195		return;
1196
1197	ASSERT(ds->ds_objset != NULL);
1198
1199	if (ds->ds_phys->ds_next_snap_obj != 0)
1200		panic("dirtying snapshot!");
1201
1202	dp = ds->ds_dir->dd_pool;
1203
1204	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1205		/* up the hold count until we can be written out */
1206		dmu_buf_add_ref(ds->ds_dbuf, ds);
1207	}
1208}
1209
1210/*
1211 * The unique space in the head dataset can be calculated by subtracting
1212 * the space used in the most recent snapshot, that is still being used
1213 * in this file system, from the space currently in use.  To figure out
1214 * the space in the most recent snapshot still in use, we need to take
1215 * the total space used in the snapshot and subtract out the space that
1216 * has been freed up since the snapshot was taken.
1217 */
1218static void
1219dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1220{
1221	uint64_t mrs_used;
1222	uint64_t dlused, dlcomp, dluncomp;
1223
1224	ASSERT(ds->ds_object == ds->ds_dir->dd_phys->dd_head_dataset_obj);
1225
1226	if (ds->ds_phys->ds_prev_snap_obj != 0)
1227		mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
1228	else
1229		mrs_used = 0;
1230
1231	VERIFY(0 == bplist_space(&ds->ds_deadlist, &dlused, &dlcomp,
1232	    &dluncomp));
1233
1234	ASSERT3U(dlused, <=, mrs_used);
1235	ds->ds_phys->ds_unique_bytes =
1236	    ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
1237
1238	if (!DS_UNIQUE_IS_ACCURATE(ds) &&
1239	    spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1240	    SPA_VERSION_UNIQUE_ACCURATE)
1241		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1242}
1243
1244static uint64_t
1245dsl_dataset_unique(dsl_dataset_t *ds)
1246{
1247	if (!DS_UNIQUE_IS_ACCURATE(ds) && !dsl_dataset_is_snapshot(ds))
1248		dsl_dataset_recalc_head_uniq(ds);
1249
1250	return (ds->ds_phys->ds_unique_bytes);
1251}
1252
1253struct killarg {
1254	dsl_dataset_t *ds;
1255	dmu_tx_t *tx;
1256};
1257
1258/* ARGSUSED */
1259static int
1260kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1261    const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1262{
1263	struct killarg *ka = arg;
1264	dmu_tx_t *tx = ka->tx;
1265
1266	if (bp == NULL)
1267		return (0);
1268
1269	if (zb->zb_level == ZB_ZIL_LEVEL) {
1270		ASSERT(zilog != NULL);
1271		/*
1272		 * It's a block in the intent log.  It has no
1273		 * accounting, so just free it.
1274		 */
1275		dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1276	} else {
1277		ASSERT(zilog == NULL);
1278		ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1279		(void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1280	}
1281
1282	return (0);
1283}
1284
1285/* ARGSUSED */
1286static int
1287dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1288{
1289	dsl_dataset_t *ds = arg1;
1290	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1291	uint64_t count;
1292	int err;
1293
1294	/*
1295	 * Can't delete a head dataset if there are snapshots of it.
1296	 * (Except if the only snapshots are from the branch we cloned
1297	 * from.)
1298	 */
1299	if (ds->ds_prev != NULL &&
1300	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1301		return (EBUSY);
1302
1303	/*
1304	 * This is really a dsl_dir thing, but check it here so that
1305	 * we'll be less likely to leave this dataset inconsistent &
1306	 * nearly destroyed.
1307	 */
1308	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1309	if (err)
1310		return (err);
1311	if (count != 0)
1312		return (EEXIST);
1313
1314	return (0);
1315}
1316
1317/* ARGSUSED */
1318static void
1319dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1320{
1321	dsl_dataset_t *ds = arg1;
1322	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1323
1324	/* Mark it as inconsistent on-disk, in case we crash */
1325	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1326	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1327
1328	spa_history_internal_log(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1329	    cr, "dataset = %llu", ds->ds_object);
1330}
1331
1332static int
1333dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1334    dmu_tx_t *tx)
1335{
1336	dsl_dataset_t *ds = dsda->ds;
1337	dsl_dataset_t *ds_prev = ds->ds_prev;
1338
1339	if (dsl_dataset_might_destroy_origin(ds_prev)) {
1340		struct dsl_ds_destroyarg ndsda = {0};
1341
1342		/*
1343		 * If we're not prepared to remove the origin, don't remove
1344		 * the clone either.
1345		 */
1346		if (dsda->rm_origin == NULL) {
1347			dsda->need_prep = B_TRUE;
1348			return (EBUSY);
1349		}
1350
1351		ndsda.ds = ds_prev;
1352		ndsda.is_origin_rm = B_TRUE;
1353		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1354	}
1355
1356	/*
1357	 * If we're not going to remove the origin after all,
1358	 * undo the open context setup.
1359	 */
1360	if (dsda->rm_origin != NULL) {
1361		dsl_dataset_disown(dsda->rm_origin, tag);
1362		dsda->rm_origin = NULL;
1363	}
1364
1365	return (0);
1366}
1367
1368/* ARGSUSED */
1369int
1370dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1371{
1372	struct dsl_ds_destroyarg *dsda = arg1;
1373	dsl_dataset_t *ds = dsda->ds;
1374
1375	/* we have an owner hold, so noone else can destroy us */
1376	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1377
1378	/*
1379	 * Only allow deferred destroy on pools that support it.
1380	 * NOTE: deferred destroy is only supported on snapshots.
1381	 */
1382	if (dsda->defer) {
1383		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1384		    SPA_VERSION_USERREFS)
1385			return (ENOTSUP);
1386		ASSERT(dsl_dataset_is_snapshot(ds));
1387		return (0);
1388	}
1389
1390	/*
1391	 * Can't delete a head dataset if there are snapshots of it.
1392	 * (Except if the only snapshots are from the branch we cloned
1393	 * from.)
1394	 */
1395	if (ds->ds_prev != NULL &&
1396	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1397		return (EBUSY);
1398
1399	/*
1400	 * If we made changes this txg, traverse_dsl_dataset won't find
1401	 * them.  Try again.
1402	 */
1403	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1404		return (EAGAIN);
1405
1406	if (dsl_dataset_is_snapshot(ds)) {
1407		/*
1408		 * If this snapshot has an elevated user reference count,
1409		 * we can't destroy it yet.
1410		 */
1411		if (ds->ds_userrefs > 0 && !dsda->releasing)
1412			return (EBUSY);
1413
1414		mutex_enter(&ds->ds_lock);
1415		/*
1416		 * Can't delete a branch point. However, if we're destroying
1417		 * a clone and removing its origin due to it having a user
1418		 * hold count of 0 and having been marked for deferred destroy,
1419		 * it's OK for the origin to have a single clone.
1420		 */
1421		if (ds->ds_phys->ds_num_children >
1422		    (dsda->is_origin_rm ? 2 : 1)) {
1423			mutex_exit(&ds->ds_lock);
1424			return (EEXIST);
1425		}
1426		mutex_exit(&ds->ds_lock);
1427	} else if (dsl_dir_is_clone(ds->ds_dir)) {
1428		return (dsl_dataset_origin_check(dsda, arg2, tx));
1429	}
1430
1431	/* XXX we should do some i/o error checking... */
1432	return (0);
1433}
1434
1435struct refsarg {
1436	kmutex_t lock;
1437	boolean_t gone;
1438	kcondvar_t cv;
1439};
1440
1441/* ARGSUSED */
1442static void
1443dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1444{
1445	struct refsarg *arg = argv;
1446
1447	mutex_enter(&arg->lock);
1448	arg->gone = TRUE;
1449	cv_signal(&arg->cv);
1450	mutex_exit(&arg->lock);
1451}
1452
1453static void
1454dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1455{
1456	struct refsarg arg;
1457
1458	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1459	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1460	arg.gone = FALSE;
1461	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1462	    dsl_dataset_refs_gone);
1463	dmu_buf_rele(ds->ds_dbuf, tag);
1464	mutex_enter(&arg.lock);
1465	while (!arg.gone)
1466		cv_wait(&arg.cv, &arg.lock);
1467	ASSERT(arg.gone);
1468	mutex_exit(&arg.lock);
1469	ds->ds_dbuf = NULL;
1470	ds->ds_phys = NULL;
1471	mutex_destroy(&arg.lock);
1472	cv_destroy(&arg.cv);
1473}
1474
1475static void
1476remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1477{
1478	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1479	uint64_t count;
1480	int err;
1481
1482	ASSERT(ds->ds_phys->ds_num_children >= 2);
1483	err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1484	/*
1485	 * The err should not be ENOENT, but a bug in a previous version
1486	 * of the code could cause upgrade_clones_cb() to not set
1487	 * ds_next_snap_obj when it should, leading to a missing entry.
1488	 * If we knew that the pool was created after
1489	 * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1490	 * ENOENT.  However, at least we can check that we don't have
1491	 * too many entries in the next_clones_obj even after failing to
1492	 * remove this one.
1493	 */
1494	if (err != ENOENT) {
1495		VERIFY3U(err, ==, 0);
1496	}
1497	ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1498	    &count));
1499	ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1500}
1501
1502void
1503dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
1504{
1505	struct dsl_ds_destroyarg *dsda = arg1;
1506	dsl_dataset_t *ds = dsda->ds;
1507	int err;
1508	int after_branch_point = FALSE;
1509	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1510	objset_t *mos = dp->dp_meta_objset;
1511	dsl_dataset_t *ds_prev = NULL;
1512	uint64_t obj;
1513
1514	ASSERT(ds->ds_owner);
1515	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1516	ASSERT(ds->ds_prev == NULL ||
1517	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1518	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1519
1520	if (dsda->defer) {
1521		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1522		if (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1) {
1523			dmu_buf_will_dirty(ds->ds_dbuf, tx);
1524			ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1525			return;
1526		}
1527	}
1528
1529	/* signal any waiters that this dataset is going away */
1530	mutex_enter(&ds->ds_lock);
1531	ds->ds_owner = dsl_reaper;
1532	cv_broadcast(&ds->ds_exclusive_cv);
1533	mutex_exit(&ds->ds_lock);
1534
1535	/* Remove our reservation */
1536	if (ds->ds_reserved != 0) {
1537		dsl_prop_setarg_t psa;
1538		uint64_t value = 0;
1539
1540		dsl_prop_setarg_init_uint64(&psa, "refreservation",
1541		    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1542		    &value);
1543		psa.psa_effective_value = 0;	/* predict default value */
1544
1545		dsl_dataset_set_reservation_sync(ds, &psa, cr, tx);
1546		ASSERT3U(ds->ds_reserved, ==, 0);
1547	}
1548
1549	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1550
1551	dsl_pool_ds_destroyed(ds, tx);
1552
1553	obj = ds->ds_object;
1554
1555	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1556		if (ds->ds_prev) {
1557			ds_prev = ds->ds_prev;
1558		} else {
1559			VERIFY(0 == dsl_dataset_hold_obj(dp,
1560			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1561		}
1562		after_branch_point =
1563		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1564
1565		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1566		if (after_branch_point &&
1567		    ds_prev->ds_phys->ds_next_clones_obj != 0) {
1568			remove_from_next_clones(ds_prev, obj, tx);
1569			if (ds->ds_phys->ds_next_snap_obj != 0) {
1570				VERIFY(0 == zap_add_int(mos,
1571				    ds_prev->ds_phys->ds_next_clones_obj,
1572				    ds->ds_phys->ds_next_snap_obj, tx));
1573			}
1574		}
1575		if (after_branch_point &&
1576		    ds->ds_phys->ds_next_snap_obj == 0) {
1577			/* This clone is toast. */
1578			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1579			ds_prev->ds_phys->ds_num_children--;
1580
1581			/*
1582			 * If the clone's origin has no other clones, no
1583			 * user holds, and has been marked for deferred
1584			 * deletion, then we should have done the necessary
1585			 * destroy setup for it.
1586			 */
1587			if (ds_prev->ds_phys->ds_num_children == 1 &&
1588			    ds_prev->ds_userrefs == 0 &&
1589			    DS_IS_DEFER_DESTROY(ds_prev)) {
1590				ASSERT3P(dsda->rm_origin, !=, NULL);
1591			} else {
1592				ASSERT3P(dsda->rm_origin, ==, NULL);
1593			}
1594		} else if (!after_branch_point) {
1595			ds_prev->ds_phys->ds_next_snap_obj =
1596			    ds->ds_phys->ds_next_snap_obj;
1597		}
1598	}
1599
1600	if (ds->ds_phys->ds_next_snap_obj != 0) {
1601		blkptr_t bp;
1602		dsl_dataset_t *ds_next;
1603		uint64_t itor = 0;
1604		uint64_t old_unique;
1605		int64_t used = 0, compressed = 0, uncompressed = 0;
1606
1607		VERIFY(0 == dsl_dataset_hold_obj(dp,
1608		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1609		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1610
1611		old_unique = dsl_dataset_unique(ds_next);
1612
1613		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1614		ds_next->ds_phys->ds_prev_snap_obj =
1615		    ds->ds_phys->ds_prev_snap_obj;
1616		ds_next->ds_phys->ds_prev_snap_txg =
1617		    ds->ds_phys->ds_prev_snap_txg;
1618		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1619		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1620
1621		/*
1622		 * Transfer to our deadlist (which will become next's
1623		 * new deadlist) any entries from next's current
1624		 * deadlist which were born before prev, and free the
1625		 * other entries.
1626		 *
1627		 * XXX we're doing this long task with the config lock held
1628		 */
1629		while (bplist_iterate(&ds_next->ds_deadlist, &itor, &bp) == 0) {
1630			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1631				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1632				    &bp, tx));
1633				if (ds_prev && !after_branch_point &&
1634				    bp.blk_birth >
1635				    ds_prev->ds_phys->ds_prev_snap_txg) {
1636					ds_prev->ds_phys->ds_unique_bytes +=
1637					    bp_get_dsize_sync(dp->dp_spa, &bp);
1638				}
1639			} else {
1640				used += bp_get_dsize_sync(dp->dp_spa, &bp);
1641				compressed += BP_GET_PSIZE(&bp);
1642				uncompressed += BP_GET_UCSIZE(&bp);
1643				dsl_free(dp, tx->tx_txg, &bp);
1644			}
1645		}
1646
1647		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1648
1649		/* change snapused */
1650		dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1651		    -used, -compressed, -uncompressed, tx);
1652
1653		/* free next's deadlist */
1654		bplist_close(&ds_next->ds_deadlist);
1655		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1656
1657		/* set next's deadlist to our deadlist */
1658		bplist_close(&ds->ds_deadlist);
1659		ds_next->ds_phys->ds_deadlist_obj =
1660		    ds->ds_phys->ds_deadlist_obj;
1661		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1662		    ds_next->ds_phys->ds_deadlist_obj));
1663		ds->ds_phys->ds_deadlist_obj = 0;
1664
1665		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1666			/*
1667			 * Update next's unique to include blocks which
1668			 * were previously shared by only this snapshot
1669			 * and it.  Those blocks will be born after the
1670			 * prev snap and before this snap, and will have
1671			 * died after the next snap and before the one
1672			 * after that (ie. be on the snap after next's
1673			 * deadlist).
1674			 *
1675			 * XXX we're doing this long task with the
1676			 * config lock held
1677			 */
1678			dsl_dataset_t *ds_after_next;
1679			uint64_t space;
1680
1681			VERIFY(0 == dsl_dataset_hold_obj(dp,
1682			    ds_next->ds_phys->ds_next_snap_obj,
1683			    FTAG, &ds_after_next));
1684
1685			VERIFY(0 ==
1686			    bplist_space_birthrange(&ds_after_next->ds_deadlist,
1687			    ds->ds_phys->ds_prev_snap_txg,
1688			    ds->ds_phys->ds_creation_txg, &space));
1689			ds_next->ds_phys->ds_unique_bytes += space;
1690
1691			dsl_dataset_rele(ds_after_next, FTAG);
1692			ASSERT3P(ds_next->ds_prev, ==, NULL);
1693		} else {
1694			ASSERT3P(ds_next->ds_prev, ==, ds);
1695			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1696			ds_next->ds_prev = NULL;
1697			if (ds_prev) {
1698				VERIFY(0 == dsl_dataset_get_ref(dp,
1699				    ds->ds_phys->ds_prev_snap_obj,
1700				    ds_next, &ds_next->ds_prev));
1701			}
1702
1703			dsl_dataset_recalc_head_uniq(ds_next);
1704
1705			/*
1706			 * Reduce the amount of our unconsmed refreservation
1707			 * being charged to our parent by the amount of
1708			 * new unique data we have gained.
1709			 */
1710			if (old_unique < ds_next->ds_reserved) {
1711				int64_t mrsdelta;
1712				uint64_t new_unique =
1713				    ds_next->ds_phys->ds_unique_bytes;
1714
1715				ASSERT(old_unique <= new_unique);
1716				mrsdelta = MIN(new_unique - old_unique,
1717				    ds_next->ds_reserved - old_unique);
1718				dsl_dir_diduse_space(ds->ds_dir,
1719				    DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1720			}
1721		}
1722		dsl_dataset_rele(ds_next, FTAG);
1723	} else {
1724		/*
1725		 * There's no next snapshot, so this is a head dataset.
1726		 * Destroy the deadlist.  Unless it's a clone, the
1727		 * deadlist should be empty.  (If it's a clone, it's
1728		 * safe to ignore the deadlist contents.)
1729		 */
1730		struct killarg ka;
1731
1732		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1733		bplist_close(&ds->ds_deadlist);
1734		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1735		ds->ds_phys->ds_deadlist_obj = 0;
1736
1737		/*
1738		 * Free everything that we point to (that's born after
1739		 * the previous snapshot, if we are a clone)
1740		 *
1741		 * NB: this should be very quick, because we already
1742		 * freed all the objects in open context.
1743		 */
1744		ka.ds = ds;
1745		ka.tx = tx;
1746		err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1747		    TRAVERSE_POST, kill_blkptr, &ka);
1748		ASSERT3U(err, ==, 0);
1749		ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1750		    ds->ds_phys->ds_unique_bytes == 0);
1751
1752		if (ds->ds_prev != NULL) {
1753			dsl_dataset_rele(ds->ds_prev, ds);
1754			ds->ds_prev = ds_prev = NULL;
1755		}
1756	}
1757
1758	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1759		/* Erase the link in the dir */
1760		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1761		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1762		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1763		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1764		ASSERT(err == 0);
1765	} else {
1766		/* remove from snapshot namespace */
1767		dsl_dataset_t *ds_head;
1768		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1769		VERIFY(0 == dsl_dataset_hold_obj(dp,
1770		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1771		VERIFY(0 == dsl_dataset_get_snapname(ds));
1772#ifdef ZFS_DEBUG
1773		{
1774			uint64_t val;
1775
1776			err = dsl_dataset_snap_lookup(ds_head,
1777			    ds->ds_snapname, &val);
1778			ASSERT3U(err, ==, 0);
1779			ASSERT3U(val, ==, obj);
1780		}
1781#endif
1782		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1783		ASSERT(err == 0);
1784		dsl_dataset_rele(ds_head, FTAG);
1785	}
1786
1787	if (ds_prev && ds->ds_prev != ds_prev)
1788		dsl_dataset_rele(ds_prev, FTAG);
1789
1790	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1791	spa_history_internal_log(LOG_DS_DESTROY, dp->dp_spa, tx,
1792	    cr, "dataset = %llu", ds->ds_object);
1793
1794	if (ds->ds_phys->ds_next_clones_obj != 0) {
1795		uint64_t count;
1796		ASSERT(0 == zap_count(mos,
1797		    ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1798		VERIFY(0 == dmu_object_free(mos,
1799		    ds->ds_phys->ds_next_clones_obj, tx));
1800	}
1801	if (ds->ds_phys->ds_props_obj != 0)
1802		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1803	if (ds->ds_phys->ds_userrefs_obj != 0)
1804		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1805	dsl_dir_close(ds->ds_dir, ds);
1806	ds->ds_dir = NULL;
1807	dsl_dataset_drain_refs(ds, tag);
1808	VERIFY(0 == dmu_object_free(mos, obj, tx));
1809
1810	if (dsda->rm_origin) {
1811		/*
1812		 * Remove the origin of the clone we just destroyed.
1813		 */
1814		struct dsl_ds_destroyarg ndsda = {0};
1815
1816		ndsda.ds = dsda->rm_origin;
1817		dsl_dataset_destroy_sync(&ndsda, tag, cr, tx);
1818	}
1819}
1820
1821static int
1822dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1823{
1824	uint64_t asize;
1825
1826	if (!dmu_tx_is_syncing(tx))
1827		return (0);
1828
1829	/*
1830	 * If there's an fs-only reservation, any blocks that might become
1831	 * owned by the snapshot dataset must be accommodated by space
1832	 * outside of the reservation.
1833	 */
1834	asize = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
1835	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, FALSE))
1836		return (ENOSPC);
1837
1838	/*
1839	 * Propogate any reserved space for this snapshot to other
1840	 * snapshot checks in this sync group.
1841	 */
1842	if (asize > 0)
1843		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
1844
1845	return (0);
1846}
1847
1848/* ARGSUSED */
1849int
1850dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
1851{
1852	dsl_dataset_t *ds = arg1;
1853	const char *snapname = arg2;
1854	int err;
1855	uint64_t value;
1856
1857	/*
1858	 * We don't allow multiple snapshots of the same txg.  If there
1859	 * is already one, try again.
1860	 */
1861	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
1862		return (EAGAIN);
1863
1864	/*
1865	 * Check for conflicting name snapshot name.
1866	 */
1867	err = dsl_dataset_snap_lookup(ds, snapname, &value);
1868	if (err == 0)
1869		return (EEXIST);
1870	if (err != ENOENT)
1871		return (err);
1872
1873	/*
1874	 * Check that the dataset's name is not too long.  Name consists
1875	 * of the dataset's length + 1 for the @-sign + snapshot name's length
1876	 */
1877	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
1878		return (ENAMETOOLONG);
1879
1880	err = dsl_dataset_snapshot_reserve_space(ds, tx);
1881	if (err)
1882		return (err);
1883
1884	ds->ds_trysnap_txg = tx->tx_txg;
1885	return (0);
1886}
1887
1888void
1889dsl_dataset_snapshot_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1890{
1891	dsl_dataset_t *ds = arg1;
1892	const char *snapname = arg2;
1893	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1894	dmu_buf_t *dbuf;
1895	dsl_dataset_phys_t *dsphys;
1896	uint64_t dsobj, crtxg;
1897	objset_t *mos = dp->dp_meta_objset;
1898	int err;
1899
1900	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1901
1902	/*
1903	 * The origin's ds_creation_txg has to be < TXG_INITIAL
1904	 */
1905	if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
1906		crtxg = 1;
1907	else
1908		crtxg = tx->tx_txg;
1909
1910	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1911	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1912	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1913	dmu_buf_will_dirty(dbuf, tx);
1914	dsphys = dbuf->db_data;
1915	bzero(dsphys, sizeof (dsl_dataset_phys_t));
1916	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
1917	dsphys->ds_fsid_guid = unique_create();
1918	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1919	    sizeof (dsphys->ds_guid));
1920	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1921	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1922	dsphys->ds_next_snap_obj = ds->ds_object;
1923	dsphys->ds_num_children = 1;
1924	dsphys->ds_creation_time = gethrestime_sec();
1925	dsphys->ds_creation_txg = crtxg;
1926	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1927	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1928	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1929	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1930	dsphys->ds_flags = ds->ds_phys->ds_flags;
1931	dsphys->ds_bp = ds->ds_phys->ds_bp;
1932	dmu_buf_rele(dbuf, FTAG);
1933
1934	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
1935	if (ds->ds_prev) {
1936		uint64_t next_clones_obj =
1937		    ds->ds_prev->ds_phys->ds_next_clones_obj;
1938		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
1939		    ds->ds_object ||
1940		    ds->ds_prev->ds_phys->ds_num_children > 1);
1941		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1942			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
1943			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1944			    ds->ds_prev->ds_phys->ds_creation_txg);
1945			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1946		} else if (next_clones_obj != 0) {
1947			remove_from_next_clones(ds->ds_prev,
1948			    dsphys->ds_next_snap_obj, tx);
1949			VERIFY3U(0, ==, zap_add_int(mos,
1950			    next_clones_obj, dsobj, tx));
1951		}
1952	}
1953
1954	/*
1955	 * If we have a reference-reservation on this dataset, we will
1956	 * need to increase the amount of refreservation being charged
1957	 * since our unique space is going to zero.
1958	 */
1959	if (ds->ds_reserved) {
1960		int64_t add = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
1961		dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
1962		    add, 0, 0, tx);
1963	}
1964
1965	bplist_close(&ds->ds_deadlist);
1966	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1967	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
1968	ds->ds_phys->ds_prev_snap_obj = dsobj;
1969	ds->ds_phys->ds_prev_snap_txg = crtxg;
1970	ds->ds_phys->ds_unique_bytes = 0;
1971	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
1972		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1973	ds->ds_phys->ds_deadlist_obj =
1974	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1975	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1976	    ds->ds_phys->ds_deadlist_obj));
1977
1978	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1979	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1980	    snapname, 8, 1, &dsobj, tx);
1981	ASSERT(err == 0);
1982
1983	if (ds->ds_prev)
1984		dsl_dataset_drop_ref(ds->ds_prev, ds);
1985	VERIFY(0 == dsl_dataset_get_ref(dp,
1986	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
1987
1988	dsl_pool_ds_snapshotted(ds, tx);
1989
1990	dsl_dir_snap_cmtime_update(ds->ds_dir);
1991
1992	spa_history_internal_log(LOG_DS_SNAPSHOT, dp->dp_spa, tx, cr,
1993	    "dataset = %llu", dsobj);
1994}
1995
1996void
1997dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
1998{
1999	ASSERT(dmu_tx_is_syncing(tx));
2000	ASSERT(ds->ds_objset != NULL);
2001	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2002
2003	/*
2004	 * in case we had to change ds_fsid_guid when we opened it,
2005	 * sync it out now.
2006	 */
2007	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2008	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2009
2010	dsl_dir_dirty(ds->ds_dir, tx);
2011	dmu_objset_sync(ds->ds_objset, zio, tx);
2012}
2013
2014void
2015dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2016{
2017	uint64_t refd, avail, uobjs, aobjs;
2018
2019	dsl_dir_stats(ds->ds_dir, nv);
2020
2021	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2022	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2023	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2024
2025	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2026	    ds->ds_phys->ds_creation_time);
2027	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2028	    ds->ds_phys->ds_creation_txg);
2029	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2030	    ds->ds_quota);
2031	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2032	    ds->ds_reserved);
2033	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2034	    ds->ds_phys->ds_guid);
2035	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2036	    dsl_dataset_unique(ds));
2037	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2038	    ds->ds_object);
2039	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2040	    ds->ds_userrefs);
2041	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2042	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2043
2044	if (ds->ds_phys->ds_next_snap_obj) {
2045		/*
2046		 * This is a snapshot; override the dd's space used with
2047		 * our unique space and compression ratio.
2048		 */
2049		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2050		    ds->ds_phys->ds_unique_bytes);
2051		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO,
2052		    ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2053		    (ds->ds_phys->ds_uncompressed_bytes * 100 /
2054		    ds->ds_phys->ds_compressed_bytes));
2055	}
2056}
2057
2058void
2059dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2060{
2061	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2062	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2063	stat->dds_guid = ds->ds_phys->ds_guid;
2064	if (ds->ds_phys->ds_next_snap_obj) {
2065		stat->dds_is_snapshot = B_TRUE;
2066		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2067	} else {
2068		stat->dds_is_snapshot = B_FALSE;
2069		stat->dds_num_clones = 0;
2070	}
2071
2072	/* clone origin is really a dsl_dir thing... */
2073	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2074	if (dsl_dir_is_clone(ds->ds_dir)) {
2075		dsl_dataset_t *ods;
2076
2077		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2078		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2079		dsl_dataset_name(ods, stat->dds_origin);
2080		dsl_dataset_drop_ref(ods, FTAG);
2081	} else {
2082		stat->dds_origin[0] = '\0';
2083	}
2084	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2085}
2086
2087uint64_t
2088dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2089{
2090	return (ds->ds_fsid_guid);
2091}
2092
2093void
2094dsl_dataset_space(dsl_dataset_t *ds,
2095    uint64_t *refdbytesp, uint64_t *availbytesp,
2096    uint64_t *usedobjsp, uint64_t *availobjsp)
2097{
2098	*refdbytesp = ds->ds_phys->ds_used_bytes;
2099	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2100	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2101		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2102	if (ds->ds_quota != 0) {
2103		/*
2104		 * Adjust available bytes according to refquota
2105		 */
2106		if (*refdbytesp < ds->ds_quota)
2107			*availbytesp = MIN(*availbytesp,
2108			    ds->ds_quota - *refdbytesp);
2109		else
2110			*availbytesp = 0;
2111	}
2112	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2113	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
2114}
2115
2116boolean_t
2117dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2118{
2119	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2120
2121	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2122	    dsl_pool_sync_context(dp));
2123	if (ds->ds_prev == NULL)
2124		return (B_FALSE);
2125	if (ds->ds_phys->ds_bp.blk_birth >
2126	    ds->ds_prev->ds_phys->ds_creation_txg)
2127		return (B_TRUE);
2128	return (B_FALSE);
2129}
2130
2131/* ARGSUSED */
2132static int
2133dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2134{
2135	dsl_dataset_t *ds = arg1;
2136	char *newsnapname = arg2;
2137	dsl_dir_t *dd = ds->ds_dir;
2138	dsl_dataset_t *hds;
2139	uint64_t val;
2140	int err;
2141
2142	err = dsl_dataset_hold_obj(dd->dd_pool,
2143	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2144	if (err)
2145		return (err);
2146
2147	/* new name better not be in use */
2148	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2149	dsl_dataset_rele(hds, FTAG);
2150
2151	if (err == 0)
2152		err = EEXIST;
2153	else if (err == ENOENT)
2154		err = 0;
2155
2156	/* dataset name + 1 for the "@" + the new snapshot name must fit */
2157	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2158		err = ENAMETOOLONG;
2159
2160	return (err);
2161}
2162
2163static void
2164dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2,
2165    cred_t *cr, dmu_tx_t *tx)
2166{
2167	dsl_dataset_t *ds = arg1;
2168	const char *newsnapname = arg2;
2169	dsl_dir_t *dd = ds->ds_dir;
2170	objset_t *mos = dd->dd_pool->dp_meta_objset;
2171	dsl_dataset_t *hds;
2172	int err;
2173
2174	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2175
2176	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2177	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2178
2179	VERIFY(0 == dsl_dataset_get_snapname(ds));
2180	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2181	ASSERT3U(err, ==, 0);
2182	mutex_enter(&ds->ds_lock);
2183	(void) strcpy(ds->ds_snapname, newsnapname);
2184	mutex_exit(&ds->ds_lock);
2185	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2186	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2187	ASSERT3U(err, ==, 0);
2188
2189	spa_history_internal_log(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2190	    cr, "dataset = %llu", ds->ds_object);
2191	dsl_dataset_rele(hds, FTAG);
2192}
2193
2194struct renamesnaparg {
2195	dsl_sync_task_group_t *dstg;
2196	char failed[MAXPATHLEN];
2197	char *oldsnap;
2198	char *newsnap;
2199};
2200
2201static int
2202dsl_snapshot_rename_one(const char *name, void *arg)
2203{
2204	struct renamesnaparg *ra = arg;
2205	dsl_dataset_t *ds = NULL;
2206	char *snapname;
2207	int err;
2208
2209	snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2210	(void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2211
2212	/*
2213	 * For recursive snapshot renames the parent won't be changing
2214	 * so we just pass name for both the to/from argument.
2215	 */
2216	err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2217	if (err != 0) {
2218		strfree(snapname);
2219		return (err == ENOENT ? 0 : err);
2220	}
2221
2222#ifdef _KERNEL
2223	/*
2224	 * For all filesystems undergoing rename, we'll need to unmount it.
2225	 */
2226	(void) zfs_unmount_snap(snapname, NULL);
2227#endif
2228	err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2229	strfree(snapname);
2230	if (err != 0)
2231		return (err == ENOENT ? 0 : err);
2232
2233	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2234	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2235
2236	return (0);
2237}
2238
2239static int
2240dsl_recursive_rename(char *oldname, const char *newname)
2241{
2242	int err;
2243	struct renamesnaparg *ra;
2244	dsl_sync_task_t *dst;
2245	spa_t *spa;
2246	char *cp, *fsname = spa_strdup(oldname);
2247	int len = strlen(oldname) + 1;
2248
2249	/* truncate the snapshot name to get the fsname */
2250	cp = strchr(fsname, '@');
2251	*cp = '\0';
2252
2253	err = spa_open(fsname, &spa, FTAG);
2254	if (err) {
2255		kmem_free(fsname, len);
2256		return (err);
2257	}
2258	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2259	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2260
2261	ra->oldsnap = strchr(oldname, '@') + 1;
2262	ra->newsnap = strchr(newname, '@') + 1;
2263	*ra->failed = '\0';
2264
2265	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2266	    DS_FIND_CHILDREN);
2267	kmem_free(fsname, len);
2268
2269	if (err == 0) {
2270		err = dsl_sync_task_group_wait(ra->dstg);
2271	}
2272
2273	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2274	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2275		dsl_dataset_t *ds = dst->dst_arg1;
2276		if (dst->dst_err) {
2277			dsl_dir_name(ds->ds_dir, ra->failed);
2278			(void) strlcat(ra->failed, "@", sizeof (ra->failed));
2279			(void) strlcat(ra->failed, ra->newsnap,
2280			    sizeof (ra->failed));
2281		}
2282		dsl_dataset_rele(ds, ra->dstg);
2283	}
2284
2285	if (err)
2286		(void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2287
2288	dsl_sync_task_group_destroy(ra->dstg);
2289	kmem_free(ra, sizeof (struct renamesnaparg));
2290	spa_close(spa, FTAG);
2291	return (err);
2292}
2293
2294static int
2295dsl_valid_rename(const char *oldname, void *arg)
2296{
2297	int delta = *(int *)arg;
2298
2299	if (strlen(oldname) + delta >= MAXNAMELEN)
2300		return (ENAMETOOLONG);
2301
2302	return (0);
2303}
2304
2305#pragma weak dmu_objset_rename = dsl_dataset_rename
2306int
2307dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2308{
2309	dsl_dir_t *dd;
2310	dsl_dataset_t *ds;
2311	const char *tail;
2312	int err;
2313
2314	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2315	if (err)
2316		return (err);
2317
2318	if (tail == NULL) {
2319		int delta = strlen(newname) - strlen(oldname);
2320
2321		/* if we're growing, validate child name lengths */
2322		if (delta > 0)
2323			err = dmu_objset_find(oldname, dsl_valid_rename,
2324			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2325
2326		if (!err) {
2327			/*
2328			 * If there are more than 2 references there may be
2329			 * holds hanging around that haven't been cleared
2330			 * out yet.
2331			 */
2332			if (dmu_buf_refcount(dd->dd_dbuf) > 2)
2333				txg_wait_synced(dd->dd_pool, 0);
2334
2335			err = dsl_dir_rename(dd, newname);
2336		}
2337		dsl_dir_close(dd, FTAG);
2338		return (err);
2339	}
2340
2341	if (tail[0] != '@') {
2342		/* the name ended in a nonexistent component */
2343		dsl_dir_close(dd, FTAG);
2344		return (ENOENT);
2345	}
2346
2347	dsl_dir_close(dd, FTAG);
2348
2349	/* new name must be snapshot in same filesystem */
2350	tail = strchr(newname, '@');
2351	if (tail == NULL)
2352		return (EINVAL);
2353	tail++;
2354	if (strncmp(oldname, newname, tail - newname) != 0)
2355		return (EXDEV);
2356
2357	if (recursive) {
2358		err = dsl_recursive_rename(oldname, newname);
2359	} else {
2360		err = dsl_dataset_hold(oldname, FTAG, &ds);
2361		if (err)
2362			return (err);
2363
2364		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2365		    dsl_dataset_snapshot_rename_check,
2366		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2367
2368		dsl_dataset_rele(ds, FTAG);
2369	}
2370
2371	return (err);
2372}
2373
2374struct promotenode {
2375	list_node_t link;
2376	dsl_dataset_t *ds;
2377};
2378
2379struct promotearg {
2380	list_t shared_snaps, origin_snaps, clone_snaps;
2381	dsl_dataset_t *origin_origin, *origin_head;
2382	uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2383	char *err_ds;
2384};
2385
2386static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2387
2388/* ARGSUSED */
2389static int
2390dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2391{
2392	dsl_dataset_t *hds = arg1;
2393	struct promotearg *pa = arg2;
2394	struct promotenode *snap = list_head(&pa->shared_snaps);
2395	dsl_dataset_t *origin_ds = snap->ds;
2396	int err;
2397
2398	/* Check that it is a real clone */
2399	if (!dsl_dir_is_clone(hds->ds_dir))
2400		return (EINVAL);
2401
2402	/* Since this is so expensive, don't do the preliminary check */
2403	if (!dmu_tx_is_syncing(tx))
2404		return (0);
2405
2406	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2407		return (EXDEV);
2408
2409	/* compute origin's new unique space */
2410	snap = list_tail(&pa->clone_snaps);
2411	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2412	err = bplist_space_birthrange(&snap->ds->ds_deadlist,
2413	    origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX, &pa->unique);
2414	if (err)
2415		return (err);
2416
2417	/*
2418	 * Walk the snapshots that we are moving
2419	 *
2420	 * Compute space to transfer.  Consider the incremental changes
2421	 * to used for each snapshot:
2422	 * (my used) = (prev's used) + (blocks born) - (blocks killed)
2423	 * So each snapshot gave birth to:
2424	 * (blocks born) = (my used) - (prev's used) + (blocks killed)
2425	 * So a sequence would look like:
2426	 * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2427	 * Which simplifies to:
2428	 * uN + kN + kN-1 + ... + k1 + k0
2429	 * Note however, if we stop before we reach the ORIGIN we get:
2430	 * uN + kN + kN-1 + ... + kM - uM-1
2431	 */
2432	pa->used = origin_ds->ds_phys->ds_used_bytes;
2433	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2434	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2435	for (snap = list_head(&pa->shared_snaps); snap;
2436	    snap = list_next(&pa->shared_snaps, snap)) {
2437		uint64_t val, dlused, dlcomp, dluncomp;
2438		dsl_dataset_t *ds = snap->ds;
2439
2440		/* Check that the snapshot name does not conflict */
2441		VERIFY(0 == dsl_dataset_get_snapname(ds));
2442		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2443		if (err == 0) {
2444			err = EEXIST;
2445			goto out;
2446		}
2447		if (err != ENOENT)
2448			goto out;
2449
2450		/* The very first snapshot does not have a deadlist */
2451		if (ds->ds_phys->ds_prev_snap_obj == 0)
2452			continue;
2453
2454		if (err = bplist_space(&ds->ds_deadlist,
2455		    &dlused, &dlcomp, &dluncomp))
2456			goto out;
2457		pa->used += dlused;
2458		pa->comp += dlcomp;
2459		pa->uncomp += dluncomp;
2460	}
2461
2462	/*
2463	 * If we are a clone of a clone then we never reached ORIGIN,
2464	 * so we need to subtract out the clone origin's used space.
2465	 */
2466	if (pa->origin_origin) {
2467		pa->used -= pa->origin_origin->ds_phys->ds_used_bytes;
2468		pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2469		pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2470	}
2471
2472	/* Check that there is enough space here */
2473	err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2474	    pa->used);
2475	if (err)
2476		return (err);
2477
2478	/*
2479	 * Compute the amounts of space that will be used by snapshots
2480	 * after the promotion (for both origin and clone).  For each,
2481	 * it is the amount of space that will be on all of their
2482	 * deadlists (that was not born before their new origin).
2483	 */
2484	if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2485		uint64_t space;
2486
2487		/*
2488		 * Note, typically this will not be a clone of a clone,
2489		 * so snap->ds->ds_origin_txg will be < TXG_INITIAL, so
2490		 * these snaplist_space() -> bplist_space_birthrange()
2491		 * calls will be fast because they do not have to
2492		 * iterate over all bps.
2493		 */
2494		snap = list_head(&pa->origin_snaps);
2495		err = snaplist_space(&pa->shared_snaps,
2496		    snap->ds->ds_origin_txg, &pa->cloneusedsnap);
2497		if (err)
2498			return (err);
2499
2500		err = snaplist_space(&pa->clone_snaps,
2501		    snap->ds->ds_origin_txg, &space);
2502		if (err)
2503			return (err);
2504		pa->cloneusedsnap += space;
2505	}
2506	if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2507		err = snaplist_space(&pa->origin_snaps,
2508		    origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2509		if (err)
2510			return (err);
2511	}
2512
2513	return (0);
2514out:
2515	pa->err_ds =  snap->ds->ds_snapname;
2516	return (err);
2517}
2518
2519static void
2520dsl_dataset_promote_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2521{
2522	dsl_dataset_t *hds = arg1;
2523	struct promotearg *pa = arg2;
2524	struct promotenode *snap = list_head(&pa->shared_snaps);
2525	dsl_dataset_t *origin_ds = snap->ds;
2526	dsl_dataset_t *origin_head;
2527	dsl_dir_t *dd = hds->ds_dir;
2528	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2529	dsl_dir_t *odd = NULL;
2530	uint64_t oldnext_obj;
2531	int64_t delta;
2532
2533	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2534
2535	snap = list_head(&pa->origin_snaps);
2536	origin_head = snap->ds;
2537
2538	/*
2539	 * We need to explicitly open odd, since origin_ds's dd will be
2540	 * changing.
2541	 */
2542	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2543	    NULL, FTAG, &odd));
2544
2545	/* change origin's next snap */
2546	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2547	oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2548	snap = list_tail(&pa->clone_snaps);
2549	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2550	origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2551
2552	/* change the origin's next clone */
2553	if (origin_ds->ds_phys->ds_next_clones_obj) {
2554		remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2555		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2556		    origin_ds->ds_phys->ds_next_clones_obj,
2557		    oldnext_obj, tx));
2558	}
2559
2560	/* change origin */
2561	dmu_buf_will_dirty(dd->dd_dbuf, tx);
2562	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2563	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2564	hds->ds_origin_txg = origin_head->ds_origin_txg;
2565	dmu_buf_will_dirty(odd->dd_dbuf, tx);
2566	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2567	origin_head->ds_origin_txg = origin_ds->ds_phys->ds_creation_txg;
2568
2569	/* move snapshots to this dir */
2570	for (snap = list_head(&pa->shared_snaps); snap;
2571	    snap = list_next(&pa->shared_snaps, snap)) {
2572		dsl_dataset_t *ds = snap->ds;
2573
2574		/* unregister props as dsl_dir is changing */
2575		if (ds->ds_objset) {
2576			dmu_objset_evict(ds->ds_objset);
2577			ds->ds_objset = NULL;
2578		}
2579		/* move snap name entry */
2580		VERIFY(0 == dsl_dataset_get_snapname(ds));
2581		VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2582		    ds->ds_snapname, tx));
2583		VERIFY(0 == zap_add(dp->dp_meta_objset,
2584		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2585		    8, 1, &ds->ds_object, tx));
2586		/* change containing dsl_dir */
2587		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2588		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2589		ds->ds_phys->ds_dir_obj = dd->dd_object;
2590		ASSERT3P(ds->ds_dir, ==, odd);
2591		dsl_dir_close(ds->ds_dir, ds);
2592		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2593		    NULL, ds, &ds->ds_dir));
2594
2595		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2596	}
2597
2598	/*
2599	 * Change space accounting.
2600	 * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2601	 * both be valid, or both be 0 (resulting in delta == 0).  This
2602	 * is true for each of {clone,origin} independently.
2603	 */
2604
2605	delta = pa->cloneusedsnap -
2606	    dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2607	ASSERT3S(delta, >=, 0);
2608	ASSERT3U(pa->used, >=, delta);
2609	dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2610	dsl_dir_diduse_space(dd, DD_USED_HEAD,
2611	    pa->used - delta, pa->comp, pa->uncomp, tx);
2612
2613	delta = pa->originusedsnap -
2614	    odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2615	ASSERT3S(delta, <=, 0);
2616	ASSERT3U(pa->used, >=, -delta);
2617	dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2618	dsl_dir_diduse_space(odd, DD_USED_HEAD,
2619	    -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2620
2621	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2622
2623	/* log history record */
2624	spa_history_internal_log(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2625	    cr, "dataset = %llu", hds->ds_object);
2626
2627	dsl_dir_close(odd, FTAG);
2628}
2629
2630static char *snaplist_tag = "snaplist";
2631/*
2632 * Make a list of dsl_dataset_t's for the snapshots between first_obj
2633 * (exclusive) and last_obj (inclusive).  The list will be in reverse
2634 * order (last_obj will be the list_head()).  If first_obj == 0, do all
2635 * snapshots back to this dataset's origin.
2636 */
2637static int
2638snaplist_make(dsl_pool_t *dp, boolean_t own,
2639    uint64_t first_obj, uint64_t last_obj, list_t *l)
2640{
2641	uint64_t obj = last_obj;
2642
2643	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2644
2645	list_create(l, sizeof (struct promotenode),
2646	    offsetof(struct promotenode, link));
2647
2648	while (obj != first_obj) {
2649		dsl_dataset_t *ds;
2650		struct promotenode *snap;
2651		int err;
2652
2653		if (own) {
2654			err = dsl_dataset_own_obj(dp, obj,
2655			    0, snaplist_tag, &ds);
2656			if (err == 0)
2657				dsl_dataset_make_exclusive(ds, snaplist_tag);
2658		} else {
2659			err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2660		}
2661		if (err == ENOENT) {
2662			/* lost race with snapshot destroy */
2663			struct promotenode *last = list_tail(l);
2664			ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
2665			obj = last->ds->ds_phys->ds_prev_snap_obj;
2666			continue;
2667		} else if (err) {
2668			return (err);
2669		}
2670
2671		if (first_obj == 0)
2672			first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
2673
2674		snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
2675		snap->ds = ds;
2676		list_insert_tail(l, snap);
2677		obj = ds->ds_phys->ds_prev_snap_obj;
2678	}
2679
2680	return (0);
2681}
2682
2683static int
2684snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
2685{
2686	struct promotenode *snap;
2687
2688	*spacep = 0;
2689	for (snap = list_head(l); snap; snap = list_next(l, snap)) {
2690		uint64_t used;
2691		int err = bplist_space_birthrange(&snap->ds->ds_deadlist,
2692		    mintxg, UINT64_MAX, &used);
2693		if (err)
2694			return (err);
2695		*spacep += used;
2696	}
2697	return (0);
2698}
2699
2700static void
2701snaplist_destroy(list_t *l, boolean_t own)
2702{
2703	struct promotenode *snap;
2704
2705	if (!l || !list_link_active(&l->list_head))
2706		return;
2707
2708	while ((snap = list_tail(l)) != NULL) {
2709		list_remove(l, snap);
2710		if (own)
2711			dsl_dataset_disown(snap->ds, snaplist_tag);
2712		else
2713			dsl_dataset_rele(snap->ds, snaplist_tag);
2714		kmem_free(snap, sizeof (struct promotenode));
2715	}
2716	list_destroy(l);
2717}
2718
2719/*
2720 * Promote a clone.  Nomenclature note:
2721 * "clone" or "cds": the original clone which is being promoted
2722 * "origin" or "ods": the snapshot which is originally clone's origin
2723 * "origin head" or "ohds": the dataset which is the head
2724 * (filesystem/volume) for the origin
2725 * "origin origin": the origin of the origin's filesystem (typically
2726 * NULL, indicating that the clone is not a clone of a clone).
2727 */
2728int
2729dsl_dataset_promote(const char *name, char *conflsnap)
2730{
2731	dsl_dataset_t *ds;
2732	dsl_dir_t *dd;
2733	dsl_pool_t *dp;
2734	dmu_object_info_t doi;
2735	struct promotearg pa = { 0 };
2736	struct promotenode *snap;
2737	int err;
2738
2739	err = dsl_dataset_hold(name, FTAG, &ds);
2740	if (err)
2741		return (err);
2742	dd = ds->ds_dir;
2743	dp = dd->dd_pool;
2744
2745	err = dmu_object_info(dp->dp_meta_objset,
2746	    ds->ds_phys->ds_snapnames_zapobj, &doi);
2747	if (err) {
2748		dsl_dataset_rele(ds, FTAG);
2749		return (err);
2750	}
2751
2752	if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
2753		dsl_dataset_rele(ds, FTAG);
2754		return (EINVAL);
2755	}
2756
2757	/*
2758	 * We are going to inherit all the snapshots taken before our
2759	 * origin (i.e., our new origin will be our parent's origin).
2760	 * Take ownership of them so that we can rename them into our
2761	 * namespace.
2762	 */
2763	rw_enter(&dp->dp_config_rwlock, RW_READER);
2764
2765	err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
2766	    &pa.shared_snaps);
2767	if (err != 0)
2768		goto out;
2769
2770	err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
2771	if (err != 0)
2772		goto out;
2773
2774	snap = list_head(&pa.shared_snaps);
2775	ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
2776	err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
2777	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
2778	if (err != 0)
2779		goto out;
2780
2781	if (dsl_dir_is_clone(snap->ds->ds_dir)) {
2782		err = dsl_dataset_own_obj(dp,
2783		    snap->ds->ds_dir->dd_phys->dd_origin_obj,
2784		    0, FTAG, &pa.origin_origin);
2785		if (err != 0)
2786			goto out;
2787	}
2788
2789out:
2790	rw_exit(&dp->dp_config_rwlock);
2791
2792	/*
2793	 * Add in 128x the snapnames zapobj size, since we will be moving
2794	 * a bunch of snapnames to the promoted ds, and dirtying their
2795	 * bonus buffers.
2796	 */
2797	if (err == 0) {
2798		err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
2799		    dsl_dataset_promote_sync, ds, &pa,
2800		    2 + 2 * doi.doi_physical_blocks_512);
2801		if (err && pa.err_ds && conflsnap)
2802			(void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
2803	}
2804
2805	snaplist_destroy(&pa.shared_snaps, B_TRUE);
2806	snaplist_destroy(&pa.clone_snaps, B_FALSE);
2807	snaplist_destroy(&pa.origin_snaps, B_FALSE);
2808	if (pa.origin_origin)
2809		dsl_dataset_disown(pa.origin_origin, FTAG);
2810	dsl_dataset_rele(ds, FTAG);
2811	return (err);
2812}
2813
2814struct cloneswaparg {
2815	dsl_dataset_t *cds; /* clone dataset */
2816	dsl_dataset_t *ohds; /* origin's head dataset */
2817	boolean_t force;
2818	int64_t unused_refres_delta; /* change in unconsumed refreservation */
2819};
2820
2821/* ARGSUSED */
2822static int
2823dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
2824{
2825	struct cloneswaparg *csa = arg1;
2826
2827	/* they should both be heads */
2828	if (dsl_dataset_is_snapshot(csa->cds) ||
2829	    dsl_dataset_is_snapshot(csa->ohds))
2830		return (EINVAL);
2831
2832	/* the branch point should be just before them */
2833	if (csa->cds->ds_prev != csa->ohds->ds_prev)
2834		return (EINVAL);
2835
2836	/* cds should be the clone (unless they are unrelated) */
2837	if (csa->cds->ds_prev != NULL &&
2838	    csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
2839	    csa->ohds->ds_object !=
2840	    csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
2841		return (EINVAL);
2842
2843	/* the clone should be a child of the origin */
2844	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
2845		return (EINVAL);
2846
2847	/* ohds shouldn't be modified unless 'force' */
2848	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
2849		return (ETXTBSY);
2850
2851	/* adjust amount of any unconsumed refreservation */
2852	csa->unused_refres_delta =
2853	    (int64_t)MIN(csa->ohds->ds_reserved,
2854	    csa->ohds->ds_phys->ds_unique_bytes) -
2855	    (int64_t)MIN(csa->ohds->ds_reserved,
2856	    csa->cds->ds_phys->ds_unique_bytes);
2857
2858	if (csa->unused_refres_delta > 0 &&
2859	    csa->unused_refres_delta >
2860	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
2861		return (ENOSPC);
2862
2863	if (csa->ohds->ds_quota != 0 &&
2864	    csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
2865		return (EDQUOT);
2866
2867	return (0);
2868}
2869
2870/* ARGSUSED */
2871static void
2872dsl_dataset_clone_swap_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2873{
2874	struct cloneswaparg *csa = arg1;
2875	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
2876
2877	ASSERT(csa->cds->ds_reserved == 0);
2878	ASSERT(csa->ohds->ds_quota == 0 ||
2879	    csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
2880
2881	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
2882	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
2883
2884	if (csa->cds->ds_objset != NULL) {
2885		dmu_objset_evict(csa->cds->ds_objset);
2886		csa->cds->ds_objset = NULL;
2887	}
2888
2889	if (csa->ohds->ds_objset != NULL) {
2890		dmu_objset_evict(csa->ohds->ds_objset);
2891		csa->ohds->ds_objset = NULL;
2892	}
2893
2894	/*
2895	 * Reset origin's unique bytes, if it exists.
2896	 */
2897	if (csa->cds->ds_prev) {
2898		dsl_dataset_t *origin = csa->cds->ds_prev;
2899		dmu_buf_will_dirty(origin->ds_dbuf, tx);
2900		VERIFY(0 == bplist_space_birthrange(&csa->cds->ds_deadlist,
2901		    origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2902		    &origin->ds_phys->ds_unique_bytes));
2903	}
2904
2905	/* swap blkptrs */
2906	{
2907		blkptr_t tmp;
2908		tmp = csa->ohds->ds_phys->ds_bp;
2909		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
2910		csa->cds->ds_phys->ds_bp = tmp;
2911	}
2912
2913	/* set dd_*_bytes */
2914	{
2915		int64_t dused, dcomp, duncomp;
2916		uint64_t cdl_used, cdl_comp, cdl_uncomp;
2917		uint64_t odl_used, odl_comp, odl_uncomp;
2918
2919		ASSERT3U(csa->cds->ds_dir->dd_phys->
2920		    dd_used_breakdown[DD_USED_SNAP], ==, 0);
2921
2922		VERIFY(0 == bplist_space(&csa->cds->ds_deadlist, &cdl_used,
2923		    &cdl_comp, &cdl_uncomp));
2924		VERIFY(0 == bplist_space(&csa->ohds->ds_deadlist, &odl_used,
2925		    &odl_comp, &odl_uncomp));
2926
2927		dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
2928		    (csa->ohds->ds_phys->ds_used_bytes + odl_used);
2929		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
2930		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
2931		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
2932		    cdl_uncomp -
2933		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
2934
2935		dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
2936		    dused, dcomp, duncomp, tx);
2937		dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
2938		    -dused, -dcomp, -duncomp, tx);
2939
2940		/*
2941		 * The difference in the space used by snapshots is the
2942		 * difference in snapshot space due to the head's
2943		 * deadlist (since that's the only thing that's
2944		 * changing that affects the snapused).
2945		 */
2946		VERIFY(0 == bplist_space_birthrange(&csa->cds->ds_deadlist,
2947		    csa->ohds->ds_origin_txg, UINT64_MAX, &cdl_used));
2948		VERIFY(0 == bplist_space_birthrange(&csa->ohds->ds_deadlist,
2949		    csa->ohds->ds_origin_txg, UINT64_MAX, &odl_used));
2950		dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
2951		    DD_USED_HEAD, DD_USED_SNAP, tx);
2952	}
2953
2954#define	SWITCH64(x, y) \
2955	{ \
2956		uint64_t __tmp = (x); \
2957		(x) = (y); \
2958		(y) = __tmp; \
2959	}
2960
2961	/* swap ds_*_bytes */
2962	SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
2963	    csa->cds->ds_phys->ds_used_bytes);
2964	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
2965	    csa->cds->ds_phys->ds_compressed_bytes);
2966	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
2967	    csa->cds->ds_phys->ds_uncompressed_bytes);
2968	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
2969	    csa->cds->ds_phys->ds_unique_bytes);
2970
2971	/* apply any parent delta for change in unconsumed refreservation */
2972	dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
2973	    csa->unused_refres_delta, 0, 0, tx);
2974
2975	/* swap deadlists */
2976	bplist_close(&csa->cds->ds_deadlist);
2977	bplist_close(&csa->ohds->ds_deadlist);
2978	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
2979	    csa->cds->ds_phys->ds_deadlist_obj);
2980	VERIFY(0 == bplist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
2981	    csa->cds->ds_phys->ds_deadlist_obj));
2982	VERIFY(0 == bplist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
2983	    csa->ohds->ds_phys->ds_deadlist_obj));
2984
2985	dsl_pool_ds_clone_swapped(csa->ohds, csa->cds, tx);
2986}
2987
2988/*
2989 * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
2990 * recv" into an existing fs to swizzle the file system to the new
2991 * version, and by "zfs rollback".  Can also be used to swap two
2992 * independent head datasets if neither has any snapshots.
2993 */
2994int
2995dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
2996    boolean_t force)
2997{
2998	struct cloneswaparg csa;
2999	int error;
3000
3001	ASSERT(clone->ds_owner);
3002	ASSERT(origin_head->ds_owner);
3003retry:
3004	/* Need exclusive access for the swap */
3005	rw_enter(&clone->ds_rwlock, RW_WRITER);
3006	if (!rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3007		rw_exit(&clone->ds_rwlock);
3008		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3009		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3010			rw_exit(&origin_head->ds_rwlock);
3011			goto retry;
3012		}
3013	}
3014	csa.cds = clone;
3015	csa.ohds = origin_head;
3016	csa.force = force;
3017	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3018	    dsl_dataset_clone_swap_check,
3019	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3020	return (error);
3021}
3022
3023/*
3024 * Given a pool name and a dataset object number in that pool,
3025 * return the name of that dataset.
3026 */
3027int
3028dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3029{
3030	spa_t *spa;
3031	dsl_pool_t *dp;
3032	dsl_dataset_t *ds;
3033	int error;
3034
3035	if ((error = spa_open(pname, &spa, FTAG)) != 0)
3036		return (error);
3037	dp = spa_get_dsl(spa);
3038	rw_enter(&dp->dp_config_rwlock, RW_READER);
3039	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3040		dsl_dataset_name(ds, buf);
3041		dsl_dataset_rele(ds, FTAG);
3042	}
3043	rw_exit(&dp->dp_config_rwlock);
3044	spa_close(spa, FTAG);
3045
3046	return (error);
3047}
3048
3049int
3050dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3051    uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3052{
3053	int error = 0;
3054
3055	ASSERT3S(asize, >, 0);
3056
3057	/*
3058	 * *ref_rsrv is the portion of asize that will come from any
3059	 * unconsumed refreservation space.
3060	 */
3061	*ref_rsrv = 0;
3062
3063	mutex_enter(&ds->ds_lock);
3064	/*
3065	 * Make a space adjustment for reserved bytes.
3066	 */
3067	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3068		ASSERT3U(*used, >=,
3069		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3070		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3071		*ref_rsrv =
3072		    asize - MIN(asize, parent_delta(ds, asize + inflight));
3073	}
3074
3075	if (!check_quota || ds->ds_quota == 0) {
3076		mutex_exit(&ds->ds_lock);
3077		return (0);
3078	}
3079	/*
3080	 * If they are requesting more space, and our current estimate
3081	 * is over quota, they get to try again unless the actual
3082	 * on-disk is over quota and there are no pending changes (which
3083	 * may free up space for us).
3084	 */
3085	if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
3086		if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
3087			error = ERESTART;
3088		else
3089			error = EDQUOT;
3090	}
3091	mutex_exit(&ds->ds_lock);
3092
3093	return (error);
3094}
3095
3096/* ARGSUSED */
3097static int
3098dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3099{
3100	dsl_dataset_t *ds = arg1;
3101	dsl_prop_setarg_t *psa = arg2;
3102	int err;
3103
3104	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3105		return (ENOTSUP);
3106
3107	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3108		return (err);
3109
3110	if (psa->psa_effective_value == 0)
3111		return (0);
3112
3113	if (psa->psa_effective_value < ds->ds_phys->ds_used_bytes ||
3114	    psa->psa_effective_value < ds->ds_reserved)
3115		return (ENOSPC);
3116
3117	return (0);
3118}
3119
3120extern void dsl_prop_set_sync(void *, void *, cred_t *, dmu_tx_t *);
3121
3122void
3123dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
3124{
3125	dsl_dataset_t *ds = arg1;
3126	dsl_prop_setarg_t *psa = arg2;
3127	uint64_t effective_value = psa->psa_effective_value;
3128
3129	dsl_prop_set_sync(ds, psa, cr, tx);
3130	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3131
3132	if (ds->ds_quota != effective_value) {
3133		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3134		ds->ds_quota = effective_value;
3135
3136		spa_history_internal_log(LOG_DS_REFQUOTA,
3137		    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu ",
3138		    (longlong_t)ds->ds_quota, ds->ds_object);
3139	}
3140}
3141
3142int
3143dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3144{
3145	dsl_dataset_t *ds;
3146	dsl_prop_setarg_t psa;
3147	int err;
3148
3149	dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3150
3151	err = dsl_dataset_hold(dsname, FTAG, &ds);
3152	if (err)
3153		return (err);
3154
3155	/*
3156	 * If someone removes a file, then tries to set the quota, we
3157	 * want to make sure the file freeing takes effect.
3158	 */
3159	txg_wait_open(ds->ds_dir->dd_pool, 0);
3160
3161	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3162	    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3163	    ds, &psa, 0);
3164
3165	dsl_dataset_rele(ds, FTAG);
3166	return (err);
3167}
3168
3169static int
3170dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3171{
3172	dsl_dataset_t *ds = arg1;
3173	dsl_prop_setarg_t *psa = arg2;
3174	uint64_t effective_value;
3175	uint64_t unique;
3176	int err;
3177
3178	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3179	    SPA_VERSION_REFRESERVATION)
3180		return (ENOTSUP);
3181
3182	if (dsl_dataset_is_snapshot(ds))
3183		return (EINVAL);
3184
3185	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3186		return (err);
3187
3188	effective_value = psa->psa_effective_value;
3189
3190	/*
3191	 * If we are doing the preliminary check in open context, the
3192	 * space estimates may be inaccurate.
3193	 */
3194	if (!dmu_tx_is_syncing(tx))
3195		return (0);
3196
3197	mutex_enter(&ds->ds_lock);
3198	unique = dsl_dataset_unique(ds);
3199	mutex_exit(&ds->ds_lock);
3200
3201	if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3202		uint64_t delta = MAX(unique, effective_value) -
3203		    MAX(unique, ds->ds_reserved);
3204
3205		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3206			return (ENOSPC);
3207		if (ds->ds_quota > 0 &&
3208		    effective_value > ds->ds_quota)
3209			return (ENOSPC);
3210	}
3211
3212	return (0);
3213}
3214
3215/* ARGSUSED */
3216static void
3217dsl_dataset_set_reservation_sync(void *arg1, void *arg2, cred_t *cr,
3218    dmu_tx_t *tx)
3219{
3220	dsl_dataset_t *ds = arg1;
3221	dsl_prop_setarg_t *psa = arg2;
3222	uint64_t effective_value = psa->psa_effective_value;
3223	uint64_t unique;
3224	int64_t delta;
3225
3226	dsl_prop_set_sync(ds, psa, cr, tx);
3227	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3228
3229	dmu_buf_will_dirty(ds->ds_dbuf, tx);
3230
3231	mutex_enter(&ds->ds_dir->dd_lock);
3232	mutex_enter(&ds->ds_lock);
3233	unique = dsl_dataset_unique(ds);
3234	delta = MAX(0, (int64_t)(effective_value - unique)) -
3235	    MAX(0, (int64_t)(ds->ds_reserved - unique));
3236	ds->ds_reserved = effective_value;
3237	mutex_exit(&ds->ds_lock);
3238
3239	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3240	mutex_exit(&ds->ds_dir->dd_lock);
3241
3242	spa_history_internal_log(LOG_DS_REFRESERV,
3243	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu",
3244	    (longlong_t)effective_value, ds->ds_object);
3245}
3246
3247int
3248dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3249    uint64_t reservation)
3250{
3251	dsl_dataset_t *ds;
3252	dsl_prop_setarg_t psa;
3253	int err;
3254
3255	dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3256	    &reservation);
3257
3258	err = dsl_dataset_hold(dsname, FTAG, &ds);
3259	if (err)
3260		return (err);
3261
3262	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3263	    dsl_dataset_set_reservation_check,
3264	    dsl_dataset_set_reservation_sync, ds, &psa, 0);
3265
3266	dsl_dataset_rele(ds, FTAG);
3267	return (err);
3268}
3269
3270struct dsl_ds_holdarg {
3271	dsl_sync_task_group_t *dstg;
3272	char *htag;
3273	char *snapname;
3274	boolean_t recursive;
3275	boolean_t gotone;
3276	boolean_t temphold;
3277	char failed[MAXPATHLEN];
3278};
3279
3280/*
3281 * The max length of a temporary tag prefix is the number of hex digits
3282 * required to express UINT64_MAX plus one for the hyphen.
3283 */
3284#define	MAX_TAG_PREFIX_LEN	17
3285
3286static int
3287dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3288{
3289	dsl_dataset_t *ds = arg1;
3290	struct dsl_ds_holdarg *ha = arg2;
3291	char *htag = ha->htag;
3292	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3293	int error = 0;
3294
3295	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3296		return (ENOTSUP);
3297
3298	if (!dsl_dataset_is_snapshot(ds))
3299		return (EINVAL);
3300
3301	/* tags must be unique */
3302	mutex_enter(&ds->ds_lock);
3303	if (ds->ds_phys->ds_userrefs_obj) {
3304		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3305		    8, 1, tx);
3306		if (error == 0)
3307			error = EEXIST;
3308		else if (error == ENOENT)
3309			error = 0;
3310	}
3311	mutex_exit(&ds->ds_lock);
3312
3313	if (error == 0 && ha->temphold &&
3314	    strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3315		error = E2BIG;
3316
3317	return (error);
3318}
3319
3320static void
3321dsl_dataset_user_hold_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
3322{
3323	dsl_dataset_t *ds = arg1;
3324	struct dsl_ds_holdarg *ha = arg2;
3325	char *htag = ha->htag;
3326	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3327	objset_t *mos = dp->dp_meta_objset;
3328	uint64_t now = gethrestime_sec();
3329	uint64_t zapobj;
3330
3331	mutex_enter(&ds->ds_lock);
3332	if (ds->ds_phys->ds_userrefs_obj == 0) {
3333		/*
3334		 * This is the first user hold for this dataset.  Create
3335		 * the userrefs zap object.
3336		 */
3337		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3338		zapobj = ds->ds_phys->ds_userrefs_obj =
3339		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3340	} else {
3341		zapobj = ds->ds_phys->ds_userrefs_obj;
3342	}
3343	ds->ds_userrefs++;
3344	mutex_exit(&ds->ds_lock);
3345
3346	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3347
3348	if (ha->temphold) {
3349		VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3350		    htag, &now, tx));
3351	}
3352
3353	spa_history_internal_log(LOG_DS_USER_HOLD,
3354	    dp->dp_spa, tx, cr, "<%s> temp = %d dataset = %llu", htag,
3355	    (int)ha->temphold, ds->ds_object);
3356}
3357
3358static int
3359dsl_dataset_user_hold_one(const char *dsname, void *arg)
3360{
3361	struct dsl_ds_holdarg *ha = arg;
3362	dsl_dataset_t *ds;
3363	int error;
3364	char *name;
3365
3366	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
3367	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3368	error = dsl_dataset_hold(name, ha->dstg, &ds);
3369	strfree(name);
3370	if (error == 0) {
3371		ha->gotone = B_TRUE;
3372		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3373		    dsl_dataset_user_hold_sync, ds, ha, 0);
3374	} else if (error == ENOENT && ha->recursive) {
3375		error = 0;
3376	} else {
3377		(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3378	}
3379	return (error);
3380}
3381
3382int
3383dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3384    boolean_t recursive, boolean_t temphold)
3385{
3386	struct dsl_ds_holdarg *ha;
3387	dsl_sync_task_t *dst;
3388	spa_t *spa;
3389	int error;
3390
3391	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3392
3393	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3394
3395	error = spa_open(dsname, &spa, FTAG);
3396	if (error) {
3397		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3398		return (error);
3399	}
3400
3401	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3402	ha->htag = htag;
3403	ha->snapname = snapname;
3404	ha->recursive = recursive;
3405	ha->temphold = temphold;
3406	if (recursive) {
3407		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3408		    ha, DS_FIND_CHILDREN);
3409	} else {
3410		error = dsl_dataset_user_hold_one(dsname, ha);
3411	}
3412	if (error == 0)
3413		error = dsl_sync_task_group_wait(ha->dstg);
3414
3415	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3416	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3417		dsl_dataset_t *ds = dst->dst_arg1;
3418
3419		if (dst->dst_err) {
3420			dsl_dataset_name(ds, ha->failed);
3421			*strchr(ha->failed, '@') = '\0';
3422		}
3423		dsl_dataset_rele(ds, ha->dstg);
3424	}
3425
3426	if (error == 0 && recursive && !ha->gotone)
3427		error = ENOENT;
3428
3429	if (error)
3430		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3431
3432	dsl_sync_task_group_destroy(ha->dstg);
3433	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3434	spa_close(spa, FTAG);
3435	return (error);
3436}
3437
3438struct dsl_ds_releasearg {
3439	dsl_dataset_t *ds;
3440	const char *htag;
3441	boolean_t own;		/* do we own or just hold ds? */
3442};
3443
3444static int
3445dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3446    boolean_t *might_destroy)
3447{
3448	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3449	uint64_t zapobj;
3450	uint64_t tmp;
3451	int error;
3452
3453	*might_destroy = B_FALSE;
3454
3455	mutex_enter(&ds->ds_lock);
3456	zapobj = ds->ds_phys->ds_userrefs_obj;
3457	if (zapobj == 0) {
3458		/* The tag can't possibly exist */
3459		mutex_exit(&ds->ds_lock);
3460		return (ESRCH);
3461	}
3462
3463	/* Make sure the tag exists */
3464	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3465	if (error) {
3466		mutex_exit(&ds->ds_lock);
3467		if (error == ENOENT)
3468			error = ESRCH;
3469		return (error);
3470	}
3471
3472	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3473	    DS_IS_DEFER_DESTROY(ds))
3474		*might_destroy = B_TRUE;
3475
3476	mutex_exit(&ds->ds_lock);
3477	return (0);
3478}
3479
3480static int
3481dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3482{
3483	struct dsl_ds_releasearg *ra = arg1;
3484	dsl_dataset_t *ds = ra->ds;
3485	boolean_t might_destroy;
3486	int error;
3487
3488	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3489		return (ENOTSUP);
3490
3491	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3492	if (error)
3493		return (error);
3494
3495	if (might_destroy) {
3496		struct dsl_ds_destroyarg dsda = {0};
3497
3498		if (dmu_tx_is_syncing(tx)) {
3499			/*
3500			 * If we're not prepared to remove the snapshot,
3501			 * we can't allow the release to happen right now.
3502			 */
3503			if (!ra->own)
3504				return (EBUSY);
3505			if (ds->ds_objset) {
3506				dmu_objset_evict(ds->ds_objset);
3507				ds->ds_objset = NULL;
3508			}
3509		}
3510		dsda.ds = ds;
3511		dsda.releasing = B_TRUE;
3512		return (dsl_dataset_destroy_check(&dsda, tag, tx));
3513	}
3514
3515	return (0);
3516}
3517
3518static void
3519dsl_dataset_user_release_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
3520{
3521	struct dsl_ds_releasearg *ra = arg1;
3522	dsl_dataset_t *ds = ra->ds;
3523	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3524	objset_t *mos = dp->dp_meta_objset;
3525	uint64_t zapobj;
3526	uint64_t dsobj = ds->ds_object;
3527	uint64_t refs;
3528	int error;
3529
3530	mutex_enter(&ds->ds_lock);
3531	ds->ds_userrefs--;
3532	refs = ds->ds_userrefs;
3533	mutex_exit(&ds->ds_lock);
3534	error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3535	VERIFY(error == 0 || error == ENOENT);
3536	zapobj = ds->ds_phys->ds_userrefs_obj;
3537	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3538	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3539	    DS_IS_DEFER_DESTROY(ds)) {
3540		struct dsl_ds_destroyarg dsda = {0};
3541
3542		ASSERT(ra->own);
3543		dsda.ds = ds;
3544		dsda.releasing = B_TRUE;
3545		/* We already did the destroy_check */
3546		dsl_dataset_destroy_sync(&dsda, tag, cr, tx);
3547	}
3548
3549	spa_history_internal_log(LOG_DS_USER_RELEASE,
3550	    dp->dp_spa, tx, cr, "<%s> %lld dataset = %llu",
3551	    ra->htag, (longlong_t)refs, dsobj);
3552}
3553
3554static int
3555dsl_dataset_user_release_one(const char *dsname, void *arg)
3556{
3557	struct dsl_ds_holdarg *ha = arg;
3558	struct dsl_ds_releasearg *ra;
3559	dsl_dataset_t *ds;
3560	int error;
3561	void *dtag = ha->dstg;
3562	char *name;
3563	boolean_t own = B_FALSE;
3564	boolean_t might_destroy;
3565
3566	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3567	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3568	error = dsl_dataset_hold(name, dtag, &ds);
3569	strfree(name);
3570	if (error == ENOENT && ha->recursive)
3571		return (0);
3572	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3573	if (error)
3574		return (error);
3575
3576	ha->gotone = B_TRUE;
3577
3578	ASSERT(dsl_dataset_is_snapshot(ds));
3579
3580	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3581	if (error) {
3582		dsl_dataset_rele(ds, dtag);
3583		return (error);
3584	}
3585
3586	if (might_destroy) {
3587#ifdef _KERNEL
3588		error = zfs_unmount_snap(name, NULL);
3589		if (error) {
3590			dsl_dataset_rele(ds, dtag);
3591			return (error);
3592		}
3593#endif
3594		if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3595			dsl_dataset_rele(ds, dtag);
3596			return (EBUSY);
3597		} else {
3598			own = B_TRUE;
3599			dsl_dataset_make_exclusive(ds, dtag);
3600		}
3601	}
3602
3603	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
3604	ra->ds = ds;
3605	ra->htag = ha->htag;
3606	ra->own = own;
3607	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
3608	    dsl_dataset_user_release_sync, ra, dtag, 0);
3609
3610	return (0);
3611}
3612
3613int
3614dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
3615    boolean_t recursive)
3616{
3617	struct dsl_ds_holdarg *ha;
3618	dsl_sync_task_t *dst;
3619	spa_t *spa;
3620	int error;
3621
3622top:
3623	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3624
3625	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3626
3627	error = spa_open(dsname, &spa, FTAG);
3628	if (error) {
3629		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3630		return (error);
3631	}
3632
3633	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3634	ha->htag = htag;
3635	ha->snapname = snapname;
3636	ha->recursive = recursive;
3637	if (recursive) {
3638		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
3639		    ha, DS_FIND_CHILDREN);
3640	} else {
3641		error = dsl_dataset_user_release_one(dsname, ha);
3642	}
3643	if (error == 0)
3644		error = dsl_sync_task_group_wait(ha->dstg);
3645
3646	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3647	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3648		struct dsl_ds_releasearg *ra = dst->dst_arg1;
3649		dsl_dataset_t *ds = ra->ds;
3650
3651		if (dst->dst_err)
3652			dsl_dataset_name(ds, ha->failed);
3653
3654		if (ra->own)
3655			dsl_dataset_disown(ds, ha->dstg);
3656		else
3657			dsl_dataset_rele(ds, ha->dstg);
3658
3659		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
3660	}
3661
3662	if (error == 0 && recursive && !ha->gotone)
3663		error = ENOENT;
3664
3665	if (error && error != EBUSY)
3666		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3667
3668	dsl_sync_task_group_destroy(ha->dstg);
3669	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3670	spa_close(spa, FTAG);
3671
3672	/*
3673	 * We can get EBUSY if we were racing with deferred destroy and
3674	 * dsl_dataset_user_release_check() hadn't done the necessary
3675	 * open context setup.  We can also get EBUSY if we're racing
3676	 * with destroy and that thread is the ds_owner.  Either way
3677	 * the busy condition should be transient, and we should retry
3678	 * the release operation.
3679	 */
3680	if (error == EBUSY)
3681		goto top;
3682
3683	return (error);
3684}
3685
3686/*
3687 * Called at spa_load time to release a stale temporary user hold.
3688 */
3689int
3690dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag)
3691{
3692	dsl_dataset_t *ds;
3693	char *snap;
3694	char *name;
3695	int namelen;
3696	int error;
3697
3698	rw_enter(&dp->dp_config_rwlock, RW_READER);
3699	error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
3700	rw_exit(&dp->dp_config_rwlock);
3701	if (error)
3702		return (error);
3703	namelen = dsl_dataset_namelen(ds)+1;
3704	name = kmem_alloc(namelen, KM_SLEEP);
3705	dsl_dataset_name(ds, name);
3706	dsl_dataset_rele(ds, FTAG);
3707
3708	snap = strchr(name, '@');
3709	*snap = '\0';
3710	++snap;
3711	return (dsl_dataset_user_release(name, snap, htag, B_FALSE));
3712}
3713
3714int
3715dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
3716{
3717	dsl_dataset_t *ds;
3718	int err;
3719
3720	err = dsl_dataset_hold(dsname, FTAG, &ds);
3721	if (err)
3722		return (err);
3723
3724	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
3725	if (ds->ds_phys->ds_userrefs_obj != 0) {
3726		zap_attribute_t *za;
3727		zap_cursor_t zc;
3728
3729		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
3730		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
3731		    ds->ds_phys->ds_userrefs_obj);
3732		    zap_cursor_retrieve(&zc, za) == 0;
3733		    zap_cursor_advance(&zc)) {
3734			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
3735			    za->za_first_integer));
3736		}
3737		zap_cursor_fini(&zc);
3738		kmem_free(za, sizeof (zap_attribute_t));
3739	}
3740	dsl_dataset_rele(ds, FTAG);
3741	return (0);
3742}
3743
3744/*
3745 * Note, this fuction is used as the callback for dmu_objset_find().  We
3746 * always return 0 so that we will continue to find and process
3747 * inconsistent datasets, even if we encounter an error trying to
3748 * process one of them.
3749 */
3750/* ARGSUSED */
3751int
3752dsl_destroy_inconsistent(const char *dsname, void *arg)
3753{
3754	dsl_dataset_t *ds;
3755
3756	if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
3757		if (DS_IS_INCONSISTENT(ds))
3758			(void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
3759		else
3760			dsl_dataset_disown(ds, FTAG);
3761	}
3762	return (0);
3763}
3764