dmu.c revision 323748
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24 */
25/* Copyright (c) 2013 by Saso Kiselkov. All rights reserved. */
26/* Copyright (c) 2013, Joyent, Inc. All rights reserved. */
27/* Copyright (c) 2014, Nexenta Systems, Inc. All rights reserved. */
28
29#include <sys/dmu.h>
30#include <sys/dmu_impl.h>
31#include <sys/dmu_tx.h>
32#include <sys/dbuf.h>
33#include <sys/dnode.h>
34#include <sys/zfs_context.h>
35#include <sys/dmu_objset.h>
36#include <sys/dmu_traverse.h>
37#include <sys/dsl_dataset.h>
38#include <sys/dsl_dir.h>
39#include <sys/dsl_pool.h>
40#include <sys/dsl_synctask.h>
41#include <sys/dsl_prop.h>
42#include <sys/dmu_zfetch.h>
43#include <sys/zfs_ioctl.h>
44#include <sys/zap.h>
45#include <sys/zio_checksum.h>
46#include <sys/zio_compress.h>
47#include <sys/sa.h>
48#include <sys/zfeature.h>
49#include <sys/abd.h>
50#ifdef _KERNEL
51#include <sys/racct.h>
52#include <sys/vm.h>
53#include <sys/zfs_znode.h>
54#endif
55
56/*
57 * Enable/disable nopwrite feature.
58 */
59int zfs_nopwrite_enabled = 1;
60SYSCTL_DECL(_vfs_zfs);
61SYSCTL_INT(_vfs_zfs, OID_AUTO, nopwrite_enabled, CTLFLAG_RDTUN,
62    &zfs_nopwrite_enabled, 0, "Enable nopwrite feature");
63
64/*
65 * Tunable to control percentage of dirtied blocks from frees in one TXG.
66 * After this threshold is crossed, additional dirty blocks from frees
67 * wait until the next TXG.
68 * A value of zero will disable this throttle.
69 */
70uint32_t zfs_per_txg_dirty_frees_percent = 30;
71SYSCTL_INT(_vfs_zfs, OID_AUTO, per_txg_dirty_frees_percent, CTLFLAG_RWTUN,
72	&zfs_per_txg_dirty_frees_percent, 0, "Percentage of dirtied blocks from frees in one txg");
73
74const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
75	{	DMU_BSWAP_UINT8,	TRUE,	"unallocated"		},
76	{	DMU_BSWAP_ZAP,		TRUE,	"object directory"	},
77	{	DMU_BSWAP_UINT64,	TRUE,	"object array"		},
78	{	DMU_BSWAP_UINT8,	TRUE,	"packed nvlist"		},
79	{	DMU_BSWAP_UINT64,	TRUE,	"packed nvlist size"	},
80	{	DMU_BSWAP_UINT64,	TRUE,	"bpobj"			},
81	{	DMU_BSWAP_UINT64,	TRUE,	"bpobj header"		},
82	{	DMU_BSWAP_UINT64,	TRUE,	"SPA space map header"	},
83	{	DMU_BSWAP_UINT64,	TRUE,	"SPA space map"		},
84	{	DMU_BSWAP_UINT64,	TRUE,	"ZIL intent log"	},
85	{	DMU_BSWAP_DNODE,	TRUE,	"DMU dnode"		},
86	{	DMU_BSWAP_OBJSET,	TRUE,	"DMU objset"		},
87	{	DMU_BSWAP_UINT64,	TRUE,	"DSL directory"		},
88	{	DMU_BSWAP_ZAP,		TRUE,	"DSL directory child map"},
89	{	DMU_BSWAP_ZAP,		TRUE,	"DSL dataset snap map"	},
90	{	DMU_BSWAP_ZAP,		TRUE,	"DSL props"		},
91	{	DMU_BSWAP_UINT64,	TRUE,	"DSL dataset"		},
92	{	DMU_BSWAP_ZNODE,	TRUE,	"ZFS znode"		},
93	{	DMU_BSWAP_OLDACL,	TRUE,	"ZFS V0 ACL"		},
94	{	DMU_BSWAP_UINT8,	FALSE,	"ZFS plain file"	},
95	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS directory"		},
96	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS master node"	},
97	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS delete queue"	},
98	{	DMU_BSWAP_UINT8,	FALSE,	"zvol object"		},
99	{	DMU_BSWAP_ZAP,		TRUE,	"zvol prop"		},
100	{	DMU_BSWAP_UINT8,	FALSE,	"other uint8[]"		},
101	{	DMU_BSWAP_UINT64,	FALSE,	"other uint64[]"	},
102	{	DMU_BSWAP_ZAP,		TRUE,	"other ZAP"		},
103	{	DMU_BSWAP_ZAP,		TRUE,	"persistent error log"	},
104	{	DMU_BSWAP_UINT8,	TRUE,	"SPA history"		},
105	{	DMU_BSWAP_UINT64,	TRUE,	"SPA history offsets"	},
106	{	DMU_BSWAP_ZAP,		TRUE,	"Pool properties"	},
107	{	DMU_BSWAP_ZAP,		TRUE,	"DSL permissions"	},
108	{	DMU_BSWAP_ACL,		TRUE,	"ZFS ACL"		},
109	{	DMU_BSWAP_UINT8,	TRUE,	"ZFS SYSACL"		},
110	{	DMU_BSWAP_UINT8,	TRUE,	"FUID table"		},
111	{	DMU_BSWAP_UINT64,	TRUE,	"FUID table size"	},
112	{	DMU_BSWAP_ZAP,		TRUE,	"DSL dataset next clones"},
113	{	DMU_BSWAP_ZAP,		TRUE,	"scan work queue"	},
114	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS user/group used"	},
115	{	DMU_BSWAP_ZAP,		TRUE,	"ZFS user/group quota"	},
116	{	DMU_BSWAP_ZAP,		TRUE,	"snapshot refcount tags"},
117	{	DMU_BSWAP_ZAP,		TRUE,	"DDT ZAP algorithm"	},
118	{	DMU_BSWAP_ZAP,		TRUE,	"DDT statistics"	},
119	{	DMU_BSWAP_UINT8,	TRUE,	"System attributes"	},
120	{	DMU_BSWAP_ZAP,		TRUE,	"SA master node"	},
121	{	DMU_BSWAP_ZAP,		TRUE,	"SA attr registration"	},
122	{	DMU_BSWAP_ZAP,		TRUE,	"SA attr layouts"	},
123	{	DMU_BSWAP_ZAP,		TRUE,	"scan translations"	},
124	{	DMU_BSWAP_UINT8,	FALSE,	"deduplicated block"	},
125	{	DMU_BSWAP_ZAP,		TRUE,	"DSL deadlist map"	},
126	{	DMU_BSWAP_UINT64,	TRUE,	"DSL deadlist map hdr"	},
127	{	DMU_BSWAP_ZAP,		TRUE,	"DSL dir clones"	},
128	{	DMU_BSWAP_UINT64,	TRUE,	"bpobj subobj"		}
129};
130
131const dmu_object_byteswap_info_t dmu_ot_byteswap[DMU_BSWAP_NUMFUNCS] = {
132	{	byteswap_uint8_array,	"uint8"		},
133	{	byteswap_uint16_array,	"uint16"	},
134	{	byteswap_uint32_array,	"uint32"	},
135	{	byteswap_uint64_array,	"uint64"	},
136	{	zap_byteswap,		"zap"		},
137	{	dnode_buf_byteswap,	"dnode"		},
138	{	dmu_objset_byteswap,	"objset"	},
139	{	zfs_znode_byteswap,	"znode"		},
140	{	zfs_oldacl_byteswap,	"oldacl"	},
141	{	zfs_acl_byteswap,	"acl"		}
142};
143
144int
145dmu_buf_hold_noread_by_dnode(dnode_t *dn, uint64_t offset,
146    void *tag, dmu_buf_t **dbp)
147{
148	uint64_t blkid;
149	dmu_buf_impl_t *db;
150
151	blkid = dbuf_whichblock(dn, 0, offset);
152	rw_enter(&dn->dn_struct_rwlock, RW_READER);
153	db = dbuf_hold(dn, blkid, tag);
154	rw_exit(&dn->dn_struct_rwlock);
155
156	if (db == NULL) {
157		*dbp = NULL;
158		return (SET_ERROR(EIO));
159	}
160
161	*dbp = &db->db;
162	return (0);
163}
164int
165dmu_buf_hold_noread(objset_t *os, uint64_t object, uint64_t offset,
166    void *tag, dmu_buf_t **dbp)
167{
168	dnode_t *dn;
169	uint64_t blkid;
170	dmu_buf_impl_t *db;
171	int err;
172
173	err = dnode_hold(os, object, FTAG, &dn);
174	if (err)
175		return (err);
176	blkid = dbuf_whichblock(dn, 0, offset);
177	rw_enter(&dn->dn_struct_rwlock, RW_READER);
178	db = dbuf_hold(dn, blkid, tag);
179	rw_exit(&dn->dn_struct_rwlock);
180	dnode_rele(dn, FTAG);
181
182	if (db == NULL) {
183		*dbp = NULL;
184		return (SET_ERROR(EIO));
185	}
186
187	*dbp = &db->db;
188	return (err);
189}
190
191int
192dmu_buf_hold_by_dnode(dnode_t *dn, uint64_t offset,
193    void *tag, dmu_buf_t **dbp, int flags)
194{
195	int err;
196	int db_flags = DB_RF_CANFAIL;
197
198	if (flags & DMU_READ_NO_PREFETCH)
199		db_flags |= DB_RF_NOPREFETCH;
200
201	err = dmu_buf_hold_noread_by_dnode(dn, offset, tag, dbp);
202	if (err == 0) {
203		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
204		err = dbuf_read(db, NULL, db_flags);
205		if (err != 0) {
206			dbuf_rele(db, tag);
207			*dbp = NULL;
208		}
209	}
210
211	return (err);
212}
213
214int
215dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
216    void *tag, dmu_buf_t **dbp, int flags)
217{
218	int err;
219	int db_flags = DB_RF_CANFAIL;
220
221	if (flags & DMU_READ_NO_PREFETCH)
222		db_flags |= DB_RF_NOPREFETCH;
223
224	err = dmu_buf_hold_noread(os, object, offset, tag, dbp);
225	if (err == 0) {
226		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
227		err = dbuf_read(db, NULL, db_flags);
228		if (err != 0) {
229			dbuf_rele(db, tag);
230			*dbp = NULL;
231		}
232	}
233
234	return (err);
235}
236
237int
238dmu_bonus_max(void)
239{
240	return (DN_MAX_BONUSLEN);
241}
242
243int
244dmu_set_bonus(dmu_buf_t *db_fake, int newsize, dmu_tx_t *tx)
245{
246	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
247	dnode_t *dn;
248	int error;
249
250	DB_DNODE_ENTER(db);
251	dn = DB_DNODE(db);
252
253	if (dn->dn_bonus != db) {
254		error = SET_ERROR(EINVAL);
255	} else if (newsize < 0 || newsize > db_fake->db_size) {
256		error = SET_ERROR(EINVAL);
257	} else {
258		dnode_setbonuslen(dn, newsize, tx);
259		error = 0;
260	}
261
262	DB_DNODE_EXIT(db);
263	return (error);
264}
265
266int
267dmu_set_bonustype(dmu_buf_t *db_fake, dmu_object_type_t type, dmu_tx_t *tx)
268{
269	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
270	dnode_t *dn;
271	int error;
272
273	DB_DNODE_ENTER(db);
274	dn = DB_DNODE(db);
275
276	if (!DMU_OT_IS_VALID(type)) {
277		error = SET_ERROR(EINVAL);
278	} else if (dn->dn_bonus != db) {
279		error = SET_ERROR(EINVAL);
280	} else {
281		dnode_setbonus_type(dn, type, tx);
282		error = 0;
283	}
284
285	DB_DNODE_EXIT(db);
286	return (error);
287}
288
289dmu_object_type_t
290dmu_get_bonustype(dmu_buf_t *db_fake)
291{
292	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
293	dnode_t *dn;
294	dmu_object_type_t type;
295
296	DB_DNODE_ENTER(db);
297	dn = DB_DNODE(db);
298	type = dn->dn_bonustype;
299	DB_DNODE_EXIT(db);
300
301	return (type);
302}
303
304int
305dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
306{
307	dnode_t *dn;
308	int error;
309
310	error = dnode_hold(os, object, FTAG, &dn);
311	dbuf_rm_spill(dn, tx);
312	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
313	dnode_rm_spill(dn, tx);
314	rw_exit(&dn->dn_struct_rwlock);
315	dnode_rele(dn, FTAG);
316	return (error);
317}
318
319/*
320 * returns ENOENT, EIO, or 0.
321 */
322int
323dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
324{
325	dnode_t *dn;
326	dmu_buf_impl_t *db;
327	int error;
328
329	error = dnode_hold(os, object, FTAG, &dn);
330	if (error)
331		return (error);
332
333	rw_enter(&dn->dn_struct_rwlock, RW_READER);
334	if (dn->dn_bonus == NULL) {
335		rw_exit(&dn->dn_struct_rwlock);
336		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
337		if (dn->dn_bonus == NULL)
338			dbuf_create_bonus(dn);
339	}
340	db = dn->dn_bonus;
341
342	/* as long as the bonus buf is held, the dnode will be held */
343	if (refcount_add(&db->db_holds, tag) == 1) {
344		VERIFY(dnode_add_ref(dn, db));
345		atomic_inc_32(&dn->dn_dbufs_count);
346	}
347
348	/*
349	 * Wait to drop dn_struct_rwlock until after adding the bonus dbuf's
350	 * hold and incrementing the dbuf count to ensure that dnode_move() sees
351	 * a dnode hold for every dbuf.
352	 */
353	rw_exit(&dn->dn_struct_rwlock);
354
355	dnode_rele(dn, FTAG);
356
357	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH));
358
359	*dbp = &db->db;
360	return (0);
361}
362
363/*
364 * returns ENOENT, EIO, or 0.
365 *
366 * This interface will allocate a blank spill dbuf when a spill blk
367 * doesn't already exist on the dnode.
368 *
369 * if you only want to find an already existing spill db, then
370 * dmu_spill_hold_existing() should be used.
371 */
372int
373dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
374{
375	dmu_buf_impl_t *db = NULL;
376	int err;
377
378	if ((flags & DB_RF_HAVESTRUCT) == 0)
379		rw_enter(&dn->dn_struct_rwlock, RW_READER);
380
381	db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
382
383	if ((flags & DB_RF_HAVESTRUCT) == 0)
384		rw_exit(&dn->dn_struct_rwlock);
385
386	ASSERT(db != NULL);
387	err = dbuf_read(db, NULL, flags);
388	if (err == 0)
389		*dbp = &db->db;
390	else
391		dbuf_rele(db, tag);
392	return (err);
393}
394
395int
396dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
397{
398	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
399	dnode_t *dn;
400	int err;
401
402	DB_DNODE_ENTER(db);
403	dn = DB_DNODE(db);
404
405	if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA) {
406		err = SET_ERROR(EINVAL);
407	} else {
408		rw_enter(&dn->dn_struct_rwlock, RW_READER);
409
410		if (!dn->dn_have_spill) {
411			err = SET_ERROR(ENOENT);
412		} else {
413			err = dmu_spill_hold_by_dnode(dn,
414			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL, tag, dbp);
415		}
416
417		rw_exit(&dn->dn_struct_rwlock);
418	}
419
420	DB_DNODE_EXIT(db);
421	return (err);
422}
423
424int
425dmu_spill_hold_by_bonus(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
426{
427	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
428	dnode_t *dn;
429	int err;
430
431	DB_DNODE_ENTER(db);
432	dn = DB_DNODE(db);
433	err = dmu_spill_hold_by_dnode(dn, DB_RF_CANFAIL, tag, dbp);
434	DB_DNODE_EXIT(db);
435
436	return (err);
437}
438
439/*
440 * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
441 * to take a held dnode rather than <os, object> -- the lookup is wasteful,
442 * and can induce severe lock contention when writing to several files
443 * whose dnodes are in the same block.
444 */
445static int
446dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
447    boolean_t read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
448{
449	dmu_buf_t **dbp;
450	uint64_t blkid, nblks, i;
451	uint32_t dbuf_flags;
452	int err;
453	zio_t *zio;
454
455	ASSERT(length <= DMU_MAX_ACCESS);
456
457	/*
458	 * Note: We directly notify the prefetch code of this read, so that
459	 * we can tell it about the multi-block read.  dbuf_read() only knows
460	 * about the one block it is accessing.
461	 */
462	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT |
463	    DB_RF_NOPREFETCH;
464
465	rw_enter(&dn->dn_struct_rwlock, RW_READER);
466	if (dn->dn_datablkshift) {
467		int blkshift = dn->dn_datablkshift;
468		nblks = (P2ROUNDUP(offset + length, 1ULL << blkshift) -
469		    P2ALIGN(offset, 1ULL << blkshift)) >> blkshift;
470	} else {
471		if (offset + length > dn->dn_datablksz) {
472			zfs_panic_recover("zfs: accessing past end of object "
473			    "%llx/%llx (size=%u access=%llu+%llu)",
474			    (longlong_t)dn->dn_objset->
475			    os_dsl_dataset->ds_object,
476			    (longlong_t)dn->dn_object, dn->dn_datablksz,
477			    (longlong_t)offset, (longlong_t)length);
478			rw_exit(&dn->dn_struct_rwlock);
479			return (SET_ERROR(EIO));
480		}
481		nblks = 1;
482	}
483	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
484
485#if defined(_KERNEL) && defined(RACCT)
486	if (racct_enable && !read) {
487		PROC_LOCK(curproc);
488		racct_add_force(curproc, RACCT_WRITEBPS, length);
489		racct_add_force(curproc, RACCT_WRITEIOPS, nblks);
490		PROC_UNLOCK(curproc);
491	}
492#endif
493
494	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
495	blkid = dbuf_whichblock(dn, 0, offset);
496	for (i = 0; i < nblks; i++) {
497		dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
498		if (db == NULL) {
499			rw_exit(&dn->dn_struct_rwlock);
500			dmu_buf_rele_array(dbp, nblks, tag);
501			zio_nowait(zio);
502			return (SET_ERROR(EIO));
503		}
504
505		/* initiate async i/o */
506		if (read)
507			(void) dbuf_read(db, zio, dbuf_flags);
508#ifdef _KERNEL
509		else
510			curthread->td_ru.ru_oublock++;
511#endif
512		dbp[i] = &db->db;
513	}
514
515	if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
516	    DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
517		dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
518		    read && DNODE_IS_CACHEABLE(dn));
519	}
520	rw_exit(&dn->dn_struct_rwlock);
521
522	/* wait for async i/o */
523	err = zio_wait(zio);
524	if (err) {
525		dmu_buf_rele_array(dbp, nblks, tag);
526		return (err);
527	}
528
529	/* wait for other io to complete */
530	if (read) {
531		for (i = 0; i < nblks; i++) {
532			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
533			mutex_enter(&db->db_mtx);
534			while (db->db_state == DB_READ ||
535			    db->db_state == DB_FILL)
536				cv_wait(&db->db_changed, &db->db_mtx);
537			if (db->db_state == DB_UNCACHED)
538				err = SET_ERROR(EIO);
539			mutex_exit(&db->db_mtx);
540			if (err) {
541				dmu_buf_rele_array(dbp, nblks, tag);
542				return (err);
543			}
544		}
545	}
546
547	*numbufsp = nblks;
548	*dbpp = dbp;
549	return (0);
550}
551
552static int
553dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
554    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
555{
556	dnode_t *dn;
557	int err;
558
559	err = dnode_hold(os, object, FTAG, &dn);
560	if (err)
561		return (err);
562
563	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
564	    numbufsp, dbpp, DMU_READ_PREFETCH);
565
566	dnode_rele(dn, FTAG);
567
568	return (err);
569}
570
571int
572dmu_buf_hold_array_by_bonus(dmu_buf_t *db_fake, uint64_t offset,
573    uint64_t length, boolean_t read, void *tag, int *numbufsp,
574    dmu_buf_t ***dbpp)
575{
576	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
577	dnode_t *dn;
578	int err;
579
580	DB_DNODE_ENTER(db);
581	dn = DB_DNODE(db);
582	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
583	    numbufsp, dbpp, DMU_READ_PREFETCH);
584	DB_DNODE_EXIT(db);
585
586	return (err);
587}
588
589void
590dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
591{
592	int i;
593	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
594
595	if (numbufs == 0)
596		return;
597
598	for (i = 0; i < numbufs; i++) {
599		if (dbp[i])
600			dbuf_rele(dbp[i], tag);
601	}
602
603	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
604}
605
606/*
607 * Issue prefetch i/os for the given blocks.  If level is greater than 0, the
608 * indirect blocks prefeteched will be those that point to the blocks containing
609 * the data starting at offset, and continuing to offset + len.
610 *
611 * Note that if the indirect blocks above the blocks being prefetched are not in
612 * cache, they will be asychronously read in.
613 */
614void
615dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
616    uint64_t len, zio_priority_t pri)
617{
618	dnode_t *dn;
619	uint64_t blkid;
620	int nblks, err;
621
622	if (len == 0) {  /* they're interested in the bonus buffer */
623		dn = DMU_META_DNODE(os);
624
625		if (object == 0 || object >= DN_MAX_OBJECT)
626			return;
627
628		rw_enter(&dn->dn_struct_rwlock, RW_READER);
629		blkid = dbuf_whichblock(dn, level,
630		    object * sizeof (dnode_phys_t));
631		dbuf_prefetch(dn, level, blkid, pri, 0);
632		rw_exit(&dn->dn_struct_rwlock);
633		return;
634	}
635
636	/*
637	 * XXX - Note, if the dnode for the requested object is not
638	 * already cached, we will do a *synchronous* read in the
639	 * dnode_hold() call.  The same is true for any indirects.
640	 */
641	err = dnode_hold(os, object, FTAG, &dn);
642	if (err != 0)
643		return;
644
645	rw_enter(&dn->dn_struct_rwlock, RW_READER);
646	/*
647	 * offset + len - 1 is the last byte we want to prefetch for, and offset
648	 * is the first.  Then dbuf_whichblk(dn, level, off + len - 1) is the
649	 * last block we want to prefetch, and dbuf_whichblock(dn, level,
650	 * offset)  is the first.  Then the number we need to prefetch is the
651	 * last - first + 1.
652	 */
653	if (level > 0 || dn->dn_datablkshift != 0) {
654		nblks = dbuf_whichblock(dn, level, offset + len - 1) -
655		    dbuf_whichblock(dn, level, offset) + 1;
656	} else {
657		nblks = (offset < dn->dn_datablksz);
658	}
659
660	if (nblks != 0) {
661		blkid = dbuf_whichblock(dn, level, offset);
662		for (int i = 0; i < nblks; i++)
663			dbuf_prefetch(dn, level, blkid + i, pri, 0);
664	}
665
666	rw_exit(&dn->dn_struct_rwlock);
667
668	dnode_rele(dn, FTAG);
669}
670
671/*
672 * Get the next "chunk" of file data to free.  We traverse the file from
673 * the end so that the file gets shorter over time (if we crashes in the
674 * middle, this will leave us in a better state).  We find allocated file
675 * data by simply searching the allocated level 1 indirects.
676 *
677 * On input, *start should be the first offset that does not need to be
678 * freed (e.g. "offset + length").  On return, *start will be the first
679 * offset that should be freed.
680 */
681static int
682get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t minimum)
683{
684	uint64_t maxblks = DMU_MAX_ACCESS >> (dn->dn_indblkshift + 1);
685	/* bytes of data covered by a level-1 indirect block */
686	uint64_t iblkrange =
687	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
688
689	ASSERT3U(minimum, <=, *start);
690
691	if (*start - minimum <= iblkrange * maxblks) {
692		*start = minimum;
693		return (0);
694	}
695	ASSERT(ISP2(iblkrange));
696
697	for (uint64_t blks = 0; *start > minimum && blks < maxblks; blks++) {
698		int err;
699
700		/*
701		 * dnode_next_offset(BACKWARDS) will find an allocated L1
702		 * indirect block at or before the input offset.  We must
703		 * decrement *start so that it is at the end of the region
704		 * to search.
705		 */
706		(*start)--;
707		err = dnode_next_offset(dn,
708		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
709
710		/* if there are no indirect blocks before start, we are done */
711		if (err == ESRCH) {
712			*start = minimum;
713			break;
714		} else if (err != 0) {
715			return (err);
716		}
717
718		/* set start to the beginning of this L1 indirect */
719		*start = P2ALIGN(*start, iblkrange);
720	}
721	if (*start < minimum)
722		*start = minimum;
723	return (0);
724}
725
726static int
727dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
728    uint64_t length)
729{
730	uint64_t object_size = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
731	int err;
732	uint64_t dirty_frees_threshold;
733	dsl_pool_t *dp = dmu_objset_pool(os);
734
735	if (offset >= object_size)
736		return (0);
737
738	if (zfs_per_txg_dirty_frees_percent <= 100)
739		dirty_frees_threshold =
740		    zfs_per_txg_dirty_frees_percent * zfs_dirty_data_max / 100;
741	else
742		dirty_frees_threshold = zfs_dirty_data_max / 4;
743
744	if (length == DMU_OBJECT_END || offset + length > object_size)
745		length = object_size - offset;
746
747	while (length != 0) {
748		uint64_t chunk_end, chunk_begin, chunk_len;
749		uint64_t long_free_dirty_all_txgs = 0;
750		dmu_tx_t *tx;
751
752		chunk_end = chunk_begin = offset + length;
753
754		/* move chunk_begin backwards to the beginning of this chunk */
755		err = get_next_chunk(dn, &chunk_begin, offset);
756		if (err)
757			return (err);
758		ASSERT3U(chunk_begin, >=, offset);
759		ASSERT3U(chunk_begin, <=, chunk_end);
760
761		chunk_len = chunk_end - chunk_begin;
762
763		mutex_enter(&dp->dp_lock);
764		for (int t = 0; t < TXG_SIZE; t++) {
765			long_free_dirty_all_txgs +=
766			    dp->dp_long_free_dirty_pertxg[t];
767		}
768		mutex_exit(&dp->dp_lock);
769
770		/*
771		 * To avoid filling up a TXG with just frees wait for
772		 * the next TXG to open before freeing more chunks if
773		 * we have reached the threshold of frees
774		 */
775		if (dirty_frees_threshold != 0 &&
776		    long_free_dirty_all_txgs >= dirty_frees_threshold) {
777			txg_wait_open(dp, 0);
778			continue;
779		}
780
781		tx = dmu_tx_create(os);
782		dmu_tx_hold_free(tx, dn->dn_object, chunk_begin, chunk_len);
783
784		/*
785		 * Mark this transaction as typically resulting in a net
786		 * reduction in space used.
787		 */
788		dmu_tx_mark_netfree(tx);
789		err = dmu_tx_assign(tx, TXG_WAIT);
790		if (err) {
791			dmu_tx_abort(tx);
792			return (err);
793		}
794
795		mutex_enter(&dp->dp_lock);
796		dp->dp_long_free_dirty_pertxg[dmu_tx_get_txg(tx) & TXG_MASK] +=
797		    chunk_len;
798		mutex_exit(&dp->dp_lock);
799		DTRACE_PROBE3(free__long__range,
800		    uint64_t, long_free_dirty_all_txgs, uint64_t, chunk_len,
801		    uint64_t, dmu_tx_get_txg(tx));
802		dnode_free_range(dn, chunk_begin, chunk_len, tx);
803		dmu_tx_commit(tx);
804
805		length -= chunk_len;
806	}
807	return (0);
808}
809
810int
811dmu_free_long_range(objset_t *os, uint64_t object,
812    uint64_t offset, uint64_t length)
813{
814	dnode_t *dn;
815	int err;
816
817	err = dnode_hold(os, object, FTAG, &dn);
818	if (err != 0)
819		return (err);
820	err = dmu_free_long_range_impl(os, dn, offset, length);
821
822	/*
823	 * It is important to zero out the maxblkid when freeing the entire
824	 * file, so that (a) subsequent calls to dmu_free_long_range_impl()
825	 * will take the fast path, and (b) dnode_reallocate() can verify
826	 * that the entire file has been freed.
827	 */
828	if (err == 0 && offset == 0 && length == DMU_OBJECT_END)
829		dn->dn_maxblkid = 0;
830
831	dnode_rele(dn, FTAG);
832	return (err);
833}
834
835int
836dmu_free_long_object(objset_t *os, uint64_t object)
837{
838	dmu_tx_t *tx;
839	int err;
840
841	err = dmu_free_long_range(os, object, 0, DMU_OBJECT_END);
842	if (err != 0)
843		return (err);
844
845	tx = dmu_tx_create(os);
846	dmu_tx_hold_bonus(tx, object);
847	dmu_tx_hold_free(tx, object, 0, DMU_OBJECT_END);
848	dmu_tx_mark_netfree(tx);
849	err = dmu_tx_assign(tx, TXG_WAIT);
850	if (err == 0) {
851		err = dmu_object_free(os, object, tx);
852		dmu_tx_commit(tx);
853	} else {
854		dmu_tx_abort(tx);
855	}
856
857	return (err);
858}
859
860int
861dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
862    uint64_t size, dmu_tx_t *tx)
863{
864	dnode_t *dn;
865	int err = dnode_hold(os, object, FTAG, &dn);
866	if (err)
867		return (err);
868	ASSERT(offset < UINT64_MAX);
869	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
870	dnode_free_range(dn, offset, size, tx);
871	dnode_rele(dn, FTAG);
872	return (0);
873}
874
875static int
876dmu_read_impl(dnode_t *dn, uint64_t offset, uint64_t size,
877    void *buf, uint32_t flags)
878{
879	dmu_buf_t **dbp;
880	int numbufs, err = 0;
881
882	/*
883	 * Deal with odd block sizes, where there can't be data past the first
884	 * block.  If we ever do the tail block optimization, we will need to
885	 * handle that here as well.
886	 */
887	if (dn->dn_maxblkid == 0) {
888		int newsz = offset > dn->dn_datablksz ? 0 :
889		    MIN(size, dn->dn_datablksz - offset);
890		bzero((char *)buf + newsz, size - newsz);
891		size = newsz;
892	}
893
894	while (size > 0) {
895		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
896		int i;
897
898		/*
899		 * NB: we could do this block-at-a-time, but it's nice
900		 * to be reading in parallel.
901		 */
902		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
903		    TRUE, FTAG, &numbufs, &dbp, flags);
904		if (err)
905			break;
906
907		for (i = 0; i < numbufs; i++) {
908			int tocpy;
909			int bufoff;
910			dmu_buf_t *db = dbp[i];
911
912			ASSERT(size > 0);
913
914			bufoff = offset - db->db_offset;
915			tocpy = (int)MIN(db->db_size - bufoff, size);
916
917			bcopy((char *)db->db_data + bufoff, buf, tocpy);
918
919			offset += tocpy;
920			size -= tocpy;
921			buf = (char *)buf + tocpy;
922		}
923		dmu_buf_rele_array(dbp, numbufs, FTAG);
924	}
925	return (err);
926}
927
928int
929dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
930    void *buf, uint32_t flags)
931{
932	dnode_t *dn;
933	int err;
934
935	err = dnode_hold(os, object, FTAG, &dn);
936	if (err != 0)
937		return (err);
938
939	err = dmu_read_impl(dn, offset, size, buf, flags);
940	dnode_rele(dn, FTAG);
941	return (err);
942}
943
944int
945dmu_read_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size, void *buf,
946    uint32_t flags)
947{
948	return (dmu_read_impl(dn, offset, size, buf, flags));
949}
950
951static void
952dmu_write_impl(dmu_buf_t **dbp, int numbufs, uint64_t offset, uint64_t size,
953    const void *buf, dmu_tx_t *tx)
954{
955	int i;
956
957	for (i = 0; i < numbufs; i++) {
958		int tocpy;
959		int bufoff;
960		dmu_buf_t *db = dbp[i];
961
962		ASSERT(size > 0);
963
964		bufoff = offset - db->db_offset;
965		tocpy = (int)MIN(db->db_size - bufoff, size);
966
967		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
968
969		if (tocpy == db->db_size)
970			dmu_buf_will_fill(db, tx);
971		else
972			dmu_buf_will_dirty(db, tx);
973
974		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
975
976		if (tocpy == db->db_size)
977			dmu_buf_fill_done(db, tx);
978
979		offset += tocpy;
980		size -= tocpy;
981		buf = (char *)buf + tocpy;
982	}
983}
984
985void
986dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
987    const void *buf, dmu_tx_t *tx)
988{
989	dmu_buf_t **dbp;
990	int numbufs;
991
992	if (size == 0)
993		return;
994
995	VERIFY0(dmu_buf_hold_array(os, object, offset, size,
996	    FALSE, FTAG, &numbufs, &dbp));
997	dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
998	dmu_buf_rele_array(dbp, numbufs, FTAG);
999}
1000
1001void
1002dmu_write_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size,
1003    const void *buf, dmu_tx_t *tx)
1004{
1005	dmu_buf_t **dbp;
1006	int numbufs;
1007
1008	if (size == 0)
1009		return;
1010
1011	VERIFY0(dmu_buf_hold_array_by_dnode(dn, offset, size,
1012	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH));
1013	dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1014	dmu_buf_rele_array(dbp, numbufs, FTAG);
1015}
1016
1017void
1018dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1019    dmu_tx_t *tx)
1020{
1021	dmu_buf_t **dbp;
1022	int numbufs, i;
1023
1024	if (size == 0)
1025		return;
1026
1027	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
1028	    FALSE, FTAG, &numbufs, &dbp));
1029
1030	for (i = 0; i < numbufs; i++) {
1031		dmu_buf_t *db = dbp[i];
1032
1033		dmu_buf_will_not_fill(db, tx);
1034	}
1035	dmu_buf_rele_array(dbp, numbufs, FTAG);
1036}
1037
1038void
1039dmu_write_embedded(objset_t *os, uint64_t object, uint64_t offset,
1040    void *data, uint8_t etype, uint8_t comp, int uncompressed_size,
1041    int compressed_size, int byteorder, dmu_tx_t *tx)
1042{
1043	dmu_buf_t *db;
1044
1045	ASSERT3U(etype, <, NUM_BP_EMBEDDED_TYPES);
1046	ASSERT3U(comp, <, ZIO_COMPRESS_FUNCTIONS);
1047	VERIFY0(dmu_buf_hold_noread(os, object, offset,
1048	    FTAG, &db));
1049
1050	dmu_buf_write_embedded(db,
1051	    data, (bp_embedded_type_t)etype, (enum zio_compress)comp,
1052	    uncompressed_size, compressed_size, byteorder, tx);
1053
1054	dmu_buf_rele(db, FTAG);
1055}
1056
1057/*
1058 * DMU support for xuio
1059 */
1060kstat_t *xuio_ksp = NULL;
1061
1062int
1063dmu_xuio_init(xuio_t *xuio, int nblk)
1064{
1065	dmu_xuio_t *priv;
1066	uio_t *uio = &xuio->xu_uio;
1067
1068	uio->uio_iovcnt = nblk;
1069	uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
1070
1071	priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
1072	priv->cnt = nblk;
1073	priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
1074	priv->iovp = uio->uio_iov;
1075	XUIO_XUZC_PRIV(xuio) = priv;
1076
1077	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1078		XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
1079	else
1080		XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
1081
1082	return (0);
1083}
1084
1085void
1086dmu_xuio_fini(xuio_t *xuio)
1087{
1088	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1089	int nblk = priv->cnt;
1090
1091	kmem_free(priv->iovp, nblk * sizeof (iovec_t));
1092	kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
1093	kmem_free(priv, sizeof (dmu_xuio_t));
1094
1095	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1096		XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
1097	else
1098		XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
1099}
1100
1101/*
1102 * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
1103 * and increase priv->next by 1.
1104 */
1105int
1106dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
1107{
1108	struct iovec *iov;
1109	uio_t *uio = &xuio->xu_uio;
1110	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1111	int i = priv->next++;
1112
1113	ASSERT(i < priv->cnt);
1114	ASSERT(off + n <= arc_buf_lsize(abuf));
1115	iov = uio->uio_iov + i;
1116	iov->iov_base = (char *)abuf->b_data + off;
1117	iov->iov_len = n;
1118	priv->bufs[i] = abuf;
1119	return (0);
1120}
1121
1122int
1123dmu_xuio_cnt(xuio_t *xuio)
1124{
1125	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1126	return (priv->cnt);
1127}
1128
1129arc_buf_t *
1130dmu_xuio_arcbuf(xuio_t *xuio, int i)
1131{
1132	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1133
1134	ASSERT(i < priv->cnt);
1135	return (priv->bufs[i]);
1136}
1137
1138void
1139dmu_xuio_clear(xuio_t *xuio, int i)
1140{
1141	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1142
1143	ASSERT(i < priv->cnt);
1144	priv->bufs[i] = NULL;
1145}
1146
1147static void
1148xuio_stat_init(void)
1149{
1150	xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
1151	    KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
1152	    KSTAT_FLAG_VIRTUAL);
1153	if (xuio_ksp != NULL) {
1154		xuio_ksp->ks_data = &xuio_stats;
1155		kstat_install(xuio_ksp);
1156	}
1157}
1158
1159static void
1160xuio_stat_fini(void)
1161{
1162	if (xuio_ksp != NULL) {
1163		kstat_delete(xuio_ksp);
1164		xuio_ksp = NULL;
1165	}
1166}
1167
1168void
1169xuio_stat_wbuf_copied(void)
1170{
1171	XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1172}
1173
1174void
1175xuio_stat_wbuf_nocopy(void)
1176{
1177	XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
1178}
1179
1180#ifdef _KERNEL
1181static int
1182dmu_read_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size)
1183{
1184	dmu_buf_t **dbp;
1185	int numbufs, i, err;
1186	xuio_t *xuio = NULL;
1187
1188	/*
1189	 * NB: we could do this block-at-a-time, but it's nice
1190	 * to be reading in parallel.
1191	 */
1192	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1193	    TRUE, FTAG, &numbufs, &dbp, 0);
1194	if (err)
1195		return (err);
1196
1197#ifdef UIO_XUIO
1198	if (uio->uio_extflg == UIO_XUIO)
1199		xuio = (xuio_t *)uio;
1200#endif
1201
1202	for (i = 0; i < numbufs; i++) {
1203		int tocpy;
1204		int bufoff;
1205		dmu_buf_t *db = dbp[i];
1206
1207		ASSERT(size > 0);
1208
1209		bufoff = uio->uio_loffset - db->db_offset;
1210		tocpy = (int)MIN(db->db_size - bufoff, size);
1211
1212		if (xuio) {
1213			dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
1214			arc_buf_t *dbuf_abuf = dbi->db_buf;
1215			arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
1216			err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
1217			if (!err) {
1218				uio->uio_resid -= tocpy;
1219				uio->uio_loffset += tocpy;
1220			}
1221
1222			if (abuf == dbuf_abuf)
1223				XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
1224			else
1225				XUIOSTAT_BUMP(xuiostat_rbuf_copied);
1226		} else {
1227#ifdef illumos
1228			err = uiomove((char *)db->db_data + bufoff, tocpy,
1229			    UIO_READ, uio);
1230#else
1231			err = vn_io_fault_uiomove((char *)db->db_data + bufoff,
1232			    tocpy, uio);
1233#endif
1234		}
1235		if (err)
1236			break;
1237
1238		size -= tocpy;
1239	}
1240	dmu_buf_rele_array(dbp, numbufs, FTAG);
1241
1242	return (err);
1243}
1244
1245/*
1246 * Read 'size' bytes into the uio buffer.
1247 * From object zdb->db_object.
1248 * Starting at offset uio->uio_loffset.
1249 *
1250 * If the caller already has a dbuf in the target object
1251 * (e.g. its bonus buffer), this routine is faster than dmu_read_uio(),
1252 * because we don't have to find the dnode_t for the object.
1253 */
1254int
1255dmu_read_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size)
1256{
1257	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1258	dnode_t *dn;
1259	int err;
1260
1261	if (size == 0)
1262		return (0);
1263
1264	DB_DNODE_ENTER(db);
1265	dn = DB_DNODE(db);
1266	err = dmu_read_uio_dnode(dn, uio, size);
1267	DB_DNODE_EXIT(db);
1268
1269	return (err);
1270}
1271
1272/*
1273 * Read 'size' bytes into the uio buffer.
1274 * From the specified object
1275 * Starting at offset uio->uio_loffset.
1276 */
1277int
1278dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
1279{
1280	dnode_t *dn;
1281	int err;
1282
1283	if (size == 0)
1284		return (0);
1285
1286	err = dnode_hold(os, object, FTAG, &dn);
1287	if (err)
1288		return (err);
1289
1290	err = dmu_read_uio_dnode(dn, uio, size);
1291
1292	dnode_rele(dn, FTAG);
1293
1294	return (err);
1295}
1296
1297static int
1298dmu_write_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size, dmu_tx_t *tx)
1299{
1300	dmu_buf_t **dbp;
1301	int numbufs;
1302	int err = 0;
1303	int i;
1304
1305	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1306	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH);
1307	if (err)
1308		return (err);
1309
1310	for (i = 0; i < numbufs; i++) {
1311		int tocpy;
1312		int bufoff;
1313		dmu_buf_t *db = dbp[i];
1314
1315		ASSERT(size > 0);
1316
1317		bufoff = uio->uio_loffset - db->db_offset;
1318		tocpy = (int)MIN(db->db_size - bufoff, size);
1319
1320		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1321
1322		if (tocpy == db->db_size)
1323			dmu_buf_will_fill(db, tx);
1324		else
1325			dmu_buf_will_dirty(db, tx);
1326
1327#ifdef illumos
1328		/*
1329		 * XXX uiomove could block forever (eg. nfs-backed
1330		 * pages).  There needs to be a uiolockdown() function
1331		 * to lock the pages in memory, so that uiomove won't
1332		 * block.
1333		 */
1334		err = uiomove((char *)db->db_data + bufoff, tocpy,
1335		    UIO_WRITE, uio);
1336#else
1337		err = vn_io_fault_uiomove((char *)db->db_data + bufoff, tocpy,
1338		    uio);
1339#endif
1340
1341		if (tocpy == db->db_size)
1342			dmu_buf_fill_done(db, tx);
1343
1344		if (err)
1345			break;
1346
1347		size -= tocpy;
1348	}
1349
1350	dmu_buf_rele_array(dbp, numbufs, FTAG);
1351	return (err);
1352}
1353
1354/*
1355 * Write 'size' bytes from the uio buffer.
1356 * To object zdb->db_object.
1357 * Starting at offset uio->uio_loffset.
1358 *
1359 * If the caller already has a dbuf in the target object
1360 * (e.g. its bonus buffer), this routine is faster than dmu_write_uio(),
1361 * because we don't have to find the dnode_t for the object.
1362 */
1363int
1364dmu_write_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size,
1365    dmu_tx_t *tx)
1366{
1367	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1368	dnode_t *dn;
1369	int err;
1370
1371	if (size == 0)
1372		return (0);
1373
1374	DB_DNODE_ENTER(db);
1375	dn = DB_DNODE(db);
1376	err = dmu_write_uio_dnode(dn, uio, size, tx);
1377	DB_DNODE_EXIT(db);
1378
1379	return (err);
1380}
1381
1382/*
1383 * Write 'size' bytes from the uio buffer.
1384 * To the specified object.
1385 * Starting at offset uio->uio_loffset.
1386 */
1387int
1388dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
1389    dmu_tx_t *tx)
1390{
1391	dnode_t *dn;
1392	int err;
1393
1394	if (size == 0)
1395		return (0);
1396
1397	err = dnode_hold(os, object, FTAG, &dn);
1398	if (err)
1399		return (err);
1400
1401	err = dmu_write_uio_dnode(dn, uio, size, tx);
1402
1403	dnode_rele(dn, FTAG);
1404
1405	return (err);
1406}
1407
1408#ifdef illumos
1409int
1410dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1411    page_t *pp, dmu_tx_t *tx)
1412{
1413	dmu_buf_t **dbp;
1414	int numbufs, i;
1415	int err;
1416
1417	if (size == 0)
1418		return (0);
1419
1420	err = dmu_buf_hold_array(os, object, offset, size,
1421	    FALSE, FTAG, &numbufs, &dbp);
1422	if (err)
1423		return (err);
1424
1425	for (i = 0; i < numbufs; i++) {
1426		int tocpy, copied, thiscpy;
1427		int bufoff;
1428		dmu_buf_t *db = dbp[i];
1429		caddr_t va;
1430
1431		ASSERT(size > 0);
1432		ASSERT3U(db->db_size, >=, PAGESIZE);
1433
1434		bufoff = offset - db->db_offset;
1435		tocpy = (int)MIN(db->db_size - bufoff, size);
1436
1437		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1438
1439		if (tocpy == db->db_size)
1440			dmu_buf_will_fill(db, tx);
1441		else
1442			dmu_buf_will_dirty(db, tx);
1443
1444		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1445			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1446			thiscpy = MIN(PAGESIZE, tocpy - copied);
1447			va = zfs_map_page(pp, S_READ);
1448			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1449			zfs_unmap_page(pp, va);
1450			pp = pp->p_next;
1451			bufoff += PAGESIZE;
1452		}
1453
1454		if (tocpy == db->db_size)
1455			dmu_buf_fill_done(db, tx);
1456
1457		offset += tocpy;
1458		size -= tocpy;
1459	}
1460	dmu_buf_rele_array(dbp, numbufs, FTAG);
1461	return (err);
1462}
1463
1464#else	/* !illumos */
1465
1466int
1467dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1468    vm_page_t *ma, dmu_tx_t *tx)
1469{
1470	dmu_buf_t **dbp;
1471	struct sf_buf *sf;
1472	int numbufs, i;
1473	int err;
1474
1475	if (size == 0)
1476		return (0);
1477
1478	err = dmu_buf_hold_array(os, object, offset, size,
1479	    FALSE, FTAG, &numbufs, &dbp);
1480	if (err)
1481		return (err);
1482
1483	for (i = 0; i < numbufs; i++) {
1484		int tocpy, copied, thiscpy;
1485		int bufoff;
1486		dmu_buf_t *db = dbp[i];
1487		caddr_t va;
1488
1489		ASSERT(size > 0);
1490		ASSERT3U(db->db_size, >=, PAGESIZE);
1491
1492		bufoff = offset - db->db_offset;
1493		tocpy = (int)MIN(db->db_size - bufoff, size);
1494
1495		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1496
1497		if (tocpy == db->db_size)
1498			dmu_buf_will_fill(db, tx);
1499		else
1500			dmu_buf_will_dirty(db, tx);
1501
1502		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1503			ASSERT3U(ptoa((*ma)->pindex), ==, db->db_offset + bufoff);
1504			thiscpy = MIN(PAGESIZE, tocpy - copied);
1505			va = zfs_map_page(*ma, &sf);
1506			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1507			zfs_unmap_page(sf);
1508			ma += 1;
1509			bufoff += PAGESIZE;
1510		}
1511
1512		if (tocpy == db->db_size)
1513			dmu_buf_fill_done(db, tx);
1514
1515		offset += tocpy;
1516		size -= tocpy;
1517	}
1518	dmu_buf_rele_array(dbp, numbufs, FTAG);
1519	return (err);
1520}
1521#endif	/* illumos */
1522#endif	/* _KERNEL */
1523
1524/*
1525 * Allocate a loaned anonymous arc buffer.
1526 */
1527arc_buf_t *
1528dmu_request_arcbuf(dmu_buf_t *handle, int size)
1529{
1530	dmu_buf_impl_t *db = (dmu_buf_impl_t *)handle;
1531
1532	return (arc_loan_buf(db->db_objset->os_spa, B_FALSE, size));
1533}
1534
1535/*
1536 * Free a loaned arc buffer.
1537 */
1538void
1539dmu_return_arcbuf(arc_buf_t *buf)
1540{
1541	arc_return_buf(buf, FTAG);
1542	arc_buf_destroy(buf, FTAG);
1543}
1544
1545/*
1546 * When possible directly assign passed loaned arc buffer to a dbuf.
1547 * If this is not possible copy the contents of passed arc buf via
1548 * dmu_write().
1549 */
1550void
1551dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1552    dmu_tx_t *tx)
1553{
1554	dmu_buf_impl_t *dbuf = (dmu_buf_impl_t *)handle;
1555	dnode_t *dn;
1556	dmu_buf_impl_t *db;
1557	uint32_t blksz = (uint32_t)arc_buf_lsize(buf);
1558	uint64_t blkid;
1559
1560	DB_DNODE_ENTER(dbuf);
1561	dn = DB_DNODE(dbuf);
1562	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1563	blkid = dbuf_whichblock(dn, 0, offset);
1564	VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
1565	rw_exit(&dn->dn_struct_rwlock);
1566	DB_DNODE_EXIT(dbuf);
1567
1568	/*
1569	 * We can only assign if the offset is aligned, the arc buf is the
1570	 * same size as the dbuf, and the dbuf is not metadata.
1571	 */
1572	if (offset == db->db.db_offset && blksz == db->db.db_size) {
1573#ifdef _KERNEL
1574		curthread->td_ru.ru_oublock++;
1575#ifdef RACCT
1576		if (racct_enable) {
1577			PROC_LOCK(curproc);
1578			racct_add_force(curproc, RACCT_WRITEBPS, blksz);
1579			racct_add_force(curproc, RACCT_WRITEIOPS, 1);
1580			PROC_UNLOCK(curproc);
1581		}
1582#endif /* RACCT */
1583#endif /* _KERNEL */
1584		dbuf_assign_arcbuf(db, buf, tx);
1585		dbuf_rele(db, FTAG);
1586	} else {
1587		objset_t *os;
1588		uint64_t object;
1589
1590		/* compressed bufs must always be assignable to their dbuf */
1591		ASSERT3U(arc_get_compression(buf), ==, ZIO_COMPRESS_OFF);
1592		ASSERT(!(buf->b_flags & ARC_BUF_FLAG_COMPRESSED));
1593
1594		DB_DNODE_ENTER(dbuf);
1595		dn = DB_DNODE(dbuf);
1596		os = dn->dn_objset;
1597		object = dn->dn_object;
1598		DB_DNODE_EXIT(dbuf);
1599
1600		dbuf_rele(db, FTAG);
1601		dmu_write(os, object, offset, blksz, buf->b_data, tx);
1602		dmu_return_arcbuf(buf);
1603		XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1604	}
1605}
1606
1607typedef struct {
1608	dbuf_dirty_record_t	*dsa_dr;
1609	dmu_sync_cb_t		*dsa_done;
1610	zgd_t			*dsa_zgd;
1611	dmu_tx_t		*dsa_tx;
1612} dmu_sync_arg_t;
1613
1614/* ARGSUSED */
1615static void
1616dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1617{
1618	dmu_sync_arg_t *dsa = varg;
1619	dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1620	blkptr_t *bp = zio->io_bp;
1621
1622	if (zio->io_error == 0) {
1623		if (BP_IS_HOLE(bp)) {
1624			/*
1625			 * A block of zeros may compress to a hole, but the
1626			 * block size still needs to be known for replay.
1627			 */
1628			BP_SET_LSIZE(bp, db->db_size);
1629		} else if (!BP_IS_EMBEDDED(bp)) {
1630			ASSERT(BP_GET_LEVEL(bp) == 0);
1631			bp->blk_fill = 1;
1632		}
1633	}
1634}
1635
1636static void
1637dmu_sync_late_arrival_ready(zio_t *zio)
1638{
1639	dmu_sync_ready(zio, NULL, zio->io_private);
1640}
1641
1642/* ARGSUSED */
1643static void
1644dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1645{
1646	dmu_sync_arg_t *dsa = varg;
1647	dbuf_dirty_record_t *dr = dsa->dsa_dr;
1648	dmu_buf_impl_t *db = dr->dr_dbuf;
1649
1650	mutex_enter(&db->db_mtx);
1651	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
1652	if (zio->io_error == 0) {
1653		dr->dt.dl.dr_nopwrite = !!(zio->io_flags & ZIO_FLAG_NOPWRITE);
1654		if (dr->dt.dl.dr_nopwrite) {
1655			blkptr_t *bp = zio->io_bp;
1656			blkptr_t *bp_orig = &zio->io_bp_orig;
1657			uint8_t chksum = BP_GET_CHECKSUM(bp_orig);
1658
1659			ASSERT(BP_EQUAL(bp, bp_orig));
1660			VERIFY(BP_EQUAL(bp, db->db_blkptr));
1661			ASSERT(zio->io_prop.zp_compress != ZIO_COMPRESS_OFF);
1662			ASSERT(zio_checksum_table[chksum].ci_flags &
1663			    ZCHECKSUM_FLAG_NOPWRITE);
1664		}
1665		dr->dt.dl.dr_overridden_by = *zio->io_bp;
1666		dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
1667		dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
1668
1669		/*
1670		 * Old style holes are filled with all zeros, whereas
1671		 * new-style holes maintain their lsize, type, level,
1672		 * and birth time (see zio_write_compress). While we
1673		 * need to reset the BP_SET_LSIZE() call that happened
1674		 * in dmu_sync_ready for old style holes, we do *not*
1675		 * want to wipe out the information contained in new
1676		 * style holes. Thus, only zero out the block pointer if
1677		 * it's an old style hole.
1678		 */
1679		if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by) &&
1680		    dr->dt.dl.dr_overridden_by.blk_birth == 0)
1681			BP_ZERO(&dr->dt.dl.dr_overridden_by);
1682	} else {
1683		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1684	}
1685	cv_broadcast(&db->db_changed);
1686	mutex_exit(&db->db_mtx);
1687
1688	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1689
1690	kmem_free(dsa, sizeof (*dsa));
1691}
1692
1693static void
1694dmu_sync_late_arrival_done(zio_t *zio)
1695{
1696	blkptr_t *bp = zio->io_bp;
1697	dmu_sync_arg_t *dsa = zio->io_private;
1698	blkptr_t *bp_orig = &zio->io_bp_orig;
1699
1700	if (zio->io_error == 0 && !BP_IS_HOLE(bp)) {
1701		ASSERT(!(zio->io_flags & ZIO_FLAG_NOPWRITE));
1702		ASSERT(BP_IS_HOLE(bp_orig) || !BP_EQUAL(bp, bp_orig));
1703		ASSERT(zio->io_bp->blk_birth == zio->io_txg);
1704		ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
1705		zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
1706	}
1707
1708	dmu_tx_commit(dsa->dsa_tx);
1709
1710	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1711
1712	abd_put(zio->io_abd);
1713	kmem_free(dsa, sizeof (*dsa));
1714}
1715
1716static int
1717dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
1718    zio_prop_t *zp, zbookmark_phys_t *zb)
1719{
1720	dmu_sync_arg_t *dsa;
1721	dmu_tx_t *tx;
1722
1723	tx = dmu_tx_create(os);
1724	dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
1725	if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
1726		dmu_tx_abort(tx);
1727		/* Make zl_get_data do txg_waited_synced() */
1728		return (SET_ERROR(EIO));
1729	}
1730
1731	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1732	dsa->dsa_dr = NULL;
1733	dsa->dsa_done = done;
1734	dsa->dsa_zgd = zgd;
1735	dsa->dsa_tx = tx;
1736
1737	/*
1738	 * Since we are currently syncing this txg, it's nontrivial to
1739	 * determine what BP to nopwrite against, so we disable nopwrite.
1740	 *
1741	 * When syncing, the db_blkptr is initially the BP of the previous
1742	 * txg.  We can not nopwrite against it because it will be changed
1743	 * (this is similar to the non-late-arrival case where the dbuf is
1744	 * dirty in a future txg).
1745	 *
1746	 * Then dbuf_write_ready() sets bp_blkptr to the location we will write.
1747	 * We can not nopwrite against it because although the BP will not
1748	 * (typically) be changed, the data has not yet been persisted to this
1749	 * location.
1750	 *
1751	 * Finally, when dbuf_write_done() is called, it is theoretically
1752	 * possible to always nopwrite, because the data that was written in
1753	 * this txg is the same data that we are trying to write.  However we
1754	 * would need to check that this dbuf is not dirty in any future
1755	 * txg's (as we do in the normal dmu_sync() path). For simplicity, we
1756	 * don't nopwrite in this case.
1757	 */
1758	zp->zp_nopwrite = B_FALSE;
1759
1760	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
1761	    abd_get_from_buf(zgd->zgd_db->db_data, zgd->zgd_db->db_size),
1762	    zgd->zgd_db->db_size, zgd->zgd_db->db_size, zp,
1763	    dmu_sync_late_arrival_ready, NULL, NULL, dmu_sync_late_arrival_done,
1764	    dsa, ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb));
1765
1766	return (0);
1767}
1768
1769/*
1770 * Intent log support: sync the block associated with db to disk.
1771 * N.B. and XXX: the caller is responsible for making sure that the
1772 * data isn't changing while dmu_sync() is writing it.
1773 *
1774 * Return values:
1775 *
1776 *	EEXIST: this txg has already been synced, so there's nothing to do.
1777 *		The caller should not log the write.
1778 *
1779 *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1780 *		The caller should not log the write.
1781 *
1782 *	EALREADY: this block is already in the process of being synced.
1783 *		The caller should track its progress (somehow).
1784 *
1785 *	EIO: could not do the I/O.
1786 *		The caller should do a txg_wait_synced().
1787 *
1788 *	0: the I/O has been initiated.
1789 *		The caller should log this blkptr in the done callback.
1790 *		It is possible that the I/O will fail, in which case
1791 *		the error will be reported to the done callback and
1792 *		propagated to pio from zio_done().
1793 */
1794int
1795dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
1796{
1797	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
1798	objset_t *os = db->db_objset;
1799	dsl_dataset_t *ds = os->os_dsl_dataset;
1800	dbuf_dirty_record_t *dr;
1801	dmu_sync_arg_t *dsa;
1802	zbookmark_phys_t zb;
1803	zio_prop_t zp;
1804	dnode_t *dn;
1805
1806	ASSERT(pio != NULL);
1807	ASSERT(txg != 0);
1808
1809	SET_BOOKMARK(&zb, ds->ds_object,
1810	    db->db.db_object, db->db_level, db->db_blkid);
1811
1812	DB_DNODE_ENTER(db);
1813	dn = DB_DNODE(db);
1814	dmu_write_policy(os, dn, db->db_level, WP_DMU_SYNC, &zp);
1815	DB_DNODE_EXIT(db);
1816
1817	/*
1818	 * If we're frozen (running ziltest), we always need to generate a bp.
1819	 */
1820	if (txg > spa_freeze_txg(os->os_spa))
1821		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1822
1823	/*
1824	 * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
1825	 * and us.  If we determine that this txg is not yet syncing,
1826	 * but it begins to sync a moment later, that's OK because the
1827	 * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
1828	 */
1829	mutex_enter(&db->db_mtx);
1830
1831	if (txg <= spa_last_synced_txg(os->os_spa)) {
1832		/*
1833		 * This txg has already synced.  There's nothing to do.
1834		 */
1835		mutex_exit(&db->db_mtx);
1836		return (SET_ERROR(EEXIST));
1837	}
1838
1839	if (txg <= spa_syncing_txg(os->os_spa)) {
1840		/*
1841		 * This txg is currently syncing, so we can't mess with
1842		 * the dirty record anymore; just write a new log block.
1843		 */
1844		mutex_exit(&db->db_mtx);
1845		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1846	}
1847
1848	dr = db->db_last_dirty;
1849	while (dr && dr->dr_txg != txg)
1850		dr = dr->dr_next;
1851
1852	if (dr == NULL) {
1853		/*
1854		 * There's no dr for this dbuf, so it must have been freed.
1855		 * There's no need to log writes to freed blocks, so we're done.
1856		 */
1857		mutex_exit(&db->db_mtx);
1858		return (SET_ERROR(ENOENT));
1859	}
1860
1861	ASSERT(dr->dr_next == NULL || dr->dr_next->dr_txg < txg);
1862
1863	if (db->db_blkptr != NULL) {
1864		/*
1865		 * We need to fill in zgd_bp with the current blkptr so that
1866		 * the nopwrite code can check if we're writing the same
1867		 * data that's already on disk.  We can only nopwrite if we
1868		 * are sure that after making the copy, db_blkptr will not
1869		 * change until our i/o completes.  We ensure this by
1870		 * holding the db_mtx, and only allowing nopwrite if the
1871		 * block is not already dirty (see below).  This is verified
1872		 * by dmu_sync_done(), which VERIFYs that the db_blkptr has
1873		 * not changed.
1874		 */
1875		*zgd->zgd_bp = *db->db_blkptr;
1876	}
1877
1878	/*
1879	 * Assume the on-disk data is X, the current syncing data (in
1880	 * txg - 1) is Y, and the current in-memory data is Z (currently
1881	 * in dmu_sync).
1882	 *
1883	 * We usually want to perform a nopwrite if X and Z are the
1884	 * same.  However, if Y is different (i.e. the BP is going to
1885	 * change before this write takes effect), then a nopwrite will
1886	 * be incorrect - we would override with X, which could have
1887	 * been freed when Y was written.
1888	 *
1889	 * (Note that this is not a concern when we are nop-writing from
1890	 * syncing context, because X and Y must be identical, because
1891	 * all previous txgs have been synced.)
1892	 *
1893	 * Therefore, we disable nopwrite if the current BP could change
1894	 * before this TXG.  There are two ways it could change: by
1895	 * being dirty (dr_next is non-NULL), or by being freed
1896	 * (dnode_block_freed()).  This behavior is verified by
1897	 * zio_done(), which VERIFYs that the override BP is identical
1898	 * to the on-disk BP.
1899	 */
1900	DB_DNODE_ENTER(db);
1901	dn = DB_DNODE(db);
1902	if (dr->dr_next != NULL || dnode_block_freed(dn, db->db_blkid))
1903		zp.zp_nopwrite = B_FALSE;
1904	DB_DNODE_EXIT(db);
1905
1906	ASSERT(dr->dr_txg == txg);
1907	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
1908	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
1909		/*
1910		 * We have already issued a sync write for this buffer,
1911		 * or this buffer has already been synced.  It could not
1912		 * have been dirtied since, or we would have cleared the state.
1913		 */
1914		mutex_exit(&db->db_mtx);
1915		return (SET_ERROR(EALREADY));
1916	}
1917
1918	ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
1919	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
1920	mutex_exit(&db->db_mtx);
1921
1922	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1923	dsa->dsa_dr = dr;
1924	dsa->dsa_done = done;
1925	dsa->dsa_zgd = zgd;
1926	dsa->dsa_tx = NULL;
1927
1928	zio_nowait(arc_write(pio, os->os_spa, txg,
1929	    zgd->zgd_bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db),
1930	    &zp, dmu_sync_ready, NULL, NULL, dmu_sync_done, dsa,
1931	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
1932
1933	return (0);
1934}
1935
1936int
1937dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1938    dmu_tx_t *tx)
1939{
1940	dnode_t *dn;
1941	int err;
1942
1943	err = dnode_hold(os, object, FTAG, &dn);
1944	if (err)
1945		return (err);
1946	err = dnode_set_blksz(dn, size, ibs, tx);
1947	dnode_rele(dn, FTAG);
1948	return (err);
1949}
1950
1951void
1952dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1953    dmu_tx_t *tx)
1954{
1955	dnode_t *dn;
1956
1957	/*
1958	 * Send streams include each object's checksum function.  This
1959	 * check ensures that the receiving system can understand the
1960	 * checksum function transmitted.
1961	 */
1962	ASSERT3U(checksum, <, ZIO_CHECKSUM_LEGACY_FUNCTIONS);
1963
1964	VERIFY0(dnode_hold(os, object, FTAG, &dn));
1965	ASSERT3U(checksum, <, ZIO_CHECKSUM_FUNCTIONS);
1966	dn->dn_checksum = checksum;
1967	dnode_setdirty(dn, tx);
1968	dnode_rele(dn, FTAG);
1969}
1970
1971void
1972dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1973    dmu_tx_t *tx)
1974{
1975	dnode_t *dn;
1976
1977	/*
1978	 * Send streams include each object's compression function.  This
1979	 * check ensures that the receiving system can understand the
1980	 * compression function transmitted.
1981	 */
1982	ASSERT3U(compress, <, ZIO_COMPRESS_LEGACY_FUNCTIONS);
1983
1984	VERIFY0(dnode_hold(os, object, FTAG, &dn));
1985	dn->dn_compress = compress;
1986	dnode_setdirty(dn, tx);
1987	dnode_rele(dn, FTAG);
1988}
1989
1990int zfs_mdcomp_disable = 0;
1991SYSCTL_INT(_vfs_zfs, OID_AUTO, mdcomp_disable, CTLFLAG_RWTUN,
1992    &zfs_mdcomp_disable, 0, "Disable metadata compression");
1993
1994/*
1995 * When the "redundant_metadata" property is set to "most", only indirect
1996 * blocks of this level and higher will have an additional ditto block.
1997 */
1998int zfs_redundant_metadata_most_ditto_level = 2;
1999
2000void
2001dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
2002{
2003	dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
2004	boolean_t ismd = (level > 0 || DMU_OT_IS_METADATA(type) ||
2005	    (wp & WP_SPILL));
2006	enum zio_checksum checksum = os->os_checksum;
2007	enum zio_compress compress = os->os_compress;
2008	enum zio_checksum dedup_checksum = os->os_dedup_checksum;
2009	boolean_t dedup = B_FALSE;
2010	boolean_t nopwrite = B_FALSE;
2011	boolean_t dedup_verify = os->os_dedup_verify;
2012	int copies = os->os_copies;
2013
2014	/*
2015	 * We maintain different write policies for each of the following
2016	 * types of data:
2017	 *	 1. metadata
2018	 *	 2. preallocated blocks (i.e. level-0 blocks of a dump device)
2019	 *	 3. all other level 0 blocks
2020	 */
2021	if (ismd) {
2022		if (zfs_mdcomp_disable) {
2023			compress = ZIO_COMPRESS_EMPTY;
2024		} else {
2025			/*
2026			 * XXX -- we should design a compression algorithm
2027			 * that specializes in arrays of bps.
2028			 */
2029			compress = zio_compress_select(os->os_spa,
2030			    ZIO_COMPRESS_ON, ZIO_COMPRESS_ON);
2031		}
2032
2033		/*
2034		 * Metadata always gets checksummed.  If the data
2035		 * checksum is multi-bit correctable, and it's not a
2036		 * ZBT-style checksum, then it's suitable for metadata
2037		 * as well.  Otherwise, the metadata checksum defaults
2038		 * to fletcher4.
2039		 */
2040		if (!(zio_checksum_table[checksum].ci_flags &
2041		    ZCHECKSUM_FLAG_METADATA) ||
2042		    (zio_checksum_table[checksum].ci_flags &
2043		    ZCHECKSUM_FLAG_EMBEDDED))
2044			checksum = ZIO_CHECKSUM_FLETCHER_4;
2045
2046		if (os->os_redundant_metadata == ZFS_REDUNDANT_METADATA_ALL ||
2047		    (os->os_redundant_metadata ==
2048		    ZFS_REDUNDANT_METADATA_MOST &&
2049		    (level >= zfs_redundant_metadata_most_ditto_level ||
2050		    DMU_OT_IS_METADATA(type) || (wp & WP_SPILL))))
2051			copies++;
2052	} else if (wp & WP_NOFILL) {
2053		ASSERT(level == 0);
2054
2055		/*
2056		 * If we're writing preallocated blocks, we aren't actually
2057		 * writing them so don't set any policy properties.  These
2058		 * blocks are currently only used by an external subsystem
2059		 * outside of zfs (i.e. dump) and not written by the zio
2060		 * pipeline.
2061		 */
2062		compress = ZIO_COMPRESS_OFF;
2063		checksum = ZIO_CHECKSUM_NOPARITY;
2064	} else {
2065		compress = zio_compress_select(os->os_spa, dn->dn_compress,
2066		    compress);
2067
2068		checksum = (dedup_checksum == ZIO_CHECKSUM_OFF) ?
2069		    zio_checksum_select(dn->dn_checksum, checksum) :
2070		    dedup_checksum;
2071
2072		/*
2073		 * Determine dedup setting.  If we are in dmu_sync(),
2074		 * we won't actually dedup now because that's all
2075		 * done in syncing context; but we do want to use the
2076		 * dedup checkum.  If the checksum is not strong
2077		 * enough to ensure unique signatures, force
2078		 * dedup_verify.
2079		 */
2080		if (dedup_checksum != ZIO_CHECKSUM_OFF) {
2081			dedup = (wp & WP_DMU_SYNC) ? B_FALSE : B_TRUE;
2082			if (!(zio_checksum_table[checksum].ci_flags &
2083			    ZCHECKSUM_FLAG_DEDUP))
2084				dedup_verify = B_TRUE;
2085		}
2086
2087		/*
2088		 * Enable nopwrite if we have secure enough checksum
2089		 * algorithm (see comment in zio_nop_write) and
2090		 * compression is enabled.  We don't enable nopwrite if
2091		 * dedup is enabled as the two features are mutually
2092		 * exclusive.
2093		 */
2094		nopwrite = (!dedup && (zio_checksum_table[checksum].ci_flags &
2095		    ZCHECKSUM_FLAG_NOPWRITE) &&
2096		    compress != ZIO_COMPRESS_OFF && zfs_nopwrite_enabled);
2097	}
2098
2099	zp->zp_checksum = checksum;
2100	zp->zp_compress = compress;
2101	ASSERT3U(zp->zp_compress, !=, ZIO_COMPRESS_INHERIT);
2102
2103	zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
2104	zp->zp_level = level;
2105	zp->zp_copies = MIN(copies, spa_max_replication(os->os_spa));
2106	zp->zp_dedup = dedup;
2107	zp->zp_dedup_verify = dedup && dedup_verify;
2108	zp->zp_nopwrite = nopwrite;
2109}
2110
2111int
2112dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
2113{
2114	dnode_t *dn;
2115	int err;
2116
2117	/*
2118	 * Sync any current changes before
2119	 * we go trundling through the block pointers.
2120	 */
2121	err = dmu_object_wait_synced(os, object);
2122	if (err) {
2123		return (err);
2124	}
2125
2126	err = dnode_hold(os, object, FTAG, &dn);
2127	if (err) {
2128		return (err);
2129	}
2130
2131	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
2132	dnode_rele(dn, FTAG);
2133
2134	return (err);
2135}
2136
2137/*
2138 * Given the ZFS object, if it contains any dirty nodes
2139 * this function flushes all dirty blocks to disk. This
2140 * ensures the DMU object info is updated. A more efficient
2141 * future version might just find the TXG with the maximum
2142 * ID and wait for that to be synced.
2143 */
2144int
2145dmu_object_wait_synced(objset_t *os, uint64_t object)
2146{
2147	dnode_t *dn;
2148	int error, i;
2149
2150	error = dnode_hold(os, object, FTAG, &dn);
2151	if (error) {
2152		return (error);
2153	}
2154
2155	for (i = 0; i < TXG_SIZE; i++) {
2156		if (list_link_active(&dn->dn_dirty_link[i])) {
2157			break;
2158		}
2159	}
2160	dnode_rele(dn, FTAG);
2161	if (i != TXG_SIZE) {
2162		txg_wait_synced(dmu_objset_pool(os), 0);
2163	}
2164
2165	return (0);
2166}
2167
2168void
2169dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
2170{
2171	dnode_phys_t *dnp;
2172
2173	rw_enter(&dn->dn_struct_rwlock, RW_READER);
2174	mutex_enter(&dn->dn_mtx);
2175
2176	dnp = dn->dn_phys;
2177
2178	doi->doi_data_block_size = dn->dn_datablksz;
2179	doi->doi_metadata_block_size = dn->dn_indblkshift ?
2180	    1ULL << dn->dn_indblkshift : 0;
2181	doi->doi_type = dn->dn_type;
2182	doi->doi_bonus_type = dn->dn_bonustype;
2183	doi->doi_bonus_size = dn->dn_bonuslen;
2184	doi->doi_indirection = dn->dn_nlevels;
2185	doi->doi_checksum = dn->dn_checksum;
2186	doi->doi_compress = dn->dn_compress;
2187	doi->doi_nblkptr = dn->dn_nblkptr;
2188	doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
2189	doi->doi_max_offset = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
2190	doi->doi_fill_count = 0;
2191	for (int i = 0; i < dnp->dn_nblkptr; i++)
2192		doi->doi_fill_count += BP_GET_FILL(&dnp->dn_blkptr[i]);
2193
2194	mutex_exit(&dn->dn_mtx);
2195	rw_exit(&dn->dn_struct_rwlock);
2196}
2197
2198/*
2199 * Get information on a DMU object.
2200 * If doi is NULL, just indicates whether the object exists.
2201 */
2202int
2203dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
2204{
2205	dnode_t *dn;
2206	int err = dnode_hold(os, object, FTAG, &dn);
2207
2208	if (err)
2209		return (err);
2210
2211	if (doi != NULL)
2212		dmu_object_info_from_dnode(dn, doi);
2213
2214	dnode_rele(dn, FTAG);
2215	return (0);
2216}
2217
2218/*
2219 * As above, but faster; can be used when you have a held dbuf in hand.
2220 */
2221void
2222dmu_object_info_from_db(dmu_buf_t *db_fake, dmu_object_info_t *doi)
2223{
2224	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2225
2226	DB_DNODE_ENTER(db);
2227	dmu_object_info_from_dnode(DB_DNODE(db), doi);
2228	DB_DNODE_EXIT(db);
2229}
2230
2231/*
2232 * Faster still when you only care about the size.
2233 * This is specifically optimized for zfs_getattr().
2234 */
2235void
2236dmu_object_size_from_db(dmu_buf_t *db_fake, uint32_t *blksize,
2237    u_longlong_t *nblk512)
2238{
2239	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2240	dnode_t *dn;
2241
2242	DB_DNODE_ENTER(db);
2243	dn = DB_DNODE(db);
2244
2245	*blksize = dn->dn_datablksz;
2246	/* add 1 for dnode space */
2247	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
2248	    SPA_MINBLOCKSHIFT) + 1;
2249	DB_DNODE_EXIT(db);
2250}
2251
2252void
2253byteswap_uint64_array(void *vbuf, size_t size)
2254{
2255	uint64_t *buf = vbuf;
2256	size_t count = size >> 3;
2257	int i;
2258
2259	ASSERT((size & 7) == 0);
2260
2261	for (i = 0; i < count; i++)
2262		buf[i] = BSWAP_64(buf[i]);
2263}
2264
2265void
2266byteswap_uint32_array(void *vbuf, size_t size)
2267{
2268	uint32_t *buf = vbuf;
2269	size_t count = size >> 2;
2270	int i;
2271
2272	ASSERT((size & 3) == 0);
2273
2274	for (i = 0; i < count; i++)
2275		buf[i] = BSWAP_32(buf[i]);
2276}
2277
2278void
2279byteswap_uint16_array(void *vbuf, size_t size)
2280{
2281	uint16_t *buf = vbuf;
2282	size_t count = size >> 1;
2283	int i;
2284
2285	ASSERT((size & 1) == 0);
2286
2287	for (i = 0; i < count; i++)
2288		buf[i] = BSWAP_16(buf[i]);
2289}
2290
2291/* ARGSUSED */
2292void
2293byteswap_uint8_array(void *vbuf, size_t size)
2294{
2295}
2296
2297void
2298dmu_init(void)
2299{
2300	abd_init();
2301	zfs_dbgmsg_init();
2302	sa_cache_init();
2303	xuio_stat_init();
2304	dmu_objset_init();
2305	dnode_init();
2306	zfetch_init();
2307	zio_compress_init();
2308	l2arc_init();
2309	arc_init();
2310	dbuf_init();
2311}
2312
2313void
2314dmu_fini(void)
2315{
2316	arc_fini(); /* arc depends on l2arc, so arc must go first */
2317	l2arc_fini();
2318	zfetch_fini();
2319	zio_compress_fini();
2320	dbuf_fini();
2321	dnode_fini();
2322	dmu_objset_fini();
2323	xuio_stat_fini();
2324	sa_cache_fini();
2325	zfs_dbgmsg_fini();
2326	abd_fini();
2327}
2328