dsl_dataset.c revision 219089
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24
25#include <sys/dmu_objset.h>
26#include <sys/dsl_dataset.h>
27#include <sys/dsl_dir.h>
28#include <sys/dsl_prop.h>
29#include <sys/dsl_synctask.h>
30#include <sys/dmu_traverse.h>
31#include <sys/dmu_tx.h>
32#include <sys/arc.h>
33#include <sys/zio.h>
34#include <sys/zap.h>
35#include <sys/unique.h>
36#include <sys/zfs_context.h>
37#include <sys/zfs_ioctl.h>
38#include <sys/spa.h>
39#include <sys/zfs_znode.h>
40#include <sys/zfs_onexit.h>
41#include <sys/zvol.h>
42#include <sys/dsl_scan.h>
43#include <sys/dsl_deadlist.h>
44
45static char *dsl_reaper = "the grim reaper";
46
47static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
48static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
49static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
50
51#define	SWITCH64(x, y) \
52	{ \
53		uint64_t __tmp = (x); \
54		(x) = (y); \
55		(y) = __tmp; \
56	}
57
58#define	DS_REF_MAX	(1ULL << 62)
59
60#define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
61
62#define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
63
64
65/*
66 * Figure out how much of this delta should be propogated to the dsl_dir
67 * layer.  If there's a refreservation, that space has already been
68 * partially accounted for in our ancestors.
69 */
70static int64_t
71parent_delta(dsl_dataset_t *ds, int64_t delta)
72{
73	uint64_t old_bytes, new_bytes;
74
75	if (ds->ds_reserved == 0)
76		return (delta);
77
78	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
79	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
80
81	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
82	return (new_bytes - old_bytes);
83}
84
85void
86dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
87{
88	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
89	int compressed = BP_GET_PSIZE(bp);
90	int uncompressed = BP_GET_UCSIZE(bp);
91	int64_t delta;
92
93	dprintf_bp(bp, "ds=%p", ds);
94
95	ASSERT(dmu_tx_is_syncing(tx));
96	/* It could have been compressed away to nothing */
97	if (BP_IS_HOLE(bp))
98		return;
99	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
100	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
101	if (ds == NULL) {
102		/*
103		 * Account for the meta-objset space in its placeholder
104		 * dsl_dir.
105		 */
106		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
107		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
108		    used, compressed, uncompressed, tx);
109		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
110		return;
111	}
112	dmu_buf_will_dirty(ds->ds_dbuf, tx);
113
114	mutex_enter(&ds->ds_dir->dd_lock);
115	mutex_enter(&ds->ds_lock);
116	delta = parent_delta(ds, used);
117	ds->ds_phys->ds_used_bytes += used;
118	ds->ds_phys->ds_compressed_bytes += compressed;
119	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
120	ds->ds_phys->ds_unique_bytes += used;
121	mutex_exit(&ds->ds_lock);
122	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
123	    compressed, uncompressed, tx);
124	dsl_dir_transfer_space(ds->ds_dir, used - delta,
125	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
126	mutex_exit(&ds->ds_dir->dd_lock);
127}
128
129int
130dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
131    boolean_t async)
132{
133	if (BP_IS_HOLE(bp))
134		return (0);
135
136	ASSERT(dmu_tx_is_syncing(tx));
137	ASSERT(bp->blk_birth <= tx->tx_txg);
138
139	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
140	int compressed = BP_GET_PSIZE(bp);
141	int uncompressed = BP_GET_UCSIZE(bp);
142
143	ASSERT(used > 0);
144	if (ds == NULL) {
145		/*
146		 * Account for the meta-objset space in its placeholder
147		 * dataset.
148		 */
149		dsl_free(tx->tx_pool, tx->tx_txg, bp);
150
151		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
152		    -used, -compressed, -uncompressed, tx);
153		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
154		return (used);
155	}
156	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
157
158	ASSERT(!dsl_dataset_is_snapshot(ds));
159	dmu_buf_will_dirty(ds->ds_dbuf, tx);
160
161	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
162		int64_t delta;
163
164		dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
165		dsl_free(tx->tx_pool, tx->tx_txg, bp);
166
167		mutex_enter(&ds->ds_dir->dd_lock);
168		mutex_enter(&ds->ds_lock);
169		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
170		    !DS_UNIQUE_IS_ACCURATE(ds));
171		delta = parent_delta(ds, -used);
172		ds->ds_phys->ds_unique_bytes -= used;
173		mutex_exit(&ds->ds_lock);
174		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
175		    delta, -compressed, -uncompressed, tx);
176		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
177		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
178		mutex_exit(&ds->ds_dir->dd_lock);
179	} else {
180		dprintf_bp(bp, "putting on dead list: %s", "");
181		if (async) {
182			/*
183			 * We are here as part of zio's write done callback,
184			 * which means we're a zio interrupt thread.  We can't
185			 * call dsl_deadlist_insert() now because it may block
186			 * waiting for I/O.  Instead, put bp on the deferred
187			 * queue and let dsl_pool_sync() finish the job.
188			 */
189			bplist_append(&ds->ds_pending_deadlist, bp);
190		} else {
191			dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
192		}
193		ASSERT3U(ds->ds_prev->ds_object, ==,
194		    ds->ds_phys->ds_prev_snap_obj);
195		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
196		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
197		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
198		    ds->ds_object && bp->blk_birth >
199		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
200			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
201			mutex_enter(&ds->ds_prev->ds_lock);
202			ds->ds_prev->ds_phys->ds_unique_bytes += used;
203			mutex_exit(&ds->ds_prev->ds_lock);
204		}
205		if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
206			dsl_dir_transfer_space(ds->ds_dir, used,
207			    DD_USED_HEAD, DD_USED_SNAP, tx);
208		}
209	}
210	mutex_enter(&ds->ds_lock);
211	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
212	ds->ds_phys->ds_used_bytes -= used;
213	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
214	ds->ds_phys->ds_compressed_bytes -= compressed;
215	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
216	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
217	mutex_exit(&ds->ds_lock);
218
219	return (used);
220}
221
222uint64_t
223dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
224{
225	uint64_t trysnap = 0;
226
227	if (ds == NULL)
228		return (0);
229	/*
230	 * The snapshot creation could fail, but that would cause an
231	 * incorrect FALSE return, which would only result in an
232	 * overestimation of the amount of space that an operation would
233	 * consume, which is OK.
234	 *
235	 * There's also a small window where we could miss a pending
236	 * snapshot, because we could set the sync task in the quiescing
237	 * phase.  So this should only be used as a guess.
238	 */
239	if (ds->ds_trysnap_txg >
240	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
241		trysnap = ds->ds_trysnap_txg;
242	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
243}
244
245boolean_t
246dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
247    uint64_t blk_birth)
248{
249	if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
250		return (B_FALSE);
251
252	ddt_prefetch(dsl_dataset_get_spa(ds), bp);
253
254	return (B_TRUE);
255}
256
257/* ARGSUSED */
258static void
259dsl_dataset_evict(dmu_buf_t *db, void *dsv)
260{
261	dsl_dataset_t *ds = dsv;
262
263	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
264
265	unique_remove(ds->ds_fsid_guid);
266
267	if (ds->ds_objset != NULL)
268		dmu_objset_evict(ds->ds_objset);
269
270	if (ds->ds_prev) {
271		dsl_dataset_drop_ref(ds->ds_prev, ds);
272		ds->ds_prev = NULL;
273	}
274
275	bplist_destroy(&ds->ds_pending_deadlist);
276	if (db != NULL) {
277		dsl_deadlist_close(&ds->ds_deadlist);
278	} else {
279		ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
280		ASSERT(!ds->ds_deadlist.dl_oldfmt);
281	}
282	if (ds->ds_dir)
283		dsl_dir_close(ds->ds_dir, ds);
284
285	ASSERT(!list_link_active(&ds->ds_synced_link));
286
287	if (mutex_owned(&ds->ds_lock))
288		mutex_exit(&ds->ds_lock);
289	mutex_destroy(&ds->ds_lock);
290	mutex_destroy(&ds->ds_recvlock);
291	if (mutex_owned(&ds->ds_opening_lock))
292		mutex_exit(&ds->ds_opening_lock);
293	mutex_destroy(&ds->ds_opening_lock);
294	rw_destroy(&ds->ds_rwlock);
295	cv_destroy(&ds->ds_exclusive_cv);
296
297	kmem_free(ds, sizeof (dsl_dataset_t));
298}
299
300static int
301dsl_dataset_get_snapname(dsl_dataset_t *ds)
302{
303	dsl_dataset_phys_t *headphys;
304	int err;
305	dmu_buf_t *headdbuf;
306	dsl_pool_t *dp = ds->ds_dir->dd_pool;
307	objset_t *mos = dp->dp_meta_objset;
308
309	if (ds->ds_snapname[0])
310		return (0);
311	if (ds->ds_phys->ds_next_snap_obj == 0)
312		return (0);
313
314	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
315	    FTAG, &headdbuf);
316	if (err)
317		return (err);
318	headphys = headdbuf->db_data;
319	err = zap_value_search(dp->dp_meta_objset,
320	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
321	dmu_buf_rele(headdbuf, FTAG);
322	return (err);
323}
324
325static int
326dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
327{
328	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
329	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
330	matchtype_t mt;
331	int err;
332
333	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
334		mt = MT_FIRST;
335	else
336		mt = MT_EXACT;
337
338	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
339	    value, mt, NULL, 0, NULL);
340	if (err == ENOTSUP && mt == MT_FIRST)
341		err = zap_lookup(mos, snapobj, name, 8, 1, value);
342	return (err);
343}
344
345static int
346dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
347{
348	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
349	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
350	matchtype_t mt;
351	int err;
352
353	dsl_dir_snap_cmtime_update(ds->ds_dir);
354
355	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
356		mt = MT_FIRST;
357	else
358		mt = MT_EXACT;
359
360	err = zap_remove_norm(mos, snapobj, name, mt, tx);
361	if (err == ENOTSUP && mt == MT_FIRST)
362		err = zap_remove(mos, snapobj, name, tx);
363	return (err);
364}
365
366static int
367dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
368    dsl_dataset_t **dsp)
369{
370	objset_t *mos = dp->dp_meta_objset;
371	dmu_buf_t *dbuf;
372	dsl_dataset_t *ds;
373	int err;
374	dmu_object_info_t doi;
375
376	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
377	    dsl_pool_sync_context(dp));
378
379	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
380	if (err)
381		return (err);
382
383	/* Make sure dsobj has the correct object type. */
384	dmu_object_info_from_db(dbuf, &doi);
385	if (doi.doi_type != DMU_OT_DSL_DATASET)
386		return (EINVAL);
387
388	ds = dmu_buf_get_user(dbuf);
389	if (ds == NULL) {
390		dsl_dataset_t *winner;
391
392		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
393		ds->ds_dbuf = dbuf;
394		ds->ds_object = dsobj;
395		ds->ds_phys = dbuf->db_data;
396
397		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
398		mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
399		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
400		rw_init(&ds->ds_rwlock, 0, 0, 0);
401		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
402
403		bplist_create(&ds->ds_pending_deadlist);
404		dsl_deadlist_open(&ds->ds_deadlist,
405		    mos, ds->ds_phys->ds_deadlist_obj);
406
407		if (err == 0) {
408			err = dsl_dir_open_obj(dp,
409			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
410		}
411		if (err) {
412			mutex_destroy(&ds->ds_lock);
413			mutex_destroy(&ds->ds_recvlock);
414			mutex_destroy(&ds->ds_opening_lock);
415			rw_destroy(&ds->ds_rwlock);
416			cv_destroy(&ds->ds_exclusive_cv);
417			bplist_destroy(&ds->ds_pending_deadlist);
418			dsl_deadlist_close(&ds->ds_deadlist);
419			kmem_free(ds, sizeof (dsl_dataset_t));
420			dmu_buf_rele(dbuf, tag);
421			return (err);
422		}
423
424		if (!dsl_dataset_is_snapshot(ds)) {
425			ds->ds_snapname[0] = '\0';
426			if (ds->ds_phys->ds_prev_snap_obj) {
427				err = dsl_dataset_get_ref(dp,
428				    ds->ds_phys->ds_prev_snap_obj,
429				    ds, &ds->ds_prev);
430			}
431		} else {
432			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
433				err = dsl_dataset_get_snapname(ds);
434			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
435				err = zap_count(
436				    ds->ds_dir->dd_pool->dp_meta_objset,
437				    ds->ds_phys->ds_userrefs_obj,
438				    &ds->ds_userrefs);
439			}
440		}
441
442		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
443			/*
444			 * In sync context, we're called with either no lock
445			 * or with the write lock.  If we're not syncing,
446			 * we're always called with the read lock held.
447			 */
448			boolean_t need_lock =
449			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
450			    dsl_pool_sync_context(dp);
451
452			if (need_lock)
453				rw_enter(&dp->dp_config_rwlock, RW_READER);
454
455			err = dsl_prop_get_ds(ds,
456			    "refreservation", sizeof (uint64_t), 1,
457			    &ds->ds_reserved, NULL);
458			if (err == 0) {
459				err = dsl_prop_get_ds(ds,
460				    "refquota", sizeof (uint64_t), 1,
461				    &ds->ds_quota, NULL);
462			}
463
464			if (need_lock)
465				rw_exit(&dp->dp_config_rwlock);
466		} else {
467			ds->ds_reserved = ds->ds_quota = 0;
468		}
469
470		if (err == 0) {
471			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
472			    dsl_dataset_evict);
473		}
474		if (err || winner) {
475			bplist_destroy(&ds->ds_pending_deadlist);
476			dsl_deadlist_close(&ds->ds_deadlist);
477			if (ds->ds_prev)
478				dsl_dataset_drop_ref(ds->ds_prev, ds);
479			dsl_dir_close(ds->ds_dir, ds);
480			mutex_destroy(&ds->ds_lock);
481			mutex_destroy(&ds->ds_recvlock);
482			mutex_destroy(&ds->ds_opening_lock);
483			rw_destroy(&ds->ds_rwlock);
484			cv_destroy(&ds->ds_exclusive_cv);
485			kmem_free(ds, sizeof (dsl_dataset_t));
486			if (err) {
487				dmu_buf_rele(dbuf, tag);
488				return (err);
489			}
490			ds = winner;
491		} else {
492			ds->ds_fsid_guid =
493			    unique_insert(ds->ds_phys->ds_fsid_guid);
494		}
495	}
496	ASSERT3P(ds->ds_dbuf, ==, dbuf);
497	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
498	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
499	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
500	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
501	mutex_enter(&ds->ds_lock);
502	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
503		mutex_exit(&ds->ds_lock);
504		dmu_buf_rele(ds->ds_dbuf, tag);
505		return (ENOENT);
506	}
507	mutex_exit(&ds->ds_lock);
508	*dsp = ds;
509	return (0);
510}
511
512static int
513dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
514{
515	dsl_pool_t *dp = ds->ds_dir->dd_pool;
516
517	/*
518	 * In syncing context we don't want the rwlock lock: there
519	 * may be an existing writer waiting for sync phase to
520	 * finish.  We don't need to worry about such writers, since
521	 * sync phase is single-threaded, so the writer can't be
522	 * doing anything while we are active.
523	 */
524	if (dsl_pool_sync_context(dp)) {
525		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
526		return (0);
527	}
528
529	/*
530	 * Normal users will hold the ds_rwlock as a READER until they
531	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
532	 * drop their READER lock after they set the ds_owner field.
533	 *
534	 * If the dataset is being destroyed, the destroy thread will
535	 * obtain a WRITER lock for exclusive access after it's done its
536	 * open-context work and then change the ds_owner to
537	 * dsl_reaper once destruction is assured.  So threads
538	 * may block here temporarily, until the "destructability" of
539	 * the dataset is determined.
540	 */
541	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
542	mutex_enter(&ds->ds_lock);
543	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
544		rw_exit(&dp->dp_config_rwlock);
545		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
546		if (DSL_DATASET_IS_DESTROYED(ds)) {
547			mutex_exit(&ds->ds_lock);
548			dsl_dataset_drop_ref(ds, tag);
549			rw_enter(&dp->dp_config_rwlock, RW_READER);
550			return (ENOENT);
551		}
552		/*
553		 * The dp_config_rwlock lives above the ds_lock. And
554		 * we need to check DSL_DATASET_IS_DESTROYED() while
555		 * holding the ds_lock, so we have to drop and reacquire
556		 * the ds_lock here.
557		 */
558		mutex_exit(&ds->ds_lock);
559		rw_enter(&dp->dp_config_rwlock, RW_READER);
560		mutex_enter(&ds->ds_lock);
561	}
562	mutex_exit(&ds->ds_lock);
563	return (0);
564}
565
566int
567dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
568    dsl_dataset_t **dsp)
569{
570	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
571
572	if (err)
573		return (err);
574	return (dsl_dataset_hold_ref(*dsp, tag));
575}
576
577int
578dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
579    void *tag, dsl_dataset_t **dsp)
580{
581	int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
582	if (err)
583		return (err);
584	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
585		dsl_dataset_rele(*dsp, tag);
586		*dsp = NULL;
587		return (EBUSY);
588	}
589	return (0);
590}
591
592int
593dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
594{
595	dsl_dir_t *dd;
596	dsl_pool_t *dp;
597	const char *snapname;
598	uint64_t obj;
599	int err = 0;
600
601	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
602	if (err)
603		return (err);
604
605	dp = dd->dd_pool;
606	obj = dd->dd_phys->dd_head_dataset_obj;
607	rw_enter(&dp->dp_config_rwlock, RW_READER);
608	if (obj)
609		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
610	else
611		err = ENOENT;
612	if (err)
613		goto out;
614
615	err = dsl_dataset_hold_ref(*dsp, tag);
616
617	/* we may be looking for a snapshot */
618	if (err == 0 && snapname != NULL) {
619		dsl_dataset_t *ds = NULL;
620
621		if (*snapname++ != '@') {
622			dsl_dataset_rele(*dsp, tag);
623			err = ENOENT;
624			goto out;
625		}
626
627		dprintf("looking for snapshot '%s'\n", snapname);
628		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
629		if (err == 0)
630			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
631		dsl_dataset_rele(*dsp, tag);
632
633		ASSERT3U((err == 0), ==, (ds != NULL));
634
635		if (ds) {
636			mutex_enter(&ds->ds_lock);
637			if (ds->ds_snapname[0] == 0)
638				(void) strlcpy(ds->ds_snapname, snapname,
639				    sizeof (ds->ds_snapname));
640			mutex_exit(&ds->ds_lock);
641			err = dsl_dataset_hold_ref(ds, tag);
642			*dsp = err ? NULL : ds;
643		}
644	}
645out:
646	rw_exit(&dp->dp_config_rwlock);
647	dsl_dir_close(dd, FTAG);
648	return (err);
649}
650
651int
652dsl_dataset_own(const char *name, boolean_t inconsistentok,
653    void *tag, dsl_dataset_t **dsp)
654{
655	int err = dsl_dataset_hold(name, tag, dsp);
656	if (err)
657		return (err);
658	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
659		dsl_dataset_rele(*dsp, tag);
660		return (EBUSY);
661	}
662	return (0);
663}
664
665void
666dsl_dataset_name(dsl_dataset_t *ds, char *name)
667{
668	if (ds == NULL) {
669		(void) strcpy(name, "mos");
670	} else {
671		dsl_dir_name(ds->ds_dir, name);
672		VERIFY(0 == dsl_dataset_get_snapname(ds));
673		if (ds->ds_snapname[0]) {
674			(void) strcat(name, "@");
675			/*
676			 * We use a "recursive" mutex so that we
677			 * can call dprintf_ds() with ds_lock held.
678			 */
679			if (!MUTEX_HELD(&ds->ds_lock)) {
680				mutex_enter(&ds->ds_lock);
681				(void) strcat(name, ds->ds_snapname);
682				mutex_exit(&ds->ds_lock);
683			} else {
684				(void) strcat(name, ds->ds_snapname);
685			}
686		}
687	}
688}
689
690static int
691dsl_dataset_namelen(dsl_dataset_t *ds)
692{
693	int result;
694
695	if (ds == NULL) {
696		result = 3;	/* "mos" */
697	} else {
698		result = dsl_dir_namelen(ds->ds_dir);
699		VERIFY(0 == dsl_dataset_get_snapname(ds));
700		if (ds->ds_snapname[0]) {
701			++result;	/* adding one for the @-sign */
702			if (!MUTEX_HELD(&ds->ds_lock)) {
703				mutex_enter(&ds->ds_lock);
704				result += strlen(ds->ds_snapname);
705				mutex_exit(&ds->ds_lock);
706			} else {
707				result += strlen(ds->ds_snapname);
708			}
709		}
710	}
711
712	return (result);
713}
714
715void
716dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
717{
718	dmu_buf_rele(ds->ds_dbuf, tag);
719}
720
721void
722dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
723{
724	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
725		rw_exit(&ds->ds_rwlock);
726	}
727	dsl_dataset_drop_ref(ds, tag);
728}
729
730void
731dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
732{
733	ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
734	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
735
736	mutex_enter(&ds->ds_lock);
737	ds->ds_owner = NULL;
738	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
739		rw_exit(&ds->ds_rwlock);
740		cv_broadcast(&ds->ds_exclusive_cv);
741	}
742	mutex_exit(&ds->ds_lock);
743	if (ds->ds_dbuf)
744		dsl_dataset_drop_ref(ds, tag);
745	else
746		dsl_dataset_evict(NULL, ds);
747}
748
749boolean_t
750dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
751{
752	boolean_t gotit = FALSE;
753
754	mutex_enter(&ds->ds_lock);
755	if (ds->ds_owner == NULL &&
756	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
757		ds->ds_owner = tag;
758		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
759			rw_exit(&ds->ds_rwlock);
760		gotit = TRUE;
761	}
762	mutex_exit(&ds->ds_lock);
763	return (gotit);
764}
765
766void
767dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
768{
769	ASSERT3P(owner, ==, ds->ds_owner);
770	if (!RW_WRITE_HELD(&ds->ds_rwlock))
771		rw_enter(&ds->ds_rwlock, RW_WRITER);
772}
773
774uint64_t
775dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
776    uint64_t flags, dmu_tx_t *tx)
777{
778	dsl_pool_t *dp = dd->dd_pool;
779	dmu_buf_t *dbuf;
780	dsl_dataset_phys_t *dsphys;
781	uint64_t dsobj;
782	objset_t *mos = dp->dp_meta_objset;
783
784	if (origin == NULL)
785		origin = dp->dp_origin_snap;
786
787	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
788	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
789	ASSERT(dmu_tx_is_syncing(tx));
790	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
791
792	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
793	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
794	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
795	dmu_buf_will_dirty(dbuf, tx);
796	dsphys = dbuf->db_data;
797	bzero(dsphys, sizeof (dsl_dataset_phys_t));
798	dsphys->ds_dir_obj = dd->dd_object;
799	dsphys->ds_flags = flags;
800	dsphys->ds_fsid_guid = unique_create();
801	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
802	    sizeof (dsphys->ds_guid));
803	dsphys->ds_snapnames_zapobj =
804	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
805	    DMU_OT_NONE, 0, tx);
806	dsphys->ds_creation_time = gethrestime_sec();
807	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
808
809	if (origin == NULL) {
810		dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
811	} else {
812		dsl_dataset_t *ohds;
813
814		dsphys->ds_prev_snap_obj = origin->ds_object;
815		dsphys->ds_prev_snap_txg =
816		    origin->ds_phys->ds_creation_txg;
817		dsphys->ds_used_bytes =
818		    origin->ds_phys->ds_used_bytes;
819		dsphys->ds_compressed_bytes =
820		    origin->ds_phys->ds_compressed_bytes;
821		dsphys->ds_uncompressed_bytes =
822		    origin->ds_phys->ds_uncompressed_bytes;
823		dsphys->ds_bp = origin->ds_phys->ds_bp;
824		dsphys->ds_flags |= origin->ds_phys->ds_flags;
825
826		dmu_buf_will_dirty(origin->ds_dbuf, tx);
827		origin->ds_phys->ds_num_children++;
828
829		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
830		    origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
831		dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
832		    dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
833		dsl_dataset_rele(ohds, FTAG);
834
835		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
836			if (origin->ds_phys->ds_next_clones_obj == 0) {
837				origin->ds_phys->ds_next_clones_obj =
838				    zap_create(mos,
839				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
840			}
841			VERIFY(0 == zap_add_int(mos,
842			    origin->ds_phys->ds_next_clones_obj,
843			    dsobj, tx));
844		}
845
846		dmu_buf_will_dirty(dd->dd_dbuf, tx);
847		dd->dd_phys->dd_origin_obj = origin->ds_object;
848		if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
849			if (origin->ds_dir->dd_phys->dd_clones == 0) {
850				dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
851				origin->ds_dir->dd_phys->dd_clones =
852				    zap_create(mos,
853				    DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
854			}
855			VERIFY3U(0, ==, zap_add_int(mos,
856			    origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
857		}
858	}
859
860	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
861		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
862
863	dmu_buf_rele(dbuf, FTAG);
864
865	dmu_buf_will_dirty(dd->dd_dbuf, tx);
866	dd->dd_phys->dd_head_dataset_obj = dsobj;
867
868	return (dsobj);
869}
870
871uint64_t
872dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
873    dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
874{
875	dsl_pool_t *dp = pdd->dd_pool;
876	uint64_t dsobj, ddobj;
877	dsl_dir_t *dd;
878
879	ASSERT(lastname[0] != '@');
880
881	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
882	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
883
884	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
885
886	dsl_deleg_set_create_perms(dd, tx, cr);
887
888	dsl_dir_close(dd, FTAG);
889
890	/*
891	 * If we are creating a clone, make sure we zero out any stale
892	 * data from the origin snapshots zil header.
893	 */
894	if (origin != NULL) {
895		dsl_dataset_t *ds;
896		objset_t *os;
897
898		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
899		VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
900		bzero(&os->os_zil_header, sizeof (os->os_zil_header));
901		dsl_dataset_dirty(ds, tx);
902		dsl_dataset_rele(ds, FTAG);
903	}
904
905	return (dsobj);
906}
907
908struct destroyarg {
909	dsl_sync_task_group_t *dstg;
910	char *snapname;
911	char *failed;
912	boolean_t defer;
913};
914
915static int
916dsl_snapshot_destroy_one(const char *name, void *arg)
917{
918	struct destroyarg *da = arg;
919	dsl_dataset_t *ds;
920	int err;
921	char *dsname;
922
923	dsname = kmem_asprintf("%s@%s", name, da->snapname);
924	err = dsl_dataset_own(dsname, B_TRUE, da->dstg, &ds);
925	strfree(dsname);
926	if (err == 0) {
927		struct dsl_ds_destroyarg *dsda;
928
929		dsl_dataset_make_exclusive(ds, da->dstg);
930		dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
931		dsda->ds = ds;
932		dsda->defer = da->defer;
933		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
934		    dsl_dataset_destroy_sync, dsda, da->dstg, 0);
935	} else if (err == ENOENT) {
936		err = 0;
937	} else {
938		(void) strcpy(da->failed, name);
939	}
940	return (err);
941}
942
943/*
944 * Destroy 'snapname' in all descendants of 'fsname'.
945 */
946#pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
947int
948dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
949{
950	int err;
951	struct destroyarg da;
952	dsl_sync_task_t *dst;
953	spa_t *spa;
954
955	err = spa_open(fsname, &spa, FTAG);
956	if (err)
957		return (err);
958	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
959	da.snapname = snapname;
960	da.failed = fsname;
961	da.defer = defer;
962
963	err = dmu_objset_find(fsname,
964	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
965
966	if (err == 0)
967		err = dsl_sync_task_group_wait(da.dstg);
968
969	for (dst = list_head(&da.dstg->dstg_tasks); dst;
970	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
971		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
972		dsl_dataset_t *ds = dsda->ds;
973
974		/*
975		 * Return the file system name that triggered the error
976		 */
977		if (dst->dst_err) {
978			dsl_dataset_name(ds, fsname);
979			*strchr(fsname, '@') = '\0';
980		}
981		ASSERT3P(dsda->rm_origin, ==, NULL);
982		dsl_dataset_disown(ds, da.dstg);
983		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
984	}
985
986	dsl_sync_task_group_destroy(da.dstg);
987	spa_close(spa, FTAG);
988	return (err);
989}
990
991static boolean_t
992dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
993{
994	boolean_t might_destroy = B_FALSE;
995
996	mutex_enter(&ds->ds_lock);
997	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
998	    DS_IS_DEFER_DESTROY(ds))
999		might_destroy = B_TRUE;
1000	mutex_exit(&ds->ds_lock);
1001
1002	return (might_destroy);
1003}
1004
1005/*
1006 * If we're removing a clone, and these three conditions are true:
1007 *	1) the clone's origin has no other children
1008 *	2) the clone's origin has no user references
1009 *	3) the clone's origin has been marked for deferred destruction
1010 * Then, prepare to remove the origin as part of this sync task group.
1011 */
1012static int
1013dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1014{
1015	dsl_dataset_t *ds = dsda->ds;
1016	dsl_dataset_t *origin = ds->ds_prev;
1017
1018	if (dsl_dataset_might_destroy_origin(origin)) {
1019		char *name;
1020		int namelen;
1021		int error;
1022
1023		namelen = dsl_dataset_namelen(origin) + 1;
1024		name = kmem_alloc(namelen, KM_SLEEP);
1025		dsl_dataset_name(origin, name);
1026#ifdef _KERNEL
1027		error = zfs_unmount_snap(name, NULL);
1028		if (error) {
1029			kmem_free(name, namelen);
1030			return (error);
1031		}
1032#endif
1033		error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1034		kmem_free(name, namelen);
1035		if (error)
1036			return (error);
1037		dsda->rm_origin = origin;
1038		dsl_dataset_make_exclusive(origin, tag);
1039	}
1040
1041	return (0);
1042}
1043
1044/*
1045 * ds must be opened as OWNER.  On return (whether successful or not),
1046 * ds will be closed and caller can no longer dereference it.
1047 */
1048int
1049dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1050{
1051	int err;
1052	dsl_sync_task_group_t *dstg;
1053	objset_t *os;
1054	dsl_dir_t *dd;
1055	uint64_t obj;
1056	struct dsl_ds_destroyarg dsda = { 0 };
1057	dsl_dataset_t dummy_ds = { 0 };
1058
1059	dsda.ds = ds;
1060
1061	if (dsl_dataset_is_snapshot(ds)) {
1062		/* Destroying a snapshot is simpler */
1063		dsl_dataset_make_exclusive(ds, tag);
1064
1065		dsda.defer = defer;
1066		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1067		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1068		    &dsda, tag, 0);
1069		ASSERT3P(dsda.rm_origin, ==, NULL);
1070		goto out;
1071	} else if (defer) {
1072		err = EINVAL;
1073		goto out;
1074	}
1075
1076	dd = ds->ds_dir;
1077	dummy_ds.ds_dir = dd;
1078	dummy_ds.ds_object = ds->ds_object;
1079
1080	/*
1081	 * Check for errors and mark this ds as inconsistent, in
1082	 * case we crash while freeing the objects.
1083	 */
1084	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1085	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1086	if (err)
1087		goto out;
1088
1089	err = dmu_objset_from_ds(ds, &os);
1090	if (err)
1091		goto out;
1092
1093	/*
1094	 * remove the objects in open context, so that we won't
1095	 * have too much to do in syncing context.
1096	 */
1097	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1098	    ds->ds_phys->ds_prev_snap_txg)) {
1099		/*
1100		 * Ignore errors, if there is not enough disk space
1101		 * we will deal with it in dsl_dataset_destroy_sync().
1102		 */
1103		(void) dmu_free_object(os, obj);
1104	}
1105	if (err != ESRCH)
1106		goto out;
1107
1108	/*
1109	 * Only the ZIL knows how to free log blocks.
1110	 */
1111	zil_destroy(dmu_objset_zil(os), B_FALSE);
1112
1113	/*
1114	 * Sync out all in-flight IO.
1115	 */
1116	txg_wait_synced(dd->dd_pool, 0);
1117
1118	/*
1119	 * If we managed to free all the objects in open
1120	 * context, the user space accounting should be zero.
1121	 */
1122	if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1123	    dmu_objset_userused_enabled(os)) {
1124		uint64_t count;
1125
1126		ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1127		    count == 0);
1128		ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1129		    count == 0);
1130	}
1131
1132	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1133	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1134	rw_exit(&dd->dd_pool->dp_config_rwlock);
1135
1136	if (err)
1137		goto out;
1138
1139	/*
1140	 * Blow away the dsl_dir + head dataset.
1141	 */
1142	dsl_dataset_make_exclusive(ds, tag);
1143	/*
1144	 * If we're removing a clone, we might also need to remove its
1145	 * origin.
1146	 */
1147	do {
1148		dsda.need_prep = B_FALSE;
1149		if (dsl_dir_is_clone(dd)) {
1150			err = dsl_dataset_origin_rm_prep(&dsda, tag);
1151			if (err) {
1152				dsl_dir_close(dd, FTAG);
1153				goto out;
1154			}
1155		}
1156
1157		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1158		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1159		    dsl_dataset_destroy_sync, &dsda, tag, 0);
1160		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1161		    dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1162		err = dsl_sync_task_group_wait(dstg);
1163		dsl_sync_task_group_destroy(dstg);
1164
1165		/*
1166		 * We could be racing against 'zfs release' or 'zfs destroy -d'
1167		 * on the origin snap, in which case we can get EBUSY if we
1168		 * needed to destroy the origin snap but were not ready to
1169		 * do so.
1170		 */
1171		if (dsda.need_prep) {
1172			ASSERT(err == EBUSY);
1173			ASSERT(dsl_dir_is_clone(dd));
1174			ASSERT(dsda.rm_origin == NULL);
1175		}
1176	} while (dsda.need_prep);
1177
1178	if (dsda.rm_origin != NULL)
1179		dsl_dataset_disown(dsda.rm_origin, tag);
1180
1181	/* if it is successful, dsl_dir_destroy_sync will close the dd */
1182	if (err)
1183		dsl_dir_close(dd, FTAG);
1184out:
1185	dsl_dataset_disown(ds, tag);
1186	return (err);
1187}
1188
1189blkptr_t *
1190dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1191{
1192	return (&ds->ds_phys->ds_bp);
1193}
1194
1195void
1196dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1197{
1198	ASSERT(dmu_tx_is_syncing(tx));
1199	/* If it's the meta-objset, set dp_meta_rootbp */
1200	if (ds == NULL) {
1201		tx->tx_pool->dp_meta_rootbp = *bp;
1202	} else {
1203		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1204		ds->ds_phys->ds_bp = *bp;
1205	}
1206}
1207
1208spa_t *
1209dsl_dataset_get_spa(dsl_dataset_t *ds)
1210{
1211	return (ds->ds_dir->dd_pool->dp_spa);
1212}
1213
1214void
1215dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1216{
1217	dsl_pool_t *dp;
1218
1219	if (ds == NULL) /* this is the meta-objset */
1220		return;
1221
1222	ASSERT(ds->ds_objset != NULL);
1223
1224	if (ds->ds_phys->ds_next_snap_obj != 0)
1225		panic("dirtying snapshot!");
1226
1227	dp = ds->ds_dir->dd_pool;
1228
1229	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1230		/* up the hold count until we can be written out */
1231		dmu_buf_add_ref(ds->ds_dbuf, ds);
1232	}
1233}
1234
1235/*
1236 * The unique space in the head dataset can be calculated by subtracting
1237 * the space used in the most recent snapshot, that is still being used
1238 * in this file system, from the space currently in use.  To figure out
1239 * the space in the most recent snapshot still in use, we need to take
1240 * the total space used in the snapshot and subtract out the space that
1241 * has been freed up since the snapshot was taken.
1242 */
1243static void
1244dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1245{
1246	uint64_t mrs_used;
1247	uint64_t dlused, dlcomp, dluncomp;
1248
1249	ASSERT(!dsl_dataset_is_snapshot(ds));
1250
1251	if (ds->ds_phys->ds_prev_snap_obj != 0)
1252		mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
1253	else
1254		mrs_used = 0;
1255
1256	dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1257
1258	ASSERT3U(dlused, <=, mrs_used);
1259	ds->ds_phys->ds_unique_bytes =
1260	    ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
1261
1262	if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1263	    SPA_VERSION_UNIQUE_ACCURATE)
1264		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1265}
1266
1267struct killarg {
1268	dsl_dataset_t *ds;
1269	dmu_tx_t *tx;
1270};
1271
1272/* ARGSUSED */
1273static int
1274kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1275    const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1276{
1277	struct killarg *ka = arg;
1278	dmu_tx_t *tx = ka->tx;
1279
1280	if (bp == NULL)
1281		return (0);
1282
1283	if (zb->zb_level == ZB_ZIL_LEVEL) {
1284		ASSERT(zilog != NULL);
1285		/*
1286		 * It's a block in the intent log.  It has no
1287		 * accounting, so just free it.
1288		 */
1289		dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1290	} else {
1291		ASSERT(zilog == NULL);
1292		ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1293		(void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1294	}
1295
1296	return (0);
1297}
1298
1299/* ARGSUSED */
1300static int
1301dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1302{
1303	dsl_dataset_t *ds = arg1;
1304	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1305	uint64_t count;
1306	int err;
1307
1308	/*
1309	 * Can't delete a head dataset if there are snapshots of it.
1310	 * (Except if the only snapshots are from the branch we cloned
1311	 * from.)
1312	 */
1313	if (ds->ds_prev != NULL &&
1314	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1315		return (EBUSY);
1316
1317	/*
1318	 * This is really a dsl_dir thing, but check it here so that
1319	 * we'll be less likely to leave this dataset inconsistent &
1320	 * nearly destroyed.
1321	 */
1322	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1323	if (err)
1324		return (err);
1325	if (count != 0)
1326		return (EEXIST);
1327
1328	return (0);
1329}
1330
1331/* ARGSUSED */
1332static void
1333dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1334{
1335	dsl_dataset_t *ds = arg1;
1336	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1337
1338	/* Mark it as inconsistent on-disk, in case we crash */
1339	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1340	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1341
1342	spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1343	    "dataset = %llu", ds->ds_object);
1344}
1345
1346static int
1347dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1348    dmu_tx_t *tx)
1349{
1350	dsl_dataset_t *ds = dsda->ds;
1351	dsl_dataset_t *ds_prev = ds->ds_prev;
1352
1353	if (dsl_dataset_might_destroy_origin(ds_prev)) {
1354		struct dsl_ds_destroyarg ndsda = {0};
1355
1356		/*
1357		 * If we're not prepared to remove the origin, don't remove
1358		 * the clone either.
1359		 */
1360		if (dsda->rm_origin == NULL) {
1361			dsda->need_prep = B_TRUE;
1362			return (EBUSY);
1363		}
1364
1365		ndsda.ds = ds_prev;
1366		ndsda.is_origin_rm = B_TRUE;
1367		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1368	}
1369
1370	/*
1371	 * If we're not going to remove the origin after all,
1372	 * undo the open context setup.
1373	 */
1374	if (dsda->rm_origin != NULL) {
1375		dsl_dataset_disown(dsda->rm_origin, tag);
1376		dsda->rm_origin = NULL;
1377	}
1378
1379	return (0);
1380}
1381
1382/*
1383 * If you add new checks here, you may need to add
1384 * additional checks to the "temporary" case in
1385 * snapshot_check() in dmu_objset.c.
1386 */
1387/* ARGSUSED */
1388int
1389dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1390{
1391	struct dsl_ds_destroyarg *dsda = arg1;
1392	dsl_dataset_t *ds = dsda->ds;
1393
1394	/* we have an owner hold, so noone else can destroy us */
1395	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1396
1397	/*
1398	 * Only allow deferred destroy on pools that support it.
1399	 * NOTE: deferred destroy is only supported on snapshots.
1400	 */
1401	if (dsda->defer) {
1402		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1403		    SPA_VERSION_USERREFS)
1404			return (ENOTSUP);
1405		ASSERT(dsl_dataset_is_snapshot(ds));
1406		return (0);
1407	}
1408
1409	/*
1410	 * Can't delete a head dataset if there are snapshots of it.
1411	 * (Except if the only snapshots are from the branch we cloned
1412	 * from.)
1413	 */
1414	if (ds->ds_prev != NULL &&
1415	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1416		return (EBUSY);
1417
1418	/*
1419	 * If we made changes this txg, traverse_dsl_dataset won't find
1420	 * them.  Try again.
1421	 */
1422	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1423		return (EAGAIN);
1424
1425	if (dsl_dataset_is_snapshot(ds)) {
1426		/*
1427		 * If this snapshot has an elevated user reference count,
1428		 * we can't destroy it yet.
1429		 */
1430		if (ds->ds_userrefs > 0 && !dsda->releasing)
1431			return (EBUSY);
1432
1433		mutex_enter(&ds->ds_lock);
1434		/*
1435		 * Can't delete a branch point. However, if we're destroying
1436		 * a clone and removing its origin due to it having a user
1437		 * hold count of 0 and having been marked for deferred destroy,
1438		 * it's OK for the origin to have a single clone.
1439		 */
1440		if (ds->ds_phys->ds_num_children >
1441		    (dsda->is_origin_rm ? 2 : 1)) {
1442			mutex_exit(&ds->ds_lock);
1443			return (EEXIST);
1444		}
1445		mutex_exit(&ds->ds_lock);
1446	} else if (dsl_dir_is_clone(ds->ds_dir)) {
1447		return (dsl_dataset_origin_check(dsda, arg2, tx));
1448	}
1449
1450	/* XXX we should do some i/o error checking... */
1451	return (0);
1452}
1453
1454struct refsarg {
1455	kmutex_t lock;
1456	boolean_t gone;
1457	kcondvar_t cv;
1458};
1459
1460/* ARGSUSED */
1461static void
1462dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1463{
1464	struct refsarg *arg = argv;
1465
1466	mutex_enter(&arg->lock);
1467	arg->gone = TRUE;
1468	cv_signal(&arg->cv);
1469	mutex_exit(&arg->lock);
1470}
1471
1472static void
1473dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1474{
1475	struct refsarg arg;
1476
1477	bzero(&arg, sizeof(arg));
1478	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1479	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1480	arg.gone = FALSE;
1481	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1482	    dsl_dataset_refs_gone);
1483	dmu_buf_rele(ds->ds_dbuf, tag);
1484	mutex_enter(&arg.lock);
1485	while (!arg.gone)
1486		cv_wait(&arg.cv, &arg.lock);
1487	ASSERT(arg.gone);
1488	mutex_exit(&arg.lock);
1489	ds->ds_dbuf = NULL;
1490	ds->ds_phys = NULL;
1491	mutex_destroy(&arg.lock);
1492	cv_destroy(&arg.cv);
1493}
1494
1495static void
1496remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1497{
1498	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1499	uint64_t count;
1500	int err;
1501
1502	ASSERT(ds->ds_phys->ds_num_children >= 2);
1503	err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1504	/*
1505	 * The err should not be ENOENT, but a bug in a previous version
1506	 * of the code could cause upgrade_clones_cb() to not set
1507	 * ds_next_snap_obj when it should, leading to a missing entry.
1508	 * If we knew that the pool was created after
1509	 * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1510	 * ENOENT.  However, at least we can check that we don't have
1511	 * too many entries in the next_clones_obj even after failing to
1512	 * remove this one.
1513	 */
1514	if (err != ENOENT) {
1515		VERIFY3U(err, ==, 0);
1516	}
1517	ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1518	    &count));
1519	ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1520}
1521
1522static void
1523dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1524{
1525	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1526	zap_cursor_t zc;
1527	zap_attribute_t za;
1528
1529	/*
1530	 * If it is the old version, dd_clones doesn't exist so we can't
1531	 * find the clones, but deadlist_remove_key() is a no-op so it
1532	 * doesn't matter.
1533	 */
1534	if (ds->ds_dir->dd_phys->dd_clones == 0)
1535		return;
1536
1537	for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1538	    zap_cursor_retrieve(&zc, &za) == 0;
1539	    zap_cursor_advance(&zc)) {
1540		dsl_dataset_t *clone;
1541
1542		VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1543		    za.za_first_integer, FTAG, &clone));
1544		if (clone->ds_dir->dd_origin_txg > mintxg) {
1545			dsl_deadlist_remove_key(&clone->ds_deadlist,
1546			    mintxg, tx);
1547			dsl_dataset_remove_clones_key(clone, mintxg, tx);
1548		}
1549		dsl_dataset_rele(clone, FTAG);
1550	}
1551	zap_cursor_fini(&zc);
1552}
1553
1554struct process_old_arg {
1555	dsl_dataset_t *ds;
1556	dsl_dataset_t *ds_prev;
1557	boolean_t after_branch_point;
1558	zio_t *pio;
1559	uint64_t used, comp, uncomp;
1560};
1561
1562static int
1563process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1564{
1565	struct process_old_arg *poa = arg;
1566	dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1567
1568	if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1569		dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1570		if (poa->ds_prev && !poa->after_branch_point &&
1571		    bp->blk_birth >
1572		    poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1573			poa->ds_prev->ds_phys->ds_unique_bytes +=
1574			    bp_get_dsize_sync(dp->dp_spa, bp);
1575		}
1576	} else {
1577		poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1578		poa->comp += BP_GET_PSIZE(bp);
1579		poa->uncomp += BP_GET_UCSIZE(bp);
1580		dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1581	}
1582	return (0);
1583}
1584
1585static void
1586process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1587    dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1588{
1589	struct process_old_arg poa = { 0 };
1590	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1591	objset_t *mos = dp->dp_meta_objset;
1592
1593	ASSERT(ds->ds_deadlist.dl_oldfmt);
1594	ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1595
1596	poa.ds = ds;
1597	poa.ds_prev = ds_prev;
1598	poa.after_branch_point = after_branch_point;
1599	poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1600	VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1601	    process_old_cb, &poa, tx));
1602	VERIFY3U(zio_wait(poa.pio), ==, 0);
1603	ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1604
1605	/* change snapused */
1606	dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1607	    -poa.used, -poa.comp, -poa.uncomp, tx);
1608
1609	/* swap next's deadlist to our deadlist */
1610	dsl_deadlist_close(&ds->ds_deadlist);
1611	dsl_deadlist_close(&ds_next->ds_deadlist);
1612	SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1613	    ds->ds_phys->ds_deadlist_obj);
1614	dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1615	dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1616	    ds_next->ds_phys->ds_deadlist_obj);
1617}
1618
1619void
1620dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1621{
1622	struct dsl_ds_destroyarg *dsda = arg1;
1623	dsl_dataset_t *ds = dsda->ds;
1624	int err;
1625	int after_branch_point = FALSE;
1626	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1627	objset_t *mos = dp->dp_meta_objset;
1628	dsl_dataset_t *ds_prev = NULL;
1629	boolean_t wont_destroy;
1630	uint64_t obj;
1631
1632	wont_destroy = (dsda->defer &&
1633	    (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1634
1635	ASSERT(ds->ds_owner || wont_destroy);
1636	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1637	ASSERT(ds->ds_prev == NULL ||
1638	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1639	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1640
1641	if (wont_destroy) {
1642		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1643		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1644		ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1645		return;
1646	}
1647
1648	/* signal any waiters that this dataset is going away */
1649	mutex_enter(&ds->ds_lock);
1650	ds->ds_owner = dsl_reaper;
1651	cv_broadcast(&ds->ds_exclusive_cv);
1652	mutex_exit(&ds->ds_lock);
1653
1654	/* Remove our reservation */
1655	if (ds->ds_reserved != 0) {
1656		dsl_prop_setarg_t psa;
1657		uint64_t value = 0;
1658
1659		dsl_prop_setarg_init_uint64(&psa, "refreservation",
1660		    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1661		    &value);
1662		psa.psa_effective_value = 0;	/* predict default value */
1663
1664		dsl_dataset_set_reservation_sync(ds, &psa, tx);
1665		ASSERT3U(ds->ds_reserved, ==, 0);
1666	}
1667
1668	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1669
1670	dsl_scan_ds_destroyed(ds, tx);
1671
1672	obj = ds->ds_object;
1673
1674	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1675		if (ds->ds_prev) {
1676			ds_prev = ds->ds_prev;
1677		} else {
1678			VERIFY(0 == dsl_dataset_hold_obj(dp,
1679			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1680		}
1681		after_branch_point =
1682		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1683
1684		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1685		if (after_branch_point &&
1686		    ds_prev->ds_phys->ds_next_clones_obj != 0) {
1687			remove_from_next_clones(ds_prev, obj, tx);
1688			if (ds->ds_phys->ds_next_snap_obj != 0) {
1689				VERIFY(0 == zap_add_int(mos,
1690				    ds_prev->ds_phys->ds_next_clones_obj,
1691				    ds->ds_phys->ds_next_snap_obj, tx));
1692			}
1693		}
1694		if (after_branch_point &&
1695		    ds->ds_phys->ds_next_snap_obj == 0) {
1696			/* This clone is toast. */
1697			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1698			ds_prev->ds_phys->ds_num_children--;
1699
1700			/*
1701			 * If the clone's origin has no other clones, no
1702			 * user holds, and has been marked for deferred
1703			 * deletion, then we should have done the necessary
1704			 * destroy setup for it.
1705			 */
1706			if (ds_prev->ds_phys->ds_num_children == 1 &&
1707			    ds_prev->ds_userrefs == 0 &&
1708			    DS_IS_DEFER_DESTROY(ds_prev)) {
1709				ASSERT3P(dsda->rm_origin, !=, NULL);
1710			} else {
1711				ASSERT3P(dsda->rm_origin, ==, NULL);
1712			}
1713		} else if (!after_branch_point) {
1714			ds_prev->ds_phys->ds_next_snap_obj =
1715			    ds->ds_phys->ds_next_snap_obj;
1716		}
1717	}
1718
1719	if (dsl_dataset_is_snapshot(ds)) {
1720		dsl_dataset_t *ds_next;
1721		uint64_t old_unique;
1722		uint64_t used = 0, comp = 0, uncomp = 0;
1723
1724		VERIFY(0 == dsl_dataset_hold_obj(dp,
1725		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1726		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1727
1728		old_unique = ds_next->ds_phys->ds_unique_bytes;
1729
1730		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1731		ds_next->ds_phys->ds_prev_snap_obj =
1732		    ds->ds_phys->ds_prev_snap_obj;
1733		ds_next->ds_phys->ds_prev_snap_txg =
1734		    ds->ds_phys->ds_prev_snap_txg;
1735		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1736		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1737
1738
1739		if (ds_next->ds_deadlist.dl_oldfmt) {
1740			process_old_deadlist(ds, ds_prev, ds_next,
1741			    after_branch_point, tx);
1742		} else {
1743			/* Adjust prev's unique space. */
1744			if (ds_prev && !after_branch_point) {
1745				dsl_deadlist_space_range(&ds_next->ds_deadlist,
1746				    ds_prev->ds_phys->ds_prev_snap_txg,
1747				    ds->ds_phys->ds_prev_snap_txg,
1748				    &used, &comp, &uncomp);
1749				ds_prev->ds_phys->ds_unique_bytes += used;
1750			}
1751
1752			/* Adjust snapused. */
1753			dsl_deadlist_space_range(&ds_next->ds_deadlist,
1754			    ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1755			    &used, &comp, &uncomp);
1756			dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1757			    -used, -comp, -uncomp, tx);
1758
1759			/* Move blocks to be freed to pool's free list. */
1760			dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1761			    &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1762			    tx);
1763			dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1764			    DD_USED_HEAD, used, comp, uncomp, tx);
1765			dsl_dir_dirty(tx->tx_pool->dp_free_dir, tx);
1766
1767			/* Merge our deadlist into next's and free it. */
1768			dsl_deadlist_merge(&ds_next->ds_deadlist,
1769			    ds->ds_phys->ds_deadlist_obj, tx);
1770		}
1771		dsl_deadlist_close(&ds->ds_deadlist);
1772		dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1773
1774		/* Collapse range in clone heads */
1775		dsl_dataset_remove_clones_key(ds,
1776		    ds->ds_phys->ds_creation_txg, tx);
1777
1778		if (dsl_dataset_is_snapshot(ds_next)) {
1779			dsl_dataset_t *ds_nextnext;
1780
1781			/*
1782			 * Update next's unique to include blocks which
1783			 * were previously shared by only this snapshot
1784			 * and it.  Those blocks will be born after the
1785			 * prev snap and before this snap, and will have
1786			 * died after the next snap and before the one
1787			 * after that (ie. be on the snap after next's
1788			 * deadlist).
1789			 */
1790			VERIFY(0 == dsl_dataset_hold_obj(dp,
1791			    ds_next->ds_phys->ds_next_snap_obj,
1792			    FTAG, &ds_nextnext));
1793			dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1794			    ds->ds_phys->ds_prev_snap_txg,
1795			    ds->ds_phys->ds_creation_txg,
1796			    &used, &comp, &uncomp);
1797			ds_next->ds_phys->ds_unique_bytes += used;
1798			dsl_dataset_rele(ds_nextnext, FTAG);
1799			ASSERT3P(ds_next->ds_prev, ==, NULL);
1800
1801			/* Collapse range in this head. */
1802			dsl_dataset_t *hds;
1803			VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1804			    ds->ds_dir->dd_phys->dd_head_dataset_obj,
1805			    FTAG, &hds));
1806			dsl_deadlist_remove_key(&hds->ds_deadlist,
1807			    ds->ds_phys->ds_creation_txg, tx);
1808			dsl_dataset_rele(hds, FTAG);
1809
1810		} else {
1811			ASSERT3P(ds_next->ds_prev, ==, ds);
1812			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1813			ds_next->ds_prev = NULL;
1814			if (ds_prev) {
1815				VERIFY(0 == dsl_dataset_get_ref(dp,
1816				    ds->ds_phys->ds_prev_snap_obj,
1817				    ds_next, &ds_next->ds_prev));
1818			}
1819
1820			dsl_dataset_recalc_head_uniq(ds_next);
1821
1822			/*
1823			 * Reduce the amount of our unconsmed refreservation
1824			 * being charged to our parent by the amount of
1825			 * new unique data we have gained.
1826			 */
1827			if (old_unique < ds_next->ds_reserved) {
1828				int64_t mrsdelta;
1829				uint64_t new_unique =
1830				    ds_next->ds_phys->ds_unique_bytes;
1831
1832				ASSERT(old_unique <= new_unique);
1833				mrsdelta = MIN(new_unique - old_unique,
1834				    ds_next->ds_reserved - old_unique);
1835				dsl_dir_diduse_space(ds->ds_dir,
1836				    DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1837			}
1838		}
1839		dsl_dataset_rele(ds_next, FTAG);
1840	} else {
1841		/*
1842		 * There's no next snapshot, so this is a head dataset.
1843		 * Destroy the deadlist.  Unless it's a clone, the
1844		 * deadlist should be empty.  (If it's a clone, it's
1845		 * safe to ignore the deadlist contents.)
1846		 */
1847		struct killarg ka;
1848
1849		dsl_deadlist_close(&ds->ds_deadlist);
1850		dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1851		ds->ds_phys->ds_deadlist_obj = 0;
1852
1853		/*
1854		 * Free everything that we point to (that's born after
1855		 * the previous snapshot, if we are a clone)
1856		 *
1857		 * NB: this should be very quick, because we already
1858		 * freed all the objects in open context.
1859		 */
1860		ka.ds = ds;
1861		ka.tx = tx;
1862		err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1863		    TRAVERSE_POST, kill_blkptr, &ka);
1864		ASSERT3U(err, ==, 0);
1865		ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1866		    ds->ds_phys->ds_unique_bytes == 0);
1867
1868		if (ds->ds_prev != NULL) {
1869			if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1870				VERIFY3U(0, ==, zap_remove_int(mos,
1871				    ds->ds_prev->ds_dir->dd_phys->dd_clones,
1872				    ds->ds_object, tx));
1873			}
1874			dsl_dataset_rele(ds->ds_prev, ds);
1875			ds->ds_prev = ds_prev = NULL;
1876		}
1877	}
1878
1879	/*
1880	 * This must be done after the dsl_traverse(), because it will
1881	 * re-open the objset.
1882	 */
1883	if (ds->ds_objset) {
1884		dmu_objset_evict(ds->ds_objset);
1885		ds->ds_objset = NULL;
1886	}
1887
1888	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1889		/* Erase the link in the dir */
1890		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1891		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1892		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1893		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1894		ASSERT(err == 0);
1895	} else {
1896		/* remove from snapshot namespace */
1897		dsl_dataset_t *ds_head;
1898		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1899		VERIFY(0 == dsl_dataset_hold_obj(dp,
1900		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1901		VERIFY(0 == dsl_dataset_get_snapname(ds));
1902#ifdef ZFS_DEBUG
1903		{
1904			uint64_t val;
1905
1906			err = dsl_dataset_snap_lookup(ds_head,
1907			    ds->ds_snapname, &val);
1908			ASSERT3U(err, ==, 0);
1909			ASSERT3U(val, ==, obj);
1910		}
1911#endif
1912		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1913		ASSERT(err == 0);
1914		dsl_dataset_rele(ds_head, FTAG);
1915	}
1916
1917	if (ds_prev && ds->ds_prev != ds_prev)
1918		dsl_dataset_rele(ds_prev, FTAG);
1919
1920	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1921	spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
1922	    "dataset = %llu", ds->ds_object);
1923
1924	if (ds->ds_phys->ds_next_clones_obj != 0) {
1925		uint64_t count;
1926		ASSERT(0 == zap_count(mos,
1927		    ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1928		VERIFY(0 == dmu_object_free(mos,
1929		    ds->ds_phys->ds_next_clones_obj, tx));
1930	}
1931	if (ds->ds_phys->ds_props_obj != 0)
1932		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1933	if (ds->ds_phys->ds_userrefs_obj != 0)
1934		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1935	dsl_dir_close(ds->ds_dir, ds);
1936	ds->ds_dir = NULL;
1937	dsl_dataset_drain_refs(ds, tag);
1938	VERIFY(0 == dmu_object_free(mos, obj, tx));
1939
1940	if (dsda->rm_origin) {
1941		/*
1942		 * Remove the origin of the clone we just destroyed.
1943		 */
1944		struct dsl_ds_destroyarg ndsda = {0};
1945
1946		ndsda.ds = dsda->rm_origin;
1947		dsl_dataset_destroy_sync(&ndsda, tag, tx);
1948	}
1949}
1950
1951static int
1952dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1953{
1954	uint64_t asize;
1955
1956	if (!dmu_tx_is_syncing(tx))
1957		return (0);
1958
1959	/*
1960	 * If there's an fs-only reservation, any blocks that might become
1961	 * owned by the snapshot dataset must be accommodated by space
1962	 * outside of the reservation.
1963	 */
1964	ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
1965	asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
1966	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
1967		return (ENOSPC);
1968
1969	/*
1970	 * Propogate any reserved space for this snapshot to other
1971	 * snapshot checks in this sync group.
1972	 */
1973	if (asize > 0)
1974		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
1975
1976	return (0);
1977}
1978
1979int
1980dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
1981{
1982	dsl_dataset_t *ds = arg1;
1983	const char *snapname = arg2;
1984	int err;
1985	uint64_t value;
1986
1987	/*
1988	 * We don't allow multiple snapshots of the same txg.  If there
1989	 * is already one, try again.
1990	 */
1991	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
1992		return (EAGAIN);
1993
1994	/*
1995	 * Check for conflicting name snapshot name.
1996	 */
1997	err = dsl_dataset_snap_lookup(ds, snapname, &value);
1998	if (err == 0)
1999		return (EEXIST);
2000	if (err != ENOENT)
2001		return (err);
2002
2003	/*
2004	 * Check that the dataset's name is not too long.  Name consists
2005	 * of the dataset's length + 1 for the @-sign + snapshot name's length
2006	 */
2007	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2008		return (ENAMETOOLONG);
2009
2010	err = dsl_dataset_snapshot_reserve_space(ds, tx);
2011	if (err)
2012		return (err);
2013
2014	ds->ds_trysnap_txg = tx->tx_txg;
2015	return (0);
2016}
2017
2018void
2019dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2020{
2021	dsl_dataset_t *ds = arg1;
2022	const char *snapname = arg2;
2023	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2024	dmu_buf_t *dbuf;
2025	dsl_dataset_phys_t *dsphys;
2026	uint64_t dsobj, crtxg;
2027	objset_t *mos = dp->dp_meta_objset;
2028	int err;
2029
2030	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2031
2032	/*
2033	 * The origin's ds_creation_txg has to be < TXG_INITIAL
2034	 */
2035	if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2036		crtxg = 1;
2037	else
2038		crtxg = tx->tx_txg;
2039
2040	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2041	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2042	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2043	dmu_buf_will_dirty(dbuf, tx);
2044	dsphys = dbuf->db_data;
2045	bzero(dsphys, sizeof (dsl_dataset_phys_t));
2046	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2047	dsphys->ds_fsid_guid = unique_create();
2048	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2049	    sizeof (dsphys->ds_guid));
2050	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2051	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2052	dsphys->ds_next_snap_obj = ds->ds_object;
2053	dsphys->ds_num_children = 1;
2054	dsphys->ds_creation_time = gethrestime_sec();
2055	dsphys->ds_creation_txg = crtxg;
2056	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2057	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
2058	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2059	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2060	dsphys->ds_flags = ds->ds_phys->ds_flags;
2061	dsphys->ds_bp = ds->ds_phys->ds_bp;
2062	dmu_buf_rele(dbuf, FTAG);
2063
2064	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2065	if (ds->ds_prev) {
2066		uint64_t next_clones_obj =
2067		    ds->ds_prev->ds_phys->ds_next_clones_obj;
2068		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2069		    ds->ds_object ||
2070		    ds->ds_prev->ds_phys->ds_num_children > 1);
2071		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2072			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2073			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2074			    ds->ds_prev->ds_phys->ds_creation_txg);
2075			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2076		} else if (next_clones_obj != 0) {
2077			remove_from_next_clones(ds->ds_prev,
2078			    dsphys->ds_next_snap_obj, tx);
2079			VERIFY3U(0, ==, zap_add_int(mos,
2080			    next_clones_obj, dsobj, tx));
2081		}
2082	}
2083
2084	/*
2085	 * If we have a reference-reservation on this dataset, we will
2086	 * need to increase the amount of refreservation being charged
2087	 * since our unique space is going to zero.
2088	 */
2089	if (ds->ds_reserved) {
2090		int64_t delta;
2091		ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2092		delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2093		dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2094		    delta, 0, 0, tx);
2095	}
2096
2097	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2098	zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2099	    ds->ds_dir->dd_myname, snapname, dsobj,
2100	    ds->ds_phys->ds_prev_snap_txg);
2101	ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2102	    UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2103	dsl_deadlist_close(&ds->ds_deadlist);
2104	dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2105	dsl_deadlist_add_key(&ds->ds_deadlist,
2106	    ds->ds_phys->ds_prev_snap_txg, tx);
2107
2108	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2109	ds->ds_phys->ds_prev_snap_obj = dsobj;
2110	ds->ds_phys->ds_prev_snap_txg = crtxg;
2111	ds->ds_phys->ds_unique_bytes = 0;
2112	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2113		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2114
2115	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2116	    snapname, 8, 1, &dsobj, tx);
2117	ASSERT(err == 0);
2118
2119	if (ds->ds_prev)
2120		dsl_dataset_drop_ref(ds->ds_prev, ds);
2121	VERIFY(0 == dsl_dataset_get_ref(dp,
2122	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2123
2124	dsl_scan_ds_snapshotted(ds, tx);
2125
2126	dsl_dir_snap_cmtime_update(ds->ds_dir);
2127
2128	spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2129	    "dataset = %llu", dsobj);
2130}
2131
2132void
2133dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2134{
2135	ASSERT(dmu_tx_is_syncing(tx));
2136	ASSERT(ds->ds_objset != NULL);
2137	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2138
2139	/*
2140	 * in case we had to change ds_fsid_guid when we opened it,
2141	 * sync it out now.
2142	 */
2143	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2144	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2145
2146	dsl_dir_dirty(ds->ds_dir, tx);
2147	dmu_objset_sync(ds->ds_objset, zio, tx);
2148}
2149
2150void
2151dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2152{
2153	uint64_t refd, avail, uobjs, aobjs;
2154
2155	dsl_dir_stats(ds->ds_dir, nv);
2156
2157	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2158	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2159	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2160
2161	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2162	    ds->ds_phys->ds_creation_time);
2163	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2164	    ds->ds_phys->ds_creation_txg);
2165	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2166	    ds->ds_quota);
2167	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2168	    ds->ds_reserved);
2169	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2170	    ds->ds_phys->ds_guid);
2171	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2172	    ds->ds_phys->ds_unique_bytes);
2173	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2174	    ds->ds_object);
2175	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2176	    ds->ds_userrefs);
2177	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2178	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2179
2180	if (ds->ds_phys->ds_next_snap_obj) {
2181		/*
2182		 * This is a snapshot; override the dd's space used with
2183		 * our unique space and compression ratio.
2184		 */
2185		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2186		    ds->ds_phys->ds_unique_bytes);
2187		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO,
2188		    ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2189		    (ds->ds_phys->ds_uncompressed_bytes * 100 /
2190		    ds->ds_phys->ds_compressed_bytes));
2191	}
2192}
2193
2194void
2195dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2196{
2197	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2198	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2199	stat->dds_guid = ds->ds_phys->ds_guid;
2200	if (ds->ds_phys->ds_next_snap_obj) {
2201		stat->dds_is_snapshot = B_TRUE;
2202		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2203	} else {
2204		stat->dds_is_snapshot = B_FALSE;
2205		stat->dds_num_clones = 0;
2206	}
2207
2208	/* clone origin is really a dsl_dir thing... */
2209	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2210	if (dsl_dir_is_clone(ds->ds_dir)) {
2211		dsl_dataset_t *ods;
2212
2213		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2214		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2215		dsl_dataset_name(ods, stat->dds_origin);
2216		dsl_dataset_drop_ref(ods, FTAG);
2217	} else {
2218		stat->dds_origin[0] = '\0';
2219	}
2220	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2221}
2222
2223uint64_t
2224dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2225{
2226	return (ds->ds_fsid_guid);
2227}
2228
2229void
2230dsl_dataset_space(dsl_dataset_t *ds,
2231    uint64_t *refdbytesp, uint64_t *availbytesp,
2232    uint64_t *usedobjsp, uint64_t *availobjsp)
2233{
2234	*refdbytesp = ds->ds_phys->ds_used_bytes;
2235	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2236	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2237		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2238	if (ds->ds_quota != 0) {
2239		/*
2240		 * Adjust available bytes according to refquota
2241		 */
2242		if (*refdbytesp < ds->ds_quota)
2243			*availbytesp = MIN(*availbytesp,
2244			    ds->ds_quota - *refdbytesp);
2245		else
2246			*availbytesp = 0;
2247	}
2248	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2249	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
2250}
2251
2252boolean_t
2253dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2254{
2255	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2256
2257	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2258	    dsl_pool_sync_context(dp));
2259	if (ds->ds_prev == NULL)
2260		return (B_FALSE);
2261	if (ds->ds_phys->ds_bp.blk_birth >
2262	    ds->ds_prev->ds_phys->ds_creation_txg) {
2263		objset_t *os, *os_prev;
2264		/*
2265		 * It may be that only the ZIL differs, because it was
2266		 * reset in the head.  Don't count that as being
2267		 * modified.
2268		 */
2269		if (dmu_objset_from_ds(ds, &os) != 0)
2270			return (B_TRUE);
2271		if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2272			return (B_TRUE);
2273		return (bcmp(&os->os_phys->os_meta_dnode,
2274		    &os_prev->os_phys->os_meta_dnode,
2275		    sizeof (os->os_phys->os_meta_dnode)) != 0);
2276	}
2277	return (B_FALSE);
2278}
2279
2280/* ARGSUSED */
2281static int
2282dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2283{
2284	dsl_dataset_t *ds = arg1;
2285	char *newsnapname = arg2;
2286	dsl_dir_t *dd = ds->ds_dir;
2287	dsl_dataset_t *hds;
2288	uint64_t val;
2289	int err;
2290
2291	err = dsl_dataset_hold_obj(dd->dd_pool,
2292	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2293	if (err)
2294		return (err);
2295
2296	/* new name better not be in use */
2297	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2298	dsl_dataset_rele(hds, FTAG);
2299
2300	if (err == 0)
2301		err = EEXIST;
2302	else if (err == ENOENT)
2303		err = 0;
2304
2305	/* dataset name + 1 for the "@" + the new snapshot name must fit */
2306	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2307		err = ENAMETOOLONG;
2308
2309	return (err);
2310}
2311
2312static void
2313dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2314{
2315	dsl_dataset_t *ds = arg1;
2316	const char *newsnapname = arg2;
2317	dsl_dir_t *dd = ds->ds_dir;
2318	objset_t *mos = dd->dd_pool->dp_meta_objset;
2319	dsl_dataset_t *hds;
2320	int err;
2321
2322	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2323
2324	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2325	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2326
2327	VERIFY(0 == dsl_dataset_get_snapname(ds));
2328	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2329	ASSERT3U(err, ==, 0);
2330	mutex_enter(&ds->ds_lock);
2331	(void) strcpy(ds->ds_snapname, newsnapname);
2332	mutex_exit(&ds->ds_lock);
2333	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2334	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2335	ASSERT3U(err, ==, 0);
2336
2337	spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2338	    "dataset = %llu", ds->ds_object);
2339	dsl_dataset_rele(hds, FTAG);
2340}
2341
2342struct renamesnaparg {
2343	dsl_sync_task_group_t *dstg;
2344	char failed[MAXPATHLEN];
2345	char *oldsnap;
2346	char *newsnap;
2347};
2348
2349static int
2350dsl_snapshot_rename_one(const char *name, void *arg)
2351{
2352	struct renamesnaparg *ra = arg;
2353	dsl_dataset_t *ds = NULL;
2354	char *snapname;
2355	int err;
2356
2357	snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2358	(void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2359
2360	/*
2361	 * For recursive snapshot renames the parent won't be changing
2362	 * so we just pass name for both the to/from argument.
2363	 */
2364	err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2365	if (err != 0) {
2366		strfree(snapname);
2367		return (err == ENOENT ? 0 : err);
2368	}
2369
2370#ifdef _KERNEL
2371	/*
2372	 * For all filesystems undergoing rename, we'll need to unmount it.
2373	 */
2374	(void) zfs_unmount_snap(snapname, NULL);
2375#endif
2376	err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2377	strfree(snapname);
2378	if (err != 0)
2379		return (err == ENOENT ? 0 : err);
2380
2381	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2382	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2383
2384	return (0);
2385}
2386
2387static int
2388dsl_recursive_rename(char *oldname, const char *newname)
2389{
2390	int err;
2391	struct renamesnaparg *ra;
2392	dsl_sync_task_t *dst;
2393	spa_t *spa;
2394	char *cp, *fsname = spa_strdup(oldname);
2395	int len = strlen(oldname) + 1;
2396
2397	/* truncate the snapshot name to get the fsname */
2398	cp = strchr(fsname, '@');
2399	*cp = '\0';
2400
2401	err = spa_open(fsname, &spa, FTAG);
2402	if (err) {
2403		kmem_free(fsname, len);
2404		return (err);
2405	}
2406	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2407	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2408
2409	ra->oldsnap = strchr(oldname, '@') + 1;
2410	ra->newsnap = strchr(newname, '@') + 1;
2411	*ra->failed = '\0';
2412
2413	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2414	    DS_FIND_CHILDREN);
2415	kmem_free(fsname, len);
2416
2417	if (err == 0) {
2418		err = dsl_sync_task_group_wait(ra->dstg);
2419	}
2420
2421	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2422	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2423		dsl_dataset_t *ds = dst->dst_arg1;
2424		if (dst->dst_err) {
2425			dsl_dir_name(ds->ds_dir, ra->failed);
2426			(void) strlcat(ra->failed, "@", sizeof (ra->failed));
2427			(void) strlcat(ra->failed, ra->newsnap,
2428			    sizeof (ra->failed));
2429		}
2430		dsl_dataset_rele(ds, ra->dstg);
2431	}
2432
2433	if (err)
2434		(void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2435
2436	dsl_sync_task_group_destroy(ra->dstg);
2437	kmem_free(ra, sizeof (struct renamesnaparg));
2438	spa_close(spa, FTAG);
2439	return (err);
2440}
2441
2442static int
2443dsl_valid_rename(const char *oldname, void *arg)
2444{
2445	int delta = *(int *)arg;
2446
2447	if (strlen(oldname) + delta >= MAXNAMELEN)
2448		return (ENAMETOOLONG);
2449
2450	return (0);
2451}
2452
2453#pragma weak dmu_objset_rename = dsl_dataset_rename
2454int
2455dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2456{
2457	dsl_dir_t *dd;
2458	dsl_dataset_t *ds;
2459	const char *tail;
2460	int err;
2461
2462	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2463	if (err)
2464		return (err);
2465
2466	if (tail == NULL) {
2467		int delta = strlen(newname) - strlen(oldname);
2468
2469		/* if we're growing, validate child name lengths */
2470		if (delta > 0)
2471			err = dmu_objset_find(oldname, dsl_valid_rename,
2472			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2473
2474		if (err == 0)
2475			err = dsl_dir_rename(dd, newname);
2476		dsl_dir_close(dd, FTAG);
2477		return (err);
2478	}
2479
2480	if (tail[0] != '@') {
2481		/* the name ended in a nonexistent component */
2482		dsl_dir_close(dd, FTAG);
2483		return (ENOENT);
2484	}
2485
2486	dsl_dir_close(dd, FTAG);
2487
2488	/* new name must be snapshot in same filesystem */
2489	tail = strchr(newname, '@');
2490	if (tail == NULL)
2491		return (EINVAL);
2492	tail++;
2493	if (strncmp(oldname, newname, tail - newname) != 0)
2494		return (EXDEV);
2495
2496	if (recursive) {
2497		err = dsl_recursive_rename(oldname, newname);
2498	} else {
2499		err = dsl_dataset_hold(oldname, FTAG, &ds);
2500		if (err)
2501			return (err);
2502
2503		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2504		    dsl_dataset_snapshot_rename_check,
2505		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2506
2507		dsl_dataset_rele(ds, FTAG);
2508	}
2509
2510	return (err);
2511}
2512
2513struct promotenode {
2514	list_node_t link;
2515	dsl_dataset_t *ds;
2516};
2517
2518struct promotearg {
2519	list_t shared_snaps, origin_snaps, clone_snaps;
2520	dsl_dataset_t *origin_origin;
2521	uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2522	char *err_ds;
2523};
2524
2525static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2526static boolean_t snaplist_unstable(list_t *l);
2527
2528static int
2529dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2530{
2531	dsl_dataset_t *hds = arg1;
2532	struct promotearg *pa = arg2;
2533	struct promotenode *snap = list_head(&pa->shared_snaps);
2534	dsl_dataset_t *origin_ds = snap->ds;
2535	int err;
2536	uint64_t unused;
2537
2538	/* Check that it is a real clone */
2539	if (!dsl_dir_is_clone(hds->ds_dir))
2540		return (EINVAL);
2541
2542	/* Since this is so expensive, don't do the preliminary check */
2543	if (!dmu_tx_is_syncing(tx))
2544		return (0);
2545
2546	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2547		return (EXDEV);
2548
2549	/* compute origin's new unique space */
2550	snap = list_tail(&pa->clone_snaps);
2551	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2552	dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2553	    origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2554	    &pa->unique, &unused, &unused);
2555
2556	/*
2557	 * Walk the snapshots that we are moving
2558	 *
2559	 * Compute space to transfer.  Consider the incremental changes
2560	 * to used for each snapshot:
2561	 * (my used) = (prev's used) + (blocks born) - (blocks killed)
2562	 * So each snapshot gave birth to:
2563	 * (blocks born) = (my used) - (prev's used) + (blocks killed)
2564	 * So a sequence would look like:
2565	 * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2566	 * Which simplifies to:
2567	 * uN + kN + kN-1 + ... + k1 + k0
2568	 * Note however, if we stop before we reach the ORIGIN we get:
2569	 * uN + kN + kN-1 + ... + kM - uM-1
2570	 */
2571	pa->used = origin_ds->ds_phys->ds_used_bytes;
2572	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2573	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2574	for (snap = list_head(&pa->shared_snaps); snap;
2575	    snap = list_next(&pa->shared_snaps, snap)) {
2576		uint64_t val, dlused, dlcomp, dluncomp;
2577		dsl_dataset_t *ds = snap->ds;
2578
2579		/* Check that the snapshot name does not conflict */
2580		VERIFY(0 == dsl_dataset_get_snapname(ds));
2581		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2582		if (err == 0) {
2583			err = EEXIST;
2584			goto out;
2585		}
2586		if (err != ENOENT)
2587			goto out;
2588
2589		/* The very first snapshot does not have a deadlist */
2590		if (ds->ds_phys->ds_prev_snap_obj == 0)
2591			continue;
2592
2593		dsl_deadlist_space(&ds->ds_deadlist,
2594		    &dlused, &dlcomp, &dluncomp);
2595		pa->used += dlused;
2596		pa->comp += dlcomp;
2597		pa->uncomp += dluncomp;
2598	}
2599
2600	/*
2601	 * If we are a clone of a clone then we never reached ORIGIN,
2602	 * so we need to subtract out the clone origin's used space.
2603	 */
2604	if (pa->origin_origin) {
2605		pa->used -= pa->origin_origin->ds_phys->ds_used_bytes;
2606		pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2607		pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2608	}
2609
2610	/* Check that there is enough space here */
2611	err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2612	    pa->used);
2613	if (err)
2614		return (err);
2615
2616	/*
2617	 * Compute the amounts of space that will be used by snapshots
2618	 * after the promotion (for both origin and clone).  For each,
2619	 * it is the amount of space that will be on all of their
2620	 * deadlists (that was not born before their new origin).
2621	 */
2622	if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2623		uint64_t space;
2624
2625		/*
2626		 * Note, typically this will not be a clone of a clone,
2627		 * so dd_origin_txg will be < TXG_INITIAL, so
2628		 * these snaplist_space() -> dsl_deadlist_space_range()
2629		 * calls will be fast because they do not have to
2630		 * iterate over all bps.
2631		 */
2632		snap = list_head(&pa->origin_snaps);
2633		err = snaplist_space(&pa->shared_snaps,
2634		    snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2635		if (err)
2636			return (err);
2637
2638		err = snaplist_space(&pa->clone_snaps,
2639		    snap->ds->ds_dir->dd_origin_txg, &space);
2640		if (err)
2641			return (err);
2642		pa->cloneusedsnap += space;
2643	}
2644	if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2645		err = snaplist_space(&pa->origin_snaps,
2646		    origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2647		if (err)
2648			return (err);
2649	}
2650
2651	return (0);
2652out:
2653	pa->err_ds =  snap->ds->ds_snapname;
2654	return (err);
2655}
2656
2657static void
2658dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2659{
2660	dsl_dataset_t *hds = arg1;
2661	struct promotearg *pa = arg2;
2662	struct promotenode *snap = list_head(&pa->shared_snaps);
2663	dsl_dataset_t *origin_ds = snap->ds;
2664	dsl_dataset_t *origin_head;
2665	dsl_dir_t *dd = hds->ds_dir;
2666	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2667	dsl_dir_t *odd = NULL;
2668	uint64_t oldnext_obj;
2669	int64_t delta;
2670
2671	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2672
2673	snap = list_head(&pa->origin_snaps);
2674	origin_head = snap->ds;
2675
2676	/*
2677	 * We need to explicitly open odd, since origin_ds's dd will be
2678	 * changing.
2679	 */
2680	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2681	    NULL, FTAG, &odd));
2682
2683	/* change origin's next snap */
2684	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2685	oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2686	snap = list_tail(&pa->clone_snaps);
2687	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2688	origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2689
2690	/* change the origin's next clone */
2691	if (origin_ds->ds_phys->ds_next_clones_obj) {
2692		remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2693		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2694		    origin_ds->ds_phys->ds_next_clones_obj,
2695		    oldnext_obj, tx));
2696	}
2697
2698	/* change origin */
2699	dmu_buf_will_dirty(dd->dd_dbuf, tx);
2700	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2701	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2702	dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2703	dmu_buf_will_dirty(odd->dd_dbuf, tx);
2704	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2705	origin_head->ds_dir->dd_origin_txg =
2706	    origin_ds->ds_phys->ds_creation_txg;
2707
2708	/* change dd_clone entries */
2709	if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2710		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2711		    odd->dd_phys->dd_clones, hds->ds_object, tx));
2712		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2713		    pa->origin_origin->ds_dir->dd_phys->dd_clones,
2714		    hds->ds_object, tx));
2715
2716		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2717		    pa->origin_origin->ds_dir->dd_phys->dd_clones,
2718		    origin_head->ds_object, tx));
2719		if (dd->dd_phys->dd_clones == 0) {
2720			dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2721			    DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2722		}
2723		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2724		    dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2725
2726	}
2727
2728	/* move snapshots to this dir */
2729	for (snap = list_head(&pa->shared_snaps); snap;
2730	    snap = list_next(&pa->shared_snaps, snap)) {
2731		dsl_dataset_t *ds = snap->ds;
2732
2733		/* unregister props as dsl_dir is changing */
2734		if (ds->ds_objset) {
2735			dmu_objset_evict(ds->ds_objset);
2736			ds->ds_objset = NULL;
2737		}
2738		/* move snap name entry */
2739		VERIFY(0 == dsl_dataset_get_snapname(ds));
2740		VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2741		    ds->ds_snapname, tx));
2742		VERIFY(0 == zap_add(dp->dp_meta_objset,
2743		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2744		    8, 1, &ds->ds_object, tx));
2745
2746		/* change containing dsl_dir */
2747		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2748		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2749		ds->ds_phys->ds_dir_obj = dd->dd_object;
2750		ASSERT3P(ds->ds_dir, ==, odd);
2751		dsl_dir_close(ds->ds_dir, ds);
2752		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2753		    NULL, ds, &ds->ds_dir));
2754
2755		/* move any clone references */
2756		if (ds->ds_phys->ds_next_clones_obj &&
2757		    spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2758			zap_cursor_t zc;
2759			zap_attribute_t za;
2760
2761			for (zap_cursor_init(&zc, dp->dp_meta_objset,
2762			    ds->ds_phys->ds_next_clones_obj);
2763			    zap_cursor_retrieve(&zc, &za) == 0;
2764			    zap_cursor_advance(&zc)) {
2765				dsl_dataset_t *cnds;
2766				uint64_t o;
2767
2768				if (za.za_first_integer == oldnext_obj) {
2769					/*
2770					 * We've already moved the
2771					 * origin's reference.
2772					 */
2773					continue;
2774				}
2775
2776				VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2777				    za.za_first_integer, FTAG, &cnds));
2778				o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2779
2780				VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2781				    odd->dd_phys->dd_clones, o, tx), ==, 0);
2782				VERIFY3U(zap_add_int(dp->dp_meta_objset,
2783				    dd->dd_phys->dd_clones, o, tx), ==, 0);
2784				dsl_dataset_rele(cnds, FTAG);
2785			}
2786			zap_cursor_fini(&zc);
2787		}
2788
2789		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2790	}
2791
2792	/*
2793	 * Change space accounting.
2794	 * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2795	 * both be valid, or both be 0 (resulting in delta == 0).  This
2796	 * is true for each of {clone,origin} independently.
2797	 */
2798
2799	delta = pa->cloneusedsnap -
2800	    dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2801	ASSERT3S(delta, >=, 0);
2802	ASSERT3U(pa->used, >=, delta);
2803	dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2804	dsl_dir_diduse_space(dd, DD_USED_HEAD,
2805	    pa->used - delta, pa->comp, pa->uncomp, tx);
2806
2807	delta = pa->originusedsnap -
2808	    odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2809	ASSERT3S(delta, <=, 0);
2810	ASSERT3U(pa->used, >=, -delta);
2811	dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2812	dsl_dir_diduse_space(odd, DD_USED_HEAD,
2813	    -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2814
2815	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2816
2817	/* log history record */
2818	spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2819	    "dataset = %llu", hds->ds_object);
2820
2821	dsl_dir_close(odd, FTAG);
2822}
2823
2824static char *snaplist_tag = "snaplist";
2825/*
2826 * Make a list of dsl_dataset_t's for the snapshots between first_obj
2827 * (exclusive) and last_obj (inclusive).  The list will be in reverse
2828 * order (last_obj will be the list_head()).  If first_obj == 0, do all
2829 * snapshots back to this dataset's origin.
2830 */
2831static int
2832snaplist_make(dsl_pool_t *dp, boolean_t own,
2833    uint64_t first_obj, uint64_t last_obj, list_t *l)
2834{
2835	uint64_t obj = last_obj;
2836
2837	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2838
2839	list_create(l, sizeof (struct promotenode),
2840	    offsetof(struct promotenode, link));
2841
2842	while (obj != first_obj) {
2843		dsl_dataset_t *ds;
2844		struct promotenode *snap;
2845		int err;
2846
2847		if (own) {
2848			err = dsl_dataset_own_obj(dp, obj,
2849			    0, snaplist_tag, &ds);
2850			if (err == 0)
2851				dsl_dataset_make_exclusive(ds, snaplist_tag);
2852		} else {
2853			err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2854		}
2855		if (err == ENOENT) {
2856			/* lost race with snapshot destroy */
2857			struct promotenode *last = list_tail(l);
2858			ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
2859			obj = last->ds->ds_phys->ds_prev_snap_obj;
2860			continue;
2861		} else if (err) {
2862			return (err);
2863		}
2864
2865		if (first_obj == 0)
2866			first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
2867
2868		snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
2869		snap->ds = ds;
2870		list_insert_tail(l, snap);
2871		obj = ds->ds_phys->ds_prev_snap_obj;
2872	}
2873
2874	return (0);
2875}
2876
2877static int
2878snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
2879{
2880	struct promotenode *snap;
2881
2882	*spacep = 0;
2883	for (snap = list_head(l); snap; snap = list_next(l, snap)) {
2884		uint64_t used, comp, uncomp;
2885		dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2886		    mintxg, UINT64_MAX, &used, &comp, &uncomp);
2887		*spacep += used;
2888	}
2889	return (0);
2890}
2891
2892static void
2893snaplist_destroy(list_t *l, boolean_t own)
2894{
2895	struct promotenode *snap;
2896
2897	if (!l || !list_link_active(&l->list_head))
2898		return;
2899
2900	while ((snap = list_tail(l)) != NULL) {
2901		list_remove(l, snap);
2902		if (own)
2903			dsl_dataset_disown(snap->ds, snaplist_tag);
2904		else
2905			dsl_dataset_rele(snap->ds, snaplist_tag);
2906		kmem_free(snap, sizeof (struct promotenode));
2907	}
2908	list_destroy(l);
2909}
2910
2911/*
2912 * Promote a clone.  Nomenclature note:
2913 * "clone" or "cds": the original clone which is being promoted
2914 * "origin" or "ods": the snapshot which is originally clone's origin
2915 * "origin head" or "ohds": the dataset which is the head
2916 * (filesystem/volume) for the origin
2917 * "origin origin": the origin of the origin's filesystem (typically
2918 * NULL, indicating that the clone is not a clone of a clone).
2919 */
2920int
2921dsl_dataset_promote(const char *name, char *conflsnap)
2922{
2923	dsl_dataset_t *ds;
2924	dsl_dir_t *dd;
2925	dsl_pool_t *dp;
2926	dmu_object_info_t doi;
2927	struct promotearg pa = { 0 };
2928	struct promotenode *snap;
2929	int err;
2930
2931	err = dsl_dataset_hold(name, FTAG, &ds);
2932	if (err)
2933		return (err);
2934	dd = ds->ds_dir;
2935	dp = dd->dd_pool;
2936
2937	err = dmu_object_info(dp->dp_meta_objset,
2938	    ds->ds_phys->ds_snapnames_zapobj, &doi);
2939	if (err) {
2940		dsl_dataset_rele(ds, FTAG);
2941		return (err);
2942	}
2943
2944	if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
2945		dsl_dataset_rele(ds, FTAG);
2946		return (EINVAL);
2947	}
2948
2949	/*
2950	 * We are going to inherit all the snapshots taken before our
2951	 * origin (i.e., our new origin will be our parent's origin).
2952	 * Take ownership of them so that we can rename them into our
2953	 * namespace.
2954	 */
2955	rw_enter(&dp->dp_config_rwlock, RW_READER);
2956
2957	err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
2958	    &pa.shared_snaps);
2959	if (err != 0)
2960		goto out;
2961
2962	err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
2963	if (err != 0)
2964		goto out;
2965
2966	snap = list_head(&pa.shared_snaps);
2967	ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
2968	err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
2969	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
2970	if (err != 0)
2971		goto out;
2972
2973	if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
2974		err = dsl_dataset_hold_obj(dp,
2975		    snap->ds->ds_dir->dd_phys->dd_origin_obj,
2976		    FTAG, &pa.origin_origin);
2977		if (err != 0)
2978			goto out;
2979	}
2980
2981out:
2982	rw_exit(&dp->dp_config_rwlock);
2983
2984	/*
2985	 * Add in 128x the snapnames zapobj size, since we will be moving
2986	 * a bunch of snapnames to the promoted ds, and dirtying their
2987	 * bonus buffers.
2988	 */
2989	if (err == 0) {
2990		err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
2991		    dsl_dataset_promote_sync, ds, &pa,
2992		    2 + 2 * doi.doi_physical_blocks_512);
2993		if (err && pa.err_ds && conflsnap)
2994			(void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
2995	}
2996
2997	snaplist_destroy(&pa.shared_snaps, B_TRUE);
2998	snaplist_destroy(&pa.clone_snaps, B_FALSE);
2999	snaplist_destroy(&pa.origin_snaps, B_FALSE);
3000	if (pa.origin_origin)
3001		dsl_dataset_rele(pa.origin_origin, FTAG);
3002	dsl_dataset_rele(ds, FTAG);
3003	return (err);
3004}
3005
3006struct cloneswaparg {
3007	dsl_dataset_t *cds; /* clone dataset */
3008	dsl_dataset_t *ohds; /* origin's head dataset */
3009	boolean_t force;
3010	int64_t unused_refres_delta; /* change in unconsumed refreservation */
3011};
3012
3013/* ARGSUSED */
3014static int
3015dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3016{
3017	struct cloneswaparg *csa = arg1;
3018
3019	/* they should both be heads */
3020	if (dsl_dataset_is_snapshot(csa->cds) ||
3021	    dsl_dataset_is_snapshot(csa->ohds))
3022		return (EINVAL);
3023
3024	/* the branch point should be just before them */
3025	if (csa->cds->ds_prev != csa->ohds->ds_prev)
3026		return (EINVAL);
3027
3028	/* cds should be the clone (unless they are unrelated) */
3029	if (csa->cds->ds_prev != NULL &&
3030	    csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3031	    csa->ohds->ds_object !=
3032	    csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3033		return (EINVAL);
3034
3035	/* the clone should be a child of the origin */
3036	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3037		return (EINVAL);
3038
3039	/* ohds shouldn't be modified unless 'force' */
3040	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3041		return (ETXTBSY);
3042
3043	/* adjust amount of any unconsumed refreservation */
3044	csa->unused_refres_delta =
3045	    (int64_t)MIN(csa->ohds->ds_reserved,
3046	    csa->ohds->ds_phys->ds_unique_bytes) -
3047	    (int64_t)MIN(csa->ohds->ds_reserved,
3048	    csa->cds->ds_phys->ds_unique_bytes);
3049
3050	if (csa->unused_refres_delta > 0 &&
3051	    csa->unused_refres_delta >
3052	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3053		return (ENOSPC);
3054
3055	if (csa->ohds->ds_quota != 0 &&
3056	    csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3057		return (EDQUOT);
3058
3059	return (0);
3060}
3061
3062/* ARGSUSED */
3063static void
3064dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3065{
3066	struct cloneswaparg *csa = arg1;
3067	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3068
3069	ASSERT(csa->cds->ds_reserved == 0);
3070	ASSERT(csa->ohds->ds_quota == 0 ||
3071	    csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3072
3073	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3074	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3075
3076	if (csa->cds->ds_objset != NULL) {
3077		dmu_objset_evict(csa->cds->ds_objset);
3078		csa->cds->ds_objset = NULL;
3079	}
3080
3081	if (csa->ohds->ds_objset != NULL) {
3082		dmu_objset_evict(csa->ohds->ds_objset);
3083		csa->ohds->ds_objset = NULL;
3084	}
3085
3086	/*
3087	 * Reset origin's unique bytes, if it exists.
3088	 */
3089	if (csa->cds->ds_prev) {
3090		dsl_dataset_t *origin = csa->cds->ds_prev;
3091		uint64_t comp, uncomp;
3092
3093		dmu_buf_will_dirty(origin->ds_dbuf, tx);
3094		dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3095		    origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3096		    &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3097	}
3098
3099	/* swap blkptrs */
3100	{
3101		blkptr_t tmp;
3102		tmp = csa->ohds->ds_phys->ds_bp;
3103		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3104		csa->cds->ds_phys->ds_bp = tmp;
3105	}
3106
3107	/* set dd_*_bytes */
3108	{
3109		int64_t dused, dcomp, duncomp;
3110		uint64_t cdl_used, cdl_comp, cdl_uncomp;
3111		uint64_t odl_used, odl_comp, odl_uncomp;
3112
3113		ASSERT3U(csa->cds->ds_dir->dd_phys->
3114		    dd_used_breakdown[DD_USED_SNAP], ==, 0);
3115
3116		dsl_deadlist_space(&csa->cds->ds_deadlist,
3117		    &cdl_used, &cdl_comp, &cdl_uncomp);
3118		dsl_deadlist_space(&csa->ohds->ds_deadlist,
3119		    &odl_used, &odl_comp, &odl_uncomp);
3120
3121		dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
3122		    (csa->ohds->ds_phys->ds_used_bytes + odl_used);
3123		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3124		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3125		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3126		    cdl_uncomp -
3127		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3128
3129		dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3130		    dused, dcomp, duncomp, tx);
3131		dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3132		    -dused, -dcomp, -duncomp, tx);
3133
3134		/*
3135		 * The difference in the space used by snapshots is the
3136		 * difference in snapshot space due to the head's
3137		 * deadlist (since that's the only thing that's
3138		 * changing that affects the snapused).
3139		 */
3140		dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3141		    csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3142		    &cdl_used, &cdl_comp, &cdl_uncomp);
3143		dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3144		    csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3145		    &odl_used, &odl_comp, &odl_uncomp);
3146		dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3147		    DD_USED_HEAD, DD_USED_SNAP, tx);
3148	}
3149
3150	/* swap ds_*_bytes */
3151	SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
3152	    csa->cds->ds_phys->ds_used_bytes);
3153	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3154	    csa->cds->ds_phys->ds_compressed_bytes);
3155	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3156	    csa->cds->ds_phys->ds_uncompressed_bytes);
3157	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3158	    csa->cds->ds_phys->ds_unique_bytes);
3159
3160	/* apply any parent delta for change in unconsumed refreservation */
3161	dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3162	    csa->unused_refres_delta, 0, 0, tx);
3163
3164	/*
3165	 * Swap deadlists.
3166	 */
3167	dsl_deadlist_close(&csa->cds->ds_deadlist);
3168	dsl_deadlist_close(&csa->ohds->ds_deadlist);
3169	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3170	    csa->cds->ds_phys->ds_deadlist_obj);
3171	dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3172	    csa->cds->ds_phys->ds_deadlist_obj);
3173	dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3174	    csa->ohds->ds_phys->ds_deadlist_obj);
3175
3176	dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3177}
3178
3179/*
3180 * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3181 * recv" into an existing fs to swizzle the file system to the new
3182 * version, and by "zfs rollback".  Can also be used to swap two
3183 * independent head datasets if neither has any snapshots.
3184 */
3185int
3186dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3187    boolean_t force)
3188{
3189	struct cloneswaparg csa;
3190	int error;
3191
3192	ASSERT(clone->ds_owner);
3193	ASSERT(origin_head->ds_owner);
3194retry:
3195	/*
3196	 * Need exclusive access for the swap. If we're swapping these
3197	 * datasets back after an error, we already hold the locks.
3198	 */
3199	if (!RW_WRITE_HELD(&clone->ds_rwlock))
3200		rw_enter(&clone->ds_rwlock, RW_WRITER);
3201	if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3202	    !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3203		rw_exit(&clone->ds_rwlock);
3204		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3205		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3206			rw_exit(&origin_head->ds_rwlock);
3207			goto retry;
3208		}
3209	}
3210	csa.cds = clone;
3211	csa.ohds = origin_head;
3212	csa.force = force;
3213	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3214	    dsl_dataset_clone_swap_check,
3215	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3216	return (error);
3217}
3218
3219/*
3220 * Given a pool name and a dataset object number in that pool,
3221 * return the name of that dataset.
3222 */
3223int
3224dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3225{
3226	spa_t *spa;
3227	dsl_pool_t *dp;
3228	dsl_dataset_t *ds;
3229	int error;
3230
3231	if ((error = spa_open(pname, &spa, FTAG)) != 0)
3232		return (error);
3233	dp = spa_get_dsl(spa);
3234	rw_enter(&dp->dp_config_rwlock, RW_READER);
3235	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3236		dsl_dataset_name(ds, buf);
3237		dsl_dataset_rele(ds, FTAG);
3238	}
3239	rw_exit(&dp->dp_config_rwlock);
3240	spa_close(spa, FTAG);
3241
3242	return (error);
3243}
3244
3245int
3246dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3247    uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3248{
3249	int error = 0;
3250
3251	ASSERT3S(asize, >, 0);
3252
3253	/*
3254	 * *ref_rsrv is the portion of asize that will come from any
3255	 * unconsumed refreservation space.
3256	 */
3257	*ref_rsrv = 0;
3258
3259	mutex_enter(&ds->ds_lock);
3260	/*
3261	 * Make a space adjustment for reserved bytes.
3262	 */
3263	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3264		ASSERT3U(*used, >=,
3265		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3266		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3267		*ref_rsrv =
3268		    asize - MIN(asize, parent_delta(ds, asize + inflight));
3269	}
3270
3271	if (!check_quota || ds->ds_quota == 0) {
3272		mutex_exit(&ds->ds_lock);
3273		return (0);
3274	}
3275	/*
3276	 * If they are requesting more space, and our current estimate
3277	 * is over quota, they get to try again unless the actual
3278	 * on-disk is over quota and there are no pending changes (which
3279	 * may free up space for us).
3280	 */
3281	if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
3282		if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
3283			error = ERESTART;
3284		else
3285			error = EDQUOT;
3286	}
3287	mutex_exit(&ds->ds_lock);
3288
3289	return (error);
3290}
3291
3292/* ARGSUSED */
3293static int
3294dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3295{
3296	dsl_dataset_t *ds = arg1;
3297	dsl_prop_setarg_t *psa = arg2;
3298	int err;
3299
3300	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3301		return (ENOTSUP);
3302
3303	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3304		return (err);
3305
3306	if (psa->psa_effective_value == 0)
3307		return (0);
3308
3309	if (psa->psa_effective_value < ds->ds_phys->ds_used_bytes ||
3310	    psa->psa_effective_value < ds->ds_reserved)
3311		return (ENOSPC);
3312
3313	return (0);
3314}
3315
3316extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3317
3318void
3319dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3320{
3321	dsl_dataset_t *ds = arg1;
3322	dsl_prop_setarg_t *psa = arg2;
3323	uint64_t effective_value = psa->psa_effective_value;
3324
3325	dsl_prop_set_sync(ds, psa, tx);
3326	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3327
3328	if (ds->ds_quota != effective_value) {
3329		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3330		ds->ds_quota = effective_value;
3331
3332		spa_history_log_internal(LOG_DS_REFQUOTA,
3333		    ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu ",
3334		    (longlong_t)ds->ds_quota, ds->ds_object);
3335	}
3336}
3337
3338int
3339dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3340{
3341	dsl_dataset_t *ds;
3342	dsl_prop_setarg_t psa;
3343	int err;
3344
3345	dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3346
3347	err = dsl_dataset_hold(dsname, FTAG, &ds);
3348	if (err)
3349		return (err);
3350
3351	/*
3352	 * If someone removes a file, then tries to set the quota, we
3353	 * want to make sure the file freeing takes effect.
3354	 */
3355	txg_wait_open(ds->ds_dir->dd_pool, 0);
3356
3357	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3358	    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3359	    ds, &psa, 0);
3360
3361	dsl_dataset_rele(ds, FTAG);
3362	return (err);
3363}
3364
3365static int
3366dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3367{
3368	dsl_dataset_t *ds = arg1;
3369	dsl_prop_setarg_t *psa = arg2;
3370	uint64_t effective_value;
3371	uint64_t unique;
3372	int err;
3373
3374	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3375	    SPA_VERSION_REFRESERVATION)
3376		return (ENOTSUP);
3377
3378	if (dsl_dataset_is_snapshot(ds))
3379		return (EINVAL);
3380
3381	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3382		return (err);
3383
3384	effective_value = psa->psa_effective_value;
3385
3386	/*
3387	 * If we are doing the preliminary check in open context, the
3388	 * space estimates may be inaccurate.
3389	 */
3390	if (!dmu_tx_is_syncing(tx))
3391		return (0);
3392
3393	mutex_enter(&ds->ds_lock);
3394	if (!DS_UNIQUE_IS_ACCURATE(ds))
3395		dsl_dataset_recalc_head_uniq(ds);
3396	unique = ds->ds_phys->ds_unique_bytes;
3397	mutex_exit(&ds->ds_lock);
3398
3399	if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3400		uint64_t delta = MAX(unique, effective_value) -
3401		    MAX(unique, ds->ds_reserved);
3402
3403		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3404			return (ENOSPC);
3405		if (ds->ds_quota > 0 &&
3406		    effective_value > ds->ds_quota)
3407			return (ENOSPC);
3408	}
3409
3410	return (0);
3411}
3412
3413static void
3414dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3415{
3416	dsl_dataset_t *ds = arg1;
3417	dsl_prop_setarg_t *psa = arg2;
3418	uint64_t effective_value = psa->psa_effective_value;
3419	uint64_t unique;
3420	int64_t delta;
3421
3422	dsl_prop_set_sync(ds, psa, tx);
3423	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3424
3425	dmu_buf_will_dirty(ds->ds_dbuf, tx);
3426
3427	mutex_enter(&ds->ds_dir->dd_lock);
3428	mutex_enter(&ds->ds_lock);
3429	ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3430	unique = ds->ds_phys->ds_unique_bytes;
3431	delta = MAX(0, (int64_t)(effective_value - unique)) -
3432	    MAX(0, (int64_t)(ds->ds_reserved - unique));
3433	ds->ds_reserved = effective_value;
3434	mutex_exit(&ds->ds_lock);
3435
3436	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3437	mutex_exit(&ds->ds_dir->dd_lock);
3438
3439	spa_history_log_internal(LOG_DS_REFRESERV,
3440	    ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu",
3441	    (longlong_t)effective_value, ds->ds_object);
3442}
3443
3444int
3445dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3446    uint64_t reservation)
3447{
3448	dsl_dataset_t *ds;
3449	dsl_prop_setarg_t psa;
3450	int err;
3451
3452	dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3453	    &reservation);
3454
3455	err = dsl_dataset_hold(dsname, FTAG, &ds);
3456	if (err)
3457		return (err);
3458
3459	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3460	    dsl_dataset_set_reservation_check,
3461	    dsl_dataset_set_reservation_sync, ds, &psa, 0);
3462
3463	dsl_dataset_rele(ds, FTAG);
3464	return (err);
3465}
3466
3467typedef struct zfs_hold_cleanup_arg {
3468	dsl_pool_t *dp;
3469	uint64_t dsobj;
3470	char htag[MAXNAMELEN];
3471} zfs_hold_cleanup_arg_t;
3472
3473static void
3474dsl_dataset_user_release_onexit(void *arg)
3475{
3476	zfs_hold_cleanup_arg_t *ca = arg;
3477
3478	(void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3479	    B_TRUE);
3480	kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3481}
3482
3483void
3484dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3485    minor_t minor)
3486{
3487	zfs_hold_cleanup_arg_t *ca;
3488
3489	ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3490	ca->dp = ds->ds_dir->dd_pool;
3491	ca->dsobj = ds->ds_object;
3492	(void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3493	VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3494	    dsl_dataset_user_release_onexit, ca, NULL));
3495}
3496
3497/*
3498 * If you add new checks here, you may need to add
3499 * additional checks to the "temporary" case in
3500 * snapshot_check() in dmu_objset.c.
3501 */
3502static int
3503dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3504{
3505	dsl_dataset_t *ds = arg1;
3506	struct dsl_ds_holdarg *ha = arg2;
3507	char *htag = ha->htag;
3508	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3509	int error = 0;
3510
3511	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3512		return (ENOTSUP);
3513
3514	if (!dsl_dataset_is_snapshot(ds))
3515		return (EINVAL);
3516
3517	/* tags must be unique */
3518	mutex_enter(&ds->ds_lock);
3519	if (ds->ds_phys->ds_userrefs_obj) {
3520		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3521		    8, 1, tx);
3522		if (error == 0)
3523			error = EEXIST;
3524		else if (error == ENOENT)
3525			error = 0;
3526	}
3527	mutex_exit(&ds->ds_lock);
3528
3529	if (error == 0 && ha->temphold &&
3530	    strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3531		error = E2BIG;
3532
3533	return (error);
3534}
3535
3536void
3537dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3538{
3539	dsl_dataset_t *ds = arg1;
3540	struct dsl_ds_holdarg *ha = arg2;
3541	char *htag = ha->htag;
3542	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3543	objset_t *mos = dp->dp_meta_objset;
3544	uint64_t now = gethrestime_sec();
3545	uint64_t zapobj;
3546
3547	mutex_enter(&ds->ds_lock);
3548	if (ds->ds_phys->ds_userrefs_obj == 0) {
3549		/*
3550		 * This is the first user hold for this dataset.  Create
3551		 * the userrefs zap object.
3552		 */
3553		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3554		zapobj = ds->ds_phys->ds_userrefs_obj =
3555		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3556	} else {
3557		zapobj = ds->ds_phys->ds_userrefs_obj;
3558	}
3559	ds->ds_userrefs++;
3560	mutex_exit(&ds->ds_lock);
3561
3562	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3563
3564	if (ha->temphold) {
3565		VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3566		    htag, &now, tx));
3567	}
3568
3569	spa_history_log_internal(LOG_DS_USER_HOLD,
3570	    dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3571	    (int)ha->temphold, ds->ds_object);
3572}
3573
3574static int
3575dsl_dataset_user_hold_one(const char *dsname, void *arg)
3576{
3577	struct dsl_ds_holdarg *ha = arg;
3578	dsl_dataset_t *ds;
3579	int error;
3580	char *name;
3581
3582	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
3583	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3584	error = dsl_dataset_hold(name, ha->dstg, &ds);
3585	strfree(name);
3586	if (error == 0) {
3587		ha->gotone = B_TRUE;
3588		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3589		    dsl_dataset_user_hold_sync, ds, ha, 0);
3590	} else if (error == ENOENT && ha->recursive) {
3591		error = 0;
3592	} else {
3593		(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3594	}
3595	return (error);
3596}
3597
3598int
3599dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3600    boolean_t temphold)
3601{
3602	struct dsl_ds_holdarg *ha;
3603	int error;
3604
3605	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3606	ha->htag = htag;
3607	ha->temphold = temphold;
3608	error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3609	    dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3610	    ds, ha, 0);
3611	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3612
3613	return (error);
3614}
3615
3616int
3617dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3618    boolean_t recursive, boolean_t temphold, int cleanup_fd)
3619{
3620	struct dsl_ds_holdarg *ha;
3621	dsl_sync_task_t *dst;
3622	spa_t *spa;
3623	int error;
3624	minor_t minor = 0;
3625
3626	if (cleanup_fd != -1) {
3627		/* Currently we only support cleanup-on-exit of tempholds. */
3628		if (!temphold)
3629			return (EINVAL);
3630		error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3631		if (error)
3632			return (error);
3633	}
3634
3635	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3636
3637	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3638
3639	error = spa_open(dsname, &spa, FTAG);
3640	if (error) {
3641		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3642		if (cleanup_fd != -1)
3643			zfs_onexit_fd_rele(cleanup_fd);
3644		return (error);
3645	}
3646
3647	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3648	ha->htag = htag;
3649	ha->snapname = snapname;
3650	ha->recursive = recursive;
3651	ha->temphold = temphold;
3652
3653	if (recursive) {
3654		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3655		    ha, DS_FIND_CHILDREN);
3656	} else {
3657		error = dsl_dataset_user_hold_one(dsname, ha);
3658	}
3659	if (error == 0)
3660		error = dsl_sync_task_group_wait(ha->dstg);
3661
3662	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3663	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3664		dsl_dataset_t *ds = dst->dst_arg1;
3665
3666		if (dst->dst_err) {
3667			dsl_dataset_name(ds, ha->failed);
3668			*strchr(ha->failed, '@') = '\0';
3669		} else if (error == 0 && minor != 0 && temphold) {
3670			/*
3671			 * If this hold is to be released upon process exit,
3672			 * register that action now.
3673			 */
3674			dsl_register_onexit_hold_cleanup(ds, htag, minor);
3675		}
3676		dsl_dataset_rele(ds, ha->dstg);
3677	}
3678
3679	if (error == 0 && recursive && !ha->gotone)
3680		error = ENOENT;
3681
3682	if (error)
3683		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3684
3685	dsl_sync_task_group_destroy(ha->dstg);
3686
3687	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3688	spa_close(spa, FTAG);
3689	if (cleanup_fd != -1)
3690		zfs_onexit_fd_rele(cleanup_fd);
3691	return (error);
3692}
3693
3694struct dsl_ds_releasearg {
3695	dsl_dataset_t *ds;
3696	const char *htag;
3697	boolean_t own;		/* do we own or just hold ds? */
3698};
3699
3700static int
3701dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3702    boolean_t *might_destroy)
3703{
3704	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3705	uint64_t zapobj;
3706	uint64_t tmp;
3707	int error;
3708
3709	*might_destroy = B_FALSE;
3710
3711	mutex_enter(&ds->ds_lock);
3712	zapobj = ds->ds_phys->ds_userrefs_obj;
3713	if (zapobj == 0) {
3714		/* The tag can't possibly exist */
3715		mutex_exit(&ds->ds_lock);
3716		return (ESRCH);
3717	}
3718
3719	/* Make sure the tag exists */
3720	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3721	if (error) {
3722		mutex_exit(&ds->ds_lock);
3723		if (error == ENOENT)
3724			error = ESRCH;
3725		return (error);
3726	}
3727
3728	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3729	    DS_IS_DEFER_DESTROY(ds))
3730		*might_destroy = B_TRUE;
3731
3732	mutex_exit(&ds->ds_lock);
3733	return (0);
3734}
3735
3736static int
3737dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3738{
3739	struct dsl_ds_releasearg *ra = arg1;
3740	dsl_dataset_t *ds = ra->ds;
3741	boolean_t might_destroy;
3742	int error;
3743
3744	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3745		return (ENOTSUP);
3746
3747	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3748	if (error)
3749		return (error);
3750
3751	if (might_destroy) {
3752		struct dsl_ds_destroyarg dsda = {0};
3753
3754		if (dmu_tx_is_syncing(tx)) {
3755			/*
3756			 * If we're not prepared to remove the snapshot,
3757			 * we can't allow the release to happen right now.
3758			 */
3759			if (!ra->own)
3760				return (EBUSY);
3761		}
3762		dsda.ds = ds;
3763		dsda.releasing = B_TRUE;
3764		return (dsl_dataset_destroy_check(&dsda, tag, tx));
3765	}
3766
3767	return (0);
3768}
3769
3770static void
3771dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3772{
3773	struct dsl_ds_releasearg *ra = arg1;
3774	dsl_dataset_t *ds = ra->ds;
3775	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3776	objset_t *mos = dp->dp_meta_objset;
3777	uint64_t zapobj;
3778	uint64_t dsobj = ds->ds_object;
3779	uint64_t refs;
3780	int error;
3781
3782	mutex_enter(&ds->ds_lock);
3783	ds->ds_userrefs--;
3784	refs = ds->ds_userrefs;
3785	mutex_exit(&ds->ds_lock);
3786	error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3787	VERIFY(error == 0 || error == ENOENT);
3788	zapobj = ds->ds_phys->ds_userrefs_obj;
3789	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3790	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3791	    DS_IS_DEFER_DESTROY(ds)) {
3792		struct dsl_ds_destroyarg dsda = {0};
3793
3794		ASSERT(ra->own);
3795		dsda.ds = ds;
3796		dsda.releasing = B_TRUE;
3797		/* We already did the destroy_check */
3798		dsl_dataset_destroy_sync(&dsda, tag, tx);
3799	}
3800
3801	spa_history_log_internal(LOG_DS_USER_RELEASE,
3802	    dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3803	    ra->htag, (longlong_t)refs, dsobj);
3804}
3805
3806static int
3807dsl_dataset_user_release_one(const char *dsname, void *arg)
3808{
3809	struct dsl_ds_holdarg *ha = arg;
3810	struct dsl_ds_releasearg *ra;
3811	dsl_dataset_t *ds;
3812	int error;
3813	void *dtag = ha->dstg;
3814	char *name;
3815	boolean_t own = B_FALSE;
3816	boolean_t might_destroy;
3817
3818	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3819	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3820	error = dsl_dataset_hold(name, dtag, &ds);
3821	strfree(name);
3822	if (error == ENOENT && ha->recursive)
3823		return (0);
3824	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3825	if (error)
3826		return (error);
3827
3828	ha->gotone = B_TRUE;
3829
3830	ASSERT(dsl_dataset_is_snapshot(ds));
3831
3832	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3833	if (error) {
3834		dsl_dataset_rele(ds, dtag);
3835		return (error);
3836	}
3837
3838	if (might_destroy) {
3839#ifdef _KERNEL
3840		name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3841		error = zfs_unmount_snap(name, NULL);
3842		strfree(name);
3843		if (error) {
3844			dsl_dataset_rele(ds, dtag);
3845			return (error);
3846		}
3847#endif
3848		if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3849			dsl_dataset_rele(ds, dtag);
3850			return (EBUSY);
3851		} else {
3852			own = B_TRUE;
3853			dsl_dataset_make_exclusive(ds, dtag);
3854		}
3855	}
3856
3857	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
3858	ra->ds = ds;
3859	ra->htag = ha->htag;
3860	ra->own = own;
3861	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
3862	    dsl_dataset_user_release_sync, ra, dtag, 0);
3863
3864	return (0);
3865}
3866
3867int
3868dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
3869    boolean_t recursive)
3870{
3871	struct dsl_ds_holdarg *ha;
3872	dsl_sync_task_t *dst;
3873	spa_t *spa;
3874	int error;
3875
3876top:
3877	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3878
3879	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3880
3881	error = spa_open(dsname, &spa, FTAG);
3882	if (error) {
3883		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3884		return (error);
3885	}
3886
3887	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3888	ha->htag = htag;
3889	ha->snapname = snapname;
3890	ha->recursive = recursive;
3891	if (recursive) {
3892		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
3893		    ha, DS_FIND_CHILDREN);
3894	} else {
3895		error = dsl_dataset_user_release_one(dsname, ha);
3896	}
3897	if (error == 0)
3898		error = dsl_sync_task_group_wait(ha->dstg);
3899
3900	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3901	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3902		struct dsl_ds_releasearg *ra = dst->dst_arg1;
3903		dsl_dataset_t *ds = ra->ds;
3904
3905		if (dst->dst_err)
3906			dsl_dataset_name(ds, ha->failed);
3907
3908		if (ra->own)
3909			dsl_dataset_disown(ds, ha->dstg);
3910		else
3911			dsl_dataset_rele(ds, ha->dstg);
3912
3913		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
3914	}
3915
3916	if (error == 0 && recursive && !ha->gotone)
3917		error = ENOENT;
3918
3919	if (error && error != EBUSY)
3920		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3921
3922	dsl_sync_task_group_destroy(ha->dstg);
3923	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3924	spa_close(spa, FTAG);
3925
3926	/*
3927	 * We can get EBUSY if we were racing with deferred destroy and
3928	 * dsl_dataset_user_release_check() hadn't done the necessary
3929	 * open context setup.  We can also get EBUSY if we're racing
3930	 * with destroy and that thread is the ds_owner.  Either way
3931	 * the busy condition should be transient, and we should retry
3932	 * the release operation.
3933	 */
3934	if (error == EBUSY)
3935		goto top;
3936
3937	return (error);
3938}
3939
3940/*
3941 * Called at spa_load time (with retry == B_FALSE) to release a stale
3942 * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
3943 */
3944int
3945dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
3946    boolean_t retry)
3947{
3948	dsl_dataset_t *ds;
3949	char *snap;
3950	char *name;
3951	int namelen;
3952	int error;
3953
3954	do {
3955		rw_enter(&dp->dp_config_rwlock, RW_READER);
3956		error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
3957		rw_exit(&dp->dp_config_rwlock);
3958		if (error)
3959			return (error);
3960		namelen = dsl_dataset_namelen(ds)+1;
3961		name = kmem_alloc(namelen, KM_SLEEP);
3962		dsl_dataset_name(ds, name);
3963		dsl_dataset_rele(ds, FTAG);
3964
3965		snap = strchr(name, '@');
3966		*snap = '\0';
3967		++snap;
3968		error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
3969		kmem_free(name, namelen);
3970
3971		/*
3972		 * The object can't have been destroyed because we have a hold,
3973		 * but it might have been renamed, resulting in ENOENT.  Retry
3974		 * if we've been requested to do so.
3975		 *
3976		 * It would be nice if we could use the dsobj all the way
3977		 * through and avoid ENOENT entirely.  But we might need to
3978		 * unmount the snapshot, and there's currently no way to lookup
3979		 * a vfsp using a ZFS object id.
3980		 */
3981	} while ((error == ENOENT) && retry);
3982
3983	return (error);
3984}
3985
3986int
3987dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
3988{
3989	dsl_dataset_t *ds;
3990	int err;
3991
3992	err = dsl_dataset_hold(dsname, FTAG, &ds);
3993	if (err)
3994		return (err);
3995
3996	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
3997	if (ds->ds_phys->ds_userrefs_obj != 0) {
3998		zap_attribute_t *za;
3999		zap_cursor_t zc;
4000
4001		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4002		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4003		    ds->ds_phys->ds_userrefs_obj);
4004		    zap_cursor_retrieve(&zc, za) == 0;
4005		    zap_cursor_advance(&zc)) {
4006			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4007			    za->za_first_integer));
4008		}
4009		zap_cursor_fini(&zc);
4010		kmem_free(za, sizeof (zap_attribute_t));
4011	}
4012	dsl_dataset_rele(ds, FTAG);
4013	return (0);
4014}
4015
4016/*
4017 * Note, this fuction is used as the callback for dmu_objset_find().  We
4018 * always return 0 so that we will continue to find and process
4019 * inconsistent datasets, even if we encounter an error trying to
4020 * process one of them.
4021 */
4022/* ARGSUSED */
4023int
4024dsl_destroy_inconsistent(const char *dsname, void *arg)
4025{
4026	dsl_dataset_t *ds;
4027
4028	if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4029		if (DS_IS_INCONSISTENT(ds))
4030			(void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4031		else
4032			dsl_dataset_disown(ds, FTAG);
4033	}
4034	return (0);
4035}
4036