dsl_pool.c revision 1.1
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23 * Use is subject to license terms.
24 */
25
26#include <sys/dsl_pool.h>
27#include <sys/dsl_dataset.h>
28#include <sys/dsl_dir.h>
29#include <sys/dsl_synctask.h>
30#include <sys/dmu_tx.h>
31#include <sys/dmu_objset.h>
32#include <sys/arc.h>
33#include <sys/zap.h>
34#include <sys/zio.h>
35#include <sys/zfs_context.h>
36#include <sys/fs/zfs.h>
37#include <sys/zfs_znode.h>
38#include <sys/spa_impl.h>
39
40int zfs_no_write_throttle = 0;
41int zfs_write_limit_shift = 3;			/* 1/8th of physical memory */
42int zfs_txg_synctime = 5;			/* target secs to sync a txg */
43
44uint64_t zfs_write_limit_min = 32 << 20;	/* min write limit is 32MB */
45uint64_t zfs_write_limit_max = 0;		/* max data payload per txg */
46uint64_t zfs_write_limit_inflated = 0;
47uint64_t zfs_write_limit_override = 0;
48
49kmutex_t zfs_write_limit_lock;
50
51static pgcnt_t old_physmem = 0;
52
53static int
54dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
55{
56	uint64_t obj;
57	int err;
58
59	err = zap_lookup(dp->dp_meta_objset,
60	    dp->dp_root_dir->dd_phys->dd_child_dir_zapobj,
61	    name, sizeof (obj), 1, &obj);
62	if (err)
63		return (err);
64
65	return (dsl_dir_open_obj(dp, obj, name, dp, ddp));
66}
67
68static dsl_pool_t *
69dsl_pool_open_impl(spa_t *spa, uint64_t txg)
70{
71	dsl_pool_t *dp;
72	blkptr_t *bp = spa_get_rootblkptr(spa);
73
74	dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
75	dp->dp_spa = spa;
76	dp->dp_meta_rootbp = *bp;
77	rw_init(&dp->dp_config_rwlock, NULL, RW_DEFAULT, NULL);
78	dp->dp_write_limit = zfs_write_limit_min;
79	txg_init(dp, txg);
80
81	txg_list_create(&dp->dp_dirty_datasets,
82	    offsetof(dsl_dataset_t, ds_dirty_link));
83	txg_list_create(&dp->dp_dirty_dirs,
84	    offsetof(dsl_dir_t, dd_dirty_link));
85	txg_list_create(&dp->dp_sync_tasks,
86	    offsetof(dsl_sync_task_group_t, dstg_node));
87	list_create(&dp->dp_synced_datasets, sizeof (dsl_dataset_t),
88	    offsetof(dsl_dataset_t, ds_synced_link));
89
90	mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
91	mutex_init(&dp->dp_scrub_cancel_lock, NULL, MUTEX_DEFAULT, NULL);
92
93	return (dp);
94}
95
96int
97dsl_pool_open(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
98{
99	int err;
100	dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
101	dsl_dir_t *dd;
102	dsl_dataset_t *ds;
103	objset_impl_t *osi;
104
105	rw_enter(&dp->dp_config_rwlock, RW_WRITER);
106	err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp, &osi);
107	if (err)
108		goto out;
109	dp->dp_meta_objset = &osi->os;
110
111	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
112	    DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
113	    &dp->dp_root_dir_obj);
114	if (err)
115		goto out;
116
117	err = dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
118	    NULL, dp, &dp->dp_root_dir);
119	if (err)
120		goto out;
121
122	err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
123	if (err)
124		goto out;
125
126	if (spa_version(spa) >= SPA_VERSION_ORIGIN) {
127		err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
128		if (err)
129			goto out;
130		err = dsl_dataset_hold_obj(dp, dd->dd_phys->dd_head_dataset_obj,
131		    FTAG, &ds);
132		if (err)
133			goto out;
134		err = dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
135		    dp, &dp->dp_origin_snap);
136		if (err)
137			goto out;
138		dsl_dataset_rele(ds, FTAG);
139		dsl_dir_close(dd, dp);
140	}
141
142	/* get scrub status */
143	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
144	    DMU_POOL_SCRUB_FUNC, sizeof (uint32_t), 1,
145	    &dp->dp_scrub_func);
146	if (err == 0) {
147		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
148		    DMU_POOL_SCRUB_QUEUE, sizeof (uint64_t), 1,
149		    &dp->dp_scrub_queue_obj);
150		if (err)
151			goto out;
152		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
153		    DMU_POOL_SCRUB_MIN_TXG, sizeof (uint64_t), 1,
154		    &dp->dp_scrub_min_txg);
155		if (err)
156			goto out;
157		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
158		    DMU_POOL_SCRUB_MAX_TXG, sizeof (uint64_t), 1,
159		    &dp->dp_scrub_max_txg);
160		if (err)
161			goto out;
162		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
163		    DMU_POOL_SCRUB_BOOKMARK, sizeof (uint64_t), 4,
164		    &dp->dp_scrub_bookmark);
165		if (err)
166			goto out;
167		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
168		    DMU_POOL_SCRUB_ERRORS, sizeof (uint64_t), 1,
169		    &spa->spa_scrub_errors);
170		if (err)
171			goto out;
172		if (spa_version(spa) < SPA_VERSION_DSL_SCRUB) {
173			/*
174			 * A new-type scrub was in progress on an old
175			 * pool.  Restart from the beginning, since the
176			 * old software may have changed the pool in the
177			 * meantime.
178			 */
179			dsl_pool_scrub_restart(dp);
180		}
181	} else {
182		/*
183		 * It's OK if there is no scrub in progress (and if
184		 * there was an I/O error, ignore it).
185		 */
186		err = 0;
187	}
188
189out:
190	rw_exit(&dp->dp_config_rwlock);
191	if (err)
192		dsl_pool_close(dp);
193	else
194		*dpp = dp;
195
196	return (err);
197}
198
199void
200dsl_pool_close(dsl_pool_t *dp)
201{
202	/* drop our references from dsl_pool_open() */
203
204	/*
205	 * Since we held the origin_snap from "syncing" context (which
206	 * includes pool-opening context), it actually only got a "ref"
207	 * and not a hold, so just drop that here.
208	 */
209	if (dp->dp_origin_snap)
210		dsl_dataset_drop_ref(dp->dp_origin_snap, dp);
211	if (dp->dp_mos_dir)
212		dsl_dir_close(dp->dp_mos_dir, dp);
213	if (dp->dp_root_dir)
214		dsl_dir_close(dp->dp_root_dir, dp);
215
216	/* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
217	if (dp->dp_meta_objset)
218		dmu_objset_evict(NULL, dp->dp_meta_objset->os);
219
220	txg_list_destroy(&dp->dp_dirty_datasets);
221	txg_list_destroy(&dp->dp_dirty_dirs);
222	list_destroy(&dp->dp_synced_datasets);
223
224	arc_flush(dp->dp_spa);
225	txg_fini(dp);
226	rw_destroy(&dp->dp_config_rwlock);
227	mutex_destroy(&dp->dp_lock);
228	mutex_destroy(&dp->dp_scrub_cancel_lock);
229	if (dp->dp_blkstats)
230		kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
231	kmem_free(dp, sizeof (dsl_pool_t));
232}
233
234dsl_pool_t *
235dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
236{
237	int err;
238	dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
239	dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
240	objset_impl_t *osip;
241	dsl_dataset_t *ds;
242	uint64_t dsobj;
243
244	/* create and open the MOS (meta-objset) */
245	dp->dp_meta_objset = &dmu_objset_create_impl(spa,
246	    NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx)->os;
247
248	/* create the pool directory */
249	err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
250	    DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
251	ASSERT3U(err, ==, 0);
252
253	/* create and open the root dir */
254	dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
255	VERIFY(0 == dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
256	    NULL, dp, &dp->dp_root_dir));
257
258	/* create and open the meta-objset dir */
259	(void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
260	VERIFY(0 == dsl_pool_open_special_dir(dp,
261	    MOS_DIR_NAME, &dp->dp_mos_dir));
262
263	if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
264		dsl_pool_create_origin(dp, tx);
265
266	/* create the root dataset */
267	dsobj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
268
269	/* create the root objset */
270	VERIFY(0 == dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
271	osip = dmu_objset_create_impl(dp->dp_spa, ds,
272	    dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
273#ifdef _KERNEL
274	zfs_create_fs(&osip->os, kcred, zplprops, tx);
275#endif
276	dsl_dataset_rele(ds, FTAG);
277
278	dmu_tx_commit(tx);
279
280	return (dp);
281}
282
283void
284dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
285{
286	zio_t *zio;
287	dmu_tx_t *tx;
288	dsl_dir_t *dd;
289	dsl_dataset_t *ds;
290	dsl_sync_task_group_t *dstg;
291	objset_impl_t *mosi = dp->dp_meta_objset->os;
292	hrtime_t start, write_time;
293	uint64_t data_written;
294	int err;
295
296	tx = dmu_tx_create_assigned(dp, txg);
297
298	dp->dp_read_overhead = 0;
299	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
300	while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
301		if (!list_link_active(&ds->ds_synced_link))
302			list_insert_tail(&dp->dp_synced_datasets, ds);
303		else
304			dmu_buf_rele(ds->ds_dbuf, ds);
305		dsl_dataset_sync(ds, zio, tx);
306	}
307	DTRACE_PROBE(pool_sync__1setup);
308
309	start = gethrtime();
310	err = zio_wait(zio);
311	write_time = gethrtime() - start;
312	ASSERT(err == 0);
313	DTRACE_PROBE(pool_sync__2rootzio);
314
315	while (dstg = txg_list_remove(&dp->dp_sync_tasks, txg))
316		dsl_sync_task_group_sync(dstg, tx);
317	DTRACE_PROBE(pool_sync__3task);
318
319	start = gethrtime();
320	while (dd = txg_list_remove(&dp->dp_dirty_dirs, txg))
321		dsl_dir_sync(dd, tx);
322	write_time += gethrtime() - start;
323
324	if (spa_sync_pass(dp->dp_spa) == 1)
325		dsl_pool_scrub_sync(dp, tx);
326
327	start = gethrtime();
328	if (list_head(&mosi->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
329	    list_head(&mosi->os_free_dnodes[txg & TXG_MASK]) != NULL) {
330		zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
331		dmu_objset_sync(mosi, zio, tx);
332		err = zio_wait(zio);
333		ASSERT(err == 0);
334		dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
335		spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
336	}
337	write_time += gethrtime() - start;
338	DTRACE_PROBE2(pool_sync__4io, hrtime_t, write_time,
339	    hrtime_t, dp->dp_read_overhead);
340	write_time -= dp->dp_read_overhead;
341
342	dmu_tx_commit(tx);
343
344	data_written = dp->dp_space_towrite[txg & TXG_MASK];
345	dp->dp_space_towrite[txg & TXG_MASK] = 0;
346	ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
347
348	/*
349	 * If the write limit max has not been explicitly set, set it
350	 * to a fraction of available physical memory (default 1/8th).
351	 * Note that we must inflate the limit because the spa
352	 * inflates write sizes to account for data replication.
353	 * Check this each sync phase to catch changing memory size.
354	 */
355	if (physmem != old_physmem && zfs_write_limit_shift) {
356		mutex_enter(&zfs_write_limit_lock);
357		old_physmem = physmem;
358		zfs_write_limit_max = ptob(physmem) >> zfs_write_limit_shift;
359		zfs_write_limit_inflated = MAX(zfs_write_limit_min,
360		    spa_get_asize(dp->dp_spa, zfs_write_limit_max));
361		mutex_exit(&zfs_write_limit_lock);
362	}
363
364	/*
365	 * Attempt to keep the sync time consistent by adjusting the
366	 * amount of write traffic allowed into each transaction group.
367	 * Weight the throughput calculation towards the current value:
368	 * 	thru = 3/4 old_thru + 1/4 new_thru
369	 */
370	ASSERT(zfs_write_limit_min > 0);
371	if (data_written > zfs_write_limit_min / 8 && write_time > 0) {
372		uint64_t throughput = (data_written * NANOSEC) / write_time;
373		if (dp->dp_throughput)
374			dp->dp_throughput = throughput / 4 +
375			    3 * dp->dp_throughput / 4;
376		else
377			dp->dp_throughput = throughput;
378		dp->dp_write_limit = MIN(zfs_write_limit_inflated,
379		    MAX(zfs_write_limit_min,
380		    dp->dp_throughput * zfs_txg_synctime));
381	}
382}
383
384void
385dsl_pool_zil_clean(dsl_pool_t *dp)
386{
387	dsl_dataset_t *ds;
388
389	while (ds = list_head(&dp->dp_synced_datasets)) {
390		list_remove(&dp->dp_synced_datasets, ds);
391		ASSERT(ds->ds_user_ptr != NULL);
392		zil_clean(((objset_impl_t *)ds->ds_user_ptr)->os_zil);
393		dmu_buf_rele(ds->ds_dbuf, ds);
394	}
395}
396
397/*
398 * TRUE if the current thread is the tx_sync_thread or if we
399 * are being called from SPA context during pool initialization.
400 */
401int
402dsl_pool_sync_context(dsl_pool_t *dp)
403{
404	return (curthread == dp->dp_tx.tx_sync_thread ||
405	    spa_get_dsl(dp->dp_spa) == NULL);
406}
407
408uint64_t
409dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
410{
411	uint64_t space, resv;
412
413	/*
414	 * Reserve about 1.6% (1/64), or at least 32MB, for allocation
415	 * efficiency.
416	 * XXX The intent log is not accounted for, so it must fit
417	 * within this slop.
418	 *
419	 * If we're trying to assess whether it's OK to do a free,
420	 * cut the reservation in half to allow forward progress
421	 * (e.g. make it possible to rm(1) files from a full pool).
422	 */
423	space = spa_get_dspace(dp->dp_spa);
424	resv = MAX(space >> 6, SPA_MINDEVSIZE >> 1);
425	if (netfree)
426		resv >>= 1;
427
428	return (space - resv);
429}
430
431int
432dsl_pool_tempreserve_space(dsl_pool_t *dp, uint64_t space, dmu_tx_t *tx)
433{
434	uint64_t reserved = 0;
435	uint64_t write_limit = (zfs_write_limit_override ?
436	    zfs_write_limit_override : dp->dp_write_limit);
437
438	if (zfs_no_write_throttle) {
439		atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK],
440		    space);
441		return (0);
442	}
443
444	/*
445	 * Check to see if we have exceeded the maximum allowed IO for
446	 * this transaction group.  We can do this without locks since
447	 * a little slop here is ok.  Note that we do the reserved check
448	 * with only half the requested reserve: this is because the
449	 * reserve requests are worst-case, and we really don't want to
450	 * throttle based off of worst-case estimates.
451	 */
452	if (write_limit > 0) {
453		reserved = dp->dp_space_towrite[tx->tx_txg & TXG_MASK]
454		    + dp->dp_tempreserved[tx->tx_txg & TXG_MASK] / 2;
455
456		if (reserved && reserved > write_limit)
457			return (ERESTART);
458	}
459
460	atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], space);
461
462	/*
463	 * If this transaction group is over 7/8ths capacity, delay
464	 * the caller 1 clock tick.  This will slow down the "fill"
465	 * rate until the sync process can catch up with us.
466	 */
467	if (reserved && reserved > (write_limit - (write_limit >> 3)))
468		txg_delay(dp, tx->tx_txg, 1);
469
470	return (0);
471}
472
473void
474dsl_pool_tempreserve_clear(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
475{
476	ASSERT(dp->dp_tempreserved[tx->tx_txg & TXG_MASK] >= space);
477	atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], -space);
478}
479
480void
481dsl_pool_memory_pressure(dsl_pool_t *dp)
482{
483	uint64_t space_inuse = 0;
484	int i;
485
486	if (dp->dp_write_limit == zfs_write_limit_min)
487		return;
488
489	for (i = 0; i < TXG_SIZE; i++) {
490		space_inuse += dp->dp_space_towrite[i];
491		space_inuse += dp->dp_tempreserved[i];
492	}
493	dp->dp_write_limit = MAX(zfs_write_limit_min,
494	    MIN(dp->dp_write_limit, space_inuse / 4));
495}
496
497void
498dsl_pool_willuse_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
499{
500	if (space > 0) {
501		mutex_enter(&dp->dp_lock);
502		dp->dp_space_towrite[tx->tx_txg & TXG_MASK] += space;
503		mutex_exit(&dp->dp_lock);
504	}
505}
506
507/* ARGSUSED */
508static int
509upgrade_clones_cb(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
510{
511	dmu_tx_t *tx = arg;
512	dsl_dataset_t *ds, *prev = NULL;
513	int err;
514	dsl_pool_t *dp = spa_get_dsl(spa);
515
516	err = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
517	if (err)
518		return (err);
519
520	while (ds->ds_phys->ds_prev_snap_obj != 0) {
521		err = dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
522		    FTAG, &prev);
523		if (err) {
524			dsl_dataset_rele(ds, FTAG);
525			return (err);
526		}
527
528		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object)
529			break;
530		dsl_dataset_rele(ds, FTAG);
531		ds = prev;
532		prev = NULL;
533	}
534
535	if (prev == NULL) {
536		prev = dp->dp_origin_snap;
537
538		/*
539		 * The $ORIGIN can't have any data, or the accounting
540		 * will be wrong.
541		 */
542		ASSERT(prev->ds_phys->ds_bp.blk_birth == 0);
543
544		/* The origin doesn't get attached to itself */
545		if (ds->ds_object == prev->ds_object) {
546			dsl_dataset_rele(ds, FTAG);
547			return (0);
548		}
549
550		dmu_buf_will_dirty(ds->ds_dbuf, tx);
551		ds->ds_phys->ds_prev_snap_obj = prev->ds_object;
552		ds->ds_phys->ds_prev_snap_txg = prev->ds_phys->ds_creation_txg;
553
554		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
555		ds->ds_dir->dd_phys->dd_origin_obj = prev->ds_object;
556
557		dmu_buf_will_dirty(prev->ds_dbuf, tx);
558		prev->ds_phys->ds_num_children++;
559
560		if (ds->ds_phys->ds_next_snap_obj == 0) {
561			ASSERT(ds->ds_prev == NULL);
562			VERIFY(0 == dsl_dataset_hold_obj(dp,
563			    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
564		}
565	}
566
567	ASSERT(ds->ds_dir->dd_phys->dd_origin_obj == prev->ds_object);
568	ASSERT(ds->ds_phys->ds_prev_snap_obj == prev->ds_object);
569
570	if (prev->ds_phys->ds_next_clones_obj == 0) {
571		prev->ds_phys->ds_next_clones_obj =
572		    zap_create(dp->dp_meta_objset,
573		    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
574	}
575	VERIFY(0 == zap_add_int(dp->dp_meta_objset,
576	    prev->ds_phys->ds_next_clones_obj, ds->ds_object, tx));
577
578	dsl_dataset_rele(ds, FTAG);
579	if (prev != dp->dp_origin_snap)
580		dsl_dataset_rele(prev, FTAG);
581	return (0);
582}
583
584void
585dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
586{
587	ASSERT(dmu_tx_is_syncing(tx));
588	ASSERT(dp->dp_origin_snap != NULL);
589
590	(void) dmu_objset_find_spa(dp->dp_spa, NULL, upgrade_clones_cb,
591	    tx, DS_FIND_CHILDREN);
592}
593
594void
595dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
596{
597	uint64_t dsobj;
598	dsl_dataset_t *ds;
599
600	ASSERT(dmu_tx_is_syncing(tx));
601	ASSERT(dp->dp_origin_snap == NULL);
602
603	/* create the origin dir, ds, & snap-ds */
604	rw_enter(&dp->dp_config_rwlock, RW_WRITER);
605	dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
606	    NULL, 0, kcred, tx);
607	VERIFY(0 == dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
608	dsl_dataset_snapshot_sync(ds, ORIGIN_DIR_NAME, kcred, tx);
609	VERIFY(0 == dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
610	    dp, &dp->dp_origin_snap));
611	dsl_dataset_rele(ds, FTAG);
612	rw_exit(&dp->dp_config_rwlock);
613}
614