1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24 * Copyright (c) 2019 Datto Inc.
25 */
26/* Copyright (c) 2013 by Saso Kiselkov. All rights reserved. */
27/* Copyright (c) 2013, Joyent, Inc. All rights reserved. */
28/* Copyright 2016 Nexenta Systems, Inc. All rights reserved. */
29
30#include <sys/dmu.h>
31#include <sys/dmu_impl.h>
32#include <sys/dmu_tx.h>
33#include <sys/dbuf.h>
34#include <sys/dnode.h>
35#include <sys/zfs_context.h>
36#include <sys/dmu_objset.h>
37#include <sys/dmu_traverse.h>
38#include <sys/dsl_dataset.h>
39#include <sys/dsl_dir.h>
40#include <sys/dsl_pool.h>
41#include <sys/dsl_synctask.h>
42#include <sys/dsl_prop.h>
43#include <sys/dmu_zfetch.h>
44#include <sys/zfs_ioctl.h>
45#include <sys/zap.h>
46#include <sys/zio_checksum.h>
47#include <sys/zio_compress.h>
48#include <sys/sa.h>
49#include <sys/zfeature.h>
50#include <sys/abd.h>
51#ifdef _KERNEL
52#include <sys/racct.h>
53#include <sys/vm.h>
54#include <sys/zfs_znode.h>
55#endif
56
57/*
58 * Enable/disable nopwrite feature.
59 */
60int zfs_nopwrite_enabled = 1;
61SYSCTL_DECL(_vfs_zfs);
62SYSCTL_INT(_vfs_zfs, OID_AUTO, nopwrite_enabled, CTLFLAG_RDTUN,
63    &zfs_nopwrite_enabled, 0, "Enable nopwrite feature");
64
65/*
66 * Tunable to control percentage of dirtied L1 blocks from frees allowed into
67 * one TXG. After this threshold is crossed, additional dirty blocks from frees
68 * will wait until the next TXG.
69 * A value of zero will disable this throttle.
70 */
71uint32_t zfs_per_txg_dirty_frees_percent = 5;
72SYSCTL_INT(_vfs_zfs, OID_AUTO, per_txg_dirty_frees_percent, CTLFLAG_RWTUN,
73	&zfs_per_txg_dirty_frees_percent, 0,
74	"Percentage of dirtied indirect blocks from frees allowed in one txg");
75
76/*
77 * This can be used for testing, to ensure that certain actions happen
78 * while in the middle of a remap (which might otherwise complete too
79 * quickly).
80 */
81int zfs_object_remap_one_indirect_delay_ticks = 0;
82
83/*
84 * Limit the amount we can prefetch with one call to this amount.  This
85 * helps to limit the amount of memory that can be used by prefetching.
86 * Larger objects should be prefetched a bit at a time.
87 */
88uint64_t dmu_prefetch_max = 8 * SPA_MAXBLOCKSIZE;
89
90const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
91	{ DMU_BSWAP_UINT8,  TRUE,  FALSE,  "unallocated"		},
92	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "object directory"		},
93	{ DMU_BSWAP_UINT64, TRUE,  TRUE,   "object array"		},
94	{ DMU_BSWAP_UINT8,  TRUE,  FALSE,  "packed nvlist"		},
95	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "packed nvlist size"		},
96	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "bpobj"			},
97	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "bpobj header"		},
98	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "SPA space map header"	},
99	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "SPA space map"		},
100	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "ZIL intent log"		},
101	{ DMU_BSWAP_DNODE,  TRUE,  FALSE,  "DMU dnode"			},
102	{ DMU_BSWAP_OBJSET, TRUE,  TRUE,   "DMU objset"			},
103	{ DMU_BSWAP_UINT64, TRUE,  TRUE,   "DSL directory"		},
104	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL directory child map"	},
105	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL dataset snap map"	},
106	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL props"			},
107	{ DMU_BSWAP_UINT64, TRUE,  TRUE,   "DSL dataset"		},
108	{ DMU_BSWAP_ZNODE,  TRUE,  FALSE,  "ZFS znode"			},
109	{ DMU_BSWAP_OLDACL, TRUE,  FALSE,  "ZFS V0 ACL"			},
110	{ DMU_BSWAP_UINT8,  FALSE, FALSE,  "ZFS plain file"		},
111	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS directory"		},
112	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS master node"		},
113	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS delete queue"		},
114	{ DMU_BSWAP_UINT8,  FALSE, FALSE,  "zvol object"		},
115	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "zvol prop"			},
116	{ DMU_BSWAP_UINT8,  FALSE, FALSE,  "other uint8[]"		},
117	{ DMU_BSWAP_UINT64, FALSE, FALSE,  "other uint64[]"		},
118	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "other ZAP"			},
119	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "persistent error log"	},
120	{ DMU_BSWAP_UINT8,  TRUE,  FALSE,  "SPA history"		},
121	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "SPA history offsets"	},
122	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "Pool properties"		},
123	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL permissions"		},
124	{ DMU_BSWAP_ACL,    TRUE,  FALSE,  "ZFS ACL"			},
125	{ DMU_BSWAP_UINT8,  TRUE,  FALSE,  "ZFS SYSACL"			},
126	{ DMU_BSWAP_UINT8,  TRUE,  FALSE,  "FUID table"			},
127	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "FUID table size"		},
128	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL dataset next clones"	},
129	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "scan work queue"		},
130	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS user/group used"	},
131	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS user/group quota"	},
132	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "snapshot refcount tags"	},
133	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "DDT ZAP algorithm"		},
134	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "DDT statistics"		},
135	{ DMU_BSWAP_UINT8,  TRUE,  FALSE,  "System attributes"		},
136	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "SA master node"		},
137	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "SA attr registration"	},
138	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "SA attr layouts"		},
139	{ DMU_BSWAP_ZAP,    TRUE,  FALSE,  "scan translations"		},
140	{ DMU_BSWAP_UINT8,  FALSE, FALSE,  "deduplicated block"		},
141	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL deadlist map"		},
142	{ DMU_BSWAP_UINT64, TRUE,  TRUE,   "DSL deadlist map hdr"	},
143	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL dir clones"		},
144	{ DMU_BSWAP_UINT64, TRUE,  FALSE,  "bpobj subobj"		}
145};
146
147const dmu_object_byteswap_info_t dmu_ot_byteswap[DMU_BSWAP_NUMFUNCS] = {
148	{	byteswap_uint8_array,	"uint8"		},
149	{	byteswap_uint16_array,	"uint16"	},
150	{	byteswap_uint32_array,	"uint32"	},
151	{	byteswap_uint64_array,	"uint64"	},
152	{	zap_byteswap,		"zap"		},
153	{	dnode_buf_byteswap,	"dnode"		},
154	{	dmu_objset_byteswap,	"objset"	},
155	{	zfs_znode_byteswap,	"znode"		},
156	{	zfs_oldacl_byteswap,	"oldacl"	},
157	{	zfs_acl_byteswap,	"acl"		}
158};
159
160int
161dmu_buf_hold_noread_by_dnode(dnode_t *dn, uint64_t offset,
162    void *tag, dmu_buf_t **dbp)
163{
164	uint64_t blkid;
165	dmu_buf_impl_t *db;
166
167	blkid = dbuf_whichblock(dn, 0, offset);
168	rw_enter(&dn->dn_struct_rwlock, RW_READER);
169	db = dbuf_hold(dn, blkid, tag);
170	rw_exit(&dn->dn_struct_rwlock);
171
172	if (db == NULL) {
173		*dbp = NULL;
174		return (SET_ERROR(EIO));
175	}
176
177	*dbp = &db->db;
178	return (0);
179}
180int
181dmu_buf_hold_noread(objset_t *os, uint64_t object, uint64_t offset,
182    void *tag, dmu_buf_t **dbp)
183{
184	dnode_t *dn;
185	uint64_t blkid;
186	dmu_buf_impl_t *db;
187	int err;
188
189	err = dnode_hold(os, object, FTAG, &dn);
190	if (err)
191		return (err);
192	blkid = dbuf_whichblock(dn, 0, offset);
193	rw_enter(&dn->dn_struct_rwlock, RW_READER);
194	db = dbuf_hold(dn, blkid, tag);
195	rw_exit(&dn->dn_struct_rwlock);
196	dnode_rele(dn, FTAG);
197
198	if (db == NULL) {
199		*dbp = NULL;
200		return (SET_ERROR(EIO));
201	}
202
203	*dbp = &db->db;
204	return (err);
205}
206
207int
208dmu_buf_hold_by_dnode(dnode_t *dn, uint64_t offset,
209    void *tag, dmu_buf_t **dbp, int flags)
210{
211	int err;
212	int db_flags = DB_RF_CANFAIL;
213
214	if (flags & DMU_READ_NO_PREFETCH)
215		db_flags |= DB_RF_NOPREFETCH;
216
217	err = dmu_buf_hold_noread_by_dnode(dn, offset, tag, dbp);
218	if (err == 0) {
219		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
220		err = dbuf_read(db, NULL, db_flags);
221		if (err != 0) {
222			dbuf_rele(db, tag);
223			*dbp = NULL;
224		}
225	}
226
227	return (err);
228}
229
230int
231dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
232    void *tag, dmu_buf_t **dbp, int flags)
233{
234	int err;
235	int db_flags = DB_RF_CANFAIL;
236
237	if (flags & DMU_READ_NO_PREFETCH)
238		db_flags |= DB_RF_NOPREFETCH;
239
240	err = dmu_buf_hold_noread(os, object, offset, tag, dbp);
241	if (err == 0) {
242		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
243		err = dbuf_read(db, NULL, db_flags);
244		if (err != 0) {
245			dbuf_rele(db, tag);
246			*dbp = NULL;
247		}
248	}
249
250	return (err);
251}
252
253int
254dmu_bonus_max(void)
255{
256	return (DN_OLD_MAX_BONUSLEN);
257}
258
259int
260dmu_set_bonus(dmu_buf_t *db_fake, int newsize, dmu_tx_t *tx)
261{
262	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
263	dnode_t *dn;
264	int error;
265
266	DB_DNODE_ENTER(db);
267	dn = DB_DNODE(db);
268
269	if (dn->dn_bonus != db) {
270		error = SET_ERROR(EINVAL);
271	} else if (newsize < 0 || newsize > db_fake->db_size) {
272		error = SET_ERROR(EINVAL);
273	} else {
274		dnode_setbonuslen(dn, newsize, tx);
275		error = 0;
276	}
277
278	DB_DNODE_EXIT(db);
279	return (error);
280}
281
282int
283dmu_set_bonustype(dmu_buf_t *db_fake, dmu_object_type_t type, dmu_tx_t *tx)
284{
285	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
286	dnode_t *dn;
287	int error;
288
289	DB_DNODE_ENTER(db);
290	dn = DB_DNODE(db);
291
292	if (!DMU_OT_IS_VALID(type)) {
293		error = SET_ERROR(EINVAL);
294	} else if (dn->dn_bonus != db) {
295		error = SET_ERROR(EINVAL);
296	} else {
297		dnode_setbonus_type(dn, type, tx);
298		error = 0;
299	}
300
301	DB_DNODE_EXIT(db);
302	return (error);
303}
304
305dmu_object_type_t
306dmu_get_bonustype(dmu_buf_t *db_fake)
307{
308	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
309	dnode_t *dn;
310	dmu_object_type_t type;
311
312	DB_DNODE_ENTER(db);
313	dn = DB_DNODE(db);
314	type = dn->dn_bonustype;
315	DB_DNODE_EXIT(db);
316
317	return (type);
318}
319
320int
321dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
322{
323	dnode_t *dn;
324	int error;
325
326	error = dnode_hold(os, object, FTAG, &dn);
327	dbuf_rm_spill(dn, tx);
328	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
329	dnode_rm_spill(dn, tx);
330	rw_exit(&dn->dn_struct_rwlock);
331	dnode_rele(dn, FTAG);
332	return (error);
333}
334
335/*
336 * returns ENOENT, EIO, or 0.
337 */
338int
339dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
340{
341	dnode_t *dn;
342	dmu_buf_impl_t *db;
343	int error;
344
345	error = dnode_hold(os, object, FTAG, &dn);
346	if (error)
347		return (error);
348
349	rw_enter(&dn->dn_struct_rwlock, RW_READER);
350	if (dn->dn_bonus == NULL) {
351		rw_exit(&dn->dn_struct_rwlock);
352		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
353		if (dn->dn_bonus == NULL)
354			dbuf_create_bonus(dn);
355	}
356	db = dn->dn_bonus;
357
358	/* as long as the bonus buf is held, the dnode will be held */
359	if (zfs_refcount_add(&db->db_holds, tag) == 1) {
360		VERIFY(dnode_add_ref(dn, db));
361		atomic_inc_32(&dn->dn_dbufs_count);
362	}
363
364	/*
365	 * Wait to drop dn_struct_rwlock until after adding the bonus dbuf's
366	 * hold and incrementing the dbuf count to ensure that dnode_move() sees
367	 * a dnode hold for every dbuf.
368	 */
369	rw_exit(&dn->dn_struct_rwlock);
370
371	dnode_rele(dn, FTAG);
372
373	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH));
374
375	*dbp = &db->db;
376	return (0);
377}
378
379/*
380 * returns ENOENT, EIO, or 0.
381 *
382 * This interface will allocate a blank spill dbuf when a spill blk
383 * doesn't already exist on the dnode.
384 *
385 * if you only want to find an already existing spill db, then
386 * dmu_spill_hold_existing() should be used.
387 */
388int
389dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
390{
391	dmu_buf_impl_t *db = NULL;
392	int err;
393
394	if ((flags & DB_RF_HAVESTRUCT) == 0)
395		rw_enter(&dn->dn_struct_rwlock, RW_READER);
396
397	db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
398
399	if ((flags & DB_RF_HAVESTRUCT) == 0)
400		rw_exit(&dn->dn_struct_rwlock);
401
402	ASSERT(db != NULL);
403	err = dbuf_read(db, NULL, flags);
404	if (err == 0)
405		*dbp = &db->db;
406	else
407		dbuf_rele(db, tag);
408	return (err);
409}
410
411int
412dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
413{
414	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
415	dnode_t *dn;
416	int err;
417
418	DB_DNODE_ENTER(db);
419	dn = DB_DNODE(db);
420
421	if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA) {
422		err = SET_ERROR(EINVAL);
423	} else {
424		rw_enter(&dn->dn_struct_rwlock, RW_READER);
425
426		if (!dn->dn_have_spill) {
427			err = SET_ERROR(ENOENT);
428		} else {
429			err = dmu_spill_hold_by_dnode(dn,
430			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL, tag, dbp);
431		}
432
433		rw_exit(&dn->dn_struct_rwlock);
434	}
435
436	DB_DNODE_EXIT(db);
437	return (err);
438}
439
440int
441dmu_spill_hold_by_bonus(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
442{
443	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
444	dnode_t *dn;
445	int err;
446
447	DB_DNODE_ENTER(db);
448	dn = DB_DNODE(db);
449	err = dmu_spill_hold_by_dnode(dn, DB_RF_CANFAIL, tag, dbp);
450	DB_DNODE_EXIT(db);
451
452	return (err);
453}
454
455/*
456 * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
457 * to take a held dnode rather than <os, object> -- the lookup is wasteful,
458 * and can induce severe lock contention when writing to several files
459 * whose dnodes are in the same block.
460 */
461int
462dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
463    boolean_t read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
464{
465	dmu_buf_t **dbp;
466	uint64_t blkid, nblks, i;
467	uint32_t dbuf_flags;
468	int err;
469	zio_t *zio;
470
471	ASSERT(length <= DMU_MAX_ACCESS);
472
473	/*
474	 * Note: We directly notify the prefetch code of this read, so that
475	 * we can tell it about the multi-block read.  dbuf_read() only knows
476	 * about the one block it is accessing.
477	 */
478	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT |
479	    DB_RF_NOPREFETCH;
480
481	rw_enter(&dn->dn_struct_rwlock, RW_READER);
482	if (dn->dn_datablkshift) {
483		int blkshift = dn->dn_datablkshift;
484		nblks = (P2ROUNDUP(offset + length, 1ULL << blkshift) -
485		    P2ALIGN(offset, 1ULL << blkshift)) >> blkshift;
486	} else {
487		if (offset + length > dn->dn_datablksz) {
488			zfs_panic_recover("zfs: accessing past end of object "
489			    "%llx/%llx (size=%u access=%llu+%llu)",
490			    (longlong_t)dn->dn_objset->
491			    os_dsl_dataset->ds_object,
492			    (longlong_t)dn->dn_object, dn->dn_datablksz,
493			    (longlong_t)offset, (longlong_t)length);
494			rw_exit(&dn->dn_struct_rwlock);
495			return (SET_ERROR(EIO));
496		}
497		nblks = 1;
498	}
499	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
500
501#if defined(_KERNEL) && defined(RACCT)
502	if (racct_enable && !read) {
503		PROC_LOCK(curproc);
504		racct_add_force(curproc, RACCT_WRITEBPS, length);
505		racct_add_force(curproc, RACCT_WRITEIOPS, nblks);
506		PROC_UNLOCK(curproc);
507	}
508#endif
509
510	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
511	blkid = dbuf_whichblock(dn, 0, offset);
512	for (i = 0; i < nblks; i++) {
513		dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
514		if (db == NULL) {
515			rw_exit(&dn->dn_struct_rwlock);
516			dmu_buf_rele_array(dbp, nblks, tag);
517			zio_nowait(zio);
518			return (SET_ERROR(EIO));
519		}
520
521		/* initiate async i/o */
522		if (read)
523			(void) dbuf_read(db, zio, dbuf_flags);
524#ifdef _KERNEL
525		else
526			curthread->td_ru.ru_oublock++;
527#endif
528		dbp[i] = &db->db;
529	}
530
531	if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
532	    DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
533		dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
534		    read && DNODE_IS_CACHEABLE(dn));
535	}
536	rw_exit(&dn->dn_struct_rwlock);
537
538	/* wait for async i/o */
539	err = zio_wait(zio);
540	if (err) {
541		dmu_buf_rele_array(dbp, nblks, tag);
542		return (err);
543	}
544
545	/* wait for other io to complete */
546	if (read) {
547		for (i = 0; i < nblks; i++) {
548			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
549			mutex_enter(&db->db_mtx);
550			while (db->db_state == DB_READ ||
551			    db->db_state == DB_FILL)
552				cv_wait(&db->db_changed, &db->db_mtx);
553			if (db->db_state == DB_UNCACHED)
554				err = SET_ERROR(EIO);
555			mutex_exit(&db->db_mtx);
556			if (err) {
557				dmu_buf_rele_array(dbp, nblks, tag);
558				return (err);
559			}
560		}
561	}
562
563	*numbufsp = nblks;
564	*dbpp = dbp;
565	return (0);
566}
567
568static int
569dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
570    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
571{
572	dnode_t *dn;
573	int err;
574
575	err = dnode_hold(os, object, FTAG, &dn);
576	if (err)
577		return (err);
578
579	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
580	    numbufsp, dbpp, DMU_READ_PREFETCH);
581
582	dnode_rele(dn, FTAG);
583
584	return (err);
585}
586
587int
588dmu_buf_hold_array_by_bonus(dmu_buf_t *db_fake, uint64_t offset,
589    uint64_t length, boolean_t read, void *tag, int *numbufsp,
590    dmu_buf_t ***dbpp)
591{
592	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
593	dnode_t *dn;
594	int err;
595
596	DB_DNODE_ENTER(db);
597	dn = DB_DNODE(db);
598	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
599	    numbufsp, dbpp, DMU_READ_PREFETCH);
600	DB_DNODE_EXIT(db);
601
602	return (err);
603}
604
605void
606dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
607{
608	int i;
609	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
610
611	if (numbufs == 0)
612		return;
613
614	for (i = 0; i < numbufs; i++) {
615		if (dbp[i])
616			dbuf_rele(dbp[i], tag);
617	}
618
619	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
620}
621
622/*
623 * Issue prefetch i/os for the given blocks.  If level is greater than 0, the
624 * indirect blocks prefeteched will be those that point to the blocks containing
625 * the data starting at offset, and continuing to offset + len.
626 *
627 * Note that if the indirect blocks above the blocks being prefetched are not in
628 * cache, they will be asychronously read in.
629 */
630void
631dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
632    uint64_t len, zio_priority_t pri)
633{
634	dnode_t *dn;
635	uint64_t blkid;
636	int nblks, err;
637
638	if (len == 0) {  /* they're interested in the bonus buffer */
639		dn = DMU_META_DNODE(os);
640
641		if (object == 0 || object >= DN_MAX_OBJECT)
642			return;
643
644		rw_enter(&dn->dn_struct_rwlock, RW_READER);
645		blkid = dbuf_whichblock(dn, level,
646		    object * sizeof (dnode_phys_t));
647		dbuf_prefetch(dn, level, blkid, pri, 0);
648		rw_exit(&dn->dn_struct_rwlock);
649		return;
650	}
651
652	/*
653	 * See comment before the definition of dmu_prefetch_max.
654	 */
655	len = MIN(len, dmu_prefetch_max);
656
657	/*
658	 * XXX - Note, if the dnode for the requested object is not
659	 * already cached, we will do a *synchronous* read in the
660	 * dnode_hold() call.  The same is true for any indirects.
661	 */
662	err = dnode_hold(os, object, FTAG, &dn);
663	if (err != 0)
664		return;
665
666	rw_enter(&dn->dn_struct_rwlock, RW_READER);
667	/*
668	 * offset + len - 1 is the last byte we want to prefetch for, and offset
669	 * is the first.  Then dbuf_whichblk(dn, level, off + len - 1) is the
670	 * last block we want to prefetch, and dbuf_whichblock(dn, level,
671	 * offset)  is the first.  Then the number we need to prefetch is the
672	 * last - first + 1.
673	 */
674	if (level > 0 || dn->dn_datablkshift != 0) {
675		nblks = dbuf_whichblock(dn, level, offset + len - 1) -
676		    dbuf_whichblock(dn, level, offset) + 1;
677	} else {
678		nblks = (offset < dn->dn_datablksz);
679	}
680
681	if (nblks != 0) {
682		blkid = dbuf_whichblock(dn, level, offset);
683		for (int i = 0; i < nblks; i++)
684			dbuf_prefetch(dn, level, blkid + i, pri, 0);
685	}
686
687	rw_exit(&dn->dn_struct_rwlock);
688
689	dnode_rele(dn, FTAG);
690}
691
692/*
693 * Get the next "chunk" of file data to free.  We traverse the file from
694 * the end so that the file gets shorter over time (if we crashes in the
695 * middle, this will leave us in a better state).  We find allocated file
696 * data by simply searching the allocated level 1 indirects.
697 *
698 * On input, *start should be the first offset that does not need to be
699 * freed (e.g. "offset + length").  On return, *start will be the first
700 * offset that should be freed and l1blks is set to the number of level 1
701 * indirect blocks found within the chunk.
702 */
703static int
704get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t minimum, uint64_t *l1blks)
705{
706	uint64_t blks;
707	uint64_t maxblks = DMU_MAX_ACCESS >> (dn->dn_indblkshift + 1);
708	/* bytes of data covered by a level-1 indirect block */
709	uint64_t iblkrange =
710	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
711
712	ASSERT3U(minimum, <=, *start);
713
714	/*
715	 * Check if we can free the entire range assuming that all of the
716	 * L1 blocks in this range have data. If we can, we use this
717	 * worst case value as an estimate so we can avoid having to look
718	 * at the object's actual data.
719	 */
720	uint64_t total_l1blks =
721	    (roundup(*start, iblkrange) - (minimum / iblkrange * iblkrange)) /
722	    iblkrange;
723	if (total_l1blks <= maxblks) {
724		*l1blks = total_l1blks;
725		*start = minimum;
726		return (0);
727	}
728	ASSERT(ISP2(iblkrange));
729
730	for (blks = 0; *start > minimum && blks < maxblks; blks++) {
731		int err;
732
733		/*
734		 * dnode_next_offset(BACKWARDS) will find an allocated L1
735		 * indirect block at or before the input offset.  We must
736		 * decrement *start so that it is at the end of the region
737		 * to search.
738		 */
739		(*start)--;
740
741		err = dnode_next_offset(dn,
742		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
743
744		/* if there are no indirect blocks before start, we are done */
745		if (err == ESRCH) {
746			*start = minimum;
747			break;
748		} else if (err != 0) {
749			*l1blks = blks;
750			return (err);
751		}
752
753		/* set start to the beginning of this L1 indirect */
754		*start = P2ALIGN(*start, iblkrange);
755	}
756	if (*start < minimum)
757		*start = minimum;
758	*l1blks = blks;
759
760	return (0);
761}
762
763/*
764 * If this objset is of type OST_ZFS return true if vfs's unmounted flag is set,
765 * otherwise return false.
766 * Used below in dmu_free_long_range_impl() to enable abort when unmounting
767 */
768/*ARGSUSED*/
769static boolean_t
770dmu_objset_zfs_unmounting(objset_t *os)
771{
772#ifdef _KERNEL
773	if (dmu_objset_type(os) == DMU_OST_ZFS)
774		return (zfs_get_vfs_flag_unmounted(os));
775#endif
776	return (B_FALSE);
777}
778
779static int
780dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
781    uint64_t length)
782{
783	uint64_t object_size = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
784	int err;
785	uint64_t dirty_frees_threshold;
786	dsl_pool_t *dp = dmu_objset_pool(os);
787
788	if (offset >= object_size)
789		return (0);
790
791	if (zfs_per_txg_dirty_frees_percent <= 100)
792		dirty_frees_threshold =
793		    zfs_per_txg_dirty_frees_percent * zfs_dirty_data_max / 100;
794	else
795		dirty_frees_threshold = zfs_dirty_data_max / 20;
796
797	if (length == DMU_OBJECT_END || offset + length > object_size)
798		length = object_size - offset;
799
800	while (length != 0) {
801		uint64_t chunk_end, chunk_begin, chunk_len;
802		uint64_t l1blks;
803		dmu_tx_t *tx;
804
805		if (dmu_objset_zfs_unmounting(dn->dn_objset))
806			return (SET_ERROR(EINTR));
807
808		chunk_end = chunk_begin = offset + length;
809
810		/* move chunk_begin backwards to the beginning of this chunk */
811		err = get_next_chunk(dn, &chunk_begin, offset, &l1blks);
812		if (err)
813			return (err);
814		ASSERT3U(chunk_begin, >=, offset);
815		ASSERT3U(chunk_begin, <=, chunk_end);
816
817		chunk_len = chunk_end - chunk_begin;
818
819		tx = dmu_tx_create(os);
820		dmu_tx_hold_free(tx, dn->dn_object, chunk_begin, chunk_len);
821
822		/*
823		 * Mark this transaction as typically resulting in a net
824		 * reduction in space used.
825		 */
826		dmu_tx_mark_netfree(tx);
827		err = dmu_tx_assign(tx, TXG_WAIT);
828		if (err) {
829			dmu_tx_abort(tx);
830			return (err);
831		}
832
833		uint64_t txg = dmu_tx_get_txg(tx);
834
835		mutex_enter(&dp->dp_lock);
836		uint64_t long_free_dirty =
837		    dp->dp_long_free_dirty_pertxg[txg & TXG_MASK];
838		mutex_exit(&dp->dp_lock);
839
840		/*
841		 * To avoid filling up a TXG with just frees, wait for
842		 * the next TXG to open before freeing more chunks if
843		 * we have reached the threshold of frees.
844		 */
845		if (dirty_frees_threshold != 0 &&
846		    long_free_dirty >= dirty_frees_threshold) {
847			dmu_tx_commit(tx);
848			txg_wait_open(dp, 0);
849			continue;
850		}
851
852		/*
853		 * In order to prevent unnecessary write throttling, for each
854		 * TXG, we track the cumulative size of L1 blocks being dirtied
855		 * in dnode_free_range() below. We compare this number to a
856		 * tunable threshold, past which we prevent new L1 dirty freeing
857		 * blocks from being added into the open TXG. See
858		 * dmu_free_long_range_impl() for details. The threshold
859		 * prevents write throttle activation due to dirty freeing L1
860		 * blocks taking up a large percentage of zfs_dirty_data_max.
861		 */
862		mutex_enter(&dp->dp_lock);
863		dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] +=
864		    l1blks << dn->dn_indblkshift;
865		mutex_exit(&dp->dp_lock);
866		DTRACE_PROBE3(free__long__range,
867		    uint64_t, long_free_dirty, uint64_t, chunk_len,
868		    uint64_t, txg);
869		dnode_free_range(dn, chunk_begin, chunk_len, tx);
870		dmu_tx_commit(tx);
871
872		length -= chunk_len;
873	}
874	return (0);
875}
876
877int
878dmu_free_long_range(objset_t *os, uint64_t object,
879    uint64_t offset, uint64_t length)
880{
881	dnode_t *dn;
882	int err;
883
884	err = dnode_hold(os, object, FTAG, &dn);
885	if (err != 0)
886		return (err);
887	err = dmu_free_long_range_impl(os, dn, offset, length);
888
889	/*
890	 * It is important to zero out the maxblkid when freeing the entire
891	 * file, so that (a) subsequent calls to dmu_free_long_range_impl()
892	 * will take the fast path, and (b) dnode_reallocate() can verify
893	 * that the entire file has been freed.
894	 */
895	if (err == 0 && offset == 0 && length == DMU_OBJECT_END)
896		dn->dn_maxblkid = 0;
897
898	dnode_rele(dn, FTAG);
899	return (err);
900}
901
902int
903dmu_free_long_object(objset_t *os, uint64_t object)
904{
905	dmu_tx_t *tx;
906	int err;
907
908	err = dmu_free_long_range(os, object, 0, DMU_OBJECT_END);
909	if (err != 0)
910		return (err);
911
912	tx = dmu_tx_create(os);
913	dmu_tx_hold_bonus(tx, object);
914	dmu_tx_hold_free(tx, object, 0, DMU_OBJECT_END);
915	dmu_tx_mark_netfree(tx);
916	err = dmu_tx_assign(tx, TXG_WAIT);
917	if (err == 0) {
918		err = dmu_object_free(os, object, tx);
919		dmu_tx_commit(tx);
920	} else {
921		dmu_tx_abort(tx);
922	}
923
924	return (err);
925}
926
927int
928dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
929    uint64_t size, dmu_tx_t *tx)
930{
931	dnode_t *dn;
932	int err = dnode_hold(os, object, FTAG, &dn);
933	if (err)
934		return (err);
935	ASSERT(offset < UINT64_MAX);
936	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
937	dnode_free_range(dn, offset, size, tx);
938	dnode_rele(dn, FTAG);
939	return (0);
940}
941
942static int
943dmu_read_impl(dnode_t *dn, uint64_t offset, uint64_t size,
944    void *buf, uint32_t flags)
945{
946	dmu_buf_t **dbp;
947	int numbufs, err = 0;
948
949	/*
950	 * Deal with odd block sizes, where there can't be data past the first
951	 * block.  If we ever do the tail block optimization, we will need to
952	 * handle that here as well.
953	 */
954	if (dn->dn_maxblkid == 0) {
955		int newsz = offset > dn->dn_datablksz ? 0 :
956		    MIN(size, dn->dn_datablksz - offset);
957		bzero((char *)buf + newsz, size - newsz);
958		size = newsz;
959	}
960
961	while (size > 0) {
962		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
963		int i;
964
965		/*
966		 * NB: we could do this block-at-a-time, but it's nice
967		 * to be reading in parallel.
968		 */
969		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
970		    TRUE, FTAG, &numbufs, &dbp, flags);
971		if (err)
972			break;
973
974		for (i = 0; i < numbufs; i++) {
975			int tocpy;
976			int bufoff;
977			dmu_buf_t *db = dbp[i];
978
979			ASSERT(size > 0);
980
981			bufoff = offset - db->db_offset;
982			tocpy = (int)MIN(db->db_size - bufoff, size);
983
984			bcopy((char *)db->db_data + bufoff, buf, tocpy);
985
986			offset += tocpy;
987			size -= tocpy;
988			buf = (char *)buf + tocpy;
989		}
990		dmu_buf_rele_array(dbp, numbufs, FTAG);
991	}
992	return (err);
993}
994
995int
996dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
997    void *buf, uint32_t flags)
998{
999	dnode_t *dn;
1000	int err;
1001
1002	err = dnode_hold(os, object, FTAG, &dn);
1003	if (err != 0)
1004		return (err);
1005
1006	err = dmu_read_impl(dn, offset, size, buf, flags);
1007	dnode_rele(dn, FTAG);
1008	return (err);
1009}
1010
1011int
1012dmu_read_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size, void *buf,
1013    uint32_t flags)
1014{
1015	return (dmu_read_impl(dn, offset, size, buf, flags));
1016}
1017
1018static void
1019dmu_write_impl(dmu_buf_t **dbp, int numbufs, uint64_t offset, uint64_t size,
1020    const void *buf, dmu_tx_t *tx)
1021{
1022	int i;
1023
1024	for (i = 0; i < numbufs; i++) {
1025		int tocpy;
1026		int bufoff;
1027		dmu_buf_t *db = dbp[i];
1028
1029		ASSERT(size > 0);
1030
1031		bufoff = offset - db->db_offset;
1032		tocpy = (int)MIN(db->db_size - bufoff, size);
1033
1034		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1035
1036		if (tocpy == db->db_size)
1037			dmu_buf_will_fill(db, tx);
1038		else
1039			dmu_buf_will_dirty(db, tx);
1040
1041		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
1042
1043		if (tocpy == db->db_size)
1044			dmu_buf_fill_done(db, tx);
1045
1046		offset += tocpy;
1047		size -= tocpy;
1048		buf = (char *)buf + tocpy;
1049	}
1050}
1051
1052void
1053dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1054    const void *buf, dmu_tx_t *tx)
1055{
1056	dmu_buf_t **dbp;
1057	int numbufs;
1058
1059	if (size == 0)
1060		return;
1061
1062	VERIFY0(dmu_buf_hold_array(os, object, offset, size,
1063	    FALSE, FTAG, &numbufs, &dbp));
1064	dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1065	dmu_buf_rele_array(dbp, numbufs, FTAG);
1066}
1067
1068void
1069dmu_write_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size,
1070    const void *buf, dmu_tx_t *tx)
1071{
1072	dmu_buf_t **dbp;
1073	int numbufs;
1074
1075	if (size == 0)
1076		return;
1077
1078	VERIFY0(dmu_buf_hold_array_by_dnode(dn, offset, size,
1079	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH));
1080	dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1081	dmu_buf_rele_array(dbp, numbufs, FTAG);
1082}
1083
1084static int
1085dmu_object_remap_one_indirect(objset_t *os, dnode_t *dn,
1086    uint64_t last_removal_txg, uint64_t offset)
1087{
1088	uint64_t l1blkid = dbuf_whichblock(dn, 1, offset);
1089	int err = 0;
1090
1091	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1092	dmu_buf_impl_t *dbuf = dbuf_hold_level(dn, 1, l1blkid, FTAG);
1093	ASSERT3P(dbuf, !=, NULL);
1094
1095	/*
1096	 * If the block hasn't been written yet, this default will ensure
1097	 * we don't try to remap it.
1098	 */
1099	uint64_t birth = UINT64_MAX;
1100	ASSERT3U(last_removal_txg, !=, UINT64_MAX);
1101	if (dbuf->db_blkptr != NULL)
1102		birth = dbuf->db_blkptr->blk_birth;
1103	rw_exit(&dn->dn_struct_rwlock);
1104
1105	/*
1106	 * If this L1 was already written after the last removal, then we've
1107	 * already tried to remap it.
1108	 */
1109	if (birth <= last_removal_txg &&
1110	    dbuf_read(dbuf, NULL, DB_RF_MUST_SUCCEED) == 0 &&
1111	    dbuf_can_remap(dbuf)) {
1112		dmu_tx_t *tx = dmu_tx_create(os);
1113		dmu_tx_hold_remap_l1indirect(tx, dn->dn_object);
1114		err = dmu_tx_assign(tx, TXG_WAIT);
1115		if (err == 0) {
1116			(void) dbuf_dirty(dbuf, tx);
1117			dmu_tx_commit(tx);
1118		} else {
1119			dmu_tx_abort(tx);
1120		}
1121	}
1122
1123	dbuf_rele(dbuf, FTAG);
1124
1125	delay(zfs_object_remap_one_indirect_delay_ticks);
1126
1127	return (err);
1128}
1129
1130/*
1131 * Remap all blockpointers in the object, if possible, so that they reference
1132 * only concrete vdevs.
1133 *
1134 * To do this, iterate over the L0 blockpointers and remap any that reference
1135 * an indirect vdev. Note that we only examine L0 blockpointers; since we
1136 * cannot guarantee that we can remap all blockpointer anyways (due to split
1137 * blocks), we do not want to make the code unnecessarily complicated to
1138 * catch the unlikely case that there is an L1 block on an indirect vdev that
1139 * contains no indirect blockpointers.
1140 */
1141int
1142dmu_object_remap_indirects(objset_t *os, uint64_t object,
1143    uint64_t last_removal_txg)
1144{
1145	uint64_t offset, l1span;
1146	int err;
1147	dnode_t *dn;
1148
1149	err = dnode_hold(os, object, FTAG, &dn);
1150	if (err != 0) {
1151		return (err);
1152	}
1153
1154	if (dn->dn_nlevels <= 1) {
1155		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1156			err = SET_ERROR(EINTR);
1157		}
1158
1159		/*
1160		 * If the dnode has no indirect blocks, we cannot dirty them.
1161		 * We still want to remap the blkptr(s) in the dnode if
1162		 * appropriate, so mark it as dirty.
1163		 */
1164		if (err == 0 && dnode_needs_remap(dn)) {
1165			dmu_tx_t *tx = dmu_tx_create(os);
1166			dmu_tx_hold_bonus(tx, dn->dn_object);
1167			if ((err = dmu_tx_assign(tx, TXG_WAIT)) == 0) {
1168				dnode_setdirty(dn, tx);
1169				dmu_tx_commit(tx);
1170			} else {
1171				dmu_tx_abort(tx);
1172			}
1173		}
1174
1175		dnode_rele(dn, FTAG);
1176		return (err);
1177	}
1178
1179	offset = 0;
1180	l1span = 1ULL << (dn->dn_indblkshift - SPA_BLKPTRSHIFT +
1181	    dn->dn_datablkshift);
1182	/*
1183	 * Find the next L1 indirect that is not a hole.
1184	 */
1185	while (dnode_next_offset(dn, 0, &offset, 2, 1, 0) == 0) {
1186		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1187			err = SET_ERROR(EINTR);
1188			break;
1189		}
1190		if ((err = dmu_object_remap_one_indirect(os, dn,
1191		    last_removal_txg, offset)) != 0) {
1192			break;
1193		}
1194		offset += l1span;
1195	}
1196
1197	dnode_rele(dn, FTAG);
1198	return (err);
1199}
1200
1201void
1202dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1203    dmu_tx_t *tx)
1204{
1205	dmu_buf_t **dbp;
1206	int numbufs, i;
1207
1208	if (size == 0)
1209		return;
1210
1211	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
1212	    FALSE, FTAG, &numbufs, &dbp));
1213
1214	for (i = 0; i < numbufs; i++) {
1215		dmu_buf_t *db = dbp[i];
1216
1217		dmu_buf_will_not_fill(db, tx);
1218	}
1219	dmu_buf_rele_array(dbp, numbufs, FTAG);
1220}
1221
1222void
1223dmu_write_embedded(objset_t *os, uint64_t object, uint64_t offset,
1224    void *data, uint8_t etype, uint8_t comp, int uncompressed_size,
1225    int compressed_size, int byteorder, dmu_tx_t *tx)
1226{
1227	dmu_buf_t *db;
1228
1229	ASSERT3U(etype, <, NUM_BP_EMBEDDED_TYPES);
1230	ASSERT3U(comp, <, ZIO_COMPRESS_FUNCTIONS);
1231	VERIFY0(dmu_buf_hold_noread(os, object, offset,
1232	    FTAG, &db));
1233
1234	dmu_buf_write_embedded(db,
1235	    data, (bp_embedded_type_t)etype, (enum zio_compress)comp,
1236	    uncompressed_size, compressed_size, byteorder, tx);
1237
1238	dmu_buf_rele(db, FTAG);
1239}
1240
1241/*
1242 * DMU support for xuio
1243 */
1244kstat_t *xuio_ksp = NULL;
1245
1246int
1247dmu_xuio_init(xuio_t *xuio, int nblk)
1248{
1249	dmu_xuio_t *priv;
1250	uio_t *uio = &xuio->xu_uio;
1251
1252	uio->uio_iovcnt = nblk;
1253	uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
1254
1255	priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
1256	priv->cnt = nblk;
1257	priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
1258	priv->iovp = uio->uio_iov;
1259	XUIO_XUZC_PRIV(xuio) = priv;
1260
1261	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1262		XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
1263	else
1264		XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
1265
1266	return (0);
1267}
1268
1269void
1270dmu_xuio_fini(xuio_t *xuio)
1271{
1272	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1273	int nblk = priv->cnt;
1274
1275	kmem_free(priv->iovp, nblk * sizeof (iovec_t));
1276	kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
1277	kmem_free(priv, sizeof (dmu_xuio_t));
1278
1279	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1280		XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
1281	else
1282		XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
1283}
1284
1285/*
1286 * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
1287 * and increase priv->next by 1.
1288 */
1289int
1290dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
1291{
1292	struct iovec *iov;
1293	uio_t *uio = &xuio->xu_uio;
1294	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1295	int i = priv->next++;
1296
1297	ASSERT(i < priv->cnt);
1298	ASSERT(off + n <= arc_buf_lsize(abuf));
1299	iov = uio->uio_iov + i;
1300	iov->iov_base = (char *)abuf->b_data + off;
1301	iov->iov_len = n;
1302	priv->bufs[i] = abuf;
1303	return (0);
1304}
1305
1306int
1307dmu_xuio_cnt(xuio_t *xuio)
1308{
1309	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1310	return (priv->cnt);
1311}
1312
1313arc_buf_t *
1314dmu_xuio_arcbuf(xuio_t *xuio, int i)
1315{
1316	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1317
1318	ASSERT(i < priv->cnt);
1319	return (priv->bufs[i]);
1320}
1321
1322void
1323dmu_xuio_clear(xuio_t *xuio, int i)
1324{
1325	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1326
1327	ASSERT(i < priv->cnt);
1328	priv->bufs[i] = NULL;
1329}
1330
1331static void
1332xuio_stat_init(void)
1333{
1334	xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
1335	    KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
1336	    KSTAT_FLAG_VIRTUAL);
1337	if (xuio_ksp != NULL) {
1338		xuio_ksp->ks_data = &xuio_stats;
1339		kstat_install(xuio_ksp);
1340	}
1341}
1342
1343static void
1344xuio_stat_fini(void)
1345{
1346	if (xuio_ksp != NULL) {
1347		kstat_delete(xuio_ksp);
1348		xuio_ksp = NULL;
1349	}
1350}
1351
1352void
1353xuio_stat_wbuf_copied(void)
1354{
1355	XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1356}
1357
1358void
1359xuio_stat_wbuf_nocopy(void)
1360{
1361	XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
1362}
1363
1364#ifdef _KERNEL
1365int
1366dmu_read_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size)
1367{
1368	dmu_buf_t **dbp;
1369	int numbufs, i, err;
1370	xuio_t *xuio = NULL;
1371
1372	/*
1373	 * NB: we could do this block-at-a-time, but it's nice
1374	 * to be reading in parallel.
1375	 */
1376	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1377	    TRUE, FTAG, &numbufs, &dbp, 0);
1378	if (err)
1379		return (err);
1380
1381#ifdef UIO_XUIO
1382	if (uio->uio_extflg == UIO_XUIO)
1383		xuio = (xuio_t *)uio;
1384#endif
1385
1386	for (i = 0; i < numbufs; i++) {
1387		int tocpy;
1388		int bufoff;
1389		dmu_buf_t *db = dbp[i];
1390
1391		ASSERT(size > 0);
1392
1393		bufoff = uio->uio_loffset - db->db_offset;
1394		tocpy = (int)MIN(db->db_size - bufoff, size);
1395
1396		if (xuio) {
1397			dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
1398			arc_buf_t *dbuf_abuf = dbi->db_buf;
1399			arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
1400			err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
1401			if (!err) {
1402				uio->uio_resid -= tocpy;
1403				uio->uio_loffset += tocpy;
1404			}
1405
1406			if (abuf == dbuf_abuf)
1407				XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
1408			else
1409				XUIOSTAT_BUMP(xuiostat_rbuf_copied);
1410		} else {
1411#ifdef illumos
1412			err = uiomove((char *)db->db_data + bufoff, tocpy,
1413			    UIO_READ, uio);
1414#else
1415			err = vn_io_fault_uiomove((char *)db->db_data + bufoff,
1416			    tocpy, uio);
1417#endif
1418		}
1419		if (err)
1420			break;
1421
1422		size -= tocpy;
1423	}
1424	dmu_buf_rele_array(dbp, numbufs, FTAG);
1425
1426	return (err);
1427}
1428
1429/*
1430 * Read 'size' bytes into the uio buffer.
1431 * From object zdb->db_object.
1432 * Starting at offset uio->uio_loffset.
1433 *
1434 * If the caller already has a dbuf in the target object
1435 * (e.g. its bonus buffer), this routine is faster than dmu_read_uio(),
1436 * because we don't have to find the dnode_t for the object.
1437 */
1438int
1439dmu_read_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size)
1440{
1441	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1442	dnode_t *dn;
1443	int err;
1444
1445	if (size == 0)
1446		return (0);
1447
1448	DB_DNODE_ENTER(db);
1449	dn = DB_DNODE(db);
1450	err = dmu_read_uio_dnode(dn, uio, size);
1451	DB_DNODE_EXIT(db);
1452
1453	return (err);
1454}
1455
1456/*
1457 * Read 'size' bytes into the uio buffer.
1458 * From the specified object
1459 * Starting at offset uio->uio_loffset.
1460 */
1461int
1462dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
1463{
1464	dnode_t *dn;
1465	int err;
1466
1467	if (size == 0)
1468		return (0);
1469
1470	err = dnode_hold(os, object, FTAG, &dn);
1471	if (err)
1472		return (err);
1473
1474	err = dmu_read_uio_dnode(dn, uio, size);
1475
1476	dnode_rele(dn, FTAG);
1477
1478	return (err);
1479}
1480
1481int
1482dmu_write_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size, dmu_tx_t *tx)
1483{
1484	dmu_buf_t **dbp;
1485	int numbufs;
1486	int err = 0;
1487	int i;
1488
1489	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1490	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH);
1491	if (err)
1492		return (err);
1493
1494	for (i = 0; i < numbufs; i++) {
1495		int tocpy;
1496		int bufoff;
1497		dmu_buf_t *db = dbp[i];
1498
1499		ASSERT(size > 0);
1500
1501		bufoff = uio->uio_loffset - db->db_offset;
1502		tocpy = (int)MIN(db->db_size - bufoff, size);
1503
1504		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1505
1506		if (tocpy == db->db_size)
1507			dmu_buf_will_fill(db, tx);
1508		else
1509			dmu_buf_will_dirty(db, tx);
1510
1511#ifdef illumos
1512		/*
1513		 * XXX uiomove could block forever (eg. nfs-backed
1514		 * pages).  There needs to be a uiolockdown() function
1515		 * to lock the pages in memory, so that uiomove won't
1516		 * block.
1517		 */
1518		err = uiomove((char *)db->db_data + bufoff, tocpy,
1519		    UIO_WRITE, uio);
1520#else
1521		err = vn_io_fault_uiomove((char *)db->db_data + bufoff, tocpy,
1522		    uio);
1523#endif
1524
1525		if (tocpy == db->db_size)
1526			dmu_buf_fill_done(db, tx);
1527
1528		if (err)
1529			break;
1530
1531		size -= tocpy;
1532	}
1533
1534	dmu_buf_rele_array(dbp, numbufs, FTAG);
1535	return (err);
1536}
1537
1538/*
1539 * Write 'size' bytes from the uio buffer.
1540 * To object zdb->db_object.
1541 * Starting at offset uio->uio_loffset.
1542 *
1543 * If the caller already has a dbuf in the target object
1544 * (e.g. its bonus buffer), this routine is faster than dmu_write_uio(),
1545 * because we don't have to find the dnode_t for the object.
1546 */
1547int
1548dmu_write_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size,
1549    dmu_tx_t *tx)
1550{
1551	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1552	dnode_t *dn;
1553	int err;
1554
1555	if (size == 0)
1556		return (0);
1557
1558	DB_DNODE_ENTER(db);
1559	dn = DB_DNODE(db);
1560	err = dmu_write_uio_dnode(dn, uio, size, tx);
1561	DB_DNODE_EXIT(db);
1562
1563	return (err);
1564}
1565
1566/*
1567 * Write 'size' bytes from the uio buffer.
1568 * To the specified object.
1569 * Starting at offset uio->uio_loffset.
1570 */
1571int
1572dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
1573    dmu_tx_t *tx)
1574{
1575	dnode_t *dn;
1576	int err;
1577
1578	if (size == 0)
1579		return (0);
1580
1581	err = dnode_hold(os, object, FTAG, &dn);
1582	if (err)
1583		return (err);
1584
1585	err = dmu_write_uio_dnode(dn, uio, size, tx);
1586
1587	dnode_rele(dn, FTAG);
1588
1589	return (err);
1590}
1591
1592#ifdef illumos
1593int
1594dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1595    page_t *pp, dmu_tx_t *tx)
1596{
1597	dmu_buf_t **dbp;
1598	int numbufs, i;
1599	int err;
1600
1601	if (size == 0)
1602		return (0);
1603
1604	err = dmu_buf_hold_array(os, object, offset, size,
1605	    FALSE, FTAG, &numbufs, &dbp);
1606	if (err)
1607		return (err);
1608
1609	for (i = 0; i < numbufs; i++) {
1610		int tocpy, copied, thiscpy;
1611		int bufoff;
1612		dmu_buf_t *db = dbp[i];
1613		caddr_t va;
1614
1615		ASSERT(size > 0);
1616		ASSERT3U(db->db_size, >=, PAGESIZE);
1617
1618		bufoff = offset - db->db_offset;
1619		tocpy = (int)MIN(db->db_size - bufoff, size);
1620
1621		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1622
1623		if (tocpy == db->db_size)
1624			dmu_buf_will_fill(db, tx);
1625		else
1626			dmu_buf_will_dirty(db, tx);
1627
1628		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1629			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1630			thiscpy = MIN(PAGESIZE, tocpy - copied);
1631			va = zfs_map_page(pp, S_READ);
1632			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1633			zfs_unmap_page(pp, va);
1634			pp = pp->p_next;
1635			bufoff += PAGESIZE;
1636		}
1637
1638		if (tocpy == db->db_size)
1639			dmu_buf_fill_done(db, tx);
1640
1641		offset += tocpy;
1642		size -= tocpy;
1643	}
1644	dmu_buf_rele_array(dbp, numbufs, FTAG);
1645	return (err);
1646}
1647
1648#else	/* !illumos */
1649
1650int
1651dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1652    vm_page_t *ma, dmu_tx_t *tx)
1653{
1654	dmu_buf_t **dbp;
1655	struct sf_buf *sf;
1656	int numbufs, i;
1657	int err;
1658
1659	if (size == 0)
1660		return (0);
1661
1662	err = dmu_buf_hold_array(os, object, offset, size,
1663	    FALSE, FTAG, &numbufs, &dbp);
1664	if (err)
1665		return (err);
1666
1667	for (i = 0; i < numbufs; i++) {
1668		int tocpy, copied, thiscpy;
1669		int bufoff;
1670		dmu_buf_t *db = dbp[i];
1671		caddr_t va;
1672
1673		ASSERT(size > 0);
1674		ASSERT3U(db->db_size, >=, PAGESIZE);
1675
1676		bufoff = offset - db->db_offset;
1677		tocpy = (int)MIN(db->db_size - bufoff, size);
1678
1679		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1680
1681		if (tocpy == db->db_size)
1682			dmu_buf_will_fill(db, tx);
1683		else
1684			dmu_buf_will_dirty(db, tx);
1685
1686		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1687			ASSERT3U(ptoa((*ma)->pindex), ==, db->db_offset + bufoff);
1688			thiscpy = MIN(PAGESIZE, tocpy - copied);
1689			va = zfs_map_page(*ma, &sf);
1690			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1691			zfs_unmap_page(sf);
1692			ma += 1;
1693			bufoff += PAGESIZE;
1694		}
1695
1696		if (tocpy == db->db_size)
1697			dmu_buf_fill_done(db, tx);
1698
1699		offset += tocpy;
1700		size -= tocpy;
1701	}
1702	dmu_buf_rele_array(dbp, numbufs, FTAG);
1703	return (err);
1704}
1705
1706int
1707dmu_read_pages(objset_t *os, uint64_t object, vm_page_t *ma, int count,
1708    int *rbehind, int *rahead, int last_size)
1709{
1710	struct sf_buf *sf;
1711	vm_object_t vmobj;
1712	vm_page_t m;
1713	dmu_buf_t **dbp;
1714	dmu_buf_t *db;
1715	caddr_t va;
1716	int numbufs, i;
1717	int bufoff, pgoff, tocpy;
1718	int mi, di;
1719	int err;
1720
1721	ASSERT3U(ma[0]->pindex + count - 1, ==, ma[count - 1]->pindex);
1722	ASSERT(last_size <= PAGE_SIZE);
1723
1724	err = dmu_buf_hold_array(os, object, IDX_TO_OFF(ma[0]->pindex),
1725	    IDX_TO_OFF(count - 1) + last_size, TRUE, FTAG, &numbufs, &dbp);
1726	if (err != 0)
1727		return (err);
1728
1729#ifdef DEBUG
1730	IMPLY(last_size < PAGE_SIZE, *rahead == 0);
1731	if (dbp[0]->db_offset != 0 || numbufs > 1) {
1732		for (i = 0; i < numbufs; i++) {
1733			ASSERT(ISP2(dbp[i]->db_size));
1734			ASSERT((dbp[i]->db_offset % dbp[i]->db_size) == 0);
1735			ASSERT3U(dbp[i]->db_size, ==, dbp[0]->db_size);
1736		}
1737	}
1738#endif
1739
1740	vmobj = ma[0]->object;
1741	zfs_vmobject_wlock(vmobj);
1742
1743	db = dbp[0];
1744	for (i = 0; i < *rbehind; i++) {
1745		m = vm_page_grab(vmobj, ma[0]->pindex - 1 - i,
1746		    VM_ALLOC_NORMAL | VM_ALLOC_NOWAIT | VM_ALLOC_NOBUSY);
1747		if (m == NULL)
1748			break;
1749		if (m->valid != 0) {
1750			ASSERT3U(m->valid, ==, VM_PAGE_BITS_ALL);
1751			break;
1752		}
1753		ASSERT(m->dirty == 0);
1754		ASSERT(!pmap_page_is_mapped(m));
1755
1756		ASSERT(db->db_size > PAGE_SIZE);
1757		bufoff = IDX_TO_OFF(m->pindex) % db->db_size;
1758		va = zfs_map_page(m, &sf);
1759		bcopy((char *)db->db_data + bufoff, va, PAGESIZE);
1760		zfs_unmap_page(sf);
1761		m->valid = VM_PAGE_BITS_ALL;
1762		vm_page_lock(m);
1763		if ((m->busy_lock & VPB_BIT_WAITERS) != 0)
1764			vm_page_activate(m);
1765		else
1766			vm_page_deactivate(m);
1767		vm_page_unlock(m);
1768	}
1769	*rbehind = i;
1770
1771	bufoff = IDX_TO_OFF(ma[0]->pindex) % db->db_size;
1772	pgoff = 0;
1773	for (mi = 0, di = 0; mi < count && di < numbufs; ) {
1774		if (pgoff == 0) {
1775			m = ma[mi];
1776			if (m != bogus_page) {
1777				vm_page_assert_xbusied(m);
1778				ASSERT(m->valid == 0);
1779				ASSERT(m->dirty == 0);
1780				ASSERT(!pmap_page_is_mapped(m));
1781				va = zfs_map_page(m, &sf);
1782			}
1783		}
1784		if (bufoff == 0)
1785			db = dbp[di];
1786
1787		if (m != bogus_page) {
1788			ASSERT3U(IDX_TO_OFF(m->pindex) + pgoff, ==,
1789			    db->db_offset + bufoff);
1790		}
1791
1792		/*
1793		 * We do not need to clamp the copy size by the file
1794		 * size as the last block is zero-filled beyond the
1795		 * end of file anyway.
1796		 */
1797		tocpy = MIN(db->db_size - bufoff, PAGESIZE - pgoff);
1798		if (m != bogus_page)
1799			bcopy((char *)db->db_data + bufoff, va + pgoff, tocpy);
1800
1801		pgoff += tocpy;
1802		ASSERT(pgoff <= PAGESIZE);
1803		if (pgoff == PAGESIZE) {
1804			if (m != bogus_page) {
1805				zfs_unmap_page(sf);
1806				m->valid = VM_PAGE_BITS_ALL;
1807			}
1808			ASSERT(mi < count);
1809			mi++;
1810			pgoff = 0;
1811		}
1812
1813		bufoff += tocpy;
1814		ASSERT(bufoff <= db->db_size);
1815		if (bufoff == db->db_size) {
1816			ASSERT(di < numbufs);
1817			di++;
1818			bufoff = 0;
1819		}
1820	}
1821
1822#ifdef DEBUG
1823	/*
1824	 * Three possibilities:
1825	 * - last requested page ends at a buffer boundary and , thus,
1826	 *   all pages and buffers have been iterated;
1827	 * - all requested pages are filled, but the last buffer
1828	 *   has not been exhausted;
1829	 *   the read-ahead is possible only in this case;
1830	 * - all buffers have been read, but the last page has not been
1831	 *   fully filled;
1832	 *   this is only possible if the file has only a single buffer
1833	 *   with a size that is not a multiple of the page size.
1834	 */
1835	if (mi == count) {
1836		ASSERT(di >= numbufs - 1);
1837		IMPLY(*rahead != 0, di == numbufs - 1);
1838		IMPLY(*rahead != 0, bufoff != 0);
1839		ASSERT(pgoff == 0);
1840	}
1841	if (di == numbufs) {
1842		ASSERT(mi >= count - 1);
1843		ASSERT(*rahead == 0);
1844		IMPLY(pgoff == 0, mi == count);
1845		if (pgoff != 0) {
1846			ASSERT(mi == count - 1);
1847			ASSERT((dbp[0]->db_size & PAGE_MASK) != 0);
1848		}
1849	}
1850#endif
1851	if (pgoff != 0) {
1852		ASSERT(m != bogus_page);
1853		bzero(va + pgoff, PAGESIZE - pgoff);
1854		zfs_unmap_page(sf);
1855		m->valid = VM_PAGE_BITS_ALL;
1856	}
1857
1858	for (i = 0; i < *rahead; i++) {
1859		m = vm_page_grab(vmobj, ma[count - 1]->pindex + 1 + i,
1860		    VM_ALLOC_NORMAL | VM_ALLOC_NOWAIT | VM_ALLOC_NOBUSY);
1861		if (m == NULL)
1862			break;
1863		if (m->valid != 0) {
1864			ASSERT3U(m->valid, ==, VM_PAGE_BITS_ALL);
1865			break;
1866		}
1867		ASSERT(m->dirty == 0);
1868		ASSERT(!pmap_page_is_mapped(m));
1869
1870		ASSERT(db->db_size > PAGE_SIZE);
1871		bufoff = IDX_TO_OFF(m->pindex) % db->db_size;
1872		tocpy = MIN(db->db_size - bufoff, PAGESIZE);
1873		va = zfs_map_page(m, &sf);
1874		bcopy((char *)db->db_data + bufoff, va, tocpy);
1875		if (tocpy < PAGESIZE) {
1876			ASSERT(i == *rahead - 1);
1877			ASSERT((db->db_size & PAGE_MASK) != 0);
1878			bzero(va + tocpy, PAGESIZE - tocpy);
1879		}
1880		zfs_unmap_page(sf);
1881		m->valid = VM_PAGE_BITS_ALL;
1882		vm_page_lock(m);
1883		if ((m->busy_lock & VPB_BIT_WAITERS) != 0)
1884			vm_page_activate(m);
1885		else
1886			vm_page_deactivate(m);
1887		vm_page_unlock(m);
1888	}
1889	*rahead = i;
1890	zfs_vmobject_wunlock(vmobj);
1891
1892	dmu_buf_rele_array(dbp, numbufs, FTAG);
1893	return (0);
1894}
1895#endif	/* illumos */
1896#endif	/* _KERNEL */
1897
1898/*
1899 * Allocate a loaned anonymous arc buffer.
1900 */
1901arc_buf_t *
1902dmu_request_arcbuf(dmu_buf_t *handle, int size)
1903{
1904	dmu_buf_impl_t *db = (dmu_buf_impl_t *)handle;
1905
1906	return (arc_loan_buf(db->db_objset->os_spa, B_FALSE, size));
1907}
1908
1909/*
1910 * Free a loaned arc buffer.
1911 */
1912void
1913dmu_return_arcbuf(arc_buf_t *buf)
1914{
1915	arc_return_buf(buf, FTAG);
1916	arc_buf_destroy(buf, FTAG);
1917}
1918
1919/*
1920 * When possible directly assign passed loaned arc buffer to a dbuf.
1921 * If this is not possible copy the contents of passed arc buf via
1922 * dmu_write().
1923 */
1924void
1925dmu_assign_arcbuf_dnode(dnode_t *dn, uint64_t offset, arc_buf_t *buf,
1926    dmu_tx_t *tx)
1927{
1928	dmu_buf_impl_t *db;
1929	uint32_t blksz = (uint32_t)arc_buf_lsize(buf);
1930	uint64_t blkid;
1931
1932	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1933	blkid = dbuf_whichblock(dn, 0, offset);
1934	VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
1935	rw_exit(&dn->dn_struct_rwlock);
1936
1937	/*
1938	 * We can only assign if the offset is aligned, the arc buf is the
1939	 * same size as the dbuf, and the dbuf is not metadata.
1940	 */
1941	if (offset == db->db.db_offset && blksz == db->db.db_size) {
1942#ifdef _KERNEL
1943		curthread->td_ru.ru_oublock++;
1944#ifdef RACCT
1945		if (racct_enable) {
1946			PROC_LOCK(curproc);
1947			racct_add_force(curproc, RACCT_WRITEBPS, blksz);
1948			racct_add_force(curproc, RACCT_WRITEIOPS, 1);
1949			PROC_UNLOCK(curproc);
1950		}
1951#endif /* RACCT */
1952#endif /* _KERNEL */
1953		dbuf_assign_arcbuf(db, buf, tx);
1954		dbuf_rele(db, FTAG);
1955	} else {
1956		objset_t *os;
1957		uint64_t object;
1958
1959		/* compressed bufs must always be assignable to their dbuf */
1960		ASSERT3U(arc_get_compression(buf), ==, ZIO_COMPRESS_OFF);
1961		ASSERT(!(buf->b_flags & ARC_BUF_FLAG_COMPRESSED));
1962
1963		os = dn->dn_objset;
1964		object = dn->dn_object;
1965
1966		dbuf_rele(db, FTAG);
1967		dmu_write(os, object, offset, blksz, buf->b_data, tx);
1968		dmu_return_arcbuf(buf);
1969		XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1970	}
1971}
1972
1973void
1974dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1975    dmu_tx_t *tx)
1976{
1977	dmu_buf_impl_t *dbuf = (dmu_buf_impl_t *)handle;
1978
1979	DB_DNODE_ENTER(dbuf);
1980	dmu_assign_arcbuf_dnode(DB_DNODE(dbuf), offset, buf, tx);
1981	DB_DNODE_EXIT(dbuf);
1982}
1983
1984typedef struct {
1985	dbuf_dirty_record_t	*dsa_dr;
1986	dmu_sync_cb_t		*dsa_done;
1987	zgd_t			*dsa_zgd;
1988	dmu_tx_t		*dsa_tx;
1989} dmu_sync_arg_t;
1990
1991/* ARGSUSED */
1992static void
1993dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1994{
1995	dmu_sync_arg_t *dsa = varg;
1996	dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1997	blkptr_t *bp = zio->io_bp;
1998
1999	if (zio->io_error == 0) {
2000		if (BP_IS_HOLE(bp)) {
2001			/*
2002			 * A block of zeros may compress to a hole, but the
2003			 * block size still needs to be known for replay.
2004			 */
2005			BP_SET_LSIZE(bp, db->db_size);
2006		} else if (!BP_IS_EMBEDDED(bp)) {
2007			ASSERT(BP_GET_LEVEL(bp) == 0);
2008			bp->blk_fill = 1;
2009		}
2010	}
2011}
2012
2013static void
2014dmu_sync_late_arrival_ready(zio_t *zio)
2015{
2016	dmu_sync_ready(zio, NULL, zio->io_private);
2017}
2018
2019/* ARGSUSED */
2020static void
2021dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
2022{
2023	dmu_sync_arg_t *dsa = varg;
2024	dbuf_dirty_record_t *dr = dsa->dsa_dr;
2025	dmu_buf_impl_t *db = dr->dr_dbuf;
2026	zgd_t *zgd = dsa->dsa_zgd;
2027
2028	/*
2029	 * Record the vdev(s) backing this blkptr so they can be flushed after
2030	 * the writes for the lwb have completed.
2031	 */
2032	if (zio->io_error == 0) {
2033		zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
2034	}
2035
2036	mutex_enter(&db->db_mtx);
2037	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
2038	if (zio->io_error == 0) {
2039		dr->dt.dl.dr_nopwrite = !!(zio->io_flags & ZIO_FLAG_NOPWRITE);
2040		if (dr->dt.dl.dr_nopwrite) {
2041			blkptr_t *bp = zio->io_bp;
2042			blkptr_t *bp_orig = &zio->io_bp_orig;
2043			uint8_t chksum = BP_GET_CHECKSUM(bp_orig);
2044
2045			ASSERT(BP_EQUAL(bp, bp_orig));
2046			VERIFY(BP_EQUAL(bp, db->db_blkptr));
2047			ASSERT(zio->io_prop.zp_compress != ZIO_COMPRESS_OFF);
2048			ASSERT(zio_checksum_table[chksum].ci_flags &
2049			    ZCHECKSUM_FLAG_NOPWRITE);
2050		}
2051		dr->dt.dl.dr_overridden_by = *zio->io_bp;
2052		dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
2053		dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
2054
2055		/*
2056		 * Old style holes are filled with all zeros, whereas
2057		 * new-style holes maintain their lsize, type, level,
2058		 * and birth time (see zio_write_compress). While we
2059		 * need to reset the BP_SET_LSIZE() call that happened
2060		 * in dmu_sync_ready for old style holes, we do *not*
2061		 * want to wipe out the information contained in new
2062		 * style holes. Thus, only zero out the block pointer if
2063		 * it's an old style hole.
2064		 */
2065		if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by) &&
2066		    dr->dt.dl.dr_overridden_by.blk_birth == 0)
2067			BP_ZERO(&dr->dt.dl.dr_overridden_by);
2068	} else {
2069		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
2070	}
2071	cv_broadcast(&db->db_changed);
2072	mutex_exit(&db->db_mtx);
2073
2074	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
2075
2076	kmem_free(dsa, sizeof (*dsa));
2077}
2078
2079static void
2080dmu_sync_late_arrival_done(zio_t *zio)
2081{
2082	blkptr_t *bp = zio->io_bp;
2083	dmu_sync_arg_t *dsa = zio->io_private;
2084	blkptr_t *bp_orig = &zio->io_bp_orig;
2085	zgd_t *zgd = dsa->dsa_zgd;
2086
2087	if (zio->io_error == 0) {
2088		/*
2089		 * Record the vdev(s) backing this blkptr so they can be
2090		 * flushed after the writes for the lwb have completed.
2091		 */
2092		zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
2093
2094		if (!BP_IS_HOLE(bp)) {
2095			ASSERT(!(zio->io_flags & ZIO_FLAG_NOPWRITE));
2096			ASSERT(BP_IS_HOLE(bp_orig) || !BP_EQUAL(bp, bp_orig));
2097			ASSERT(zio->io_bp->blk_birth == zio->io_txg);
2098			ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
2099			zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
2100		}
2101	}
2102
2103	dmu_tx_commit(dsa->dsa_tx);
2104
2105	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
2106
2107	abd_put(zio->io_abd);
2108	kmem_free(dsa, sizeof (*dsa));
2109}
2110
2111static int
2112dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
2113    zio_prop_t *zp, zbookmark_phys_t *zb)
2114{
2115	dmu_sync_arg_t *dsa;
2116	dmu_tx_t *tx;
2117
2118	tx = dmu_tx_create(os);
2119	dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
2120	if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
2121		dmu_tx_abort(tx);
2122		/* Make zl_get_data do txg_waited_synced() */
2123		return (SET_ERROR(EIO));
2124	}
2125
2126	/*
2127	 * In order to prevent the zgd's lwb from being free'd prior to
2128	 * dmu_sync_late_arrival_done() being called, we have to ensure
2129	 * the lwb's "max txg" takes this tx's txg into account.
2130	 */
2131	zil_lwb_add_txg(zgd->zgd_lwb, dmu_tx_get_txg(tx));
2132
2133	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
2134	dsa->dsa_dr = NULL;
2135	dsa->dsa_done = done;
2136	dsa->dsa_zgd = zgd;
2137	dsa->dsa_tx = tx;
2138
2139	/*
2140	 * Since we are currently syncing this txg, it's nontrivial to
2141	 * determine what BP to nopwrite against, so we disable nopwrite.
2142	 *
2143	 * When syncing, the db_blkptr is initially the BP of the previous
2144	 * txg.  We can not nopwrite against it because it will be changed
2145	 * (this is similar to the non-late-arrival case where the dbuf is
2146	 * dirty in a future txg).
2147	 *
2148	 * Then dbuf_write_ready() sets bp_blkptr to the location we will write.
2149	 * We can not nopwrite against it because although the BP will not
2150	 * (typically) be changed, the data has not yet been persisted to this
2151	 * location.
2152	 *
2153	 * Finally, when dbuf_write_done() is called, it is theoretically
2154	 * possible to always nopwrite, because the data that was written in
2155	 * this txg is the same data that we are trying to write.  However we
2156	 * would need to check that this dbuf is not dirty in any future
2157	 * txg's (as we do in the normal dmu_sync() path). For simplicity, we
2158	 * don't nopwrite in this case.
2159	 */
2160	zp->zp_nopwrite = B_FALSE;
2161
2162	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
2163	    abd_get_from_buf(zgd->zgd_db->db_data, zgd->zgd_db->db_size),
2164	    zgd->zgd_db->db_size, zgd->zgd_db->db_size, zp,
2165	    dmu_sync_late_arrival_ready, NULL, NULL, dmu_sync_late_arrival_done,
2166	    dsa, ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb));
2167
2168	return (0);
2169}
2170
2171/*
2172 * Intent log support: sync the block associated with db to disk.
2173 * N.B. and XXX: the caller is responsible for making sure that the
2174 * data isn't changing while dmu_sync() is writing it.
2175 *
2176 * Return values:
2177 *
2178 *	EEXIST: this txg has already been synced, so there's nothing to do.
2179 *		The caller should not log the write.
2180 *
2181 *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
2182 *		The caller should not log the write.
2183 *
2184 *	EALREADY: this block is already in the process of being synced.
2185 *		The caller should track its progress (somehow).
2186 *
2187 *	EIO: could not do the I/O.
2188 *		The caller should do a txg_wait_synced().
2189 *
2190 *	0: the I/O has been initiated.
2191 *		The caller should log this blkptr in the done callback.
2192 *		It is possible that the I/O will fail, in which case
2193 *		the error will be reported to the done callback and
2194 *		propagated to pio from zio_done().
2195 */
2196int
2197dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
2198{
2199	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
2200	objset_t *os = db->db_objset;
2201	dsl_dataset_t *ds = os->os_dsl_dataset;
2202	dbuf_dirty_record_t *dr;
2203	dmu_sync_arg_t *dsa;
2204	zbookmark_phys_t zb;
2205	zio_prop_t zp;
2206	dnode_t *dn;
2207
2208	ASSERT(pio != NULL);
2209	ASSERT(txg != 0);
2210
2211	SET_BOOKMARK(&zb, ds->ds_object,
2212	    db->db.db_object, db->db_level, db->db_blkid);
2213
2214	DB_DNODE_ENTER(db);
2215	dn = DB_DNODE(db);
2216	dmu_write_policy(os, dn, db->db_level, WP_DMU_SYNC, &zp);
2217	DB_DNODE_EXIT(db);
2218
2219	/*
2220	 * If we're frozen (running ziltest), we always need to generate a bp.
2221	 */
2222	if (txg > spa_freeze_txg(os->os_spa))
2223		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
2224
2225	/*
2226	 * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
2227	 * and us.  If we determine that this txg is not yet syncing,
2228	 * but it begins to sync a moment later, that's OK because the
2229	 * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
2230	 */
2231	mutex_enter(&db->db_mtx);
2232
2233	if (txg <= spa_last_synced_txg(os->os_spa)) {
2234		/*
2235		 * This txg has already synced.  There's nothing to do.
2236		 */
2237		mutex_exit(&db->db_mtx);
2238		return (SET_ERROR(EEXIST));
2239	}
2240
2241	if (txg <= spa_syncing_txg(os->os_spa)) {
2242		/*
2243		 * This txg is currently syncing, so we can't mess with
2244		 * the dirty record anymore; just write a new log block.
2245		 */
2246		mutex_exit(&db->db_mtx);
2247		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
2248	}
2249
2250	dr = db->db_last_dirty;
2251	while (dr && dr->dr_txg != txg)
2252		dr = dr->dr_next;
2253
2254	if (dr == NULL) {
2255		/*
2256		 * There's no dr for this dbuf, so it must have been freed.
2257		 * There's no need to log writes to freed blocks, so we're done.
2258		 */
2259		mutex_exit(&db->db_mtx);
2260		return (SET_ERROR(ENOENT));
2261	}
2262
2263	ASSERT(dr->dr_next == NULL || dr->dr_next->dr_txg < txg);
2264
2265	if (db->db_blkptr != NULL) {
2266		/*
2267		 * We need to fill in zgd_bp with the current blkptr so that
2268		 * the nopwrite code can check if we're writing the same
2269		 * data that's already on disk.  We can only nopwrite if we
2270		 * are sure that after making the copy, db_blkptr will not
2271		 * change until our i/o completes.  We ensure this by
2272		 * holding the db_mtx, and only allowing nopwrite if the
2273		 * block is not already dirty (see below).  This is verified
2274		 * by dmu_sync_done(), which VERIFYs that the db_blkptr has
2275		 * not changed.
2276		 */
2277		*zgd->zgd_bp = *db->db_blkptr;
2278	}
2279
2280	/*
2281	 * Assume the on-disk data is X, the current syncing data (in
2282	 * txg - 1) is Y, and the current in-memory data is Z (currently
2283	 * in dmu_sync).
2284	 *
2285	 * We usually want to perform a nopwrite if X and Z are the
2286	 * same.  However, if Y is different (i.e. the BP is going to
2287	 * change before this write takes effect), then a nopwrite will
2288	 * be incorrect - we would override with X, which could have
2289	 * been freed when Y was written.
2290	 *
2291	 * (Note that this is not a concern when we are nop-writing from
2292	 * syncing context, because X and Y must be identical, because
2293	 * all previous txgs have been synced.)
2294	 *
2295	 * Therefore, we disable nopwrite if the current BP could change
2296	 * before this TXG.  There are two ways it could change: by
2297	 * being dirty (dr_next is non-NULL), or by being freed
2298	 * (dnode_block_freed()).  This behavior is verified by
2299	 * zio_done(), which VERIFYs that the override BP is identical
2300	 * to the on-disk BP.
2301	 */
2302	DB_DNODE_ENTER(db);
2303	dn = DB_DNODE(db);
2304	if (dr->dr_next != NULL || dnode_block_freed(dn, db->db_blkid))
2305		zp.zp_nopwrite = B_FALSE;
2306	DB_DNODE_EXIT(db);
2307
2308	ASSERT(dr->dr_txg == txg);
2309	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
2310	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
2311		/*
2312		 * We have already issued a sync write for this buffer,
2313		 * or this buffer has already been synced.  It could not
2314		 * have been dirtied since, or we would have cleared the state.
2315		 */
2316		mutex_exit(&db->db_mtx);
2317		return (SET_ERROR(EALREADY));
2318	}
2319
2320	ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
2321	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
2322	mutex_exit(&db->db_mtx);
2323
2324	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
2325	dsa->dsa_dr = dr;
2326	dsa->dsa_done = done;
2327	dsa->dsa_zgd = zgd;
2328	dsa->dsa_tx = NULL;
2329
2330	zio_nowait(arc_write(pio, os->os_spa, txg,
2331	    zgd->zgd_bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db),
2332	    &zp, dmu_sync_ready, NULL, NULL, dmu_sync_done, dsa,
2333	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
2334
2335	return (0);
2336}
2337
2338int
2339dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
2340    dmu_tx_t *tx)
2341{
2342	dnode_t *dn;
2343	int err;
2344
2345	err = dnode_hold(os, object, FTAG, &dn);
2346	if (err)
2347		return (err);
2348	err = dnode_set_blksz(dn, size, ibs, tx);
2349	dnode_rele(dn, FTAG);
2350	return (err);
2351}
2352
2353void
2354dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
2355    dmu_tx_t *tx)
2356{
2357	dnode_t *dn;
2358
2359	/*
2360	 * Send streams include each object's checksum function.  This
2361	 * check ensures that the receiving system can understand the
2362	 * checksum function transmitted.
2363	 */
2364	ASSERT3U(checksum, <, ZIO_CHECKSUM_LEGACY_FUNCTIONS);
2365
2366	VERIFY0(dnode_hold(os, object, FTAG, &dn));
2367	ASSERT3U(checksum, <, ZIO_CHECKSUM_FUNCTIONS);
2368	dn->dn_checksum = checksum;
2369	dnode_setdirty(dn, tx);
2370	dnode_rele(dn, FTAG);
2371}
2372
2373void
2374dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
2375    dmu_tx_t *tx)
2376{
2377	dnode_t *dn;
2378
2379	/*
2380	 * Send streams include each object's compression function.  This
2381	 * check ensures that the receiving system can understand the
2382	 * compression function transmitted.
2383	 */
2384	ASSERT3U(compress, <, ZIO_COMPRESS_LEGACY_FUNCTIONS);
2385
2386	VERIFY0(dnode_hold(os, object, FTAG, &dn));
2387	dn->dn_compress = compress;
2388	dnode_setdirty(dn, tx);
2389	dnode_rele(dn, FTAG);
2390}
2391
2392int zfs_mdcomp_disable = 0;
2393SYSCTL_INT(_vfs_zfs, OID_AUTO, mdcomp_disable, CTLFLAG_RWTUN,
2394    &zfs_mdcomp_disable, 0, "Disable metadata compression");
2395
2396/*
2397 * When the "redundant_metadata" property is set to "most", only indirect
2398 * blocks of this level and higher will have an additional ditto block.
2399 */
2400int zfs_redundant_metadata_most_ditto_level = 2;
2401
2402void
2403dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
2404{
2405	dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
2406	boolean_t ismd = (level > 0 || DMU_OT_IS_METADATA(type) ||
2407	    (wp & WP_SPILL));
2408	enum zio_checksum checksum = os->os_checksum;
2409	enum zio_compress compress = os->os_compress;
2410	enum zio_checksum dedup_checksum = os->os_dedup_checksum;
2411	boolean_t dedup = B_FALSE;
2412	boolean_t nopwrite = B_FALSE;
2413	boolean_t dedup_verify = os->os_dedup_verify;
2414	int copies = os->os_copies;
2415
2416	/*
2417	 * We maintain different write policies for each of the following
2418	 * types of data:
2419	 *	 1. metadata
2420	 *	 2. preallocated blocks (i.e. level-0 blocks of a dump device)
2421	 *	 3. all other level 0 blocks
2422	 */
2423	if (ismd) {
2424		if (zfs_mdcomp_disable) {
2425			compress = ZIO_COMPRESS_EMPTY;
2426		} else {
2427			/*
2428			 * XXX -- we should design a compression algorithm
2429			 * that specializes in arrays of bps.
2430			 */
2431			compress = zio_compress_select(os->os_spa,
2432			    ZIO_COMPRESS_ON, ZIO_COMPRESS_ON);
2433		}
2434
2435		/*
2436		 * Metadata always gets checksummed.  If the data
2437		 * checksum is multi-bit correctable, and it's not a
2438		 * ZBT-style checksum, then it's suitable for metadata
2439		 * as well.  Otherwise, the metadata checksum defaults
2440		 * to fletcher4.
2441		 */
2442		if (!(zio_checksum_table[checksum].ci_flags &
2443		    ZCHECKSUM_FLAG_METADATA) ||
2444		    (zio_checksum_table[checksum].ci_flags &
2445		    ZCHECKSUM_FLAG_EMBEDDED))
2446			checksum = ZIO_CHECKSUM_FLETCHER_4;
2447
2448		if (os->os_redundant_metadata == ZFS_REDUNDANT_METADATA_ALL ||
2449		    (os->os_redundant_metadata ==
2450		    ZFS_REDUNDANT_METADATA_MOST &&
2451		    (level >= zfs_redundant_metadata_most_ditto_level ||
2452		    DMU_OT_IS_METADATA(type) || (wp & WP_SPILL))))
2453			copies++;
2454	} else if (wp & WP_NOFILL) {
2455		ASSERT(level == 0);
2456
2457		/*
2458		 * If we're writing preallocated blocks, we aren't actually
2459		 * writing them so don't set any policy properties.  These
2460		 * blocks are currently only used by an external subsystem
2461		 * outside of zfs (i.e. dump) and not written by the zio
2462		 * pipeline.
2463		 */
2464		compress = ZIO_COMPRESS_OFF;
2465		checksum = ZIO_CHECKSUM_NOPARITY;
2466	} else {
2467		compress = zio_compress_select(os->os_spa, dn->dn_compress,
2468		    compress);
2469
2470		checksum = (dedup_checksum == ZIO_CHECKSUM_OFF) ?
2471		    zio_checksum_select(dn->dn_checksum, checksum) :
2472		    dedup_checksum;
2473
2474		/*
2475		 * Determine dedup setting.  If we are in dmu_sync(),
2476		 * we won't actually dedup now because that's all
2477		 * done in syncing context; but we do want to use the
2478		 * dedup checkum.  If the checksum is not strong
2479		 * enough to ensure unique signatures, force
2480		 * dedup_verify.
2481		 */
2482		if (dedup_checksum != ZIO_CHECKSUM_OFF) {
2483			dedup = (wp & WP_DMU_SYNC) ? B_FALSE : B_TRUE;
2484			if (!(zio_checksum_table[checksum].ci_flags &
2485			    ZCHECKSUM_FLAG_DEDUP))
2486				dedup_verify = B_TRUE;
2487		}
2488
2489		/*
2490		 * Enable nopwrite if we have secure enough checksum
2491		 * algorithm (see comment in zio_nop_write) and
2492		 * compression is enabled.  We don't enable nopwrite if
2493		 * dedup is enabled as the two features are mutually
2494		 * exclusive.
2495		 */
2496		nopwrite = (!dedup && (zio_checksum_table[checksum].ci_flags &
2497		    ZCHECKSUM_FLAG_NOPWRITE) &&
2498		    compress != ZIO_COMPRESS_OFF && zfs_nopwrite_enabled);
2499	}
2500
2501	zp->zp_checksum = checksum;
2502	zp->zp_compress = compress;
2503	ASSERT3U(zp->zp_compress, !=, ZIO_COMPRESS_INHERIT);
2504
2505	zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
2506	zp->zp_level = level;
2507	zp->zp_copies = MIN(copies, spa_max_replication(os->os_spa));
2508	zp->zp_dedup = dedup;
2509	zp->zp_dedup_verify = dedup && dedup_verify;
2510	zp->zp_nopwrite = nopwrite;
2511	zp->zp_zpl_smallblk = DMU_OT_IS_FILE(zp->zp_type) ?
2512	    os->os_zpl_special_smallblock : 0;
2513}
2514
2515int
2516dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
2517{
2518	dnode_t *dn;
2519	int err;
2520
2521	/*
2522	 * Sync any current changes before
2523	 * we go trundling through the block pointers.
2524	 */
2525	err = dmu_object_wait_synced(os, object);
2526	if (err) {
2527		return (err);
2528	}
2529
2530	err = dnode_hold(os, object, FTAG, &dn);
2531	if (err) {
2532		return (err);
2533	}
2534
2535	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
2536	dnode_rele(dn, FTAG);
2537
2538	return (err);
2539}
2540
2541/*
2542 * Given the ZFS object, if it contains any dirty nodes
2543 * this function flushes all dirty blocks to disk. This
2544 * ensures the DMU object info is updated. A more efficient
2545 * future version might just find the TXG with the maximum
2546 * ID and wait for that to be synced.
2547 */
2548int
2549dmu_object_wait_synced(objset_t *os, uint64_t object)
2550{
2551	dnode_t *dn;
2552	int error, i;
2553
2554	error = dnode_hold(os, object, FTAG, &dn);
2555	if (error) {
2556		return (error);
2557	}
2558
2559	for (i = 0; i < TXG_SIZE; i++) {
2560		if (list_link_active(&dn->dn_dirty_link[i])) {
2561			break;
2562		}
2563	}
2564	dnode_rele(dn, FTAG);
2565	if (i != TXG_SIZE) {
2566		txg_wait_synced(dmu_objset_pool(os), 0);
2567	}
2568
2569	return (0);
2570}
2571
2572void
2573__dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
2574{
2575	dnode_phys_t *dnp = dn->dn_phys;
2576
2577	doi->doi_data_block_size = dn->dn_datablksz;
2578	doi->doi_metadata_block_size = dn->dn_indblkshift ?
2579	    1ULL << dn->dn_indblkshift : 0;
2580	doi->doi_type = dn->dn_type;
2581	doi->doi_bonus_type = dn->dn_bonustype;
2582	doi->doi_bonus_size = dn->dn_bonuslen;
2583	doi->doi_dnodesize = dn->dn_num_slots << DNODE_SHIFT;
2584	doi->doi_indirection = dn->dn_nlevels;
2585	doi->doi_checksum = dn->dn_checksum;
2586	doi->doi_compress = dn->dn_compress;
2587	doi->doi_nblkptr = dn->dn_nblkptr;
2588	doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
2589	doi->doi_max_offset = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
2590	doi->doi_fill_count = 0;
2591	for (int i = 0; i < dnp->dn_nblkptr; i++)
2592		doi->doi_fill_count += BP_GET_FILL(&dnp->dn_blkptr[i]);
2593}
2594
2595void
2596dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
2597{
2598	rw_enter(&dn->dn_struct_rwlock, RW_READER);
2599	mutex_enter(&dn->dn_mtx);
2600
2601	__dmu_object_info_from_dnode(dn, doi);
2602
2603	mutex_exit(&dn->dn_mtx);
2604	rw_exit(&dn->dn_struct_rwlock);
2605}
2606
2607/*
2608 * Get information on a DMU object.
2609 * If doi is NULL, just indicates whether the object exists.
2610 */
2611int
2612dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
2613{
2614	dnode_t *dn;
2615	int err = dnode_hold(os, object, FTAG, &dn);
2616
2617	if (err)
2618		return (err);
2619
2620	if (doi != NULL)
2621		dmu_object_info_from_dnode(dn, doi);
2622
2623	dnode_rele(dn, FTAG);
2624	return (0);
2625}
2626
2627/*
2628 * As above, but faster; can be used when you have a held dbuf in hand.
2629 */
2630void
2631dmu_object_info_from_db(dmu_buf_t *db_fake, dmu_object_info_t *doi)
2632{
2633	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2634
2635	DB_DNODE_ENTER(db);
2636	dmu_object_info_from_dnode(DB_DNODE(db), doi);
2637	DB_DNODE_EXIT(db);
2638}
2639
2640/*
2641 * Faster still when you only care about the size.
2642 * This is specifically optimized for zfs_getattr().
2643 */
2644void
2645dmu_object_size_from_db(dmu_buf_t *db_fake, uint32_t *blksize,
2646    u_longlong_t *nblk512)
2647{
2648	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2649	dnode_t *dn;
2650
2651	DB_DNODE_ENTER(db);
2652	dn = DB_DNODE(db);
2653
2654	*blksize = dn->dn_datablksz;
2655	/* add in number of slots used for the dnode itself */
2656	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
2657	    SPA_MINBLOCKSHIFT) + dn->dn_num_slots;
2658	DB_DNODE_EXIT(db);
2659}
2660
2661void
2662dmu_object_dnsize_from_db(dmu_buf_t *db_fake, int *dnsize)
2663{
2664	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2665	dnode_t *dn;
2666
2667	DB_DNODE_ENTER(db);
2668	dn = DB_DNODE(db);
2669	*dnsize = dn->dn_num_slots << DNODE_SHIFT;
2670	DB_DNODE_EXIT(db);
2671}
2672
2673void
2674byteswap_uint64_array(void *vbuf, size_t size)
2675{
2676	uint64_t *buf = vbuf;
2677	size_t count = size >> 3;
2678	int i;
2679
2680	ASSERT((size & 7) == 0);
2681
2682	for (i = 0; i < count; i++)
2683		buf[i] = BSWAP_64(buf[i]);
2684}
2685
2686void
2687byteswap_uint32_array(void *vbuf, size_t size)
2688{
2689	uint32_t *buf = vbuf;
2690	size_t count = size >> 2;
2691	int i;
2692
2693	ASSERT((size & 3) == 0);
2694
2695	for (i = 0; i < count; i++)
2696		buf[i] = BSWAP_32(buf[i]);
2697}
2698
2699void
2700byteswap_uint16_array(void *vbuf, size_t size)
2701{
2702	uint16_t *buf = vbuf;
2703	size_t count = size >> 1;
2704	int i;
2705
2706	ASSERT((size & 1) == 0);
2707
2708	for (i = 0; i < count; i++)
2709		buf[i] = BSWAP_16(buf[i]);
2710}
2711
2712/* ARGSUSED */
2713void
2714byteswap_uint8_array(void *vbuf, size_t size)
2715{
2716}
2717
2718void
2719dmu_init(void)
2720{
2721	abd_init();
2722	zfs_dbgmsg_init();
2723	sa_cache_init();
2724	xuio_stat_init();
2725	dmu_objset_init();
2726	dnode_init();
2727	zfetch_init();
2728	zio_compress_init();
2729	l2arc_init();
2730	arc_init();
2731	dbuf_init();
2732}
2733
2734void
2735dmu_fini(void)
2736{
2737	arc_fini(); /* arc depends on l2arc, so arc must go first */
2738	l2arc_fini();
2739	zfetch_fini();
2740	zio_compress_fini();
2741	dbuf_fini();
2742	dnode_fini();
2743	dmu_objset_fini();
2744	xuio_stat_fini();
2745	sa_cache_fini();
2746	zfs_dbgmsg_fini();
2747	abd_fini();
2748}
2749