1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
24 */
25/* Copyright (c) 2013 by Saso Kiselkov. All rights reserved. */
26/* Copyright (c) 2013, Joyent, Inc. All rights reserved. */
27/* Copyright (c) 2014, Nexenta Systems, Inc. All rights reserved. */
28
29#include <sys/dmu.h>
30#include <sys/dmu_impl.h>
31#include <sys/dmu_tx.h>
32#include <sys/dbuf.h>
33#include <sys/dnode.h>
34#include <sys/zfs_context.h>
35#include <sys/dmu_objset.h>
36#include <sys/dmu_traverse.h>
37#include <sys/dsl_dataset.h>
38#include <sys/dsl_dir.h>
39#include <sys/dsl_pool.h>
40#include <sys/dsl_synctask.h>
41#include <sys/dsl_prop.h>
42#include <sys/dmu_zfetch.h>
43#include <sys/zfs_ioctl.h>
44#include <sys/zap.h>
45#include <sys/zio_checksum.h>
46#include <sys/zio_compress.h>
47#include <sys/sa.h>
48#include <sys/zfeature.h>
49#ifdef _KERNEL
50#include <sys/racct.h>
51#include <sys/vm.h>
52#include <sys/zfs_znode.h>
53#endif
54
55/*
56 * Enable/disable nopwrite feature.
57 */
58int zfs_nopwrite_enabled = 1;
59SYSCTL_DECL(_vfs_zfs);
60SYSCTL_INT(_vfs_zfs, OID_AUTO, nopwrite_enabled, CTLFLAG_RDTUN,
61    &zfs_nopwrite_enabled, 0, "Enable nopwrite feature");
62
63/*
64 * Tunable to control percentage of dirtied blocks from frees in one TXG.
65 * After this threshold is crossed, additional dirty blocks from frees
66 * wait until the next TXG.
67 * A value of zero will disable this throttle.
68 */
69uint32_t zfs_per_txg_dirty_frees_percent = 30;
70SYSCTL_INT(_vfs_zfs, OID_AUTO, per_txg_dirty_frees_percent, CTLFLAG_RWTUN,
71	&zfs_per_txg_dirty_frees_percent, 0, "Percentage of dirtied blocks from frees in one txg");
72
73const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
74	{	DMU_BSWAP_UINT8,	TRUE,	"unallocated"		},
75	{	DMU_BSWAP_ZAP,		TRUE,	"object directory"	},
76	{	DMU_BSWAP_UINT64,	TRUE,	"object array"		},
77	{	DMU_BSWAP_UINT8,	TRUE,	"packed nvlist"		},
78	{	DMU_BSWAP_UINT64,	TRUE,	"packed nvlist size"	},
79	{	DMU_BSWAP_UINT64,	TRUE,	"bpobj"			},
80	{	DMU_BSWAP_UINT64,	TRUE,	"bpobj header"		},
81	{	DMU_BSWAP_UINT64,	TRUE,	"SPA space map header"	},
82	{	DMU_BSWAP_UINT64,	TRUE,	"SPA space map"		},
83	{	DMU_BSWAP_UINT64,	TRUE,	"ZIL intent log"	},
84	{	DMU_BSWAP_DNODE,	TRUE,	"DMU dnode"		},
85	{	DMU_BSWAP_OBJSET,	TRUE,	"DMU objset"		},
86	{	DMU_BSWAP_UINT64,	TRUE,	"DSL directory"		},
87	{	DMU_BSWAP_ZAP,		TRUE,	"DSL directory child map"},
88	{	DMU_BSWAP_ZAP,		TRUE,	"DSL dataset snap map"	},
89	{	DMU_BSWAP_ZAP,		TRUE,	"DSL props"		},
90	{	DMU_BSWAP_UINT64,	TRUE,	"DSL dataset"		},
91	{	DMU_BSWAP_ZNODE,	TRUE,	"ZFS znode"		},
92	{	DMU_BSWAP_OLDACL,	TRUE,	"ZFS V0 ACL"		},
93	{	DMU_BSWAP_UINT8,	FALSE,	"ZFS plain file"	},
94	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS directory"		},
95	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS master node"	},
96	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS delete queue"	},
97	{	DMU_BSWAP_UINT8,	FALSE,	"zvol object"		},
98	{	DMU_BSWAP_ZAP,		TRUE,	"zvol prop"		},
99	{	DMU_BSWAP_UINT8,	FALSE,	"other uint8[]"		},
100	{	DMU_BSWAP_UINT64,	FALSE,	"other uint64[]"	},
101	{	DMU_BSWAP_ZAP,		TRUE,	"other ZAP"		},
102	{	DMU_BSWAP_ZAP,		TRUE,	"persistent error log"	},
103	{	DMU_BSWAP_UINT8,	TRUE,	"SPA history"		},
104	{	DMU_BSWAP_UINT64,	TRUE,	"SPA history offsets"	},
105	{	DMU_BSWAP_ZAP,		TRUE,	"Pool properties"	},
106	{	DMU_BSWAP_ZAP,		TRUE,	"DSL permissions"	},
107	{	DMU_BSWAP_ACL,		TRUE,	"ZFS ACL"		},
108	{	DMU_BSWAP_UINT8,	TRUE,	"ZFS SYSACL"		},
109	{	DMU_BSWAP_UINT8,	TRUE,	"FUID table"		},
110	{	DMU_BSWAP_UINT64,	TRUE,	"FUID table size"	},
111	{	DMU_BSWAP_ZAP,		TRUE,	"DSL dataset next clones"},
112	{	DMU_BSWAP_ZAP,		TRUE,	"scan work queue"	},
113	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS user/group used"	},
114	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS user/group quota"	},
115	{	DMU_BSWAP_ZAP,		TRUE,	"snapshot refcount tags"},
116	{	DMU_BSWAP_ZAP,		TRUE,	"DDT ZAP algorithm"	},
117	{	DMU_BSWAP_ZAP,		TRUE,	"DDT statistics"	},
118	{	DMU_BSWAP_UINT8,	TRUE,	"System attributes"	},
119	{	DMU_BSWAP_ZAP,		TRUE,	"SA master node"	},
120	{	DMU_BSWAP_ZAP,		TRUE,	"SA attr registration"	},
121	{	DMU_BSWAP_ZAP,		TRUE,	"SA attr layouts"	},
122	{	DMU_BSWAP_ZAP,		TRUE,	"scan translations"	},
123	{	DMU_BSWAP_UINT8,	FALSE,	"deduplicated block"	},
124	{	DMU_BSWAP_ZAP,		TRUE,	"DSL deadlist map"	},
125	{	DMU_BSWAP_UINT64,	TRUE,	"DSL deadlist map hdr"	},
126	{	DMU_BSWAP_ZAP,		TRUE,	"DSL dir clones"	},
127	{	DMU_BSWAP_UINT64,	TRUE,	"bpobj subobj"		}
128};
129
130const dmu_object_byteswap_info_t dmu_ot_byteswap[DMU_BSWAP_NUMFUNCS] = {
131	{	byteswap_uint8_array,	"uint8"		},
132	{	byteswap_uint16_array,	"uint16"	},
133	{	byteswap_uint32_array,	"uint32"	},
134	{	byteswap_uint64_array,	"uint64"	},
135	{	zap_byteswap,		"zap"		},
136	{	dnode_buf_byteswap,	"dnode"		},
137	{	dmu_objset_byteswap,	"objset"	},
138	{	zfs_znode_byteswap,	"znode"		},
139	{	zfs_oldacl_byteswap,	"oldacl"	},
140	{	zfs_acl_byteswap,	"acl"		}
141};
142
143int
144dmu_buf_hold_noread_by_dnode(dnode_t *dn, uint64_t offset,
145    void *tag, dmu_buf_t **dbp)
146{
147	uint64_t blkid;
148	dmu_buf_impl_t *db;
149
150	blkid = dbuf_whichblock(dn, 0, offset);
151	rw_enter(&dn->dn_struct_rwlock, RW_READER);
152	db = dbuf_hold(dn, blkid, tag);
153	rw_exit(&dn->dn_struct_rwlock);
154
155	if (db == NULL) {
156		*dbp = NULL;
157		return (SET_ERROR(EIO));
158	}
159
160	*dbp = &db->db;
161	return (0);
162}
163int
164dmu_buf_hold_noread(objset_t *os, uint64_t object, uint64_t offset,
165    void *tag, dmu_buf_t **dbp)
166{
167	dnode_t *dn;
168	uint64_t blkid;
169	dmu_buf_impl_t *db;
170	int err;
171
172	err = dnode_hold(os, object, FTAG, &dn);
173	if (err)
174		return (err);
175	blkid = dbuf_whichblock(dn, 0, offset);
176	rw_enter(&dn->dn_struct_rwlock, RW_READER);
177	db = dbuf_hold(dn, blkid, tag);
178	rw_exit(&dn->dn_struct_rwlock);
179	dnode_rele(dn, FTAG);
180
181	if (db == NULL) {
182		*dbp = NULL;
183		return (SET_ERROR(EIO));
184	}
185
186	*dbp = &db->db;
187	return (err);
188}
189
190int
191dmu_buf_hold_by_dnode(dnode_t *dn, uint64_t offset,
192    void *tag, dmu_buf_t **dbp, int flags)
193{
194	int err;
195	int db_flags = DB_RF_CANFAIL;
196
197	if (flags & DMU_READ_NO_PREFETCH)
198		db_flags |= DB_RF_NOPREFETCH;
199
200	err = dmu_buf_hold_noread_by_dnode(dn, offset, tag, dbp);
201	if (err == 0) {
202		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
203		err = dbuf_read(db, NULL, db_flags);
204		if (err != 0) {
205			dbuf_rele(db, tag);
206			*dbp = NULL;
207		}
208	}
209
210	return (err);
211}
212
213int
214dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
215    void *tag, dmu_buf_t **dbp, int flags)
216{
217	int err;
218	int db_flags = DB_RF_CANFAIL;
219
220	if (flags & DMU_READ_NO_PREFETCH)
221		db_flags |= DB_RF_NOPREFETCH;
222
223	err = dmu_buf_hold_noread(os, object, offset, tag, dbp);
224	if (err == 0) {
225		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
226		err = dbuf_read(db, NULL, db_flags);
227		if (err != 0) {
228			dbuf_rele(db, tag);
229			*dbp = NULL;
230		}
231	}
232
233	return (err);
234}
235
236int
237dmu_bonus_max(void)
238{
239	return (DN_MAX_BONUSLEN);
240}
241
242int
243dmu_set_bonus(dmu_buf_t *db_fake, int newsize, dmu_tx_t *tx)
244{
245	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
246	dnode_t *dn;
247	int error;
248
249	DB_DNODE_ENTER(db);
250	dn = DB_DNODE(db);
251
252	if (dn->dn_bonus != db) {
253		error = SET_ERROR(EINVAL);
254	} else if (newsize < 0 || newsize > db_fake->db_size) {
255		error = SET_ERROR(EINVAL);
256	} else {
257		dnode_setbonuslen(dn, newsize, tx);
258		error = 0;
259	}
260
261	DB_DNODE_EXIT(db);
262	return (error);
263}
264
265int
266dmu_set_bonustype(dmu_buf_t *db_fake, dmu_object_type_t type, dmu_tx_t *tx)
267{
268	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
269	dnode_t *dn;
270	int error;
271
272	DB_DNODE_ENTER(db);
273	dn = DB_DNODE(db);
274
275	if (!DMU_OT_IS_VALID(type)) {
276		error = SET_ERROR(EINVAL);
277	} else if (dn->dn_bonus != db) {
278		error = SET_ERROR(EINVAL);
279	} else {
280		dnode_setbonus_type(dn, type, tx);
281		error = 0;
282	}
283
284	DB_DNODE_EXIT(db);
285	return (error);
286}
287
288dmu_object_type_t
289dmu_get_bonustype(dmu_buf_t *db_fake)
290{
291	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
292	dnode_t *dn;
293	dmu_object_type_t type;
294
295	DB_DNODE_ENTER(db);
296	dn = DB_DNODE(db);
297	type = dn->dn_bonustype;
298	DB_DNODE_EXIT(db);
299
300	return (type);
301}
302
303int
304dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
305{
306	dnode_t *dn;
307	int error;
308
309	error = dnode_hold(os, object, FTAG, &dn);
310	dbuf_rm_spill(dn, tx);
311	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
312	dnode_rm_spill(dn, tx);
313	rw_exit(&dn->dn_struct_rwlock);
314	dnode_rele(dn, FTAG);
315	return (error);
316}
317
318/*
319 * returns ENOENT, EIO, or 0.
320 */
321int
322dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
323{
324	dnode_t *dn;
325	dmu_buf_impl_t *db;
326	int error;
327
328	error = dnode_hold(os, object, FTAG, &dn);
329	if (error)
330		return (error);
331
332	rw_enter(&dn->dn_struct_rwlock, RW_READER);
333	if (dn->dn_bonus == NULL) {
334		rw_exit(&dn->dn_struct_rwlock);
335		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
336		if (dn->dn_bonus == NULL)
337			dbuf_create_bonus(dn);
338	}
339	db = dn->dn_bonus;
340
341	/* as long as the bonus buf is held, the dnode will be held */
342	if (refcount_add(&db->db_holds, tag) == 1) {
343		VERIFY(dnode_add_ref(dn, db));
344		atomic_inc_32(&dn->dn_dbufs_count);
345	}
346
347	/*
348	 * Wait to drop dn_struct_rwlock until after adding the bonus dbuf's
349	 * hold and incrementing the dbuf count to ensure that dnode_move() sees
350	 * a dnode hold for every dbuf.
351	 */
352	rw_exit(&dn->dn_struct_rwlock);
353
354	dnode_rele(dn, FTAG);
355
356	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH));
357
358	*dbp = &db->db;
359	return (0);
360}
361
362/*
363 * returns ENOENT, EIO, or 0.
364 *
365 * This interface will allocate a blank spill dbuf when a spill blk
366 * doesn't already exist on the dnode.
367 *
368 * if you only want to find an already existing spill db, then
369 * dmu_spill_hold_existing() should be used.
370 */
371int
372dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
373{
374	dmu_buf_impl_t *db = NULL;
375	int err;
376
377	if ((flags & DB_RF_HAVESTRUCT) == 0)
378		rw_enter(&dn->dn_struct_rwlock, RW_READER);
379
380	db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
381
382	if ((flags & DB_RF_HAVESTRUCT) == 0)
383		rw_exit(&dn->dn_struct_rwlock);
384
385	ASSERT(db != NULL);
386	err = dbuf_read(db, NULL, flags);
387	if (err == 0)
388		*dbp = &db->db;
389	else
390		dbuf_rele(db, tag);
391	return (err);
392}
393
394int
395dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
396{
397	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
398	dnode_t *dn;
399	int err;
400
401	DB_DNODE_ENTER(db);
402	dn = DB_DNODE(db);
403
404	if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA) {
405		err = SET_ERROR(EINVAL);
406	} else {
407		rw_enter(&dn->dn_struct_rwlock, RW_READER);
408
409		if (!dn->dn_have_spill) {
410			err = SET_ERROR(ENOENT);
411		} else {
412			err = dmu_spill_hold_by_dnode(dn,
413			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL, tag, dbp);
414		}
415
416		rw_exit(&dn->dn_struct_rwlock);
417	}
418
419	DB_DNODE_EXIT(db);
420	return (err);
421}
422
423int
424dmu_spill_hold_by_bonus(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
425{
426	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
427	dnode_t *dn;
428	int err;
429
430	DB_DNODE_ENTER(db);
431	dn = DB_DNODE(db);
432	err = dmu_spill_hold_by_dnode(dn, DB_RF_CANFAIL, tag, dbp);
433	DB_DNODE_EXIT(db);
434
435	return (err);
436}
437
438/*
439 * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
440 * to take a held dnode rather than <os, object> -- the lookup is wasteful,
441 * and can induce severe lock contention when writing to several files
442 * whose dnodes are in the same block.
443 */
444static int
445dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
446    boolean_t read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
447{
448	dmu_buf_t **dbp;
449	uint64_t blkid, nblks, i;
450	uint32_t dbuf_flags;
451	int err;
452	zio_t *zio;
453
454	ASSERT(length <= DMU_MAX_ACCESS);
455
456	/*
457	 * Note: We directly notify the prefetch code of this read, so that
458	 * we can tell it about the multi-block read.  dbuf_read() only knows
459	 * about the one block it is accessing.
460	 */
461	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT |
462	    DB_RF_NOPREFETCH;
463
464	rw_enter(&dn->dn_struct_rwlock, RW_READER);
465	if (dn->dn_datablkshift) {
466		int blkshift = dn->dn_datablkshift;
467		nblks = (P2ROUNDUP(offset + length, 1ULL << blkshift) -
468		    P2ALIGN(offset, 1ULL << blkshift)) >> blkshift;
469	} else {
470		if (offset + length > dn->dn_datablksz) {
471			zfs_panic_recover("zfs: accessing past end of object "
472			    "%llx/%llx (size=%u access=%llu+%llu)",
473			    (longlong_t)dn->dn_objset->
474			    os_dsl_dataset->ds_object,
475			    (longlong_t)dn->dn_object, dn->dn_datablksz,
476			    (longlong_t)offset, (longlong_t)length);
477			rw_exit(&dn->dn_struct_rwlock);
478			return (SET_ERROR(EIO));
479		}
480		nblks = 1;
481	}
482	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
483
484#if defined(_KERNEL) && defined(RACCT)
485	if (racct_enable && !read) {
486		PROC_LOCK(curproc);
487		racct_add_force(curproc, RACCT_WRITEBPS, length);
488		racct_add_force(curproc, RACCT_WRITEIOPS, nblks);
489		PROC_UNLOCK(curproc);
490	}
491#endif
492
493	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
494	blkid = dbuf_whichblock(dn, 0, offset);
495	for (i = 0; i < nblks; i++) {
496		dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
497		if (db == NULL) {
498			rw_exit(&dn->dn_struct_rwlock);
499			dmu_buf_rele_array(dbp, nblks, tag);
500			zio_nowait(zio);
501			return (SET_ERROR(EIO));
502		}
503
504		/* initiate async i/o */
505		if (read)
506			(void) dbuf_read(db, zio, dbuf_flags);
507
508#ifdef _KERNEL
509		else
510			curthread->td_ru.ru_oublock++;
511#endif
512		dbp[i] = &db->db;
513	}
514
515	if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
516	    DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
517		dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
518		    read && DNODE_IS_CACHEABLE(dn));
519	}
520	rw_exit(&dn->dn_struct_rwlock);
521
522	/* wait for async i/o */
523	err = zio_wait(zio);
524	if (err) {
525		dmu_buf_rele_array(dbp, nblks, tag);
526		return (err);
527	}
528
529	/* wait for other io to complete */
530	if (read) {
531		for (i = 0; i < nblks; i++) {
532			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
533			mutex_enter(&db->db_mtx);
534			while (db->db_state == DB_READ ||
535			    db->db_state == DB_FILL)
536				cv_wait(&db->db_changed, &db->db_mtx);
537			if (db->db_state == DB_UNCACHED)
538				err = SET_ERROR(EIO);
539			mutex_exit(&db->db_mtx);
540			if (err) {
541				dmu_buf_rele_array(dbp, nblks, tag);
542				return (err);
543			}
544		}
545	}
546
547	*numbufsp = nblks;
548	*dbpp = dbp;
549	return (0);
550}
551
552static int
553dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
554    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
555{
556	dnode_t *dn;
557	int err;
558
559	err = dnode_hold(os, object, FTAG, &dn);
560	if (err)
561		return (err);
562
563	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
564	    numbufsp, dbpp, DMU_READ_PREFETCH);
565
566	dnode_rele(dn, FTAG);
567
568	return (err);
569}
570
571int
572dmu_buf_hold_array_by_bonus(dmu_buf_t *db_fake, uint64_t offset,
573    uint64_t length, boolean_t read, void *tag, int *numbufsp,
574    dmu_buf_t ***dbpp)
575{
576	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
577	dnode_t *dn;
578	int err;
579
580	DB_DNODE_ENTER(db);
581	dn = DB_DNODE(db);
582	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
583	    numbufsp, dbpp, DMU_READ_PREFETCH);
584	DB_DNODE_EXIT(db);
585
586	return (err);
587}
588
589void
590dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
591{
592	int i;
593	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
594
595	if (numbufs == 0)
596		return;
597
598	for (i = 0; i < numbufs; i++) {
599		if (dbp[i])
600			dbuf_rele(dbp[i], tag);
601	}
602
603	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
604}
605
606/*
607 * Issue prefetch i/os for the given blocks.  If level is greater than 0, the
608 * indirect blocks prefeteched will be those that point to the blocks containing
609 * the data starting at offset, and continuing to offset + len.
610 *
611 * Note that if the indirect blocks above the blocks being prefetched are not in
612 * cache, they will be asychronously read in.
613 */
614void
615dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
616    uint64_t len, zio_priority_t pri)
617{
618	dnode_t *dn;
619	uint64_t blkid;
620	int nblks, err;
621
622	if (len == 0) {  /* they're interested in the bonus buffer */
623		dn = DMU_META_DNODE(os);
624
625		if (object == 0 || object >= DN_MAX_OBJECT)
626			return;
627
628		rw_enter(&dn->dn_struct_rwlock, RW_READER);
629		blkid = dbuf_whichblock(dn, level,
630		    object * sizeof (dnode_phys_t));
631		dbuf_prefetch(dn, level, blkid, pri, 0);
632		rw_exit(&dn->dn_struct_rwlock);
633		return;
634	}
635
636	/*
637	 * XXX - Note, if the dnode for the requested object is not
638	 * already cached, we will do a *synchronous* read in the
639	 * dnode_hold() call.  The same is true for any indirects.
640	 */
641	err = dnode_hold(os, object, FTAG, &dn);
642	if (err != 0)
643		return;
644
645	rw_enter(&dn->dn_struct_rwlock, RW_READER);
646	/*
647	 * offset + len - 1 is the last byte we want to prefetch for, and offset
648	 * is the first.  Then dbuf_whichblk(dn, level, off + len - 1) is the
649	 * last block we want to prefetch, and dbuf_whichblock(dn, level,
650	 * offset)  is the first.  Then the number we need to prefetch is the
651	 * last - first + 1.
652	 */
653	if (level > 0 || dn->dn_datablkshift != 0) {
654		nblks = dbuf_whichblock(dn, level, offset + len - 1) -
655		    dbuf_whichblock(dn, level, offset) + 1;
656	} else {
657		nblks = (offset < dn->dn_datablksz);
658	}
659
660	if (nblks != 0) {
661		blkid = dbuf_whichblock(dn, level, offset);
662		for (int i = 0; i < nblks; i++)
663			dbuf_prefetch(dn, level, blkid + i, pri, 0);
664	}
665
666	rw_exit(&dn->dn_struct_rwlock);
667
668	dnode_rele(dn, FTAG);
669}
670
671/*
672 * Get the next "chunk" of file data to free.  We traverse the file from
673 * the end so that the file gets shorter over time (if we crashes in the
674 * middle, this will leave us in a better state).  We find allocated file
675 * data by simply searching the allocated level 1 indirects.
676 *
677 * On input, *start should be the first offset that does not need to be
678 * freed (e.g. "offset + length").  On return, *start will be the first
679 * offset that should be freed.
680 */
681static int
682get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t minimum)
683{
684	uint64_t maxblks = DMU_MAX_ACCESS >> (dn->dn_indblkshift + 1);
685	/* bytes of data covered by a level-1 indirect block */
686	uint64_t iblkrange =
687	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
688
689	ASSERT3U(minimum, <=, *start);
690
691	if (*start - minimum <= iblkrange * maxblks) {
692		*start = minimum;
693		return (0);
694	}
695	ASSERT(ISP2(iblkrange));
696
697	for (uint64_t blks = 0; *start > minimum && blks < maxblks; blks++) {
698		int err;
699
700		/*
701		 * dnode_next_offset(BACKWARDS) will find an allocated L1
702		 * indirect block at or before the input offset.  We must
703		 * decrement *start so that it is at the end of the region
704		 * to search.
705		 */
706		(*start)--;
707		err = dnode_next_offset(dn,
708		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
709
710		/* if there are no indirect blocks before start, we are done */
711		if (err == ESRCH) {
712			*start = minimum;
713			break;
714		} else if (err != 0) {
715			return (err);
716		}
717
718		/* set start to the beginning of this L1 indirect */
719		*start = P2ALIGN(*start, iblkrange);
720	}
721	if (*start < minimum)
722		*start = minimum;
723	return (0);
724}
725
726static int
727dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
728    uint64_t length)
729{
730	uint64_t object_size = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
731	int err;
732	uint64_t dirty_frees_threshold;
733	dsl_pool_t *dp = dmu_objset_pool(os);
734
735	if (offset >= object_size)
736		return (0);
737
738	if (zfs_per_txg_dirty_frees_percent <= 100)
739		dirty_frees_threshold =
740		    zfs_per_txg_dirty_frees_percent * zfs_dirty_data_max / 100;
741	else
742		dirty_frees_threshold = zfs_dirty_data_max / 4;
743
744	if (length == DMU_OBJECT_END || offset + length > object_size)
745		length = object_size - offset;
746
747	while (length != 0) {
748		uint64_t chunk_end, chunk_begin, chunk_len;
749		uint64_t long_free_dirty_all_txgs = 0;
750		dmu_tx_t *tx;
751
752		chunk_end = chunk_begin = offset + length;
753
754		/* move chunk_begin backwards to the beginning of this chunk */
755		err = get_next_chunk(dn, &chunk_begin, offset);
756		if (err)
757			return (err);
758		ASSERT3U(chunk_begin, >=, offset);
759		ASSERT3U(chunk_begin, <=, chunk_end);
760
761		chunk_len = chunk_end - chunk_begin;
762
763		mutex_enter(&dp->dp_lock);
764		for (int t = 0; t < TXG_SIZE; t++) {
765			long_free_dirty_all_txgs +=
766			    dp->dp_long_free_dirty_pertxg[t];
767		}
768		mutex_exit(&dp->dp_lock);
769
770		/*
771		 * To avoid filling up a TXG with just frees wait for
772		 * the next TXG to open before freeing more chunks if
773		 * we have reached the threshold of frees
774		 */
775		if (dirty_frees_threshold != 0 &&
776		    long_free_dirty_all_txgs >= dirty_frees_threshold) {
777			txg_wait_open(dp, 0);
778			continue;
779		}
780
781		tx = dmu_tx_create(os);
782		dmu_tx_hold_free(tx, dn->dn_object, chunk_begin, chunk_len);
783
784		/*
785		 * Mark this transaction as typically resulting in a net
786		 * reduction in space used.
787		 */
788		dmu_tx_mark_netfree(tx);
789		err = dmu_tx_assign(tx, TXG_WAIT);
790		if (err) {
791			dmu_tx_abort(tx);
792			return (err);
793		}
794
795		mutex_enter(&dp->dp_lock);
796		dp->dp_long_free_dirty_pertxg[dmu_tx_get_txg(tx) & TXG_MASK] +=
797		    chunk_len;
798		mutex_exit(&dp->dp_lock);
799		DTRACE_PROBE3(free__long__range,
800		    uint64_t, long_free_dirty_all_txgs, uint64_t, chunk_len,
801		    uint64_t, dmu_tx_get_txg(tx));
802		dnode_free_range(dn, chunk_begin, chunk_len, tx);
803		dmu_tx_commit(tx);
804
805		length -= chunk_len;
806	}
807	return (0);
808}
809
810int
811dmu_free_long_range(objset_t *os, uint64_t object,
812    uint64_t offset, uint64_t length)
813{
814	dnode_t *dn;
815	int err;
816
817	err = dnode_hold(os, object, FTAG, &dn);
818	if (err != 0)
819		return (err);
820	err = dmu_free_long_range_impl(os, dn, offset, length);
821
822	/*
823	 * It is important to zero out the maxblkid when freeing the entire
824	 * file, so that (a) subsequent calls to dmu_free_long_range_impl()
825	 * will take the fast path, and (b) dnode_reallocate() can verify
826	 * that the entire file has been freed.
827	 */
828	if (err == 0 && offset == 0 && length == DMU_OBJECT_END)
829		dn->dn_maxblkid = 0;
830
831	dnode_rele(dn, FTAG);
832	return (err);
833}
834
835int
836dmu_free_long_object(objset_t *os, uint64_t object)
837{
838	dmu_tx_t *tx;
839	int err;
840
841	err = dmu_free_long_range(os, object, 0, DMU_OBJECT_END);
842	if (err != 0)
843		return (err);
844
845	tx = dmu_tx_create(os);
846	dmu_tx_hold_bonus(tx, object);
847	dmu_tx_hold_free(tx, object, 0, DMU_OBJECT_END);
848	dmu_tx_mark_netfree(tx);
849	err = dmu_tx_assign(tx, TXG_WAIT);
850	if (err == 0) {
851		err = dmu_object_free(os, object, tx);
852		dmu_tx_commit(tx);
853	} else {
854		dmu_tx_abort(tx);
855	}
856
857	return (err);
858}
859
860int
861dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
862    uint64_t size, dmu_tx_t *tx)
863{
864	dnode_t *dn;
865	int err = dnode_hold(os, object, FTAG, &dn);
866	if (err)
867		return (err);
868	ASSERT(offset < UINT64_MAX);
869	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
870	dnode_free_range(dn, offset, size, tx);
871	dnode_rele(dn, FTAG);
872	return (0);
873}
874
875int
876dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
877    void *buf, uint32_t flags)
878{
879	dnode_t *dn;
880	dmu_buf_t **dbp;
881	int numbufs, err;
882
883	err = dnode_hold(os, object, FTAG, &dn);
884	if (err)
885		return (err);
886
887	/*
888	 * Deal with odd block sizes, where there can't be data past the first
889	 * block.  If we ever do the tail block optimization, we will need to
890	 * handle that here as well.
891	 */
892	if (dn->dn_maxblkid == 0) {
893		int newsz = offset > dn->dn_datablksz ? 0 :
894		    MIN(size, dn->dn_datablksz - offset);
895		bzero((char *)buf + newsz, size - newsz);
896		size = newsz;
897	}
898
899	while (size > 0) {
900		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
901		int i;
902
903		/*
904		 * NB: we could do this block-at-a-time, but it's nice
905		 * to be reading in parallel.
906		 */
907		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
908		    TRUE, FTAG, &numbufs, &dbp, flags);
909		if (err)
910			break;
911
912		for (i = 0; i < numbufs; i++) {
913			int tocpy;
914			int bufoff;
915			dmu_buf_t *db = dbp[i];
916
917			ASSERT(size > 0);
918
919			bufoff = offset - db->db_offset;
920			tocpy = (int)MIN(db->db_size - bufoff, size);
921
922			bcopy((char *)db->db_data + bufoff, buf, tocpy);
923
924			offset += tocpy;
925			size -= tocpy;
926			buf = (char *)buf + tocpy;
927		}
928		dmu_buf_rele_array(dbp, numbufs, FTAG);
929	}
930	dnode_rele(dn, FTAG);
931	return (err);
932}
933
934void
935dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
936    const void *buf, dmu_tx_t *tx)
937{
938	dmu_buf_t **dbp;
939	int numbufs, i;
940
941	if (size == 0)
942		return;
943
944	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
945	    FALSE, FTAG, &numbufs, &dbp));
946
947	for (i = 0; i < numbufs; i++) {
948		int tocpy;
949		int bufoff;
950		dmu_buf_t *db = dbp[i];
951
952		ASSERT(size > 0);
953
954		bufoff = offset - db->db_offset;
955		tocpy = (int)MIN(db->db_size - bufoff, size);
956
957		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
958
959		if (tocpy == db->db_size)
960			dmu_buf_will_fill(db, tx);
961		else
962			dmu_buf_will_dirty(db, tx);
963
964		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
965
966		if (tocpy == db->db_size)
967			dmu_buf_fill_done(db, tx);
968
969		offset += tocpy;
970		size -= tocpy;
971		buf = (char *)buf + tocpy;
972	}
973	dmu_buf_rele_array(dbp, numbufs, FTAG);
974}
975
976void
977dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
978    dmu_tx_t *tx)
979{
980	dmu_buf_t **dbp;
981	int numbufs, i;
982
983	if (size == 0)
984		return;
985
986	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
987	    FALSE, FTAG, &numbufs, &dbp));
988
989	for (i = 0; i < numbufs; i++) {
990		dmu_buf_t *db = dbp[i];
991
992		dmu_buf_will_not_fill(db, tx);
993	}
994	dmu_buf_rele_array(dbp, numbufs, FTAG);
995}
996
997void
998dmu_write_embedded(objset_t *os, uint64_t object, uint64_t offset,
999    void *data, uint8_t etype, uint8_t comp, int uncompressed_size,
1000    int compressed_size, int byteorder, dmu_tx_t *tx)
1001{
1002	dmu_buf_t *db;
1003
1004	ASSERT3U(etype, <, NUM_BP_EMBEDDED_TYPES);
1005	ASSERT3U(comp, <, ZIO_COMPRESS_FUNCTIONS);
1006	VERIFY0(dmu_buf_hold_noread(os, object, offset,
1007	    FTAG, &db));
1008
1009	dmu_buf_write_embedded(db,
1010	    data, (bp_embedded_type_t)etype, (enum zio_compress)comp,
1011	    uncompressed_size, compressed_size, byteorder, tx);
1012
1013	dmu_buf_rele(db, FTAG);
1014}
1015
1016/*
1017 * DMU support for xuio
1018 */
1019kstat_t *xuio_ksp = NULL;
1020
1021int
1022dmu_xuio_init(xuio_t *xuio, int nblk)
1023{
1024	dmu_xuio_t *priv;
1025	uio_t *uio = &xuio->xu_uio;
1026
1027	uio->uio_iovcnt = nblk;
1028	uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
1029
1030	priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
1031	priv->cnt = nblk;
1032	priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
1033	priv->iovp = uio->uio_iov;
1034	XUIO_XUZC_PRIV(xuio) = priv;
1035
1036	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1037		XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
1038	else
1039		XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
1040
1041	return (0);
1042}
1043
1044void
1045dmu_xuio_fini(xuio_t *xuio)
1046{
1047	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1048	int nblk = priv->cnt;
1049
1050	kmem_free(priv->iovp, nblk * sizeof (iovec_t));
1051	kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
1052	kmem_free(priv, sizeof (dmu_xuio_t));
1053
1054	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1055		XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
1056	else
1057		XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
1058}
1059
1060/*
1061 * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
1062 * and increase priv->next by 1.
1063 */
1064int
1065dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
1066{
1067	struct iovec *iov;
1068	uio_t *uio = &xuio->xu_uio;
1069	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1070	int i = priv->next++;
1071
1072	ASSERT(i < priv->cnt);
1073	ASSERT(off + n <= arc_buf_size(abuf));
1074	iov = uio->uio_iov + i;
1075	iov->iov_base = (char *)abuf->b_data + off;
1076	iov->iov_len = n;
1077	priv->bufs[i] = abuf;
1078	return (0);
1079}
1080
1081int
1082dmu_xuio_cnt(xuio_t *xuio)
1083{
1084	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1085	return (priv->cnt);
1086}
1087
1088arc_buf_t *
1089dmu_xuio_arcbuf(xuio_t *xuio, int i)
1090{
1091	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1092
1093	ASSERT(i < priv->cnt);
1094	return (priv->bufs[i]);
1095}
1096
1097void
1098dmu_xuio_clear(xuio_t *xuio, int i)
1099{
1100	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1101
1102	ASSERT(i < priv->cnt);
1103	priv->bufs[i] = NULL;
1104}
1105
1106static void
1107xuio_stat_init(void)
1108{
1109	xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
1110	    KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
1111	    KSTAT_FLAG_VIRTUAL);
1112	if (xuio_ksp != NULL) {
1113		xuio_ksp->ks_data = &xuio_stats;
1114		kstat_install(xuio_ksp);
1115	}
1116}
1117
1118static void
1119xuio_stat_fini(void)
1120{
1121	if (xuio_ksp != NULL) {
1122		kstat_delete(xuio_ksp);
1123		xuio_ksp = NULL;
1124	}
1125}
1126
1127void
1128xuio_stat_wbuf_copied()
1129{
1130	XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1131}
1132
1133void
1134xuio_stat_wbuf_nocopy()
1135{
1136	XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
1137}
1138
1139#ifdef _KERNEL
1140static int
1141dmu_read_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size)
1142{
1143	dmu_buf_t **dbp;
1144	int numbufs, i, err;
1145	xuio_t *xuio = NULL;
1146
1147	/*
1148	 * NB: we could do this block-at-a-time, but it's nice
1149	 * to be reading in parallel.
1150	 */
1151	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1152	    TRUE, FTAG, &numbufs, &dbp, 0);
1153	if (err)
1154		return (err);
1155
1156#ifdef UIO_XUIO
1157	if (uio->uio_extflg == UIO_XUIO)
1158		xuio = (xuio_t *)uio;
1159#endif
1160
1161	for (i = 0; i < numbufs; i++) {
1162		int tocpy;
1163		int bufoff;
1164		dmu_buf_t *db = dbp[i];
1165
1166		ASSERT(size > 0);
1167
1168		bufoff = uio->uio_loffset - db->db_offset;
1169		tocpy = (int)MIN(db->db_size - bufoff, size);
1170
1171		if (xuio) {
1172			dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
1173			arc_buf_t *dbuf_abuf = dbi->db_buf;
1174			arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
1175			err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
1176			if (!err) {
1177				uio->uio_resid -= tocpy;
1178				uio->uio_loffset += tocpy;
1179			}
1180
1181			if (abuf == dbuf_abuf)
1182				XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
1183			else
1184				XUIOSTAT_BUMP(xuiostat_rbuf_copied);
1185		} else {
1186#ifdef illumos
1187			err = uiomove((char *)db->db_data + bufoff, tocpy,
1188			    UIO_READ, uio);
1189#endif
1190#ifdef __FreeBSD__
1191			err = vn_io_fault_uiomove((char *)db->db_data + bufoff,
1192			    tocpy, uio);
1193#endif
1194#ifdef __NetBSD__
1195			err = uiomove((char *)db->db_data + bufoff, tocpy,
1196			    UIO_READ, uio);
1197#endif
1198		}
1199		if (err)
1200			break;
1201
1202		size -= tocpy;
1203	}
1204	dmu_buf_rele_array(dbp, numbufs, FTAG);
1205
1206	return (err);
1207}
1208
1209/*
1210 * Read 'size' bytes into the uio buffer.
1211 * From object zdb->db_object.
1212 * Starting at offset uio->uio_loffset.
1213 *
1214 * If the caller already has a dbuf in the target object
1215 * (e.g. its bonus buffer), this routine is faster than dmu_read_uio(),
1216 * because we don't have to find the dnode_t for the object.
1217 */
1218int
1219dmu_read_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size)
1220{
1221	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1222	dnode_t *dn;
1223	int err;
1224
1225	if (size == 0)
1226		return (0);
1227
1228	DB_DNODE_ENTER(db);
1229	dn = DB_DNODE(db);
1230	err = dmu_read_uio_dnode(dn, uio, size);
1231	DB_DNODE_EXIT(db);
1232
1233	return (err);
1234}
1235
1236/*
1237 * Read 'size' bytes into the uio buffer.
1238 * From the specified object
1239 * Starting at offset uio->uio_loffset.
1240 */
1241int
1242dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
1243{
1244	dnode_t *dn;
1245	int err;
1246
1247	if (size == 0)
1248		return (0);
1249
1250	err = dnode_hold(os, object, FTAG, &dn);
1251	if (err)
1252		return (err);
1253
1254	err = dmu_read_uio_dnode(dn, uio, size);
1255
1256	dnode_rele(dn, FTAG);
1257
1258	return (err);
1259}
1260
1261static int
1262dmu_write_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size, dmu_tx_t *tx)
1263{
1264	dmu_buf_t **dbp;
1265	int numbufs;
1266	int err = 0;
1267	int i;
1268
1269	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1270	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH);
1271	if (err)
1272		return (err);
1273
1274	for (i = 0; i < numbufs; i++) {
1275		int tocpy;
1276		int bufoff;
1277		dmu_buf_t *db = dbp[i];
1278
1279		ASSERT(size > 0);
1280
1281		bufoff = uio->uio_loffset - db->db_offset;
1282		tocpy = (int)MIN(db->db_size - bufoff, size);
1283
1284		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1285
1286		if (tocpy == db->db_size)
1287			dmu_buf_will_fill(db, tx);
1288		else
1289			dmu_buf_will_dirty(db, tx);
1290
1291#ifdef illumos
1292		/*
1293		 * XXX uiomove could block forever (eg. nfs-backed
1294		 * pages).  There needs to be a uiolockdown() function
1295		 * to lock the pages in memory, so that uiomove won't
1296		 * block.
1297		 */
1298		err = uiomove((char *)db->db_data + bufoff, tocpy,
1299		    UIO_WRITE, uio);
1300#endif
1301#ifdef __FreeBSD__
1302		err = vn_io_fault_uiomove((char *)db->db_data + bufoff, tocpy,
1303		    uio);
1304#endif
1305#ifdef __NetBSD__
1306		err = uiomove((char *)db->db_data + bufoff, tocpy,
1307		    UIO_WRITE, uio);
1308#endif
1309
1310		if (tocpy == db->db_size)
1311			dmu_buf_fill_done(db, tx);
1312
1313		if (err)
1314			break;
1315
1316		size -= tocpy;
1317	}
1318
1319	dmu_buf_rele_array(dbp, numbufs, FTAG);
1320	return (err);
1321}
1322
1323/*
1324 * Write 'size' bytes from the uio buffer.
1325 * To object zdb->db_object.
1326 * Starting at offset uio->uio_loffset.
1327 *
1328 * If the caller already has a dbuf in the target object
1329 * (e.g. its bonus buffer), this routine is faster than dmu_write_uio(),
1330 * because we don't have to find the dnode_t for the object.
1331 */
1332int
1333dmu_write_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size,
1334    dmu_tx_t *tx)
1335{
1336	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1337	dnode_t *dn;
1338	int err;
1339
1340	if (size == 0)
1341		return (0);
1342
1343	DB_DNODE_ENTER(db);
1344	dn = DB_DNODE(db);
1345	err = dmu_write_uio_dnode(dn, uio, size, tx);
1346	DB_DNODE_EXIT(db);
1347
1348	return (err);
1349}
1350
1351/*
1352 * Write 'size' bytes from the uio buffer.
1353 * To the specified object.
1354 * Starting at offset uio->uio_loffset.
1355 */
1356int
1357dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
1358    dmu_tx_t *tx)
1359{
1360	dnode_t *dn;
1361	int err;
1362
1363	if (size == 0)
1364		return (0);
1365
1366	err = dnode_hold(os, object, FTAG, &dn);
1367	if (err)
1368		return (err);
1369
1370	err = dmu_write_uio_dnode(dn, uio, size, tx);
1371
1372	dnode_rele(dn, FTAG);
1373
1374	return (err);
1375}
1376
1377#ifdef illumos
1378int
1379dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1380    page_t *pp, dmu_tx_t *tx)
1381{
1382	dmu_buf_t **dbp;
1383	int numbufs, i;
1384	int err;
1385
1386	if (size == 0)
1387		return (0);
1388
1389	err = dmu_buf_hold_array(os, object, offset, size,
1390	    FALSE, FTAG, &numbufs, &dbp);
1391	if (err)
1392		return (err);
1393
1394	for (i = 0; i < numbufs; i++) {
1395		int tocpy, copied, thiscpy;
1396		int bufoff;
1397		dmu_buf_t *db = dbp[i];
1398		caddr_t va;
1399
1400		ASSERT(size > 0);
1401		ASSERT3U(db->db_size, >=, PAGESIZE);
1402
1403		bufoff = offset - db->db_offset;
1404		tocpy = (int)MIN(db->db_size - bufoff, size);
1405
1406		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1407
1408		if (tocpy == db->db_size)
1409			dmu_buf_will_fill(db, tx);
1410		else
1411			dmu_buf_will_dirty(db, tx);
1412
1413		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1414			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1415			thiscpy = MIN(PAGESIZE, tocpy - copied);
1416			va = zfs_map_page(pp, S_READ);
1417			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1418			zfs_unmap_page(pp, va);
1419			pp = pp->p_next;
1420			bufoff += PAGESIZE;
1421		}
1422
1423		if (tocpy == db->db_size)
1424			dmu_buf_fill_done(db, tx);
1425
1426		offset += tocpy;
1427		size -= tocpy;
1428	}
1429	dmu_buf_rele_array(dbp, numbufs, FTAG);
1430	return (err);
1431}
1432#endif /* illumos */
1433
1434#ifdef __FreeBSD__
1435int
1436dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1437    vm_page_t *ma, dmu_tx_t *tx)
1438{
1439	dmu_buf_t **dbp;
1440	struct sf_buf *sf;
1441	int numbufs, i;
1442	int err;
1443
1444	if (size == 0)
1445		return (0);
1446
1447	err = dmu_buf_hold_array(os, object, offset, size,
1448	    FALSE, FTAG, &numbufs, &dbp);
1449	if (err)
1450		return (err);
1451
1452	for (i = 0; i < numbufs; i++) {
1453		int tocpy, copied, thiscpy;
1454		int bufoff;
1455		dmu_buf_t *db = dbp[i];
1456		caddr_t va;
1457
1458		ASSERT(size > 0);
1459		ASSERT3U(db->db_size, >=, PAGESIZE);
1460
1461		bufoff = offset - db->db_offset;
1462		tocpy = (int)MIN(db->db_size - bufoff, size);
1463
1464		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1465
1466		if (tocpy == db->db_size)
1467			dmu_buf_will_fill(db, tx);
1468		else
1469			dmu_buf_will_dirty(db, tx);
1470
1471		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1472			ASSERT3U(ptoa((*ma)->pindex), ==, db->db_offset + bufoff);
1473			thiscpy = MIN(PAGESIZE, tocpy - copied);
1474			va = zfs_map_page(*ma, &sf);
1475			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1476			zfs_unmap_page(sf);
1477			ma += 1;
1478			bufoff += PAGESIZE;
1479		}
1480
1481		if (tocpy == db->db_size)
1482			dmu_buf_fill_done(db, tx);
1483
1484		offset += tocpy;
1485		size -= tocpy;
1486	}
1487	dmu_buf_rele_array(dbp, numbufs, FTAG);
1488	return (err);
1489}
1490#endif	/* __FreeBSD__ */
1491
1492#ifdef __NetBSD__
1493int
1494dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1495    struct vm_page **pgs, dmu_tx_t *tx)
1496{
1497	dmu_buf_t **dbp;
1498	int numbufs, i;
1499	int err;
1500
1501	if (size == 0)
1502		return (0);
1503
1504	err = dmu_buf_hold_array(os, object, offset, size,
1505	    FALSE, FTAG, &numbufs, &dbp);
1506	if (err)
1507		return (err);
1508
1509	for (i = 0; i < numbufs; i++) {
1510		int tocpy, copied, thiscpy;
1511		int bufoff;
1512		dmu_buf_t *db = dbp[i];
1513		caddr_t va;
1514
1515		ASSERT(size > 0);
1516		ASSERT3U(db->db_size, >=, PAGESIZE);
1517
1518		bufoff = offset - db->db_offset;
1519		tocpy = (int)MIN(db->db_size - bufoff, size);
1520
1521		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1522
1523		if (tocpy == db->db_size)
1524			dmu_buf_will_fill(db, tx);
1525		else
1526			dmu_buf_will_dirty(db, tx);
1527
1528		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1529			ASSERT3U((*pgs)->offset, ==, db->db_offset + bufoff);
1530			thiscpy = MIN(PAGESIZE, tocpy - copied);
1531			va = zfs_map_page(*pgs, S_READ);
1532			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1533			zfs_unmap_page(*pgs, va);
1534			pgs++;
1535			bufoff += PAGESIZE;
1536		}
1537
1538		if (tocpy == db->db_size)
1539			dmu_buf_fill_done(db, tx);
1540
1541		offset += tocpy;
1542		size -= tocpy;
1543	}
1544	dmu_buf_rele_array(dbp, numbufs, FTAG);
1545	return (err);
1546}
1547#endif
1548#endif	/* _KERNEL */
1549
1550/*
1551 * Allocate a loaned anonymous arc buffer.
1552 */
1553arc_buf_t *
1554dmu_request_arcbuf(dmu_buf_t *handle, int size)
1555{
1556	dmu_buf_impl_t *db = (dmu_buf_impl_t *)handle;
1557
1558	return (arc_loan_buf(db->db_objset->os_spa, size));
1559}
1560
1561/*
1562 * Free a loaned arc buffer.
1563 */
1564void
1565dmu_return_arcbuf(arc_buf_t *buf)
1566{
1567	arc_return_buf(buf, FTAG);
1568	arc_buf_destroy(buf, FTAG);
1569}
1570
1571/*
1572 * When possible directly assign passed loaned arc buffer to a dbuf.
1573 * If this is not possible copy the contents of passed arc buf via
1574 * dmu_write().
1575 */
1576void
1577dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1578    dmu_tx_t *tx)
1579{
1580	dmu_buf_impl_t *dbuf = (dmu_buf_impl_t *)handle;
1581	dnode_t *dn;
1582	dmu_buf_impl_t *db;
1583	uint32_t blksz = (uint32_t)arc_buf_size(buf);
1584	uint64_t blkid;
1585
1586	DB_DNODE_ENTER(dbuf);
1587	dn = DB_DNODE(dbuf);
1588	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1589	blkid = dbuf_whichblock(dn, 0, offset);
1590	VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
1591	rw_exit(&dn->dn_struct_rwlock);
1592	DB_DNODE_EXIT(dbuf);
1593
1594	/*
1595	 * We can only assign if the offset is aligned, the arc buf is the
1596	 * same size as the dbuf, and the dbuf is not metadata.  It
1597	 * can't be metadata because the loaned arc buf comes from the
1598	 * user-data kmem arena.
1599	 */
1600	if (offset == db->db.db_offset && blksz == db->db.db_size &&
1601	    DBUF_GET_BUFC_TYPE(db) == ARC_BUFC_DATA) {
1602#ifdef _KERNEL
1603		curthread->td_ru.ru_oublock++;
1604#ifdef RACCT
1605		if (racct_enable) {
1606			PROC_LOCK(curproc);
1607			racct_add_force(curproc, RACCT_WRITEBPS, blksz);
1608			racct_add_force(curproc, RACCT_WRITEIOPS, 1);
1609			PROC_UNLOCK(curproc);
1610		}
1611#endif /* RACCT */
1612#endif /* _KERNEL */
1613		dbuf_assign_arcbuf(db, buf, tx);
1614		dbuf_rele(db, FTAG);
1615	} else {
1616		objset_t *os;
1617		uint64_t object;
1618
1619		DB_DNODE_ENTER(dbuf);
1620		dn = DB_DNODE(dbuf);
1621		os = dn->dn_objset;
1622		object = dn->dn_object;
1623		DB_DNODE_EXIT(dbuf);
1624
1625		dbuf_rele(db, FTAG);
1626		dmu_write(os, object, offset, blksz, buf->b_data, tx);
1627		dmu_return_arcbuf(buf);
1628		XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1629	}
1630}
1631
1632typedef struct {
1633	dbuf_dirty_record_t	*dsa_dr;
1634	dmu_sync_cb_t		*dsa_done;
1635	zgd_t			*dsa_zgd;
1636	dmu_tx_t		*dsa_tx;
1637} dmu_sync_arg_t;
1638
1639/* ARGSUSED */
1640static void
1641dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1642{
1643	dmu_sync_arg_t *dsa = varg;
1644	dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1645	blkptr_t *bp = zio->io_bp;
1646
1647	if (zio->io_error == 0) {
1648		if (BP_IS_HOLE(bp)) {
1649			/*
1650			 * A block of zeros may compress to a hole, but the
1651			 * block size still needs to be known for replay.
1652			 */
1653			BP_SET_LSIZE(bp, db->db_size);
1654		} else if (!BP_IS_EMBEDDED(bp)) {
1655			ASSERT(BP_GET_LEVEL(bp) == 0);
1656			bp->blk_fill = 1;
1657		}
1658	}
1659}
1660
1661static void
1662dmu_sync_late_arrival_ready(zio_t *zio)
1663{
1664	dmu_sync_ready(zio, NULL, zio->io_private);
1665}
1666
1667/* ARGSUSED */
1668static void
1669dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1670{
1671	dmu_sync_arg_t *dsa = varg;
1672	dbuf_dirty_record_t *dr = dsa->dsa_dr;
1673	dmu_buf_impl_t *db = dr->dr_dbuf;
1674
1675	mutex_enter(&db->db_mtx);
1676	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
1677	if (zio->io_error == 0) {
1678		dr->dt.dl.dr_nopwrite = !!(zio->io_flags & ZIO_FLAG_NOPWRITE);
1679		if (dr->dt.dl.dr_nopwrite) {
1680			blkptr_t *bp = zio->io_bp;
1681			blkptr_t *bp_orig = &zio->io_bp_orig;
1682			uint8_t chksum = BP_GET_CHECKSUM(bp_orig);
1683
1684			ASSERT(BP_EQUAL(bp, bp_orig));
1685			ASSERT(zio->io_prop.zp_compress != ZIO_COMPRESS_OFF);
1686			ASSERT(zio_checksum_table[chksum].ci_flags &
1687			    ZCHECKSUM_FLAG_NOPWRITE);
1688		}
1689		dr->dt.dl.dr_overridden_by = *zio->io_bp;
1690		dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
1691		dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
1692
1693		/*
1694		 * Old style holes are filled with all zeros, whereas
1695		 * new-style holes maintain their lsize, type, level,
1696		 * and birth time (see zio_write_compress). While we
1697		 * need to reset the BP_SET_LSIZE() call that happened
1698		 * in dmu_sync_ready for old style holes, we do *not*
1699		 * want to wipe out the information contained in new
1700		 * style holes. Thus, only zero out the block pointer if
1701		 * it's an old style hole.
1702		 */
1703		if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by) &&
1704		    dr->dt.dl.dr_overridden_by.blk_birth == 0)
1705			BP_ZERO(&dr->dt.dl.dr_overridden_by);
1706	} else {
1707		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1708	}
1709	cv_broadcast(&db->db_changed);
1710	mutex_exit(&db->db_mtx);
1711
1712	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1713
1714	kmem_free(dsa, sizeof (*dsa));
1715}
1716
1717static void
1718dmu_sync_late_arrival_done(zio_t *zio)
1719{
1720	blkptr_t *bp = zio->io_bp;
1721	dmu_sync_arg_t *dsa = zio->io_private;
1722	blkptr_t *bp_orig = &zio->io_bp_orig;
1723
1724	if (zio->io_error == 0 && !BP_IS_HOLE(bp)) {
1725		/*
1726		 * If we didn't allocate a new block (i.e. ZIO_FLAG_NOPWRITE)
1727		 * then there is nothing to do here. Otherwise, free the
1728		 * newly allocated block in this txg.
1729		 */
1730		if (zio->io_flags & ZIO_FLAG_NOPWRITE) {
1731			ASSERT(BP_EQUAL(bp, bp_orig));
1732		} else {
1733			ASSERT(BP_IS_HOLE(bp_orig) || !BP_EQUAL(bp, bp_orig));
1734			ASSERT(zio->io_bp->blk_birth == zio->io_txg);
1735			ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
1736			zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
1737		}
1738	}
1739
1740	dmu_tx_commit(dsa->dsa_tx);
1741
1742	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1743
1744	kmem_free(dsa, sizeof (*dsa));
1745}
1746
1747static int
1748dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
1749    zio_prop_t *zp, zbookmark_phys_t *zb)
1750{
1751	dmu_sync_arg_t *dsa;
1752	dmu_tx_t *tx;
1753
1754	tx = dmu_tx_create(os);
1755	dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
1756	if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
1757		dmu_tx_abort(tx);
1758		/* Make zl_get_data do txg_waited_synced() */
1759		return (SET_ERROR(EIO));
1760	}
1761
1762	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1763	dsa->dsa_dr = NULL;
1764	dsa->dsa_done = done;
1765	dsa->dsa_zgd = zgd;
1766	dsa->dsa_tx = tx;
1767
1768	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx),
1769	    zgd->zgd_bp, zgd->zgd_db->db_data, zgd->zgd_db->db_size,
1770	    zp, dmu_sync_late_arrival_ready, NULL,
1771	    NULL, dmu_sync_late_arrival_done, dsa, ZIO_PRIORITY_SYNC_WRITE,
1772	    ZIO_FLAG_CANFAIL, zb));
1773
1774	return (0);
1775}
1776
1777/*
1778 * Intent log support: sync the block associated with db to disk.
1779 * N.B. and XXX: the caller is responsible for making sure that the
1780 * data isn't changing while dmu_sync() is writing it.
1781 *
1782 * Return values:
1783 *
1784 *	EEXIST: this txg has already been synced, so there's nothing to do.
1785 *		The caller should not log the write.
1786 *
1787 *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1788 *		The caller should not log the write.
1789 *
1790 *	EALREADY: this block is already in the process of being synced.
1791 *		The caller should track its progress (somehow).
1792 *
1793 *	EIO: could not do the I/O.
1794 *		The caller should do a txg_wait_synced().
1795 *
1796 *	0: the I/O has been initiated.
1797 *		The caller should log this blkptr in the done callback.
1798 *		It is possible that the I/O will fail, in which case
1799 *		the error will be reported to the done callback and
1800 *		propagated to pio from zio_done().
1801 */
1802int
1803dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
1804{
1805	blkptr_t *bp = zgd->zgd_bp;
1806	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
1807	objset_t *os = db->db_objset;
1808	dsl_dataset_t *ds = os->os_dsl_dataset;
1809	dbuf_dirty_record_t *dr;
1810	dmu_sync_arg_t *dsa;
1811	zbookmark_phys_t zb;
1812	zio_prop_t zp;
1813	dnode_t *dn;
1814
1815	ASSERT(pio != NULL);
1816	ASSERT(txg != 0);
1817
1818	SET_BOOKMARK(&zb, ds->ds_object,
1819	    db->db.db_object, db->db_level, db->db_blkid);
1820
1821	DB_DNODE_ENTER(db);
1822	dn = DB_DNODE(db);
1823	dmu_write_policy(os, dn, db->db_level, WP_DMU_SYNC, &zp);
1824	DB_DNODE_EXIT(db);
1825
1826	/*
1827	 * If we're frozen (running ziltest), we always need to generate a bp.
1828	 */
1829	if (txg > spa_freeze_txg(os->os_spa))
1830		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1831
1832	/*
1833	 * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
1834	 * and us.  If we determine that this txg is not yet syncing,
1835	 * but it begins to sync a moment later, that's OK because the
1836	 * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
1837	 */
1838	mutex_enter(&db->db_mtx);
1839
1840	if (txg <= spa_last_synced_txg(os->os_spa)) {
1841		/*
1842		 * This txg has already synced.  There's nothing to do.
1843		 */
1844		mutex_exit(&db->db_mtx);
1845		return (SET_ERROR(EEXIST));
1846	}
1847
1848	if (txg <= spa_syncing_txg(os->os_spa)) {
1849		/*
1850		 * This txg is currently syncing, so we can't mess with
1851		 * the dirty record anymore; just write a new log block.
1852		 */
1853		mutex_exit(&db->db_mtx);
1854		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1855	}
1856
1857	dr = db->db_last_dirty;
1858	while (dr && dr->dr_txg != txg)
1859		dr = dr->dr_next;
1860
1861	if (dr == NULL) {
1862		/*
1863		 * There's no dr for this dbuf, so it must have been freed.
1864		 * There's no need to log writes to freed blocks, so we're done.
1865		 */
1866		mutex_exit(&db->db_mtx);
1867		return (SET_ERROR(ENOENT));
1868	}
1869
1870	ASSERT(dr->dr_next == NULL || dr->dr_next->dr_txg < txg);
1871
1872	/*
1873	 * Assume the on-disk data is X, the current syncing data (in
1874	 * txg - 1) is Y, and the current in-memory data is Z (currently
1875	 * in dmu_sync).
1876	 *
1877	 * We usually want to perform a nopwrite if X and Z are the
1878	 * same.  However, if Y is different (i.e. the BP is going to
1879	 * change before this write takes effect), then a nopwrite will
1880	 * be incorrect - we would override with X, which could have
1881	 * been freed when Y was written.
1882	 *
1883	 * (Note that this is not a concern when we are nop-writing from
1884	 * syncing context, because X and Y must be identical, because
1885	 * all previous txgs have been synced.)
1886	 *
1887	 * Therefore, we disable nopwrite if the current BP could change
1888	 * before this TXG.  There are two ways it could change: by
1889	 * being dirty (dr_next is non-NULL), or by being freed
1890	 * (dnode_block_freed()).  This behavior is verified by
1891	 * zio_done(), which VERIFYs that the override BP is identical
1892	 * to the on-disk BP.
1893	 */
1894	DB_DNODE_ENTER(db);
1895	dn = DB_DNODE(db);
1896	if (dr->dr_next != NULL || dnode_block_freed(dn, db->db_blkid))
1897		zp.zp_nopwrite = B_FALSE;
1898	DB_DNODE_EXIT(db);
1899
1900	ASSERT(dr->dr_txg == txg);
1901	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
1902	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
1903		/*
1904		 * We have already issued a sync write for this buffer,
1905		 * or this buffer has already been synced.  It could not
1906		 * have been dirtied since, or we would have cleared the state.
1907		 */
1908		mutex_exit(&db->db_mtx);
1909		return (SET_ERROR(EALREADY));
1910	}
1911
1912	ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
1913	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
1914	mutex_exit(&db->db_mtx);
1915
1916	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1917	dsa->dsa_dr = dr;
1918	dsa->dsa_done = done;
1919	dsa->dsa_zgd = zgd;
1920	dsa->dsa_tx = NULL;
1921
1922	zio_nowait(arc_write(pio, os->os_spa, txg,
1923	    bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db),
1924	    &zp, dmu_sync_ready, NULL, NULL, dmu_sync_done, dsa,
1925	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
1926
1927	return (0);
1928}
1929
1930int
1931dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1932    dmu_tx_t *tx)
1933{
1934	dnode_t *dn;
1935	int err;
1936
1937	err = dnode_hold(os, object, FTAG, &dn);
1938	if (err)
1939		return (err);
1940	err = dnode_set_blksz(dn, size, ibs, tx);
1941	dnode_rele(dn, FTAG);
1942	return (err);
1943}
1944
1945void
1946dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1947    dmu_tx_t *tx)
1948{
1949	dnode_t *dn;
1950
1951	/*
1952	 * Send streams include each object's checksum function.  This
1953	 * check ensures that the receiving system can understand the
1954	 * checksum function transmitted.
1955	 */
1956	ASSERT3U(checksum, <, ZIO_CHECKSUM_LEGACY_FUNCTIONS);
1957
1958	VERIFY0(dnode_hold(os, object, FTAG, &dn));
1959	ASSERT3U(checksum, <, ZIO_CHECKSUM_FUNCTIONS);
1960	dn->dn_checksum = checksum;
1961	dnode_setdirty(dn, tx);
1962	dnode_rele(dn, FTAG);
1963}
1964
1965void
1966dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1967    dmu_tx_t *tx)
1968{
1969	dnode_t *dn;
1970
1971	/*
1972	 * Send streams include each object's compression function.  This
1973	 * check ensures that the receiving system can understand the
1974	 * compression function transmitted.
1975	 */
1976	ASSERT3U(compress, <, ZIO_COMPRESS_LEGACY_FUNCTIONS);
1977
1978	VERIFY0(dnode_hold(os, object, FTAG, &dn));
1979	dn->dn_compress = compress;
1980	dnode_setdirty(dn, tx);
1981	dnode_rele(dn, FTAG);
1982}
1983
1984int zfs_mdcomp_disable = 0;
1985SYSCTL_INT(_vfs_zfs, OID_AUTO, mdcomp_disable, CTLFLAG_RWTUN,
1986    &zfs_mdcomp_disable, 0, "Disable metadata compression");
1987
1988/*
1989 * When the "redundant_metadata" property is set to "most", only indirect
1990 * blocks of this level and higher will have an additional ditto block.
1991 */
1992int zfs_redundant_metadata_most_ditto_level = 2;
1993
1994void
1995dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
1996{
1997	dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
1998	boolean_t ismd = (level > 0 || DMU_OT_IS_METADATA(type) ||
1999	    (wp & WP_SPILL));
2000	enum zio_checksum checksum = os->os_checksum;
2001	enum zio_compress compress = os->os_compress;
2002	enum zio_checksum dedup_checksum = os->os_dedup_checksum;
2003	boolean_t dedup = B_FALSE;
2004	boolean_t nopwrite = B_FALSE;
2005	boolean_t dedup_verify = os->os_dedup_verify;
2006	int copies = os->os_copies;
2007
2008	/*
2009	 * We maintain different write policies for each of the following
2010	 * types of data:
2011	 *	 1. metadata
2012	 *	 2. preallocated blocks (i.e. level-0 blocks of a dump device)
2013	 *	 3. all other level 0 blocks
2014	 */
2015	if (ismd) {
2016		if (zfs_mdcomp_disable) {
2017			compress = ZIO_COMPRESS_EMPTY;
2018		} else {
2019			/*
2020			 * XXX -- we should design a compression algorithm
2021			 * that specializes in arrays of bps.
2022			 */
2023			compress = zio_compress_select(os->os_spa,
2024			    ZIO_COMPRESS_ON, ZIO_COMPRESS_ON);
2025		}
2026
2027		/*
2028		 * Metadata always gets checksummed.  If the data
2029		 * checksum is multi-bit correctable, and it's not a
2030		 * ZBT-style checksum, then it's suitable for metadata
2031		 * as well.  Otherwise, the metadata checksum defaults
2032		 * to fletcher4.
2033		 */
2034		if (!(zio_checksum_table[checksum].ci_flags &
2035		    ZCHECKSUM_FLAG_METADATA) ||
2036		    (zio_checksum_table[checksum].ci_flags &
2037		    ZCHECKSUM_FLAG_EMBEDDED))
2038			checksum = ZIO_CHECKSUM_FLETCHER_4;
2039
2040		if (os->os_redundant_metadata == ZFS_REDUNDANT_METADATA_ALL ||
2041		    (os->os_redundant_metadata ==
2042		    ZFS_REDUNDANT_METADATA_MOST &&
2043		    (level >= zfs_redundant_metadata_most_ditto_level ||
2044		    DMU_OT_IS_METADATA(type) || (wp & WP_SPILL))))
2045			copies++;
2046	} else if (wp & WP_NOFILL) {
2047		ASSERT(level == 0);
2048
2049		/*
2050		 * If we're writing preallocated blocks, we aren't actually
2051		 * writing them so don't set any policy properties.  These
2052		 * blocks are currently only used by an external subsystem
2053		 * outside of zfs (i.e. dump) and not written by the zio
2054		 * pipeline.
2055		 */
2056		compress = ZIO_COMPRESS_OFF;
2057		checksum = ZIO_CHECKSUM_NOPARITY;
2058	} else {
2059		compress = zio_compress_select(os->os_spa, dn->dn_compress,
2060		    compress);
2061
2062		checksum = (dedup_checksum == ZIO_CHECKSUM_OFF) ?
2063		    zio_checksum_select(dn->dn_checksum, checksum) :
2064		    dedup_checksum;
2065
2066		/*
2067		 * Determine dedup setting.  If we are in dmu_sync(),
2068		 * we won't actually dedup now because that's all
2069		 * done in syncing context; but we do want to use the
2070		 * dedup checkum.  If the checksum is not strong
2071		 * enough to ensure unique signatures, force
2072		 * dedup_verify.
2073		 */
2074		if (dedup_checksum != ZIO_CHECKSUM_OFF) {
2075			dedup = (wp & WP_DMU_SYNC) ? B_FALSE : B_TRUE;
2076			if (!(zio_checksum_table[checksum].ci_flags &
2077			    ZCHECKSUM_FLAG_DEDUP))
2078				dedup_verify = B_TRUE;
2079		}
2080
2081		/*
2082		 * Enable nopwrite if we have secure enough checksum
2083		 * algorithm (see comment in zio_nop_write) and
2084		 * compression is enabled.  We don't enable nopwrite if
2085		 * dedup is enabled as the two features are mutually
2086		 * exclusive.
2087		 */
2088		nopwrite = (!dedup && (zio_checksum_table[checksum].ci_flags &
2089		    ZCHECKSUM_FLAG_NOPWRITE) &&
2090		    compress != ZIO_COMPRESS_OFF && zfs_nopwrite_enabled);
2091	}
2092
2093	zp->zp_checksum = checksum;
2094	zp->zp_compress = compress;
2095	zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
2096	zp->zp_level = level;
2097	zp->zp_copies = MIN(copies, spa_max_replication(os->os_spa));
2098	zp->zp_dedup = dedup;
2099	zp->zp_dedup_verify = dedup && dedup_verify;
2100	zp->zp_nopwrite = nopwrite;
2101}
2102
2103int
2104dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
2105{
2106	dnode_t *dn;
2107	int err;
2108
2109	/*
2110	 * Sync any current changes before
2111	 * we go trundling through the block pointers.
2112	 */
2113	err = dmu_object_wait_synced(os, object);
2114	if (err) {
2115		return (err);
2116	}
2117
2118	err = dnode_hold(os, object, FTAG, &dn);
2119	if (err) {
2120		return (err);
2121	}
2122
2123	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
2124	dnode_rele(dn, FTAG);
2125
2126	return (err);
2127}
2128
2129/*
2130 * Given the ZFS object, if it contains any dirty nodes
2131 * this function flushes all dirty blocks to disk. This
2132 * ensures the DMU object info is updated. A more efficient
2133 * future version might just find the TXG with the maximum
2134 * ID and wait for that to be synced.
2135 */
2136int
2137dmu_object_wait_synced(objset_t *os, uint64_t object)
2138{
2139	dnode_t *dn;
2140	int error, i;
2141
2142	error = dnode_hold(os, object, FTAG, &dn);
2143	if (error) {
2144		return (error);
2145	}
2146
2147	for (i = 0; i < TXG_SIZE; i++) {
2148		if (list_link_active(&dn->dn_dirty_link[i]) ||
2149		    !list_is_empty(&dn->dn_dirty_records[i])) {
2150			break;
2151		}
2152	}
2153	dnode_rele(dn, FTAG);
2154	if (i != TXG_SIZE) {
2155		txg_wait_synced(dmu_objset_pool(os), 0);
2156	}
2157
2158	return (0);
2159}
2160
2161void
2162dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
2163{
2164	dnode_phys_t *dnp;
2165
2166	rw_enter(&dn->dn_struct_rwlock, RW_READER);
2167	mutex_enter(&dn->dn_mtx);
2168
2169	dnp = dn->dn_phys;
2170
2171	doi->doi_data_block_size = dn->dn_datablksz;
2172	doi->doi_metadata_block_size = dn->dn_indblkshift ?
2173	    1ULL << dn->dn_indblkshift : 0;
2174	doi->doi_type = dn->dn_type;
2175	doi->doi_bonus_type = dn->dn_bonustype;
2176	doi->doi_bonus_size = dn->dn_bonuslen;
2177	doi->doi_indirection = dn->dn_nlevels;
2178	doi->doi_checksum = dn->dn_checksum;
2179	doi->doi_compress = dn->dn_compress;
2180	doi->doi_nblkptr = dn->dn_nblkptr;
2181	doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
2182	doi->doi_max_offset = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
2183	doi->doi_fill_count = 0;
2184	for (int i = 0; i < dnp->dn_nblkptr; i++)
2185		doi->doi_fill_count += BP_GET_FILL(&dnp->dn_blkptr[i]);
2186
2187	mutex_exit(&dn->dn_mtx);
2188	rw_exit(&dn->dn_struct_rwlock);
2189}
2190
2191/*
2192 * Get information on a DMU object.
2193 * If doi is NULL, just indicates whether the object exists.
2194 */
2195int
2196dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
2197{
2198	dnode_t *dn;
2199	int err = dnode_hold(os, object, FTAG, &dn);
2200
2201	if (err)
2202		return (err);
2203
2204	if (doi != NULL)
2205		dmu_object_info_from_dnode(dn, doi);
2206
2207	dnode_rele(dn, FTAG);
2208	return (0);
2209}
2210
2211/*
2212 * As above, but faster; can be used when you have a held dbuf in hand.
2213 */
2214void
2215dmu_object_info_from_db(dmu_buf_t *db_fake, dmu_object_info_t *doi)
2216{
2217	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2218
2219	DB_DNODE_ENTER(db);
2220	dmu_object_info_from_dnode(DB_DNODE(db), doi);
2221	DB_DNODE_EXIT(db);
2222}
2223
2224/*
2225 * Faster still when you only care about the size.
2226 * This is specifically optimized for zfs_getattr().
2227 */
2228void
2229dmu_object_size_from_db(dmu_buf_t *db_fake, uint32_t *blksize,
2230    u_longlong_t *nblk512)
2231{
2232	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2233	dnode_t *dn;
2234
2235	DB_DNODE_ENTER(db);
2236	dn = DB_DNODE(db);
2237
2238	*blksize = dn->dn_datablksz;
2239	/* add 1 for dnode space */
2240	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
2241	    SPA_MINBLOCKSHIFT) + 1;
2242	DB_DNODE_EXIT(db);
2243}
2244
2245void
2246byteswap_uint64_array(void *vbuf, size_t size)
2247{
2248	uint64_t *buf = vbuf;
2249	size_t count = size >> 3;
2250	int i;
2251
2252	ASSERT((size & 7) == 0);
2253
2254	for (i = 0; i < count; i++)
2255		buf[i] = BSWAP_64(buf[i]);
2256}
2257
2258void
2259byteswap_uint32_array(void *vbuf, size_t size)
2260{
2261	uint32_t *buf = vbuf;
2262	size_t count = size >> 2;
2263	int i;
2264
2265	ASSERT((size & 3) == 0);
2266
2267	for (i = 0; i < count; i++)
2268		buf[i] = BSWAP_32(buf[i]);
2269}
2270
2271void
2272byteswap_uint16_array(void *vbuf, size_t size)
2273{
2274	uint16_t *buf = vbuf;
2275	size_t count = size >> 1;
2276	int i;
2277
2278	ASSERT((size & 1) == 0);
2279
2280	for (i = 0; i < count; i++)
2281		buf[i] = BSWAP_16(buf[i]);
2282}
2283
2284/* ARGSUSED */
2285void
2286byteswap_uint8_array(void *vbuf, size_t size)
2287{
2288}
2289
2290void
2291dmu_init(void)
2292{
2293	zfs_dbgmsg_init();
2294	sa_cache_init();
2295	xuio_stat_init();
2296	dmu_objset_init();
2297	dnode_init();
2298	zfetch_init();
2299	zio_compress_init();
2300	l2arc_init();
2301	arc_init();
2302	dbuf_init();
2303}
2304
2305void
2306dmu_fini(void)
2307{
2308	arc_fini(); /* arc depends on l2arc, so arc must go first */
2309	l2arc_fini();
2310	zfetch_fini();
2311	zio_compress_fini();
2312	dbuf_fini();
2313	dnode_fini();
2314	dmu_objset_fini();
2315	xuio_stat_fini();
2316	sa_cache_fini();
2317	zfs_dbgmsg_fini();
2318}
2319