1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
24 * Copyright (c) 2012, 2018 by Delphix. All rights reserved.
25 * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
26 * Copyright (c) 2013, Joyent, Inc. All rights reserved.
27 * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
28 * Copyright (c) 2014 Integros [integros.com]
29 */
30
31#include <sys/zfs_context.h>
32#include <sys/dmu.h>
33#include <sys/dmu_send.h>
34#include <sys/dmu_impl.h>
35#include <sys/dbuf.h>
36#include <sys/dmu_objset.h>
37#include <sys/dsl_dataset.h>
38#include <sys/dsl_dir.h>
39#include <sys/dmu_tx.h>
40#include <sys/spa.h>
41#include <sys/zio.h>
42#include <sys/dmu_zfetch.h>
43#include <sys/sa.h>
44#include <sys/sa_impl.h>
45#include <sys/zfeature.h>
46#include <sys/blkptr.h>
47#include <sys/range_tree.h>
48#include <sys/callb.h>
49#include <sys/abd.h>
50#include <sys/vdev.h>
51#include <sys/cityhash.h>
52#include <sys/spa_impl.h>
53
54kstat_t *dbuf_ksp;
55
56typedef struct dbuf_stats {
57	/*
58	 * Various statistics about the size of the dbuf cache.
59	 */
60	kstat_named_t cache_count;
61	kstat_named_t cache_size_bytes;
62	kstat_named_t cache_size_bytes_max;
63	/*
64	 * Statistics regarding the bounds on the dbuf cache size.
65	 */
66	kstat_named_t cache_target_bytes;
67	kstat_named_t cache_lowater_bytes;
68	kstat_named_t cache_hiwater_bytes;
69	/*
70	 * Total number of dbuf cache evictions that have occurred.
71	 */
72	kstat_named_t cache_total_evicts;
73	/*
74	 * The distribution of dbuf levels in the dbuf cache and
75	 * the total size of all dbufs at each level.
76	 */
77	kstat_named_t cache_levels[DN_MAX_LEVELS];
78	kstat_named_t cache_levels_bytes[DN_MAX_LEVELS];
79	/*
80	 * Statistics about the dbuf hash table.
81	 */
82	kstat_named_t hash_hits;
83	kstat_named_t hash_misses;
84	kstat_named_t hash_collisions;
85	kstat_named_t hash_elements;
86	kstat_named_t hash_elements_max;
87	/*
88	 * Number of sublists containing more than one dbuf in the dbuf
89	 * hash table. Keep track of the longest hash chain.
90	 */
91	kstat_named_t hash_chains;
92	kstat_named_t hash_chain_max;
93	/*
94	 * Number of times a dbuf_create() discovers that a dbuf was
95	 * already created and in the dbuf hash table.
96	 */
97	kstat_named_t hash_insert_race;
98	/*
99	 * Statistics about the size of the metadata dbuf cache.
100	 */
101	kstat_named_t metadata_cache_count;
102	kstat_named_t metadata_cache_size_bytes;
103	kstat_named_t metadata_cache_size_bytes_max;
104	/*
105	 * For diagnostic purposes, this is incremented whenever we can't add
106	 * something to the metadata cache because it's full, and instead put
107	 * the data in the regular dbuf cache.
108	 */
109	kstat_named_t metadata_cache_overflow;
110} dbuf_stats_t;
111
112dbuf_stats_t dbuf_stats = {
113	{ "cache_count",			KSTAT_DATA_UINT64 },
114	{ "cache_size_bytes",			KSTAT_DATA_UINT64 },
115	{ "cache_size_bytes_max",		KSTAT_DATA_UINT64 },
116	{ "cache_target_bytes",			KSTAT_DATA_UINT64 },
117	{ "cache_lowater_bytes",		KSTAT_DATA_UINT64 },
118	{ "cache_hiwater_bytes",		KSTAT_DATA_UINT64 },
119	{ "cache_total_evicts",			KSTAT_DATA_UINT64 },
120	{ { "cache_levels_N",			KSTAT_DATA_UINT64 } },
121	{ { "cache_levels_bytes_N",		KSTAT_DATA_UINT64 } },
122	{ "hash_hits",				KSTAT_DATA_UINT64 },
123	{ "hash_misses",			KSTAT_DATA_UINT64 },
124	{ "hash_collisions",			KSTAT_DATA_UINT64 },
125	{ "hash_elements",			KSTAT_DATA_UINT64 },
126	{ "hash_elements_max",			KSTAT_DATA_UINT64 },
127	{ "hash_chains",			KSTAT_DATA_UINT64 },
128	{ "hash_chain_max",			KSTAT_DATA_UINT64 },
129	{ "hash_insert_race",			KSTAT_DATA_UINT64 },
130	{ "metadata_cache_count",		KSTAT_DATA_UINT64 },
131	{ "metadata_cache_size_bytes",		KSTAT_DATA_UINT64 },
132	{ "metadata_cache_size_bytes_max",	KSTAT_DATA_UINT64 },
133	{ "metadata_cache_overflow",		KSTAT_DATA_UINT64 }
134};
135
136#define	DBUF_STAT_INCR(stat, val)	\
137	atomic_add_64(&dbuf_stats.stat.value.ui64, (val));
138#define	DBUF_STAT_DECR(stat, val)	\
139	DBUF_STAT_INCR(stat, -(val));
140#define	DBUF_STAT_BUMP(stat)		\
141	DBUF_STAT_INCR(stat, 1);
142#define	DBUF_STAT_BUMPDOWN(stat)	\
143	DBUF_STAT_INCR(stat, -1);
144#define	DBUF_STAT_MAX(stat, v) {					\
145	uint64_t _m;							\
146	while ((v) > (_m = dbuf_stats.stat.value.ui64) &&		\
147	    (_m != atomic_cas_64(&dbuf_stats.stat.value.ui64, _m, (v))))\
148		continue;						\
149}
150
151struct dbuf_hold_impl_data {
152	/* Function arguments */
153	dnode_t *dh_dn;
154	uint8_t dh_level;
155	uint64_t dh_blkid;
156	boolean_t dh_fail_sparse;
157	boolean_t dh_fail_uncached;
158	void *dh_tag;
159	dmu_buf_impl_t **dh_dbp;
160	/* Local variables */
161	dmu_buf_impl_t *dh_db;
162	dmu_buf_impl_t *dh_parent;
163	blkptr_t *dh_bp;
164	int dh_err;
165	dbuf_dirty_record_t *dh_dr;
166	int dh_depth;
167};
168
169static void __dbuf_hold_impl_init(struct dbuf_hold_impl_data *dh,
170    dnode_t *dn, uint8_t level, uint64_t blkid, boolean_t fail_sparse,
171	boolean_t fail_uncached,
172	void *tag, dmu_buf_impl_t **dbp, int depth);
173static int __dbuf_hold_impl(struct dbuf_hold_impl_data *dh);
174
175static boolean_t dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx);
176static void dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx);
177
178/*
179 * Global data structures and functions for the dbuf cache.
180 */
181static kmem_cache_t *dbuf_kmem_cache;
182static taskq_t *dbu_evict_taskq;
183
184static kthread_t *dbuf_cache_evict_thread;
185static kmutex_t dbuf_evict_lock;
186static kcondvar_t dbuf_evict_cv;
187static boolean_t dbuf_evict_thread_exit;
188
189/*
190 * There are two dbuf caches; each dbuf can only be in one of them at a time.
191 *
192 * 1. Cache of metadata dbufs, to help make read-heavy administrative commands
193 *    from /sbin/zfs run faster. The "metadata cache" specifically stores dbufs
194 *    that represent the metadata that describes filesystems/snapshots/
195 *    bookmarks/properties/etc. We only evict from this cache when we export a
196 *    pool, to short-circuit as much I/O as possible for all administrative
197 *    commands that need the metadata. There is no eviction policy for this
198 *    cache, because we try to only include types in it which would occupy a
199 *    very small amount of space per object but create a large impact on the
200 *    performance of these commands. Instead, after it reaches a maximum size
201 *    (which should only happen on very small memory systems with a very large
202 *    number of filesystem objects), we stop taking new dbufs into the
203 *    metadata cache, instead putting them in the normal dbuf cache.
204 *
205 * 2. LRU cache of dbufs. The dbuf cache maintains a list of dbufs that
206 *    are not currently held but have been recently released. These dbufs
207 *    are not eligible for arc eviction until they are aged out of the cache.
208 *    Dbufs that are aged out of the cache will be immediately destroyed and
209 *    become eligible for arc eviction.
210 *
211 * Dbufs are added to these caches once the last hold is released. If a dbuf is
212 * later accessed and still exists in the dbuf cache, then it will be removed
213 * from the cache and later re-added to the head of the cache.
214 *
215 * If a given dbuf meets the requirements for the metadata cache, it will go
216 * there, otherwise it will be considered for the generic LRU dbuf cache. The
217 * caches and the refcounts tracking their sizes are stored in an array indexed
218 * by those caches' matching enum values (from dbuf_cached_state_t).
219 */
220typedef struct dbuf_cache {
221	multilist_t *cache;
222	zfs_refcount_t size;
223} dbuf_cache_t;
224dbuf_cache_t dbuf_caches[DB_CACHE_MAX];
225
226/* Size limits for the caches */
227uint64_t dbuf_cache_max_bytes = 0;
228uint64_t dbuf_metadata_cache_max_bytes = 0;
229/* Set the default sizes of the caches to log2 fraction of arc size */
230int dbuf_cache_shift = 5;
231int dbuf_metadata_cache_shift = 6;
232
233/*
234 * For diagnostic purposes, this is incremented whenever we can't add
235 * something to the metadata cache because it's full, and instead put
236 * the data in the regular dbuf cache.
237 */
238uint64_t dbuf_metadata_cache_overflow;
239
240/*
241 * The LRU dbuf cache uses a three-stage eviction policy:
242 *	- A low water marker designates when the dbuf eviction thread
243 *	should stop evicting from the dbuf cache.
244 *	- When we reach the maximum size (aka mid water mark), we
245 *	signal the eviction thread to run.
246 *	- The high water mark indicates when the eviction thread
247 *	is unable to keep up with the incoming load and eviction must
248 *	happen in the context of the calling thread.
249 *
250 * The dbuf cache:
251 *                                                 (max size)
252 *                                      low water   mid water   hi water
253 * +----------------------------------------+----------+----------+
254 * |                                        |          |          |
255 * |                                        |          |          |
256 * |                                        |          |          |
257 * |                                        |          |          |
258 * +----------------------------------------+----------+----------+
259 *                                        stop        signal     evict
260 *                                      evicting     eviction   directly
261 *                                                    thread
262 *
263 * The high and low water marks indicate the operating range for the eviction
264 * thread. The low water mark is, by default, 90% of the total size of the
265 * cache and the high water mark is at 110% (both of these percentages can be
266 * changed by setting dbuf_cache_lowater_pct and dbuf_cache_hiwater_pct,
267 * respectively). The eviction thread will try to ensure that the cache remains
268 * within this range by waking up every second and checking if the cache is
269 * above the low water mark. The thread can also be woken up by callers adding
270 * elements into the cache if the cache is larger than the mid water (i.e max
271 * cache size). Once the eviction thread is woken up and eviction is required,
272 * it will continue evicting buffers until it's able to reduce the cache size
273 * to the low water mark. If the cache size continues to grow and hits the high
274 * water mark, then callers adding elments to the cache will begin to evict
275 * directly from the cache until the cache is no longer above the high water
276 * mark.
277 */
278
279/*
280 * The percentage above and below the maximum cache size.
281 */
282uint_t dbuf_cache_hiwater_pct = 10;
283uint_t dbuf_cache_lowater_pct = 10;
284
285SYSCTL_DECL(_vfs_zfs);
286SYSCTL_QUAD(_vfs_zfs, OID_AUTO, dbuf_cache_max_bytes, CTLFLAG_RWTUN,
287    &dbuf_cache_max_bytes, 0, "dbuf cache size in bytes");
288SYSCTL_QUAD(_vfs_zfs, OID_AUTO, dbuf_metadata_cache_max_bytes, CTLFLAG_RWTUN,
289    &dbuf_metadata_cache_max_bytes, 0, "dbuf metadata cache size in bytes");
290SYSCTL_INT(_vfs_zfs, OID_AUTO, dbuf_cache_shift, CTLFLAG_RDTUN,
291    &dbuf_cache_shift, 0, "dbuf cache size as log2 fraction of ARC");
292SYSCTL_INT(_vfs_zfs, OID_AUTO, dbuf_metadata_cache_shift, CTLFLAG_RDTUN,
293    &dbuf_metadata_cache_shift, 0,
294    "dbuf metadata cache size as log2 fraction of ARC");
295SYSCTL_QUAD(_vfs_zfs, OID_AUTO, dbuf_metadata_cache_overflow, CTLFLAG_RD,
296    &dbuf_metadata_cache_overflow, 0, "dbuf metadata cache overflow");
297SYSCTL_UINT(_vfs_zfs, OID_AUTO, dbuf_cache_hiwater_pct, CTLFLAG_RWTUN,
298    &dbuf_cache_hiwater_pct, 0, "max percents above the dbuf cache size");
299SYSCTL_UINT(_vfs_zfs, OID_AUTO, dbuf_cache_lowater_pct, CTLFLAG_RWTUN,
300    &dbuf_cache_lowater_pct, 0, "max percents below the dbuf cache size");
301
302/* ARGSUSED */
303static int
304dbuf_cons(void *vdb, void *unused, int kmflag)
305{
306	dmu_buf_impl_t *db = vdb;
307	bzero(db, sizeof (dmu_buf_impl_t));
308
309	mutex_init(&db->db_mtx, NULL, MUTEX_DEFAULT, NULL);
310	cv_init(&db->db_changed, NULL, CV_DEFAULT, NULL);
311	multilist_link_init(&db->db_cache_link);
312	zfs_refcount_create(&db->db_holds);
313
314	return (0);
315}
316
317/* ARGSUSED */
318static void
319dbuf_dest(void *vdb, void *unused)
320{
321	dmu_buf_impl_t *db = vdb;
322	mutex_destroy(&db->db_mtx);
323	cv_destroy(&db->db_changed);
324	ASSERT(!multilist_link_active(&db->db_cache_link));
325	zfs_refcount_destroy(&db->db_holds);
326}
327
328/*
329 * dbuf hash table routines
330 */
331static dbuf_hash_table_t dbuf_hash_table;
332
333static uint64_t dbuf_hash_count;
334
335/*
336 * We use Cityhash for this. It's fast, and has good hash properties without
337 * requiring any large static buffers.
338 */
339static uint64_t
340dbuf_hash(void *os, uint64_t obj, uint8_t lvl, uint64_t blkid)
341{
342	return (cityhash4((uintptr_t)os, obj, (uint64_t)lvl, blkid));
343}
344
345#define	DBUF_EQUAL(dbuf, os, obj, level, blkid)		\
346	((dbuf)->db.db_object == (obj) &&		\
347	(dbuf)->db_objset == (os) &&			\
348	(dbuf)->db_level == (level) &&			\
349	(dbuf)->db_blkid == (blkid))
350
351dmu_buf_impl_t *
352dbuf_find(objset_t *os, uint64_t obj, uint8_t level, uint64_t blkid)
353{
354	dbuf_hash_table_t *h = &dbuf_hash_table;
355	uint64_t hv = dbuf_hash(os, obj, level, blkid);
356	uint64_t idx = hv & h->hash_table_mask;
357	dmu_buf_impl_t *db;
358
359	mutex_enter(DBUF_HASH_MUTEX(h, idx));
360	for (db = h->hash_table[idx]; db != NULL; db = db->db_hash_next) {
361		if (DBUF_EQUAL(db, os, obj, level, blkid)) {
362			mutex_enter(&db->db_mtx);
363			if (db->db_state != DB_EVICTING) {
364				mutex_exit(DBUF_HASH_MUTEX(h, idx));
365				return (db);
366			}
367			mutex_exit(&db->db_mtx);
368		}
369	}
370	mutex_exit(DBUF_HASH_MUTEX(h, idx));
371	return (NULL);
372}
373
374static dmu_buf_impl_t *
375dbuf_find_bonus(objset_t *os, uint64_t object)
376{
377	dnode_t *dn;
378	dmu_buf_impl_t *db = NULL;
379
380	if (dnode_hold(os, object, FTAG, &dn) == 0) {
381		rw_enter(&dn->dn_struct_rwlock, RW_READER);
382		if (dn->dn_bonus != NULL) {
383			db = dn->dn_bonus;
384			mutex_enter(&db->db_mtx);
385		}
386		rw_exit(&dn->dn_struct_rwlock);
387		dnode_rele(dn, FTAG);
388	}
389	return (db);
390}
391
392/*
393 * Insert an entry into the hash table.  If there is already an element
394 * equal to elem in the hash table, then the already existing element
395 * will be returned and the new element will not be inserted.
396 * Otherwise returns NULL.
397 */
398static dmu_buf_impl_t *
399dbuf_hash_insert(dmu_buf_impl_t *db)
400{
401	dbuf_hash_table_t *h = &dbuf_hash_table;
402	objset_t *os = db->db_objset;
403	uint64_t obj = db->db.db_object;
404	int level = db->db_level;
405	uint64_t blkid, hv, idx;
406	dmu_buf_impl_t *dbf;
407	uint32_t i;
408
409	blkid = db->db_blkid;
410	hv = dbuf_hash(os, obj, level, blkid);
411	idx = hv & h->hash_table_mask;
412
413	mutex_enter(DBUF_HASH_MUTEX(h, idx));
414	for (dbf = h->hash_table[idx], i = 0; dbf != NULL;
415	    dbf = dbf->db_hash_next, i++) {
416		if (DBUF_EQUAL(dbf, os, obj, level, blkid)) {
417			mutex_enter(&dbf->db_mtx);
418			if (dbf->db_state != DB_EVICTING) {
419				mutex_exit(DBUF_HASH_MUTEX(h, idx));
420				return (dbf);
421			}
422			mutex_exit(&dbf->db_mtx);
423		}
424	}
425
426	if (i > 0) {
427		DBUF_STAT_BUMP(hash_collisions);
428		if (i == 1)
429			DBUF_STAT_BUMP(hash_chains);
430
431		DBUF_STAT_MAX(hash_chain_max, i);
432	}
433
434	mutex_enter(&db->db_mtx);
435	db->db_hash_next = h->hash_table[idx];
436	h->hash_table[idx] = db;
437	mutex_exit(DBUF_HASH_MUTEX(h, idx));
438	atomic_inc_64(&dbuf_hash_count);
439	DBUF_STAT_MAX(hash_elements_max, dbuf_hash_count);
440
441	return (NULL);
442}
443
444/*
445 * Remove an entry from the hash table.  It must be in the EVICTING state.
446 */
447static void
448dbuf_hash_remove(dmu_buf_impl_t *db)
449{
450	dbuf_hash_table_t *h = &dbuf_hash_table;
451	uint64_t hv, idx;
452	dmu_buf_impl_t *dbf, **dbp;
453
454	hv = dbuf_hash(db->db_objset, db->db.db_object,
455	    db->db_level, db->db_blkid);
456	idx = hv & h->hash_table_mask;
457
458	/*
459	 * We mustn't hold db_mtx to maintain lock ordering:
460	 * DBUF_HASH_MUTEX > db_mtx.
461	 */
462	ASSERT(zfs_refcount_is_zero(&db->db_holds));
463	ASSERT(db->db_state == DB_EVICTING);
464	ASSERT(!MUTEX_HELD(&db->db_mtx));
465
466	mutex_enter(DBUF_HASH_MUTEX(h, idx));
467	dbp = &h->hash_table[idx];
468	while ((dbf = *dbp) != db) {
469		dbp = &dbf->db_hash_next;
470		ASSERT(dbf != NULL);
471	}
472	*dbp = db->db_hash_next;
473	db->db_hash_next = NULL;
474	if (h->hash_table[idx] &&
475	    h->hash_table[idx]->db_hash_next == NULL)
476		DBUF_STAT_BUMPDOWN(hash_chains);
477	mutex_exit(DBUF_HASH_MUTEX(h, idx));
478	atomic_dec_64(&dbuf_hash_count);
479}
480
481typedef enum {
482	DBVU_EVICTING,
483	DBVU_NOT_EVICTING
484} dbvu_verify_type_t;
485
486static void
487dbuf_verify_user(dmu_buf_impl_t *db, dbvu_verify_type_t verify_type)
488{
489#ifdef ZFS_DEBUG
490	int64_t holds;
491
492	if (db->db_user == NULL)
493		return;
494
495	/* Only data blocks support the attachment of user data. */
496	ASSERT(db->db_level == 0);
497
498	/* Clients must resolve a dbuf before attaching user data. */
499	ASSERT(db->db.db_data != NULL);
500	ASSERT3U(db->db_state, ==, DB_CACHED);
501
502	holds = zfs_refcount_count(&db->db_holds);
503	if (verify_type == DBVU_EVICTING) {
504		/*
505		 * Immediate eviction occurs when holds == dirtycnt.
506		 * For normal eviction buffers, holds is zero on
507		 * eviction, except when dbuf_fix_old_data() calls
508		 * dbuf_clear_data().  However, the hold count can grow
509		 * during eviction even though db_mtx is held (see
510		 * dmu_bonus_hold() for an example), so we can only
511		 * test the generic invariant that holds >= dirtycnt.
512		 */
513		ASSERT3U(holds, >=, db->db_dirtycnt);
514	} else {
515		if (db->db_user_immediate_evict == TRUE)
516			ASSERT3U(holds, >=, db->db_dirtycnt);
517		else
518			ASSERT3U(holds, >, 0);
519	}
520#endif
521}
522
523static void
524dbuf_evict_user(dmu_buf_impl_t *db)
525{
526	dmu_buf_user_t *dbu = db->db_user;
527
528	ASSERT(MUTEX_HELD(&db->db_mtx));
529
530	if (dbu == NULL)
531		return;
532
533	dbuf_verify_user(db, DBVU_EVICTING);
534	db->db_user = NULL;
535
536#ifdef ZFS_DEBUG
537	if (dbu->dbu_clear_on_evict_dbufp != NULL)
538		*dbu->dbu_clear_on_evict_dbufp = NULL;
539#endif
540
541	/*
542	 * There are two eviction callbacks - one that we call synchronously
543	 * and one that we invoke via a taskq.  The async one is useful for
544	 * avoiding lock order reversals and limiting stack depth.
545	 *
546	 * Note that if we have a sync callback but no async callback,
547	 * it's likely that the sync callback will free the structure
548	 * containing the dbu.  In that case we need to take care to not
549	 * dereference dbu after calling the sync evict func.
550	 */
551	boolean_t has_async = (dbu->dbu_evict_func_async != NULL);
552
553	if (dbu->dbu_evict_func_sync != NULL)
554		dbu->dbu_evict_func_sync(dbu);
555
556	if (has_async) {
557		taskq_dispatch_ent(dbu_evict_taskq, dbu->dbu_evict_func_async,
558		    dbu, 0, &dbu->dbu_tqent);
559	}
560}
561
562boolean_t
563dbuf_is_metadata(dmu_buf_impl_t *db)
564{
565	if (db->db_level > 0) {
566		return (B_TRUE);
567	} else {
568		boolean_t is_metadata;
569
570		DB_DNODE_ENTER(db);
571		is_metadata = DMU_OT_IS_METADATA(DB_DNODE(db)->dn_type);
572		DB_DNODE_EXIT(db);
573
574		return (is_metadata);
575	}
576}
577
578/*
579 * This returns whether this dbuf should be stored in the metadata cache, which
580 * is based on whether it's from one of the dnode types that store data related
581 * to traversing dataset hierarchies.
582 */
583static boolean_t
584dbuf_include_in_metadata_cache(dmu_buf_impl_t *db)
585{
586	DB_DNODE_ENTER(db);
587	dmu_object_type_t type = DB_DNODE(db)->dn_type;
588	DB_DNODE_EXIT(db);
589
590	/* Check if this dbuf is one of the types we care about */
591	if (DMU_OT_IS_METADATA_CACHED(type)) {
592		/* If we hit this, then we set something up wrong in dmu_ot */
593		ASSERT(DMU_OT_IS_METADATA(type));
594
595		/*
596		 * Sanity check for small-memory systems: don't allocate too
597		 * much memory for this purpose.
598		 */
599		if (zfs_refcount_count(
600		    &dbuf_caches[DB_DBUF_METADATA_CACHE].size) >
601		    dbuf_metadata_cache_max_bytes) {
602			dbuf_metadata_cache_overflow++;
603			DTRACE_PROBE1(dbuf__metadata__cache__overflow,
604			    dmu_buf_impl_t *, db);
605			return (B_FALSE);
606		}
607
608		return (B_TRUE);
609	}
610
611	return (B_FALSE);
612}
613
614/*
615 * This function *must* return indices evenly distributed between all
616 * sublists of the multilist. This is needed due to how the dbuf eviction
617 * code is laid out; dbuf_evict_thread() assumes dbufs are evenly
618 * distributed between all sublists and uses this assumption when
619 * deciding which sublist to evict from and how much to evict from it.
620 */
621unsigned int
622dbuf_cache_multilist_index_func(multilist_t *ml, void *obj)
623{
624	dmu_buf_impl_t *db = obj;
625
626	/*
627	 * The assumption here, is the hash value for a given
628	 * dmu_buf_impl_t will remain constant throughout it's lifetime
629	 * (i.e. it's objset, object, level and blkid fields don't change).
630	 * Thus, we don't need to store the dbuf's sublist index
631	 * on insertion, as this index can be recalculated on removal.
632	 *
633	 * Also, the low order bits of the hash value are thought to be
634	 * distributed evenly. Otherwise, in the case that the multilist
635	 * has a power of two number of sublists, each sublists' usage
636	 * would not be evenly distributed.
637	 */
638	return (dbuf_hash(db->db_objset, db->db.db_object,
639	    db->db_level, db->db_blkid) %
640	    multilist_get_num_sublists(ml));
641}
642
643static inline unsigned long
644dbuf_cache_target_bytes(void)
645{
646	return MIN(dbuf_cache_max_bytes,
647	    arc_max_bytes() >> dbuf_cache_shift);
648}
649
650static inline uint64_t
651dbuf_cache_hiwater_bytes(void)
652{
653	uint64_t dbuf_cache_target = dbuf_cache_target_bytes();
654	return (dbuf_cache_target +
655	    (dbuf_cache_target * dbuf_cache_hiwater_pct) / 100);
656}
657
658static inline uint64_t
659dbuf_cache_lowater_bytes(void)
660{
661	uint64_t dbuf_cache_target = dbuf_cache_target_bytes();
662	return (dbuf_cache_target -
663	    (dbuf_cache_target * dbuf_cache_lowater_pct) / 100);
664}
665
666static inline boolean_t
667dbuf_cache_above_lowater(void)
668{
669	return (zfs_refcount_count(&dbuf_caches[DB_DBUF_CACHE].size) >
670	    dbuf_cache_lowater_bytes());
671}
672
673/*
674 * Evict the oldest eligible dbuf from the dbuf cache.
675 */
676static void
677dbuf_evict_one(void)
678{
679	int idx = multilist_get_random_index(dbuf_caches[DB_DBUF_CACHE].cache);
680	multilist_sublist_t *mls = multilist_sublist_lock(
681	    dbuf_caches[DB_DBUF_CACHE].cache, idx);
682
683	ASSERT(!MUTEX_HELD(&dbuf_evict_lock));
684
685	dmu_buf_impl_t *db = multilist_sublist_tail(mls);
686	while (db != NULL && mutex_tryenter(&db->db_mtx) == 0) {
687		db = multilist_sublist_prev(mls, db);
688	}
689
690	DTRACE_PROBE2(dbuf__evict__one, dmu_buf_impl_t *, db,
691	    multilist_sublist_t *, mls);
692
693	if (db != NULL) {
694		multilist_sublist_remove(mls, db);
695		multilist_sublist_unlock(mls);
696		(void) zfs_refcount_remove_many(
697		    &dbuf_caches[DB_DBUF_CACHE].size,
698		    db->db.db_size, db);
699		DBUF_STAT_BUMPDOWN(cache_levels[db->db_level]);
700		DBUF_STAT_BUMPDOWN(cache_count);
701		DBUF_STAT_DECR(cache_levels_bytes[db->db_level],
702		    db->db.db_size);
703		ASSERT3U(db->db_caching_status, ==, DB_DBUF_CACHE);
704		db->db_caching_status = DB_NO_CACHE;
705		dbuf_destroy(db);
706		DBUF_STAT_BUMP(cache_total_evicts);
707	} else {
708		multilist_sublist_unlock(mls);
709	}
710}
711
712/*
713 * The dbuf evict thread is responsible for aging out dbufs from the
714 * cache. Once the cache has reached it's maximum size, dbufs are removed
715 * and destroyed. The eviction thread will continue running until the size
716 * of the dbuf cache is at or below the maximum size. Once the dbuf is aged
717 * out of the cache it is destroyed and becomes eligible for arc eviction.
718 */
719/* ARGSUSED */
720static void
721dbuf_evict_thread(void *unused __unused)
722{
723	callb_cpr_t cpr;
724
725	CALLB_CPR_INIT(&cpr, &dbuf_evict_lock, callb_generic_cpr, FTAG);
726
727	mutex_enter(&dbuf_evict_lock);
728	while (!dbuf_evict_thread_exit) {
729		while (!dbuf_cache_above_lowater() && !dbuf_evict_thread_exit) {
730			CALLB_CPR_SAFE_BEGIN(&cpr);
731			(void) cv_timedwait_hires(&dbuf_evict_cv,
732			    &dbuf_evict_lock, SEC2NSEC(1), MSEC2NSEC(1), 0);
733			CALLB_CPR_SAFE_END(&cpr, &dbuf_evict_lock);
734#ifdef __FreeBSD__
735			if (dbuf_ksp != NULL)
736				dbuf_ksp->ks_update(dbuf_ksp, KSTAT_READ);
737#endif
738		}
739		mutex_exit(&dbuf_evict_lock);
740
741		/*
742		 * Keep evicting as long as we're above the low water mark
743		 * for the cache. We do this without holding the locks to
744		 * minimize lock contention.
745		 */
746		while (dbuf_cache_above_lowater() && !dbuf_evict_thread_exit) {
747			dbuf_evict_one();
748		}
749
750		mutex_enter(&dbuf_evict_lock);
751	}
752
753	dbuf_evict_thread_exit = B_FALSE;
754	cv_broadcast(&dbuf_evict_cv);
755	CALLB_CPR_EXIT(&cpr);	/* drops dbuf_evict_lock */
756	thread_exit();
757}
758
759/*
760 * Wake up the dbuf eviction thread if the dbuf cache is at its max size.
761 * If the dbuf cache is at its high water mark, then evict a dbuf from the
762 * dbuf cache using the callers context.
763 */
764static void
765dbuf_evict_notify(uint64_t size)
766{
767	/*
768	 * We check if we should evict without holding the dbuf_evict_lock,
769	 * because it's OK to occasionally make the wrong decision here,
770	 * and grabbing the lock results in massive lock contention.
771	 */
772	if (size > dbuf_cache_max_bytes) {
773		if (size > dbuf_cache_hiwater_bytes())
774			dbuf_evict_one();
775		cv_signal(&dbuf_evict_cv);
776	}
777}
778
779static int
780dbuf_kstat_update(kstat_t *ksp, int rw)
781{
782	dbuf_stats_t *ds = ksp->ks_data;
783
784	if (rw == KSTAT_WRITE) {
785		return (SET_ERROR(EACCES));
786	} else {
787		ds->metadata_cache_size_bytes.value.ui64 =
788		    zfs_refcount_count(&dbuf_caches[DB_DBUF_METADATA_CACHE].size);
789		ds->cache_size_bytes.value.ui64 =
790		    zfs_refcount_count(&dbuf_caches[DB_DBUF_CACHE].size);
791		ds->cache_target_bytes.value.ui64 = dbuf_cache_target_bytes();
792		ds->cache_hiwater_bytes.value.ui64 = dbuf_cache_hiwater_bytes();
793		ds->cache_lowater_bytes.value.ui64 = dbuf_cache_lowater_bytes();
794		ds->hash_elements.value.ui64 = dbuf_hash_count;
795	}
796
797	return (0);
798}
799
800void
801dbuf_init(void)
802{
803	uint64_t hsize = 1ULL << 16;
804	dbuf_hash_table_t *h = &dbuf_hash_table;
805	int i;
806
807	/*
808	 * The hash table is big enough to fill all of physical memory
809	 * with an average 4K block size.  The table will take up
810	 * totalmem*sizeof(void*)/4K (i.e. 2MB/GB with 8-byte pointers).
811	 */
812	while (hsize * 4096 < (uint64_t)physmem * PAGESIZE)
813		hsize <<= 1;
814
815retry:
816	h->hash_table_mask = hsize - 1;
817	h->hash_table = kmem_zalloc(hsize * sizeof (void *), KM_NOSLEEP);
818	if (h->hash_table == NULL) {
819		/* XXX - we should really return an error instead of assert */
820		ASSERT(hsize > (1ULL << 10));
821		hsize >>= 1;
822		goto retry;
823	}
824
825	dbuf_kmem_cache = kmem_cache_create("dmu_buf_impl_t",
826	    sizeof (dmu_buf_impl_t),
827	    0, dbuf_cons, dbuf_dest, NULL, NULL, NULL, 0);
828
829	for (i = 0; i < DBUF_MUTEXES; i++)
830		mutex_init(&h->hash_mutexes[i], NULL, MUTEX_DEFAULT, NULL);
831
832	dbuf_stats_init(h);
833	/*
834	 * Setup the parameters for the dbuf caches. We set the sizes of the
835	 * dbuf cache and the metadata cache to 1/32nd and 1/16th (default)
836	 * of the size of the ARC, respectively. If the values are set in
837	 * /etc/system and they're not greater than the size of the ARC, then
838	 * we honor that value.
839	 */
840	if (dbuf_cache_max_bytes == 0 ||
841	    dbuf_cache_max_bytes >= arc_max_bytes())  {
842		dbuf_cache_max_bytes = arc_max_bytes() >> dbuf_cache_shift;
843	}
844	if (dbuf_metadata_cache_max_bytes == 0 ||
845	    dbuf_metadata_cache_max_bytes >= arc_max_bytes()) {
846		dbuf_metadata_cache_max_bytes =
847		    arc_max_bytes() >> dbuf_metadata_cache_shift;
848	}
849
850	/*
851	 * All entries are queued via taskq_dispatch_ent(), so min/maxalloc
852	 * configuration is not required.
853	 */
854	dbu_evict_taskq = taskq_create("dbu_evict", 1, minclsyspri, 0, 0, 0);
855
856	for (dbuf_cached_state_t dcs = 0; dcs < DB_CACHE_MAX; dcs++) {
857		dbuf_caches[dcs].cache =
858		    multilist_create(sizeof (dmu_buf_impl_t),
859		    offsetof(dmu_buf_impl_t, db_cache_link),
860		    dbuf_cache_multilist_index_func);
861		zfs_refcount_create(&dbuf_caches[dcs].size);
862	}
863
864	dbuf_evict_thread_exit = B_FALSE;
865	mutex_init(&dbuf_evict_lock, NULL, MUTEX_DEFAULT, NULL);
866	cv_init(&dbuf_evict_cv, NULL, CV_DEFAULT, NULL);
867	dbuf_cache_evict_thread = thread_create(NULL, 0, dbuf_evict_thread,
868	    NULL, 0, &p0, TS_RUN, minclsyspri);
869
870	dbuf_ksp = kstat_create("zfs", 0, "dbufstats", "misc",
871	    KSTAT_TYPE_NAMED, sizeof (dbuf_stats) / sizeof (kstat_named_t),
872	    KSTAT_FLAG_VIRTUAL);
873	if (dbuf_ksp != NULL) {
874		for (i = 0; i < DN_MAX_LEVELS; i++) {
875			snprintf(dbuf_stats.cache_levels[i].name,
876			    KSTAT_STRLEN, "cache_level_%d", i);
877			dbuf_stats.cache_levels[i].data_type =
878			    KSTAT_DATA_UINT64;
879			snprintf(dbuf_stats.cache_levels_bytes[i].name,
880			    KSTAT_STRLEN, "cache_level_%d_bytes", i);
881			dbuf_stats.cache_levels_bytes[i].data_type =
882			    KSTAT_DATA_UINT64;
883		}
884		dbuf_ksp->ks_data = &dbuf_stats;
885		dbuf_ksp->ks_update = dbuf_kstat_update;
886		kstat_install(dbuf_ksp);
887	}
888}
889
890void
891dbuf_fini(void)
892{
893	dbuf_hash_table_t *h = &dbuf_hash_table;
894	int i;
895
896	dbuf_stats_destroy();
897
898	for (i = 0; i < DBUF_MUTEXES; i++)
899		mutex_destroy(&h->hash_mutexes[i]);
900	kmem_free(h->hash_table, (h->hash_table_mask + 1) * sizeof (void *));
901	kmem_cache_destroy(dbuf_kmem_cache);
902	taskq_destroy(dbu_evict_taskq);
903
904	mutex_enter(&dbuf_evict_lock);
905	dbuf_evict_thread_exit = B_TRUE;
906	while (dbuf_evict_thread_exit) {
907		cv_signal(&dbuf_evict_cv);
908		cv_wait(&dbuf_evict_cv, &dbuf_evict_lock);
909	}
910	mutex_exit(&dbuf_evict_lock);
911
912	mutex_destroy(&dbuf_evict_lock);
913	cv_destroy(&dbuf_evict_cv);
914
915	for (dbuf_cached_state_t dcs = 0; dcs < DB_CACHE_MAX; dcs++) {
916		zfs_refcount_destroy(&dbuf_caches[dcs].size);
917		multilist_destroy(dbuf_caches[dcs].cache);
918	}
919
920	if (dbuf_ksp != NULL) {
921		kstat_delete(dbuf_ksp);
922		dbuf_ksp = NULL;
923	}
924}
925
926/*
927 * Other stuff.
928 */
929
930#ifdef ZFS_DEBUG
931static void
932dbuf_verify(dmu_buf_impl_t *db)
933{
934	dnode_t *dn;
935	dbuf_dirty_record_t *dr;
936
937	ASSERT(MUTEX_HELD(&db->db_mtx));
938
939	if (!(zfs_flags & ZFS_DEBUG_DBUF_VERIFY))
940		return;
941
942	ASSERT(db->db_objset != NULL);
943	DB_DNODE_ENTER(db);
944	dn = DB_DNODE(db);
945	if (dn == NULL) {
946		ASSERT(db->db_parent == NULL);
947		ASSERT(db->db_blkptr == NULL);
948	} else {
949		ASSERT3U(db->db.db_object, ==, dn->dn_object);
950		ASSERT3P(db->db_objset, ==, dn->dn_objset);
951		ASSERT3U(db->db_level, <, dn->dn_nlevels);
952		ASSERT(db->db_blkid == DMU_BONUS_BLKID ||
953		    db->db_blkid == DMU_SPILL_BLKID ||
954		    !avl_is_empty(&dn->dn_dbufs));
955	}
956	if (db->db_blkid == DMU_BONUS_BLKID) {
957		ASSERT(dn != NULL);
958		ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
959		ASSERT3U(db->db.db_offset, ==, DMU_BONUS_BLKID);
960	} else if (db->db_blkid == DMU_SPILL_BLKID) {
961		ASSERT(dn != NULL);
962		ASSERT0(db->db.db_offset);
963	} else {
964		ASSERT3U(db->db.db_offset, ==, db->db_blkid * db->db.db_size);
965	}
966
967	for (dr = db->db_data_pending; dr != NULL; dr = dr->dr_next)
968		ASSERT(dr->dr_dbuf == db);
969
970	for (dr = db->db_last_dirty; dr != NULL; dr = dr->dr_next)
971		ASSERT(dr->dr_dbuf == db);
972
973	/*
974	 * We can't assert that db_size matches dn_datablksz because it
975	 * can be momentarily different when another thread is doing
976	 * dnode_set_blksz().
977	 */
978	if (db->db_level == 0 && db->db.db_object == DMU_META_DNODE_OBJECT) {
979		dr = db->db_data_pending;
980		/*
981		 * It should only be modified in syncing context, so
982		 * make sure we only have one copy of the data.
983		 */
984		ASSERT(dr == NULL || dr->dt.dl.dr_data == db->db_buf);
985	}
986
987	/* verify db->db_blkptr */
988	if (db->db_blkptr) {
989		if (db->db_parent == dn->dn_dbuf) {
990			/* db is pointed to by the dnode */
991			/* ASSERT3U(db->db_blkid, <, dn->dn_nblkptr); */
992			if (DMU_OBJECT_IS_SPECIAL(db->db.db_object))
993				ASSERT(db->db_parent == NULL);
994			else
995				ASSERT(db->db_parent != NULL);
996			if (db->db_blkid != DMU_SPILL_BLKID)
997				ASSERT3P(db->db_blkptr, ==,
998				    &dn->dn_phys->dn_blkptr[db->db_blkid]);
999		} else {
1000			/* db is pointed to by an indirect block */
1001			int epb = db->db_parent->db.db_size >> SPA_BLKPTRSHIFT;
1002			ASSERT3U(db->db_parent->db_level, ==, db->db_level+1);
1003			ASSERT3U(db->db_parent->db.db_object, ==,
1004			    db->db.db_object);
1005			/*
1006			 * dnode_grow_indblksz() can make this fail if we don't
1007			 * have the struct_rwlock.  XXX indblksz no longer
1008			 * grows.  safe to do this now?
1009			 */
1010			if (RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
1011				ASSERT3P(db->db_blkptr, ==,
1012				    ((blkptr_t *)db->db_parent->db.db_data +
1013				    db->db_blkid % epb));
1014			}
1015		}
1016	}
1017	if ((db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr)) &&
1018	    (db->db_buf == NULL || db->db_buf->b_data) &&
1019	    db->db.db_data && db->db_blkid != DMU_BONUS_BLKID &&
1020	    db->db_state != DB_FILL && !dn->dn_free_txg) {
1021		/*
1022		 * If the blkptr isn't set but they have nonzero data,
1023		 * it had better be dirty, otherwise we'll lose that
1024		 * data when we evict this buffer.
1025		 *
1026		 * There is an exception to this rule for indirect blocks; in
1027		 * this case, if the indirect block is a hole, we fill in a few
1028		 * fields on each of the child blocks (importantly, birth time)
1029		 * to prevent hole birth times from being lost when you
1030		 * partially fill in a hole.
1031		 */
1032		if (db->db_dirtycnt == 0) {
1033			if (db->db_level == 0) {
1034				uint64_t *buf = db->db.db_data;
1035				int i;
1036
1037				for (i = 0; i < db->db.db_size >> 3; i++) {
1038					ASSERT(buf[i] == 0);
1039				}
1040			} else {
1041				blkptr_t *bps = db->db.db_data;
1042				ASSERT3U(1 << DB_DNODE(db)->dn_indblkshift, ==,
1043				    db->db.db_size);
1044				/*
1045				 * We want to verify that all the blkptrs in the
1046				 * indirect block are holes, but we may have
1047				 * automatically set up a few fields for them.
1048				 * We iterate through each blkptr and verify
1049				 * they only have those fields set.
1050				 */
1051				for (int i = 0;
1052				    i < db->db.db_size / sizeof (blkptr_t);
1053				    i++) {
1054					blkptr_t *bp = &bps[i];
1055					ASSERT(ZIO_CHECKSUM_IS_ZERO(
1056					    &bp->blk_cksum));
1057					ASSERT(
1058					    DVA_IS_EMPTY(&bp->blk_dva[0]) &&
1059					    DVA_IS_EMPTY(&bp->blk_dva[1]) &&
1060					    DVA_IS_EMPTY(&bp->blk_dva[2]));
1061					ASSERT0(bp->blk_fill);
1062					ASSERT0(bp->blk_pad[0]);
1063					ASSERT0(bp->blk_pad[1]);
1064					ASSERT(!BP_IS_EMBEDDED(bp));
1065					ASSERT(BP_IS_HOLE(bp));
1066					ASSERT0(bp->blk_phys_birth);
1067				}
1068			}
1069		}
1070	}
1071	DB_DNODE_EXIT(db);
1072}
1073#endif
1074
1075static void
1076dbuf_clear_data(dmu_buf_impl_t *db)
1077{
1078	ASSERT(MUTEX_HELD(&db->db_mtx));
1079	dbuf_evict_user(db);
1080	ASSERT3P(db->db_buf, ==, NULL);
1081	db->db.db_data = NULL;
1082	if (db->db_state != DB_NOFILL)
1083		db->db_state = DB_UNCACHED;
1084}
1085
1086static void
1087dbuf_set_data(dmu_buf_impl_t *db, arc_buf_t *buf)
1088{
1089	ASSERT(MUTEX_HELD(&db->db_mtx));
1090	ASSERT(buf != NULL);
1091
1092	db->db_buf = buf;
1093	ASSERT(buf->b_data != NULL);
1094	db->db.db_data = buf->b_data;
1095}
1096
1097/*
1098 * Loan out an arc_buf for read.  Return the loaned arc_buf.
1099 */
1100arc_buf_t *
1101dbuf_loan_arcbuf(dmu_buf_impl_t *db)
1102{
1103	arc_buf_t *abuf;
1104
1105	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1106	mutex_enter(&db->db_mtx);
1107	if (arc_released(db->db_buf) || zfs_refcount_count(&db->db_holds) > 1) {
1108		int blksz = db->db.db_size;
1109		spa_t *spa = db->db_objset->os_spa;
1110
1111		mutex_exit(&db->db_mtx);
1112		abuf = arc_loan_buf(spa, B_FALSE, blksz);
1113		bcopy(db->db.db_data, abuf->b_data, blksz);
1114	} else {
1115		abuf = db->db_buf;
1116		arc_loan_inuse_buf(abuf, db);
1117		db->db_buf = NULL;
1118		dbuf_clear_data(db);
1119		mutex_exit(&db->db_mtx);
1120	}
1121	return (abuf);
1122}
1123
1124/*
1125 * Calculate which level n block references the data at the level 0 offset
1126 * provided.
1127 */
1128uint64_t
1129dbuf_whichblock(dnode_t *dn, int64_t level, uint64_t offset)
1130{
1131	if (dn->dn_datablkshift != 0 && dn->dn_indblkshift != 0) {
1132		/*
1133		 * The level n blkid is equal to the level 0 blkid divided by
1134		 * the number of level 0s in a level n block.
1135		 *
1136		 * The level 0 blkid is offset >> datablkshift =
1137		 * offset / 2^datablkshift.
1138		 *
1139		 * The number of level 0s in a level n is the number of block
1140		 * pointers in an indirect block, raised to the power of level.
1141		 * This is 2^(indblkshift - SPA_BLKPTRSHIFT)^level =
1142		 * 2^(level*(indblkshift - SPA_BLKPTRSHIFT)).
1143		 *
1144		 * Thus, the level n blkid is: offset /
1145		 * ((2^datablkshift)*(2^(level*(indblkshift - SPA_BLKPTRSHIFT)))
1146		 * = offset / 2^(datablkshift + level *
1147		 *   (indblkshift - SPA_BLKPTRSHIFT))
1148		 * = offset >> (datablkshift + level *
1149		 *   (indblkshift - SPA_BLKPTRSHIFT))
1150		 */
1151		return (offset >> (dn->dn_datablkshift + level *
1152		    (dn->dn_indblkshift - SPA_BLKPTRSHIFT)));
1153	} else {
1154		ASSERT3U(offset, <, dn->dn_datablksz);
1155		return (0);
1156	}
1157}
1158
1159static void
1160dbuf_read_done(zio_t *zio, const zbookmark_phys_t *zb, const blkptr_t *bp,
1161    arc_buf_t *buf, void *vdb)
1162{
1163	dmu_buf_impl_t *db = vdb;
1164
1165	mutex_enter(&db->db_mtx);
1166	ASSERT3U(db->db_state, ==, DB_READ);
1167	/*
1168	 * All reads are synchronous, so we must have a hold on the dbuf
1169	 */
1170	ASSERT(zfs_refcount_count(&db->db_holds) > 0);
1171	ASSERT(db->db_buf == NULL);
1172	ASSERT(db->db.db_data == NULL);
1173	if (buf == NULL) {
1174		/* i/o error */
1175		ASSERT(zio == NULL || zio->io_error != 0);
1176		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1177		ASSERT3P(db->db_buf, ==, NULL);
1178		db->db_state = DB_UNCACHED;
1179	} else if (db->db_level == 0 && db->db_freed_in_flight) {
1180		/* freed in flight */
1181		ASSERT(zio == NULL || zio->io_error == 0);
1182		if (buf == NULL) {
1183			buf = arc_alloc_buf(db->db_objset->os_spa,
1184			     db, DBUF_GET_BUFC_TYPE(db), db->db.db_size);
1185		}
1186		arc_release(buf, db);
1187		bzero(buf->b_data, db->db.db_size);
1188		arc_buf_freeze(buf);
1189		db->db_freed_in_flight = FALSE;
1190		dbuf_set_data(db, buf);
1191		db->db_state = DB_CACHED;
1192	} else {
1193		/* success */
1194		ASSERT(zio == NULL || zio->io_error == 0);
1195		dbuf_set_data(db, buf);
1196		db->db_state = DB_CACHED;
1197	}
1198	cv_broadcast(&db->db_changed);
1199	dbuf_rele_and_unlock(db, NULL, B_FALSE);
1200}
1201
1202static void
1203dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
1204{
1205	dnode_t *dn;
1206	zbookmark_phys_t zb;
1207	arc_flags_t aflags = ARC_FLAG_NOWAIT;
1208
1209	DB_DNODE_ENTER(db);
1210	dn = DB_DNODE(db);
1211	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
1212	/* We need the struct_rwlock to prevent db_blkptr from changing. */
1213	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1214	ASSERT(MUTEX_HELD(&db->db_mtx));
1215	ASSERT(db->db_state == DB_UNCACHED);
1216	ASSERT(db->db_buf == NULL);
1217
1218	if (db->db_blkid == DMU_BONUS_BLKID) {
1219		/*
1220		 * The bonus length stored in the dnode may be less than
1221		 * the maximum available space in the bonus buffer.
1222		 */
1223		int bonuslen = MIN(dn->dn_bonuslen, dn->dn_phys->dn_bonuslen);
1224		int max_bonuslen = DN_SLOTS_TO_BONUSLEN(dn->dn_num_slots);
1225
1226		ASSERT3U(bonuslen, <=, db->db.db_size);
1227		db->db.db_data = zio_buf_alloc(max_bonuslen);
1228		arc_space_consume(max_bonuslen, ARC_SPACE_BONUS);
1229		if (bonuslen < max_bonuslen)
1230			bzero(db->db.db_data, max_bonuslen);
1231		if (bonuslen)
1232			bcopy(DN_BONUS(dn->dn_phys), db->db.db_data, bonuslen);
1233		DB_DNODE_EXIT(db);
1234		db->db_state = DB_CACHED;
1235		mutex_exit(&db->db_mtx);
1236		return;
1237	}
1238
1239	/*
1240	 * Recheck BP_IS_HOLE() after dnode_block_freed() in case dnode_sync()
1241	 * processes the delete record and clears the bp while we are waiting
1242	 * for the dn_mtx (resulting in a "no" from block_freed).
1243	 */
1244	if (db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr) ||
1245	    (db->db_level == 0 && (dnode_block_freed(dn, db->db_blkid) ||
1246	    BP_IS_HOLE(db->db_blkptr)))) {
1247		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
1248
1249		dbuf_set_data(db, arc_alloc_buf(db->db_objset->os_spa, db, type,
1250		    db->db.db_size));
1251		bzero(db->db.db_data, db->db.db_size);
1252
1253		if (db->db_blkptr != NULL && db->db_level > 0 &&
1254		    BP_IS_HOLE(db->db_blkptr) &&
1255		    db->db_blkptr->blk_birth != 0) {
1256			blkptr_t *bps = db->db.db_data;
1257			for (int i = 0; i < ((1 <<
1258			    DB_DNODE(db)->dn_indblkshift) / sizeof (blkptr_t));
1259			    i++) {
1260				blkptr_t *bp = &bps[i];
1261				ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
1262				    1 << dn->dn_indblkshift);
1263				BP_SET_LSIZE(bp,
1264				    BP_GET_LEVEL(db->db_blkptr) == 1 ?
1265				    dn->dn_datablksz :
1266				    BP_GET_LSIZE(db->db_blkptr));
1267				BP_SET_TYPE(bp, BP_GET_TYPE(db->db_blkptr));
1268				BP_SET_LEVEL(bp,
1269				    BP_GET_LEVEL(db->db_blkptr) - 1);
1270				BP_SET_BIRTH(bp, db->db_blkptr->blk_birth, 0);
1271			}
1272		}
1273		DB_DNODE_EXIT(db);
1274		db->db_state = DB_CACHED;
1275		mutex_exit(&db->db_mtx);
1276		return;
1277	}
1278
1279	DB_DNODE_EXIT(db);
1280
1281	db->db_state = DB_READ;
1282	mutex_exit(&db->db_mtx);
1283
1284	if (DBUF_IS_L2CACHEABLE(db))
1285		aflags |= ARC_FLAG_L2CACHE;
1286
1287	SET_BOOKMARK(&zb, db->db_objset->os_dsl_dataset ?
1288	    db->db_objset->os_dsl_dataset->ds_object : DMU_META_OBJSET,
1289	    db->db.db_object, db->db_level, db->db_blkid);
1290
1291	dbuf_add_ref(db, NULL);
1292
1293	(void) arc_read(zio, db->db_objset->os_spa, db->db_blkptr,
1294	    dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
1295	    (flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
1296	    &aflags, &zb);
1297}
1298
1299/*
1300 * This is our just-in-time copy function.  It makes a copy of buffers that
1301 * have been modified in a previous transaction group before we access them in
1302 * the current active group.
1303 *
1304 * This function is used in three places: when we are dirtying a buffer for the
1305 * first time in a txg, when we are freeing a range in a dnode that includes
1306 * this buffer, and when we are accessing a buffer which was received compressed
1307 * and later referenced in a WRITE_BYREF record.
1308 *
1309 * Note that when we are called from dbuf_free_range() we do not put a hold on
1310 * the buffer, we just traverse the active dbuf list for the dnode.
1311 */
1312static void
1313dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
1314{
1315	dbuf_dirty_record_t *dr = db->db_last_dirty;
1316
1317	ASSERT(MUTEX_HELD(&db->db_mtx));
1318	ASSERT(db->db.db_data != NULL);
1319	ASSERT(db->db_level == 0);
1320	ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT);
1321
1322	if (dr == NULL ||
1323	    (dr->dt.dl.dr_data !=
1324	    ((db->db_blkid  == DMU_BONUS_BLKID) ? db->db.db_data : db->db_buf)))
1325		return;
1326
1327	/*
1328	 * If the last dirty record for this dbuf has not yet synced
1329	 * and its referencing the dbuf data, either:
1330	 *	reset the reference to point to a new copy,
1331	 * or (if there a no active holders)
1332	 *	just null out the current db_data pointer.
1333	 */
1334	ASSERT(dr->dr_txg >= txg - 2);
1335	if (db->db_blkid == DMU_BONUS_BLKID) {
1336		/* Note that the data bufs here are zio_bufs */
1337		dnode_t *dn = DB_DNODE(db);
1338		int bonuslen = DN_SLOTS_TO_BONUSLEN(dn->dn_num_slots);
1339		dr->dt.dl.dr_data = zio_buf_alloc(bonuslen);
1340		arc_space_consume(bonuslen, ARC_SPACE_BONUS);
1341		bcopy(db->db.db_data, dr->dt.dl.dr_data, bonuslen);
1342	} else if (zfs_refcount_count(&db->db_holds) > db->db_dirtycnt) {
1343		int size = arc_buf_size(db->db_buf);
1344		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
1345		spa_t *spa = db->db_objset->os_spa;
1346		enum zio_compress compress_type =
1347		    arc_get_compression(db->db_buf);
1348
1349		if (compress_type == ZIO_COMPRESS_OFF) {
1350			dr->dt.dl.dr_data = arc_alloc_buf(spa, db, type, size);
1351		} else {
1352			ASSERT3U(type, ==, ARC_BUFC_DATA);
1353			dr->dt.dl.dr_data = arc_alloc_compressed_buf(spa, db,
1354			    size, arc_buf_lsize(db->db_buf), compress_type);
1355		}
1356		bcopy(db->db.db_data, dr->dt.dl.dr_data->b_data, size);
1357	} else {
1358		db->db_buf = NULL;
1359		dbuf_clear_data(db);
1360	}
1361}
1362
1363int
1364dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
1365{
1366	int err = 0;
1367	boolean_t prefetch;
1368	dnode_t *dn;
1369
1370	/*
1371	 * We don't have to hold the mutex to check db_state because it
1372	 * can't be freed while we have a hold on the buffer.
1373	 */
1374	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
1375
1376	if (db->db_state == DB_NOFILL)
1377		return (SET_ERROR(EIO));
1378
1379	DB_DNODE_ENTER(db);
1380	dn = DB_DNODE(db);
1381	if ((flags & DB_RF_HAVESTRUCT) == 0)
1382		rw_enter(&dn->dn_struct_rwlock, RW_READER);
1383
1384	prefetch = db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
1385	    (flags & DB_RF_NOPREFETCH) == 0 && dn != NULL &&
1386	    DBUF_IS_CACHEABLE(db);
1387
1388	mutex_enter(&db->db_mtx);
1389	if (db->db_state == DB_CACHED) {
1390		/*
1391		 * If the arc buf is compressed, we need to decompress it to
1392		 * read the data. This could happen during the "zfs receive" of
1393		 * a stream which is compressed and deduplicated.
1394		 */
1395		if (db->db_buf != NULL &&
1396		    arc_get_compression(db->db_buf) != ZIO_COMPRESS_OFF) {
1397			dbuf_fix_old_data(db,
1398			    spa_syncing_txg(dmu_objset_spa(db->db_objset)));
1399			err = arc_decompress(db->db_buf);
1400			dbuf_set_data(db, db->db_buf);
1401		}
1402		mutex_exit(&db->db_mtx);
1403		if (prefetch)
1404			dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
1405		if ((flags & DB_RF_HAVESTRUCT) == 0)
1406			rw_exit(&dn->dn_struct_rwlock);
1407		DB_DNODE_EXIT(db);
1408		DBUF_STAT_BUMP(hash_hits);
1409	} else if (db->db_state == DB_UNCACHED) {
1410		spa_t *spa = dn->dn_objset->os_spa;
1411		boolean_t need_wait = B_FALSE;
1412
1413		if (zio == NULL &&
1414		    db->db_blkptr != NULL && !BP_IS_HOLE(db->db_blkptr)) {
1415			zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
1416			need_wait = B_TRUE;
1417		}
1418		dbuf_read_impl(db, zio, flags);
1419
1420		/* dbuf_read_impl has dropped db_mtx for us */
1421
1422		if (prefetch)
1423			dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
1424
1425		if ((flags & DB_RF_HAVESTRUCT) == 0)
1426			rw_exit(&dn->dn_struct_rwlock);
1427		DB_DNODE_EXIT(db);
1428		DBUF_STAT_BUMP(hash_misses);
1429
1430		if (need_wait)
1431			err = zio_wait(zio);
1432	} else {
1433		/*
1434		 * Another reader came in while the dbuf was in flight
1435		 * between UNCACHED and CACHED.  Either a writer will finish
1436		 * writing the buffer (sending the dbuf to CACHED) or the
1437		 * first reader's request will reach the read_done callback
1438		 * and send the dbuf to CACHED.  Otherwise, a failure
1439		 * occurred and the dbuf went to UNCACHED.
1440		 */
1441		mutex_exit(&db->db_mtx);
1442		if (prefetch)
1443			dmu_zfetch(&dn->dn_zfetch, db->db_blkid, 1, B_TRUE);
1444		if ((flags & DB_RF_HAVESTRUCT) == 0)
1445			rw_exit(&dn->dn_struct_rwlock);
1446		DB_DNODE_EXIT(db);
1447		DBUF_STAT_BUMP(hash_misses);
1448
1449		/* Skip the wait per the caller's request. */
1450		mutex_enter(&db->db_mtx);
1451		if ((flags & DB_RF_NEVERWAIT) == 0) {
1452			while (db->db_state == DB_READ ||
1453			    db->db_state == DB_FILL) {
1454				ASSERT(db->db_state == DB_READ ||
1455				    (flags & DB_RF_HAVESTRUCT) == 0);
1456				DTRACE_PROBE2(blocked__read, dmu_buf_impl_t *,
1457				    db, zio_t *, zio);
1458				cv_wait(&db->db_changed, &db->db_mtx);
1459			}
1460			if (db->db_state == DB_UNCACHED)
1461				err = SET_ERROR(EIO);
1462		}
1463		mutex_exit(&db->db_mtx);
1464	}
1465
1466	return (err);
1467}
1468
1469static void
1470dbuf_noread(dmu_buf_impl_t *db)
1471{
1472	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
1473	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1474	mutex_enter(&db->db_mtx);
1475	while (db->db_state == DB_READ || db->db_state == DB_FILL)
1476		cv_wait(&db->db_changed, &db->db_mtx);
1477	if (db->db_state == DB_UNCACHED) {
1478		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
1479		spa_t *spa = db->db_objset->os_spa;
1480
1481		ASSERT(db->db_buf == NULL);
1482		ASSERT(db->db.db_data == NULL);
1483		dbuf_set_data(db, arc_alloc_buf(spa, db, type, db->db.db_size));
1484		db->db_state = DB_FILL;
1485	} else if (db->db_state == DB_NOFILL) {
1486		dbuf_clear_data(db);
1487	} else {
1488		ASSERT3U(db->db_state, ==, DB_CACHED);
1489	}
1490	mutex_exit(&db->db_mtx);
1491}
1492
1493void
1494dbuf_unoverride(dbuf_dirty_record_t *dr)
1495{
1496	dmu_buf_impl_t *db = dr->dr_dbuf;
1497	blkptr_t *bp = &dr->dt.dl.dr_overridden_by;
1498	uint64_t txg = dr->dr_txg;
1499
1500	ASSERT(MUTEX_HELD(&db->db_mtx));
1501	/*
1502	 * This assert is valid because dmu_sync() expects to be called by
1503	 * a zilog's get_data while holding a range lock.  This call only
1504	 * comes from dbuf_dirty() callers who must also hold a range lock.
1505	 */
1506	ASSERT(dr->dt.dl.dr_override_state != DR_IN_DMU_SYNC);
1507	ASSERT(db->db_level == 0);
1508
1509	if (db->db_blkid == DMU_BONUS_BLKID ||
1510	    dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN)
1511		return;
1512
1513	ASSERT(db->db_data_pending != dr);
1514
1515	/* free this block */
1516	if (!BP_IS_HOLE(bp) && !dr->dt.dl.dr_nopwrite)
1517		zio_free(db->db_objset->os_spa, txg, bp);
1518
1519	dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1520	dr->dt.dl.dr_nopwrite = B_FALSE;
1521
1522	/*
1523	 * Release the already-written buffer, so we leave it in
1524	 * a consistent dirty state.  Note that all callers are
1525	 * modifying the buffer, so they will immediately do
1526	 * another (redundant) arc_release().  Therefore, leave
1527	 * the buf thawed to save the effort of freezing &
1528	 * immediately re-thawing it.
1529	 */
1530	arc_release(dr->dt.dl.dr_data, db);
1531}
1532
1533/*
1534 * Evict (if its unreferenced) or clear (if its referenced) any level-0
1535 * data blocks in the free range, so that any future readers will find
1536 * empty blocks.
1537 */
1538void
1539dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
1540    dmu_tx_t *tx)
1541{
1542	dmu_buf_impl_t db_search;
1543	dmu_buf_impl_t *db, *db_next;
1544	uint64_t txg = tx->tx_txg;
1545	avl_index_t where;
1546
1547	if (end_blkid > dn->dn_maxblkid &&
1548	    !(start_blkid == DMU_SPILL_BLKID || end_blkid == DMU_SPILL_BLKID))
1549		end_blkid = dn->dn_maxblkid;
1550	dprintf_dnode(dn, "start=%llu end=%llu\n", start_blkid, end_blkid);
1551
1552	db_search.db_level = 0;
1553	db_search.db_blkid = start_blkid;
1554	db_search.db_state = DB_SEARCH;
1555
1556	mutex_enter(&dn->dn_dbufs_mtx);
1557	db = avl_find(&dn->dn_dbufs, &db_search, &where);
1558	ASSERT3P(db, ==, NULL);
1559
1560	db = avl_nearest(&dn->dn_dbufs, where, AVL_AFTER);
1561
1562	for (; db != NULL; db = db_next) {
1563		db_next = AVL_NEXT(&dn->dn_dbufs, db);
1564		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1565
1566		if (db->db_level != 0 || db->db_blkid > end_blkid) {
1567			break;
1568		}
1569		ASSERT3U(db->db_blkid, >=, start_blkid);
1570
1571		/* found a level 0 buffer in the range */
1572		mutex_enter(&db->db_mtx);
1573		if (dbuf_undirty(db, tx)) {
1574			/* mutex has been dropped and dbuf destroyed */
1575			continue;
1576		}
1577
1578		if (db->db_state == DB_UNCACHED ||
1579		    db->db_state == DB_NOFILL ||
1580		    db->db_state == DB_EVICTING) {
1581			ASSERT(db->db.db_data == NULL);
1582			mutex_exit(&db->db_mtx);
1583			continue;
1584		}
1585		if (db->db_state == DB_READ || db->db_state == DB_FILL) {
1586			/* will be handled in dbuf_read_done or dbuf_rele */
1587			db->db_freed_in_flight = TRUE;
1588			mutex_exit(&db->db_mtx);
1589			continue;
1590		}
1591		if (zfs_refcount_count(&db->db_holds) == 0) {
1592			ASSERT(db->db_buf);
1593			dbuf_destroy(db);
1594			continue;
1595		}
1596		/* The dbuf is referenced */
1597
1598		if (db->db_last_dirty != NULL) {
1599			dbuf_dirty_record_t *dr = db->db_last_dirty;
1600
1601			if (dr->dr_txg == txg) {
1602				/*
1603				 * This buffer is "in-use", re-adjust the file
1604				 * size to reflect that this buffer may
1605				 * contain new data when we sync.
1606				 */
1607				if (db->db_blkid != DMU_SPILL_BLKID &&
1608				    db->db_blkid > dn->dn_maxblkid)
1609					dn->dn_maxblkid = db->db_blkid;
1610				dbuf_unoverride(dr);
1611			} else {
1612				/*
1613				 * This dbuf is not dirty in the open context.
1614				 * Either uncache it (if its not referenced in
1615				 * the open context) or reset its contents to
1616				 * empty.
1617				 */
1618				dbuf_fix_old_data(db, txg);
1619			}
1620		}
1621		/* clear the contents if its cached */
1622		if (db->db_state == DB_CACHED) {
1623			ASSERT(db->db.db_data != NULL);
1624			arc_release(db->db_buf, db);
1625			bzero(db->db.db_data, db->db.db_size);
1626			arc_buf_freeze(db->db_buf);
1627		}
1628
1629		mutex_exit(&db->db_mtx);
1630	}
1631	mutex_exit(&dn->dn_dbufs_mtx);
1632}
1633
1634void
1635dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
1636{
1637	arc_buf_t *buf, *obuf;
1638	int osize = db->db.db_size;
1639	arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
1640	dnode_t *dn;
1641
1642	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1643
1644	DB_DNODE_ENTER(db);
1645	dn = DB_DNODE(db);
1646
1647	/* XXX does *this* func really need the lock? */
1648	ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
1649
1650	/*
1651	 * This call to dmu_buf_will_dirty() with the dn_struct_rwlock held
1652	 * is OK, because there can be no other references to the db
1653	 * when we are changing its size, so no concurrent DB_FILL can
1654	 * be happening.
1655	 */
1656	/*
1657	 * XXX we should be doing a dbuf_read, checking the return
1658	 * value and returning that up to our callers
1659	 */
1660	dmu_buf_will_dirty(&db->db, tx);
1661
1662	/* create the data buffer for the new block */
1663	buf = arc_alloc_buf(dn->dn_objset->os_spa, db, type, size);
1664
1665	/* copy old block data to the new block */
1666	obuf = db->db_buf;
1667	bcopy(obuf->b_data, buf->b_data, MIN(osize, size));
1668	/* zero the remainder */
1669	if (size > osize)
1670		bzero((uint8_t *)buf->b_data + osize, size - osize);
1671
1672	mutex_enter(&db->db_mtx);
1673	dbuf_set_data(db, buf);
1674	arc_buf_destroy(obuf, db);
1675	db->db.db_size = size;
1676
1677	if (db->db_level == 0) {
1678		ASSERT3U(db->db_last_dirty->dr_txg, ==, tx->tx_txg);
1679		db->db_last_dirty->dt.dl.dr_data = buf;
1680	}
1681	mutex_exit(&db->db_mtx);
1682
1683	dmu_objset_willuse_space(dn->dn_objset, size - osize, tx);
1684	DB_DNODE_EXIT(db);
1685}
1686
1687void
1688dbuf_release_bp(dmu_buf_impl_t *db)
1689{
1690	objset_t *os = db->db_objset;
1691
1692	ASSERT(dsl_pool_sync_context(dmu_objset_pool(os)));
1693	ASSERT(arc_released(os->os_phys_buf) ||
1694	    list_link_active(&os->os_dsl_dataset->ds_synced_link));
1695	ASSERT(db->db_parent == NULL || arc_released(db->db_parent->db_buf));
1696
1697	(void) arc_release(db->db_buf, db);
1698}
1699
1700/*
1701 * We already have a dirty record for this TXG, and we are being
1702 * dirtied again.
1703 */
1704static void
1705dbuf_redirty(dbuf_dirty_record_t *dr)
1706{
1707	dmu_buf_impl_t *db = dr->dr_dbuf;
1708
1709	ASSERT(MUTEX_HELD(&db->db_mtx));
1710
1711	if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID) {
1712		/*
1713		 * If this buffer has already been written out,
1714		 * we now need to reset its state.
1715		 */
1716		dbuf_unoverride(dr);
1717		if (db->db.db_object != DMU_META_DNODE_OBJECT &&
1718		    db->db_state != DB_NOFILL) {
1719			/* Already released on initial dirty, so just thaw. */
1720			ASSERT(arc_released(db->db_buf));
1721			arc_buf_thaw(db->db_buf);
1722		}
1723	}
1724}
1725
1726dbuf_dirty_record_t *
1727dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
1728{
1729	dnode_t *dn;
1730	objset_t *os;
1731	dbuf_dirty_record_t **drp, *dr;
1732	int drop_struct_lock = FALSE;
1733	int txgoff = tx->tx_txg & TXG_MASK;
1734
1735	ASSERT(tx->tx_txg != 0);
1736	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
1737	DMU_TX_DIRTY_BUF(tx, db);
1738
1739	DB_DNODE_ENTER(db);
1740	dn = DB_DNODE(db);
1741	/*
1742	 * Shouldn't dirty a regular buffer in syncing context.  Private
1743	 * objects may be dirtied in syncing context, but only if they
1744	 * were already pre-dirtied in open context.
1745	 */
1746#ifdef DEBUG
1747	if (dn->dn_objset->os_dsl_dataset != NULL) {
1748		rrw_enter(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock,
1749		    RW_READER, FTAG);
1750	}
1751	ASSERT(!dmu_tx_is_syncing(tx) ||
1752	    BP_IS_HOLE(dn->dn_objset->os_rootbp) ||
1753	    DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
1754	    dn->dn_objset->os_dsl_dataset == NULL);
1755	if (dn->dn_objset->os_dsl_dataset != NULL)
1756		rrw_exit(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock, FTAG);
1757#endif
1758	/*
1759	 * We make this assert for private objects as well, but after we
1760	 * check if we're already dirty.  They are allowed to re-dirty
1761	 * in syncing context.
1762	 */
1763	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
1764	    dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
1765	    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
1766
1767	mutex_enter(&db->db_mtx);
1768	/*
1769	 * XXX make this true for indirects too?  The problem is that
1770	 * transactions created with dmu_tx_create_assigned() from
1771	 * syncing context don't bother holding ahead.
1772	 */
1773	ASSERT(db->db_level != 0 ||
1774	    db->db_state == DB_CACHED || db->db_state == DB_FILL ||
1775	    db->db_state == DB_NOFILL);
1776
1777	mutex_enter(&dn->dn_mtx);
1778	/*
1779	 * Don't set dirtyctx to SYNC if we're just modifying this as we
1780	 * initialize the objset.
1781	 */
1782	if (dn->dn_dirtyctx == DN_UNDIRTIED) {
1783		if (dn->dn_objset->os_dsl_dataset != NULL) {
1784			rrw_enter(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock,
1785			    RW_READER, FTAG);
1786		}
1787		if (!BP_IS_HOLE(dn->dn_objset->os_rootbp)) {
1788			dn->dn_dirtyctx = (dmu_tx_is_syncing(tx) ?
1789			    DN_DIRTY_SYNC : DN_DIRTY_OPEN);
1790			ASSERT(dn->dn_dirtyctx_firstset == NULL);
1791			dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_SLEEP);
1792		}
1793		if (dn->dn_objset->os_dsl_dataset != NULL) {
1794			rrw_exit(&dn->dn_objset->os_dsl_dataset->ds_bp_rwlock,
1795			    FTAG);
1796		}
1797	}
1798
1799	if (tx->tx_txg > dn->dn_dirty_txg)
1800		dn->dn_dirty_txg = tx->tx_txg;
1801	mutex_exit(&dn->dn_mtx);
1802
1803	if (db->db_blkid == DMU_SPILL_BLKID)
1804		dn->dn_have_spill = B_TRUE;
1805
1806	/*
1807	 * If this buffer is already dirty, we're done.
1808	 */
1809	drp = &db->db_last_dirty;
1810	ASSERT(*drp == NULL || (*drp)->dr_txg <= tx->tx_txg ||
1811	    db->db.db_object == DMU_META_DNODE_OBJECT);
1812	while ((dr = *drp) != NULL && dr->dr_txg > tx->tx_txg)
1813		drp = &dr->dr_next;
1814	if (dr && dr->dr_txg == tx->tx_txg) {
1815		DB_DNODE_EXIT(db);
1816
1817		dbuf_redirty(dr);
1818		mutex_exit(&db->db_mtx);
1819		return (dr);
1820	}
1821
1822	/*
1823	 * Only valid if not already dirty.
1824	 */
1825	ASSERT(dn->dn_object == 0 ||
1826	    dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
1827	    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
1828
1829	ASSERT3U(dn->dn_nlevels, >, db->db_level);
1830
1831	/*
1832	 * We should only be dirtying in syncing context if it's the
1833	 * mos or we're initializing the os or it's a special object.
1834	 * However, we are allowed to dirty in syncing context provided
1835	 * we already dirtied it in open context.  Hence we must make
1836	 * this assertion only if we're not already dirty.
1837	 */
1838	os = dn->dn_objset;
1839	VERIFY3U(tx->tx_txg, <=, spa_final_dirty_txg(os->os_spa));
1840#ifdef DEBUG
1841	if (dn->dn_objset->os_dsl_dataset != NULL)
1842		rrw_enter(&os->os_dsl_dataset->ds_bp_rwlock, RW_READER, FTAG);
1843	ASSERT(!dmu_tx_is_syncing(tx) || DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
1844	    os->os_dsl_dataset == NULL || BP_IS_HOLE(os->os_rootbp));
1845	if (dn->dn_objset->os_dsl_dataset != NULL)
1846		rrw_exit(&os->os_dsl_dataset->ds_bp_rwlock, FTAG);
1847#endif
1848	ASSERT(db->db.db_size != 0);
1849
1850	dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
1851
1852	if (db->db_blkid != DMU_BONUS_BLKID) {
1853		dmu_objset_willuse_space(os, db->db.db_size, tx);
1854	}
1855
1856	/*
1857	 * If this buffer is dirty in an old transaction group we need
1858	 * to make a copy of it so that the changes we make in this
1859	 * transaction group won't leak out when we sync the older txg.
1860	 */
1861	dr = kmem_zalloc(sizeof (dbuf_dirty_record_t), KM_SLEEP);
1862	list_link_init(&dr->dr_dirty_node);
1863	if (db->db_level == 0) {
1864		void *data_old = db->db_buf;
1865
1866		if (db->db_state != DB_NOFILL) {
1867			if (db->db_blkid == DMU_BONUS_BLKID) {
1868				dbuf_fix_old_data(db, tx->tx_txg);
1869				data_old = db->db.db_data;
1870			} else if (db->db.db_object != DMU_META_DNODE_OBJECT) {
1871				/*
1872				 * Release the data buffer from the cache so
1873				 * that we can modify it without impacting
1874				 * possible other users of this cached data
1875				 * block.  Note that indirect blocks and
1876				 * private objects are not released until the
1877				 * syncing state (since they are only modified
1878				 * then).
1879				 */
1880				arc_release(db->db_buf, db);
1881				dbuf_fix_old_data(db, tx->tx_txg);
1882				data_old = db->db_buf;
1883			}
1884			ASSERT(data_old != NULL);
1885		}
1886		dr->dt.dl.dr_data = data_old;
1887	} else {
1888		mutex_init(&dr->dt.di.dr_mtx, NULL, MUTEX_DEFAULT, NULL);
1889		list_create(&dr->dt.di.dr_children,
1890		    sizeof (dbuf_dirty_record_t),
1891		    offsetof(dbuf_dirty_record_t, dr_dirty_node));
1892	}
1893	if (db->db_blkid != DMU_BONUS_BLKID && os->os_dsl_dataset != NULL)
1894		dr->dr_accounted = db->db.db_size;
1895	dr->dr_dbuf = db;
1896	dr->dr_txg = tx->tx_txg;
1897	dr->dr_next = *drp;
1898	*drp = dr;
1899
1900	/*
1901	 * We could have been freed_in_flight between the dbuf_noread
1902	 * and dbuf_dirty.  We win, as though the dbuf_noread() had
1903	 * happened after the free.
1904	 */
1905	if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
1906	    db->db_blkid != DMU_SPILL_BLKID) {
1907		mutex_enter(&dn->dn_mtx);
1908		if (dn->dn_free_ranges[txgoff] != NULL) {
1909			range_tree_clear(dn->dn_free_ranges[txgoff],
1910			    db->db_blkid, 1);
1911		}
1912		mutex_exit(&dn->dn_mtx);
1913		db->db_freed_in_flight = FALSE;
1914	}
1915
1916	/*
1917	 * This buffer is now part of this txg
1918	 */
1919	dbuf_add_ref(db, (void *)(uintptr_t)tx->tx_txg);
1920	db->db_dirtycnt += 1;
1921	ASSERT3U(db->db_dirtycnt, <=, 3);
1922
1923	mutex_exit(&db->db_mtx);
1924
1925	if (db->db_blkid == DMU_BONUS_BLKID ||
1926	    db->db_blkid == DMU_SPILL_BLKID) {
1927		mutex_enter(&dn->dn_mtx);
1928		ASSERT(!list_link_active(&dr->dr_dirty_node));
1929		list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
1930		mutex_exit(&dn->dn_mtx);
1931		dnode_setdirty(dn, tx);
1932		DB_DNODE_EXIT(db);
1933		return (dr);
1934	}
1935
1936	/*
1937	 * The dn_struct_rwlock prevents db_blkptr from changing
1938	 * due to a write from syncing context completing
1939	 * while we are running, so we want to acquire it before
1940	 * looking at db_blkptr.
1941	 */
1942	if (!RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
1943		rw_enter(&dn->dn_struct_rwlock, RW_READER);
1944		drop_struct_lock = TRUE;
1945	}
1946
1947	/*
1948	 * We need to hold the dn_struct_rwlock to make this assertion,
1949	 * because it protects dn_phys / dn_next_nlevels from changing.
1950	 */
1951	ASSERT((dn->dn_phys->dn_nlevels == 0 && db->db_level == 0) ||
1952	    dn->dn_phys->dn_nlevels > db->db_level ||
1953	    dn->dn_next_nlevels[txgoff] > db->db_level ||
1954	    dn->dn_next_nlevels[(tx->tx_txg-1) & TXG_MASK] > db->db_level ||
1955	    dn->dn_next_nlevels[(tx->tx_txg-2) & TXG_MASK] > db->db_level);
1956
1957	/*
1958	 * If we are overwriting a dedup BP, then unless it is snapshotted,
1959	 * when we get to syncing context we will need to decrement its
1960	 * refcount in the DDT.  Prefetch the relevant DDT block so that
1961	 * syncing context won't have to wait for the i/o.
1962	 */
1963	ddt_prefetch(os->os_spa, db->db_blkptr);
1964
1965	if (db->db_level == 0) {
1966		dnode_new_blkid(dn, db->db_blkid, tx, drop_struct_lock);
1967		ASSERT(dn->dn_maxblkid >= db->db_blkid);
1968	}
1969
1970	if (db->db_level+1 < dn->dn_nlevels) {
1971		dmu_buf_impl_t *parent = db->db_parent;
1972		dbuf_dirty_record_t *di;
1973		int parent_held = FALSE;
1974
1975		if (db->db_parent == NULL || db->db_parent == dn->dn_dbuf) {
1976			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
1977
1978			parent = dbuf_hold_level(dn, db->db_level+1,
1979			    db->db_blkid >> epbs, FTAG);
1980			ASSERT(parent != NULL);
1981			parent_held = TRUE;
1982		}
1983		if (drop_struct_lock)
1984			rw_exit(&dn->dn_struct_rwlock);
1985		ASSERT3U(db->db_level+1, ==, parent->db_level);
1986		di = dbuf_dirty(parent, tx);
1987		if (parent_held)
1988			dbuf_rele(parent, FTAG);
1989
1990		mutex_enter(&db->db_mtx);
1991		/*
1992		 * Since we've dropped the mutex, it's possible that
1993		 * dbuf_undirty() might have changed this out from under us.
1994		 */
1995		if (db->db_last_dirty == dr ||
1996		    dn->dn_object == DMU_META_DNODE_OBJECT) {
1997			mutex_enter(&di->dt.di.dr_mtx);
1998			ASSERT3U(di->dr_txg, ==, tx->tx_txg);
1999			ASSERT(!list_link_active(&dr->dr_dirty_node));
2000			list_insert_tail(&di->dt.di.dr_children, dr);
2001			mutex_exit(&di->dt.di.dr_mtx);
2002			dr->dr_parent = di;
2003		}
2004		mutex_exit(&db->db_mtx);
2005	} else {
2006		ASSERT(db->db_level+1 == dn->dn_nlevels);
2007		ASSERT(db->db_blkid < dn->dn_nblkptr);
2008		ASSERT(db->db_parent == NULL || db->db_parent == dn->dn_dbuf);
2009		mutex_enter(&dn->dn_mtx);
2010		ASSERT(!list_link_active(&dr->dr_dirty_node));
2011		list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
2012		mutex_exit(&dn->dn_mtx);
2013		if (drop_struct_lock)
2014			rw_exit(&dn->dn_struct_rwlock);
2015	}
2016
2017	dnode_setdirty(dn, tx);
2018	DB_DNODE_EXIT(db);
2019	return (dr);
2020}
2021
2022/*
2023 * Undirty a buffer in the transaction group referenced by the given
2024 * transaction.  Return whether this evicted the dbuf.
2025 */
2026static boolean_t
2027dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
2028{
2029	dnode_t *dn;
2030	uint64_t txg = tx->tx_txg;
2031	dbuf_dirty_record_t *dr, **drp;
2032
2033	ASSERT(txg != 0);
2034
2035	/*
2036	 * Due to our use of dn_nlevels below, this can only be called
2037	 * in open context, unless we are operating on the MOS.
2038	 * From syncing context, dn_nlevels may be different from the
2039	 * dn_nlevels used when dbuf was dirtied.
2040	 */
2041	ASSERT(db->db_objset ==
2042	    dmu_objset_pool(db->db_objset)->dp_meta_objset ||
2043	    txg != spa_syncing_txg(dmu_objset_spa(db->db_objset)));
2044	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
2045	ASSERT0(db->db_level);
2046	ASSERT(MUTEX_HELD(&db->db_mtx));
2047
2048	/*
2049	 * If this buffer is not dirty, we're done.
2050	 */
2051	for (drp = &db->db_last_dirty; (dr = *drp) != NULL; drp = &dr->dr_next)
2052		if (dr->dr_txg <= txg)
2053			break;
2054	if (dr == NULL || dr->dr_txg < txg)
2055		return (B_FALSE);
2056	ASSERT(dr->dr_txg == txg);
2057	ASSERT(dr->dr_dbuf == db);
2058
2059	DB_DNODE_ENTER(db);
2060	dn = DB_DNODE(db);
2061
2062	dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
2063
2064	ASSERT(db->db.db_size != 0);
2065
2066	dsl_pool_undirty_space(dmu_objset_pool(dn->dn_objset),
2067	    dr->dr_accounted, txg);
2068
2069	*drp = dr->dr_next;
2070
2071	/*
2072	 * Note that there are three places in dbuf_dirty()
2073	 * where this dirty record may be put on a list.
2074	 * Make sure to do a list_remove corresponding to
2075	 * every one of those list_insert calls.
2076	 */
2077	if (dr->dr_parent) {
2078		mutex_enter(&dr->dr_parent->dt.di.dr_mtx);
2079		list_remove(&dr->dr_parent->dt.di.dr_children, dr);
2080		mutex_exit(&dr->dr_parent->dt.di.dr_mtx);
2081	} else if (db->db_blkid == DMU_SPILL_BLKID ||
2082	    db->db_level + 1 == dn->dn_nlevels) {
2083		ASSERT(db->db_blkptr == NULL || db->db_parent == dn->dn_dbuf);
2084		mutex_enter(&dn->dn_mtx);
2085		list_remove(&dn->dn_dirty_records[txg & TXG_MASK], dr);
2086		mutex_exit(&dn->dn_mtx);
2087	}
2088	DB_DNODE_EXIT(db);
2089
2090	if (db->db_state != DB_NOFILL) {
2091		dbuf_unoverride(dr);
2092
2093		ASSERT(db->db_buf != NULL);
2094		ASSERT(dr->dt.dl.dr_data != NULL);
2095		if (dr->dt.dl.dr_data != db->db_buf)
2096			arc_buf_destroy(dr->dt.dl.dr_data, db);
2097	}
2098
2099	kmem_free(dr, sizeof (dbuf_dirty_record_t));
2100
2101	ASSERT(db->db_dirtycnt > 0);
2102	db->db_dirtycnt -= 1;
2103
2104	if (zfs_refcount_remove(&db->db_holds, (void *)(uintptr_t)txg) == 0) {
2105		ASSERT(db->db_state == DB_NOFILL || arc_released(db->db_buf));
2106		dbuf_destroy(db);
2107		return (B_TRUE);
2108	}
2109
2110	return (B_FALSE);
2111}
2112
2113void
2114dmu_buf_will_dirty(dmu_buf_t *db_fake, dmu_tx_t *tx)
2115{
2116	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2117	int rf = DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH;
2118
2119	ASSERT(tx->tx_txg != 0);
2120	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
2121
2122	/*
2123	 * Quick check for dirtyness.  For already dirty blocks, this
2124	 * reduces runtime of this function by >90%, and overall performance
2125	 * by 50% for some workloads (e.g. file deletion with indirect blocks
2126	 * cached).
2127	 */
2128	mutex_enter(&db->db_mtx);
2129	dbuf_dirty_record_t *dr;
2130	for (dr = db->db_last_dirty;
2131	    dr != NULL && dr->dr_txg >= tx->tx_txg; dr = dr->dr_next) {
2132		/*
2133		 * It's possible that it is already dirty but not cached,
2134		 * because there are some calls to dbuf_dirty() that don't
2135		 * go through dmu_buf_will_dirty().
2136		 */
2137		if (dr->dr_txg == tx->tx_txg && db->db_state == DB_CACHED) {
2138			/* This dbuf is already dirty and cached. */
2139			dbuf_redirty(dr);
2140			mutex_exit(&db->db_mtx);
2141			return;
2142		}
2143	}
2144	mutex_exit(&db->db_mtx);
2145
2146	DB_DNODE_ENTER(db);
2147	if (RW_WRITE_HELD(&DB_DNODE(db)->dn_struct_rwlock))
2148		rf |= DB_RF_HAVESTRUCT;
2149	DB_DNODE_EXIT(db);
2150	(void) dbuf_read(db, NULL, rf);
2151	(void) dbuf_dirty(db, tx);
2152}
2153
2154void
2155dmu_buf_will_not_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
2156{
2157	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2158
2159	db->db_state = DB_NOFILL;
2160
2161	dmu_buf_will_fill(db_fake, tx);
2162}
2163
2164void
2165dmu_buf_will_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
2166{
2167	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2168
2169	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
2170	ASSERT(tx->tx_txg != 0);
2171	ASSERT(db->db_level == 0);
2172	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
2173
2174	ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT ||
2175	    dmu_tx_private_ok(tx));
2176
2177	dbuf_noread(db);
2178	(void) dbuf_dirty(db, tx);
2179}
2180
2181#pragma weak dmu_buf_fill_done = dbuf_fill_done
2182/* ARGSUSED */
2183void
2184dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
2185{
2186	mutex_enter(&db->db_mtx);
2187	DBUF_VERIFY(db);
2188
2189	if (db->db_state == DB_FILL) {
2190		if (db->db_level == 0 && db->db_freed_in_flight) {
2191			ASSERT(db->db_blkid != DMU_BONUS_BLKID);
2192			/* we were freed while filling */
2193			/* XXX dbuf_undirty? */
2194			bzero(db->db.db_data, db->db.db_size);
2195			db->db_freed_in_flight = FALSE;
2196		}
2197		db->db_state = DB_CACHED;
2198		cv_broadcast(&db->db_changed);
2199	}
2200	mutex_exit(&db->db_mtx);
2201}
2202
2203void
2204dmu_buf_write_embedded(dmu_buf_t *dbuf, void *data,
2205    bp_embedded_type_t etype, enum zio_compress comp,
2206    int uncompressed_size, int compressed_size, int byteorder,
2207    dmu_tx_t *tx)
2208{
2209	dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbuf;
2210	struct dirty_leaf *dl;
2211	dmu_object_type_t type;
2212
2213	if (etype == BP_EMBEDDED_TYPE_DATA) {
2214		ASSERT(spa_feature_is_active(dmu_objset_spa(db->db_objset),
2215		    SPA_FEATURE_EMBEDDED_DATA));
2216	}
2217
2218	DB_DNODE_ENTER(db);
2219	type = DB_DNODE(db)->dn_type;
2220	DB_DNODE_EXIT(db);
2221
2222	ASSERT0(db->db_level);
2223	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
2224
2225	dmu_buf_will_not_fill(dbuf, tx);
2226
2227	ASSERT3U(db->db_last_dirty->dr_txg, ==, tx->tx_txg);
2228	dl = &db->db_last_dirty->dt.dl;
2229	encode_embedded_bp_compressed(&dl->dr_overridden_by,
2230	    data, comp, uncompressed_size, compressed_size);
2231	BPE_SET_ETYPE(&dl->dr_overridden_by, etype);
2232	BP_SET_TYPE(&dl->dr_overridden_by, type);
2233	BP_SET_LEVEL(&dl->dr_overridden_by, 0);
2234	BP_SET_BYTEORDER(&dl->dr_overridden_by, byteorder);
2235
2236	dl->dr_override_state = DR_OVERRIDDEN;
2237	dl->dr_overridden_by.blk_birth = db->db_last_dirty->dr_txg;
2238}
2239
2240/*
2241 * Directly assign a provided arc buf to a given dbuf if it's not referenced
2242 * by anybody except our caller. Otherwise copy arcbuf's contents to dbuf.
2243 */
2244void
2245dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
2246{
2247	ASSERT(!zfs_refcount_is_zero(&db->db_holds));
2248	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
2249	ASSERT(db->db_level == 0);
2250	ASSERT3U(dbuf_is_metadata(db), ==, arc_is_metadata(buf));
2251	ASSERT(buf != NULL);
2252	ASSERT(arc_buf_lsize(buf) == db->db.db_size);
2253	ASSERT(tx->tx_txg != 0);
2254
2255	arc_return_buf(buf, db);
2256	ASSERT(arc_released(buf));
2257
2258	mutex_enter(&db->db_mtx);
2259
2260	while (db->db_state == DB_READ || db->db_state == DB_FILL)
2261		cv_wait(&db->db_changed, &db->db_mtx);
2262
2263	ASSERT(db->db_state == DB_CACHED || db->db_state == DB_UNCACHED);
2264
2265	if (db->db_state == DB_CACHED &&
2266	    zfs_refcount_count(&db->db_holds) - 1 > db->db_dirtycnt) {
2267		mutex_exit(&db->db_mtx);
2268		(void) dbuf_dirty(db, tx);
2269		bcopy(buf->b_data, db->db.db_data, db->db.db_size);
2270		arc_buf_destroy(buf, db);
2271		xuio_stat_wbuf_copied();
2272		return;
2273	}
2274
2275	xuio_stat_wbuf_nocopy();
2276	if (db->db_state == DB_CACHED) {
2277		dbuf_dirty_record_t *dr = db->db_last_dirty;
2278
2279		ASSERT(db->db_buf != NULL);
2280		if (dr != NULL && dr->dr_txg == tx->tx_txg) {
2281			ASSERT(dr->dt.dl.dr_data == db->db_buf);
2282			if (!arc_released(db->db_buf)) {
2283				ASSERT(dr->dt.dl.dr_override_state ==
2284				    DR_OVERRIDDEN);
2285				arc_release(db->db_buf, db);
2286			}
2287			dr->dt.dl.dr_data = buf;
2288			arc_buf_destroy(db->db_buf, db);
2289		} else if (dr == NULL || dr->dt.dl.dr_data != db->db_buf) {
2290			arc_release(db->db_buf, db);
2291			arc_buf_destroy(db->db_buf, db);
2292		}
2293		db->db_buf = NULL;
2294	}
2295	ASSERT(db->db_buf == NULL);
2296	dbuf_set_data(db, buf);
2297	db->db_state = DB_FILL;
2298	mutex_exit(&db->db_mtx);
2299	(void) dbuf_dirty(db, tx);
2300	dmu_buf_fill_done(&db->db, tx);
2301}
2302
2303void
2304dbuf_destroy(dmu_buf_impl_t *db)
2305{
2306	dnode_t *dn;
2307	dmu_buf_impl_t *parent = db->db_parent;
2308	dmu_buf_impl_t *dndb;
2309
2310	ASSERT(MUTEX_HELD(&db->db_mtx));
2311	ASSERT(zfs_refcount_is_zero(&db->db_holds));
2312
2313	if (db->db_buf != NULL) {
2314		arc_buf_destroy(db->db_buf, db);
2315		db->db_buf = NULL;
2316	}
2317
2318	if (db->db_blkid == DMU_BONUS_BLKID) {
2319		int slots = DB_DNODE(db)->dn_num_slots;
2320		int bonuslen = DN_SLOTS_TO_BONUSLEN(slots);
2321		if (db->db.db_data != NULL) {
2322			zio_buf_free(db->db.db_data, bonuslen);
2323			arc_space_return(bonuslen, ARC_SPACE_BONUS);
2324			db->db_state = DB_UNCACHED;
2325		}
2326	}
2327
2328	dbuf_clear_data(db);
2329
2330	if (multilist_link_active(&db->db_cache_link)) {
2331		ASSERT(db->db_caching_status == DB_DBUF_CACHE ||
2332		    db->db_caching_status == DB_DBUF_METADATA_CACHE);
2333
2334		multilist_remove(dbuf_caches[db->db_caching_status].cache, db);
2335		(void) zfs_refcount_remove_many(
2336		    &dbuf_caches[db->db_caching_status].size,
2337		    db->db.db_size, db);
2338
2339		if (db->db_caching_status == DB_DBUF_METADATA_CACHE) {
2340			DBUF_STAT_BUMPDOWN(metadata_cache_count);
2341		} else {
2342			DBUF_STAT_BUMPDOWN(cache_levels[db->db_level]);
2343			DBUF_STAT_BUMPDOWN(cache_count);
2344			DBUF_STAT_DECR(cache_levels_bytes[db->db_level],
2345			    db->db.db_size);
2346		}
2347		db->db_caching_status = DB_NO_CACHE;
2348	}
2349
2350	ASSERT(db->db_state == DB_UNCACHED || db->db_state == DB_NOFILL);
2351	ASSERT(db->db_data_pending == NULL);
2352
2353	db->db_state = DB_EVICTING;
2354	db->db_blkptr = NULL;
2355
2356	/*
2357	 * Now that db_state is DB_EVICTING, nobody else can find this via
2358	 * the hash table.  We can now drop db_mtx, which allows us to
2359	 * acquire the dn_dbufs_mtx.
2360	 */
2361	mutex_exit(&db->db_mtx);
2362
2363	DB_DNODE_ENTER(db);
2364	dn = DB_DNODE(db);
2365	dndb = dn->dn_dbuf;
2366	if (db->db_blkid != DMU_BONUS_BLKID) {
2367		boolean_t needlock = !MUTEX_HELD(&dn->dn_dbufs_mtx);
2368		if (needlock)
2369			mutex_enter(&dn->dn_dbufs_mtx);
2370		avl_remove(&dn->dn_dbufs, db);
2371		membar_producer();
2372		DB_DNODE_EXIT(db);
2373		if (needlock)
2374			mutex_exit(&dn->dn_dbufs_mtx);
2375		/*
2376		 * Decrementing the dbuf count means that the hold corresponding
2377		 * to the removed dbuf is no longer discounted in dnode_move(),
2378		 * so the dnode cannot be moved until after we release the hold.
2379		 * The membar_producer() ensures visibility of the decremented
2380		 * value in dnode_move(), since DB_DNODE_EXIT doesn't actually
2381		 * release any lock.
2382		 */
2383		mutex_enter(&dn->dn_mtx);
2384		dnode_rele_and_unlock(dn, db, B_TRUE);
2385		db->db_dnode_handle = NULL;
2386
2387		dbuf_hash_remove(db);
2388	} else {
2389		DB_DNODE_EXIT(db);
2390	}
2391
2392	ASSERT(zfs_refcount_is_zero(&db->db_holds));
2393
2394	db->db_parent = NULL;
2395
2396	ASSERT(db->db_buf == NULL);
2397	ASSERT(db->db.db_data == NULL);
2398	ASSERT(db->db_hash_next == NULL);
2399	ASSERT(db->db_blkptr == NULL);
2400	ASSERT(db->db_data_pending == NULL);
2401	ASSERT3U(db->db_caching_status, ==, DB_NO_CACHE);
2402	ASSERT(!multilist_link_active(&db->db_cache_link));
2403
2404	kmem_cache_free(dbuf_kmem_cache, db);
2405	arc_space_return(sizeof (dmu_buf_impl_t), ARC_SPACE_DBUF);
2406
2407	/*
2408	 * If this dbuf is referenced from an indirect dbuf,
2409	 * decrement the ref count on the indirect dbuf.
2410	 */
2411	if (parent && parent != dndb) {
2412		mutex_enter(&parent->db_mtx);
2413		dbuf_rele_and_unlock(parent, db, B_TRUE);
2414	}
2415}
2416
2417/*
2418 * Note: While bpp will always be updated if the function returns success,
2419 * parentp will not be updated if the dnode does not have dn_dbuf filled in;
2420 * this happens when the dnode is the meta-dnode, or a userused or groupused
2421 * object.
2422 */
2423__attribute__((always_inline))
2424static inline int
2425dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
2426    dmu_buf_impl_t **parentp, blkptr_t **bpp, struct dbuf_hold_impl_data *dh)
2427{
2428	*parentp = NULL;
2429	*bpp = NULL;
2430
2431	ASSERT(blkid != DMU_BONUS_BLKID);
2432
2433	if (blkid == DMU_SPILL_BLKID) {
2434		mutex_enter(&dn->dn_mtx);
2435		if (dn->dn_have_spill &&
2436		    (dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR))
2437			*bpp = DN_SPILL_BLKPTR(dn->dn_phys);
2438		else
2439			*bpp = NULL;
2440		dbuf_add_ref(dn->dn_dbuf, NULL);
2441		*parentp = dn->dn_dbuf;
2442		mutex_exit(&dn->dn_mtx);
2443		return (0);
2444	}
2445
2446	int nlevels =
2447	    (dn->dn_phys->dn_nlevels == 0) ? 1 : dn->dn_phys->dn_nlevels;
2448	int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
2449
2450	ASSERT3U(level * epbs, <, 64);
2451	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
2452	/*
2453	 * This assertion shouldn't trip as long as the max indirect block size
2454	 * is less than 1M.  The reason for this is that up to that point,
2455	 * the number of levels required to address an entire object with blocks
2456	 * of size SPA_MINBLOCKSIZE satisfies nlevels * epbs + 1 <= 64.  In
2457	 * other words, if N * epbs + 1 > 64, then if (N-1) * epbs + 1 > 55
2458	 * (i.e. we can address the entire object), objects will all use at most
2459	 * N-1 levels and the assertion won't overflow.  However, once epbs is
2460	 * 13, 4 * 13 + 1 = 53, but 5 * 13 + 1 = 66.  Then, 4 levels will not be
2461	 * enough to address an entire object, so objects will have 5 levels,
2462	 * but then this assertion will overflow.
2463	 *
2464	 * All this is to say that if we ever increase DN_MAX_INDBLKSHIFT, we
2465	 * need to redo this logic to handle overflows.
2466	 */
2467	ASSERT(level >= nlevels ||
2468	    ((nlevels - level - 1) * epbs) +
2469	    highbit64(dn->dn_phys->dn_nblkptr) <= 64);
2470	if (level >= nlevels ||
2471	    blkid >= ((uint64_t)dn->dn_phys->dn_nblkptr <<
2472	    ((nlevels - level - 1) * epbs)) ||
2473	    (fail_sparse &&
2474	    blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))) {
2475		/* the buffer has no parent yet */
2476		return (SET_ERROR(ENOENT));
2477	} else if (level < nlevels-1) {
2478		/* this block is referenced from an indirect block */
2479		int err;
2480		if (dh == NULL) {
2481			err = dbuf_hold_impl(dn, level+1,
2482			    blkid >> epbs, fail_sparse, FALSE, NULL, parentp);
2483		} else {
2484			__dbuf_hold_impl_init(dh + 1, dn, dh->dh_level + 1,
2485			    blkid >> epbs, fail_sparse, FALSE, NULL,
2486			    parentp, dh->dh_depth + 1);
2487			err = __dbuf_hold_impl(dh + 1);
2488		}
2489		if (err)
2490			return (err);
2491		err = dbuf_read(*parentp, NULL,
2492		    (DB_RF_HAVESTRUCT | DB_RF_NOPREFETCH | DB_RF_CANFAIL));
2493		if (err) {
2494			dbuf_rele(*parentp, NULL);
2495			*parentp = NULL;
2496			return (err);
2497		}
2498		*bpp = ((blkptr_t *)(*parentp)->db.db_data) +
2499		    (blkid & ((1ULL << epbs) - 1));
2500		if (blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))
2501			ASSERT(BP_IS_HOLE(*bpp));
2502		return (0);
2503	} else {
2504		/* the block is referenced from the dnode */
2505		ASSERT3U(level, ==, nlevels-1);
2506		ASSERT(dn->dn_phys->dn_nblkptr == 0 ||
2507		    blkid < dn->dn_phys->dn_nblkptr);
2508		if (dn->dn_dbuf) {
2509			dbuf_add_ref(dn->dn_dbuf, NULL);
2510			*parentp = dn->dn_dbuf;
2511		}
2512		*bpp = &dn->dn_phys->dn_blkptr[blkid];
2513		return (0);
2514	}
2515}
2516
2517static dmu_buf_impl_t *
2518dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
2519    dmu_buf_impl_t *parent, blkptr_t *blkptr)
2520{
2521	objset_t *os = dn->dn_objset;
2522	dmu_buf_impl_t *db, *odb;
2523
2524	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
2525	ASSERT(dn->dn_type != DMU_OT_NONE);
2526
2527	db = kmem_cache_alloc(dbuf_kmem_cache, KM_SLEEP);
2528
2529	db->db_objset = os;
2530	db->db.db_object = dn->dn_object;
2531	db->db_level = level;
2532	db->db_blkid = blkid;
2533	db->db_last_dirty = NULL;
2534	db->db_dirtycnt = 0;
2535	db->db_dnode_handle = dn->dn_handle;
2536	db->db_parent = parent;
2537	db->db_blkptr = blkptr;
2538
2539	db->db_user = NULL;
2540	db->db_user_immediate_evict = FALSE;
2541	db->db_freed_in_flight = FALSE;
2542	db->db_pending_evict = FALSE;
2543
2544	if (blkid == DMU_BONUS_BLKID) {
2545		ASSERT3P(parent, ==, dn->dn_dbuf);
2546		db->db.db_size = DN_SLOTS_TO_BONUSLEN(dn->dn_num_slots) -
2547		    (dn->dn_nblkptr-1) * sizeof (blkptr_t);
2548		ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
2549		db->db.db_offset = DMU_BONUS_BLKID;
2550		db->db_state = DB_UNCACHED;
2551		db->db_caching_status = DB_NO_CACHE;
2552		/* the bonus dbuf is not placed in the hash table */
2553		arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_DBUF);
2554		return (db);
2555	} else if (blkid == DMU_SPILL_BLKID) {
2556		db->db.db_size = (blkptr != NULL) ?
2557		    BP_GET_LSIZE(blkptr) : SPA_MINBLOCKSIZE;
2558		db->db.db_offset = 0;
2559	} else {
2560		int blocksize =
2561		    db->db_level ? 1 << dn->dn_indblkshift : dn->dn_datablksz;
2562		db->db.db_size = blocksize;
2563		db->db.db_offset = db->db_blkid * blocksize;
2564	}
2565
2566	/*
2567	 * Hold the dn_dbufs_mtx while we get the new dbuf
2568	 * in the hash table *and* added to the dbufs list.
2569	 * This prevents a possible deadlock with someone
2570	 * trying to look up this dbuf before its added to the
2571	 * dn_dbufs list.
2572	 */
2573	mutex_enter(&dn->dn_dbufs_mtx);
2574	db->db_state = DB_EVICTING;
2575	if ((odb = dbuf_hash_insert(db)) != NULL) {
2576		/* someone else inserted it first */
2577		kmem_cache_free(dbuf_kmem_cache, db);
2578		mutex_exit(&dn->dn_dbufs_mtx);
2579		DBUF_STAT_BUMP(hash_insert_race);
2580		return (odb);
2581	}
2582	avl_add(&dn->dn_dbufs, db);
2583
2584	db->db_state = DB_UNCACHED;
2585	db->db_caching_status = DB_NO_CACHE;
2586	mutex_exit(&dn->dn_dbufs_mtx);
2587	arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_DBUF);
2588
2589	if (parent && parent != dn->dn_dbuf)
2590		dbuf_add_ref(parent, db);
2591
2592	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
2593	    zfs_refcount_count(&dn->dn_holds) > 0);
2594	(void) zfs_refcount_add(&dn->dn_holds, db);
2595
2596	dprintf_dbuf(db, "db=%p\n", db);
2597
2598	return (db);
2599}
2600
2601typedef struct dbuf_prefetch_arg {
2602	spa_t *dpa_spa;	/* The spa to issue the prefetch in. */
2603	zbookmark_phys_t dpa_zb; /* The target block to prefetch. */
2604	int dpa_epbs; /* Entries (blkptr_t's) Per Block Shift. */
2605	int dpa_curlevel; /* The current level that we're reading */
2606	dnode_t *dpa_dnode; /* The dnode associated with the prefetch */
2607	zio_priority_t dpa_prio; /* The priority I/Os should be issued at. */
2608	zio_t *dpa_zio; /* The parent zio_t for all prefetches. */
2609	arc_flags_t dpa_aflags; /* Flags to pass to the final prefetch. */
2610} dbuf_prefetch_arg_t;
2611
2612/*
2613 * Actually issue the prefetch read for the block given.
2614 */
2615static void
2616dbuf_issue_final_prefetch(dbuf_prefetch_arg_t *dpa, blkptr_t *bp)
2617{
2618	if (BP_IS_HOLE(bp) || BP_IS_EMBEDDED(bp))
2619		return;
2620
2621	arc_flags_t aflags =
2622	    dpa->dpa_aflags | ARC_FLAG_NOWAIT | ARC_FLAG_PREFETCH;
2623
2624	ASSERT3U(dpa->dpa_curlevel, ==, BP_GET_LEVEL(bp));
2625	ASSERT3U(dpa->dpa_curlevel, ==, dpa->dpa_zb.zb_level);
2626	ASSERT(dpa->dpa_zio != NULL);
2627	(void) arc_read(dpa->dpa_zio, dpa->dpa_spa, bp, NULL, NULL,
2628	    dpa->dpa_prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
2629	    &aflags, &dpa->dpa_zb);
2630}
2631
2632/*
2633 * Called when an indirect block above our prefetch target is read in.  This
2634 * will either read in the next indirect block down the tree or issue the actual
2635 * prefetch if the next block down is our target.
2636 */
2637static void
2638dbuf_prefetch_indirect_done(zio_t *zio, const zbookmark_phys_t *zb,
2639    const blkptr_t *iobp, arc_buf_t *abuf, void *private)
2640{
2641	dbuf_prefetch_arg_t *dpa = private;
2642
2643	ASSERT3S(dpa->dpa_zb.zb_level, <, dpa->dpa_curlevel);
2644	ASSERT3S(dpa->dpa_curlevel, >, 0);
2645
2646	if (abuf == NULL) {
2647		ASSERT(zio == NULL || zio->io_error != 0);
2648		kmem_free(dpa, sizeof (*dpa));
2649		return;
2650	}
2651	ASSERT(zio == NULL || zio->io_error == 0);
2652
2653	/*
2654	 * The dpa_dnode is only valid if we are called with a NULL
2655	 * zio. This indicates that the arc_read() returned without
2656	 * first calling zio_read() to issue a physical read. Once
2657	 * a physical read is made the dpa_dnode must be invalidated
2658	 * as the locks guarding it may have been dropped. If the
2659	 * dpa_dnode is still valid, then we want to add it to the dbuf
2660	 * cache. To do so, we must hold the dbuf associated with the block
2661	 * we just prefetched, read its contents so that we associate it
2662	 * with an arc_buf_t, and then release it.
2663	 */
2664	if (zio != NULL) {
2665		ASSERT3S(BP_GET_LEVEL(zio->io_bp), ==, dpa->dpa_curlevel);
2666		if (zio->io_flags & ZIO_FLAG_RAW) {
2667			ASSERT3U(BP_GET_PSIZE(zio->io_bp), ==, zio->io_size);
2668		} else {
2669			ASSERT3U(BP_GET_LSIZE(zio->io_bp), ==, zio->io_size);
2670		}
2671		ASSERT3P(zio->io_spa, ==, dpa->dpa_spa);
2672
2673		dpa->dpa_dnode = NULL;
2674	} else if (dpa->dpa_dnode != NULL) {
2675		uint64_t curblkid = dpa->dpa_zb.zb_blkid >>
2676		    (dpa->dpa_epbs * (dpa->dpa_curlevel -
2677		    dpa->dpa_zb.zb_level));
2678		dmu_buf_impl_t *db = dbuf_hold_level(dpa->dpa_dnode,
2679		    dpa->dpa_curlevel, curblkid, FTAG);
2680		(void) dbuf_read(db, NULL,
2681		    DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH | DB_RF_HAVESTRUCT);
2682		dbuf_rele(db, FTAG);
2683	}
2684
2685	if (abuf == NULL) {
2686		kmem_free(dpa, sizeof(*dpa));
2687		return;
2688	}
2689
2690	dpa->dpa_curlevel--;
2691
2692	uint64_t nextblkid = dpa->dpa_zb.zb_blkid >>
2693	    (dpa->dpa_epbs * (dpa->dpa_curlevel - dpa->dpa_zb.zb_level));
2694	blkptr_t *bp = ((blkptr_t *)abuf->b_data) +
2695	    P2PHASE(nextblkid, 1ULL << dpa->dpa_epbs);
2696	if (BP_IS_HOLE(bp)) {
2697		kmem_free(dpa, sizeof (*dpa));
2698	} else if (dpa->dpa_curlevel == dpa->dpa_zb.zb_level) {
2699		ASSERT3U(nextblkid, ==, dpa->dpa_zb.zb_blkid);
2700		dbuf_issue_final_prefetch(dpa, bp);
2701		kmem_free(dpa, sizeof (*dpa));
2702	} else {
2703		arc_flags_t iter_aflags = ARC_FLAG_NOWAIT;
2704		zbookmark_phys_t zb;
2705
2706		/* flag if L2ARC eligible, l2arc_noprefetch then decides */
2707		if (dpa->dpa_aflags & ARC_FLAG_L2CACHE)
2708			iter_aflags |= ARC_FLAG_L2CACHE;
2709
2710		ASSERT3U(dpa->dpa_curlevel, ==, BP_GET_LEVEL(bp));
2711
2712		SET_BOOKMARK(&zb, dpa->dpa_zb.zb_objset,
2713		    dpa->dpa_zb.zb_object, dpa->dpa_curlevel, nextblkid);
2714
2715		(void) arc_read(dpa->dpa_zio, dpa->dpa_spa,
2716		    bp, dbuf_prefetch_indirect_done, dpa, dpa->dpa_prio,
2717		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
2718		    &iter_aflags, &zb);
2719	}
2720
2721	arc_buf_destroy(abuf, private);
2722}
2723
2724/*
2725 * Issue prefetch reads for the given block on the given level.  If the indirect
2726 * blocks above that block are not in memory, we will read them in
2727 * asynchronously.  As a result, this call never blocks waiting for a read to
2728 * complete.
2729 */
2730void
2731dbuf_prefetch(dnode_t *dn, int64_t level, uint64_t blkid, zio_priority_t prio,
2732    arc_flags_t aflags)
2733{
2734	blkptr_t bp;
2735	int epbs, nlevels, curlevel;
2736	uint64_t curblkid;
2737
2738	ASSERT(blkid != DMU_BONUS_BLKID);
2739	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
2740
2741	if (blkid > dn->dn_maxblkid)
2742		return;
2743
2744	if (dnode_block_freed(dn, blkid))
2745		return;
2746
2747	/*
2748	 * This dnode hasn't been written to disk yet, so there's nothing to
2749	 * prefetch.
2750	 */
2751	nlevels = dn->dn_phys->dn_nlevels;
2752	if (level >= nlevels || dn->dn_phys->dn_nblkptr == 0)
2753		return;
2754
2755	epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
2756	if (dn->dn_phys->dn_maxblkid < blkid << (epbs * level))
2757		return;
2758
2759	dmu_buf_impl_t *db = dbuf_find(dn->dn_objset, dn->dn_object,
2760	    level, blkid);
2761	if (db != NULL) {
2762		mutex_exit(&db->db_mtx);
2763		/*
2764		 * This dbuf already exists.  It is either CACHED, or
2765		 * (we assume) about to be read or filled.
2766		 */
2767		return;
2768	}
2769
2770	/*
2771	 * Find the closest ancestor (indirect block) of the target block
2772	 * that is present in the cache.  In this indirect block, we will
2773	 * find the bp that is at curlevel, curblkid.
2774	 */
2775	curlevel = level;
2776	curblkid = blkid;
2777	while (curlevel < nlevels - 1) {
2778		int parent_level = curlevel + 1;
2779		uint64_t parent_blkid = curblkid >> epbs;
2780		dmu_buf_impl_t *db;
2781
2782		if (dbuf_hold_impl(dn, parent_level, parent_blkid,
2783		    FALSE, TRUE, FTAG, &db) == 0) {
2784			blkptr_t *bpp = db->db_buf->b_data;
2785			bp = bpp[P2PHASE(curblkid, 1 << epbs)];
2786			dbuf_rele(db, FTAG);
2787			break;
2788		}
2789
2790		curlevel = parent_level;
2791		curblkid = parent_blkid;
2792	}
2793
2794	if (curlevel == nlevels - 1) {
2795		/* No cached indirect blocks found. */
2796		ASSERT3U(curblkid, <, dn->dn_phys->dn_nblkptr);
2797		bp = dn->dn_phys->dn_blkptr[curblkid];
2798	}
2799	if (BP_IS_HOLE(&bp))
2800		return;
2801
2802	ASSERT3U(curlevel, ==, BP_GET_LEVEL(&bp));
2803
2804	zio_t *pio = zio_root(dmu_objset_spa(dn->dn_objset), NULL, NULL,
2805	    ZIO_FLAG_CANFAIL);
2806
2807	dbuf_prefetch_arg_t *dpa = kmem_zalloc(sizeof (*dpa), KM_SLEEP);
2808	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
2809	SET_BOOKMARK(&dpa->dpa_zb, ds != NULL ? ds->ds_object : DMU_META_OBJSET,
2810	    dn->dn_object, level, blkid);
2811	dpa->dpa_curlevel = curlevel;
2812	dpa->dpa_prio = prio;
2813	dpa->dpa_aflags = aflags;
2814	dpa->dpa_spa = dn->dn_objset->os_spa;
2815	dpa->dpa_dnode = dn;
2816	dpa->dpa_epbs = epbs;
2817	dpa->dpa_zio = pio;
2818
2819	/* flag if L2ARC eligible, l2arc_noprefetch then decides */
2820	if (DNODE_LEVEL_IS_L2CACHEABLE(dn, level))
2821		dpa->dpa_aflags |= ARC_FLAG_L2CACHE;
2822
2823	/*
2824	 * If we have the indirect just above us, no need to do the asynchronous
2825	 * prefetch chain; we'll just run the last step ourselves.  If we're at
2826	 * a higher level, though, we want to issue the prefetches for all the
2827	 * indirect blocks asynchronously, so we can go on with whatever we were
2828	 * doing.
2829	 */
2830	if (curlevel == level) {
2831		ASSERT3U(curblkid, ==, blkid);
2832		dbuf_issue_final_prefetch(dpa, &bp);
2833		kmem_free(dpa, sizeof (*dpa));
2834	} else {
2835		arc_flags_t iter_aflags = ARC_FLAG_NOWAIT;
2836		zbookmark_phys_t zb;
2837
2838		/* flag if L2ARC eligible, l2arc_noprefetch then decides */
2839		if (DNODE_LEVEL_IS_L2CACHEABLE(dn, level))
2840			iter_aflags |= ARC_FLAG_L2CACHE;
2841
2842		SET_BOOKMARK(&zb, ds != NULL ? ds->ds_object : DMU_META_OBJSET,
2843		    dn->dn_object, curlevel, curblkid);
2844		(void) arc_read(dpa->dpa_zio, dpa->dpa_spa,
2845		    &bp, dbuf_prefetch_indirect_done, dpa, prio,
2846		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
2847		    &iter_aflags, &zb);
2848	}
2849	/*
2850	 * We use pio here instead of dpa_zio since it's possible that
2851	 * dpa may have already been freed.
2852	 */
2853	zio_nowait(pio);
2854}
2855
2856#define	DBUF_HOLD_IMPL_MAX_DEPTH	20
2857
2858/*
2859 * Helper function for __dbuf_hold_impl() to copy a buffer. Handles
2860 * the case of encrypted, compressed and uncompressed buffers by
2861 * allocating the new buffer, respectively, with arc_alloc_raw_buf(),
2862 * arc_alloc_compressed_buf() or arc_alloc_buf().*
2863 *
2864 * NOTE: Declared noinline to avoid stack bloat in __dbuf_hold_impl().
2865 */
2866noinline static void
2867dbuf_hold_copy(struct dbuf_hold_impl_data *dh)
2868{
2869	dnode_t *dn = dh->dh_dn;
2870	dmu_buf_impl_t *db = dh->dh_db;
2871	dbuf_dirty_record_t *dr = dh->dh_dr;
2872	arc_buf_t *data = dr->dt.dl.dr_data;
2873
2874	enum zio_compress compress_type = arc_get_compression(data);
2875
2876	if (compress_type != ZIO_COMPRESS_OFF) {
2877		dbuf_set_data(db, arc_alloc_compressed_buf(
2878		    dn->dn_objset->os_spa, db, arc_buf_size(data),
2879		    arc_buf_lsize(data), compress_type));
2880	} else {
2881		dbuf_set_data(db, arc_alloc_buf(dn->dn_objset->os_spa, db,
2882		    DBUF_GET_BUFC_TYPE(db), db->db.db_size));
2883	}
2884
2885	bcopy(data->b_data, db->db.db_data, arc_buf_size(data));
2886}
2887
2888/*
2889 * Returns with db_holds incremented, and db_mtx not held.
2890 * Note: dn_struct_rwlock must be held.
2891 */
2892static int
2893__dbuf_hold_impl(struct dbuf_hold_impl_data *dh)
2894{
2895	ASSERT3S(dh->dh_depth, <, DBUF_HOLD_IMPL_MAX_DEPTH);
2896	dh->dh_parent = NULL;
2897
2898	ASSERT(dh->dh_blkid != DMU_BONUS_BLKID);
2899	ASSERT(RW_LOCK_HELD(&dh->dh_dn->dn_struct_rwlock));
2900	ASSERT3U(dh->dh_dn->dn_nlevels, >, dh->dh_level);
2901
2902	*(dh->dh_dbp) = NULL;
2903
2904	/* dbuf_find() returns with db_mtx held */
2905	dh->dh_db = dbuf_find(dh->dh_dn->dn_objset, dh->dh_dn->dn_object,
2906	    dh->dh_level, dh->dh_blkid);
2907
2908	if (dh->dh_db == NULL) {
2909		dh->dh_bp = NULL;
2910
2911		if (dh->dh_fail_uncached)
2912			return (SET_ERROR(ENOENT));
2913
2914		ASSERT3P(dh->dh_parent, ==, NULL);
2915		dh->dh_err = dbuf_findbp(dh->dh_dn, dh->dh_level, dh->dh_blkid,
2916		    dh->dh_fail_sparse, &dh->dh_parent, &dh->dh_bp, dh);
2917		if (dh->dh_fail_sparse) {
2918			if (dh->dh_err == 0 &&
2919			    dh->dh_bp && BP_IS_HOLE(dh->dh_bp))
2920				dh->dh_err = SET_ERROR(ENOENT);
2921			if (dh->dh_err) {
2922				if (dh->dh_parent)
2923					dbuf_rele(dh->dh_parent, NULL);
2924				return (dh->dh_err);
2925			}
2926		}
2927		if (dh->dh_err && dh->dh_err != ENOENT)
2928			return (dh->dh_err);
2929		dh->dh_db = dbuf_create(dh->dh_dn, dh->dh_level, dh->dh_blkid,
2930		    dh->dh_parent, dh->dh_bp);
2931	}
2932
2933	if (dh->dh_fail_uncached && dh->dh_db->db_state != DB_CACHED) {
2934		mutex_exit(&dh->dh_db->db_mtx);
2935		return (SET_ERROR(ENOENT));
2936	}
2937
2938	if (dh->dh_db->db_buf != NULL) {
2939		arc_buf_access(dh->dh_db->db_buf);
2940		ASSERT3P(dh->dh_db->db.db_data, ==, dh->dh_db->db_buf->b_data);
2941	}
2942
2943	ASSERT(dh->dh_db->db_buf == NULL || arc_referenced(dh->dh_db->db_buf));
2944
2945	/*
2946	 * If this buffer is currently syncing out, and we are are
2947	 * still referencing it from db_data, we need to make a copy
2948	 * of it in case we decide we want to dirty it again in this txg.
2949	 */
2950	if (dh->dh_db->db_level == 0 &&
2951	    dh->dh_db->db_blkid != DMU_BONUS_BLKID &&
2952	    dh->dh_dn->dn_object != DMU_META_DNODE_OBJECT &&
2953	    dh->dh_db->db_state == DB_CACHED && dh->dh_db->db_data_pending) {
2954		dh->dh_dr = dh->dh_db->db_data_pending;
2955		if (dh->dh_dr->dt.dl.dr_data == dh->dh_db->db_buf)
2956			dbuf_hold_copy(dh);
2957	}
2958
2959	if (multilist_link_active(&dh->dh_db->db_cache_link)) {
2960		ASSERT(zfs_refcount_is_zero(&dh->dh_db->db_holds));
2961		ASSERT(dh->dh_db->db_caching_status == DB_DBUF_CACHE ||
2962		    dh->dh_db->db_caching_status == DB_DBUF_METADATA_CACHE);
2963
2964		multilist_remove(
2965		    dbuf_caches[dh->dh_db->db_caching_status].cache,
2966		    dh->dh_db);
2967		(void) zfs_refcount_remove_many(
2968		    &dbuf_caches[dh->dh_db->db_caching_status].size,
2969		    dh->dh_db->db.db_size, dh->dh_db);
2970
2971		if (dh->dh_db->db_caching_status == DB_DBUF_METADATA_CACHE) {
2972			DBUF_STAT_BUMPDOWN(metadata_cache_count);
2973		} else {
2974			DBUF_STAT_BUMPDOWN(cache_levels[dh->dh_db->db_level]);
2975			DBUF_STAT_BUMPDOWN(cache_count);
2976			DBUF_STAT_DECR(cache_levels_bytes[dh->dh_db->db_level],
2977			    dh->dh_db->db.db_size);
2978		}
2979		dh->dh_db->db_caching_status = DB_NO_CACHE;
2980	}
2981	(void) zfs_refcount_add(&dh->dh_db->db_holds, dh->dh_tag);
2982	DBUF_VERIFY(dh->dh_db);
2983	mutex_exit(&dh->dh_db->db_mtx);
2984
2985	/* NOTE: we can't rele the parent until after we drop the db_mtx */
2986	if (dh->dh_parent)
2987		dbuf_rele(dh->dh_parent, NULL);
2988
2989	ASSERT3P(DB_DNODE(dh->dh_db), ==, dh->dh_dn);
2990	ASSERT3U(dh->dh_db->db_blkid, ==, dh->dh_blkid);
2991	ASSERT3U(dh->dh_db->db_level, ==, dh->dh_level);
2992	*(dh->dh_dbp) = dh->dh_db;
2993
2994	return (0);
2995}
2996
2997/*
2998 * The following code preserves the recursive function dbuf_hold_impl()
2999 * but moves the local variables AND function arguments to the heap to
3000 * minimize the stack frame size.  Enough space is initially allocated
3001 * on the stack for 20 levels of recursion.
3002 */
3003int
3004dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid,
3005    boolean_t fail_sparse, boolean_t fail_uncached,
3006    void *tag, dmu_buf_impl_t **dbp)
3007{
3008	struct dbuf_hold_impl_data *dh;
3009	int error;
3010
3011	dh = kmem_alloc(sizeof (struct dbuf_hold_impl_data) *
3012	    DBUF_HOLD_IMPL_MAX_DEPTH, KM_SLEEP);
3013	__dbuf_hold_impl_init(dh, dn, level, blkid, fail_sparse,
3014	    fail_uncached, tag, dbp, 0);
3015
3016	error = __dbuf_hold_impl(dh);
3017
3018	kmem_free(dh, sizeof (struct dbuf_hold_impl_data) *
3019	    DBUF_HOLD_IMPL_MAX_DEPTH);
3020
3021	return (error);
3022}
3023
3024static void
3025__dbuf_hold_impl_init(struct dbuf_hold_impl_data *dh,
3026    dnode_t *dn, uint8_t level, uint64_t blkid,
3027    boolean_t fail_sparse, boolean_t fail_uncached,
3028    void *tag, dmu_buf_impl_t **dbp, int depth)
3029{
3030	dh->dh_dn = dn;
3031	dh->dh_level = level;
3032	dh->dh_blkid = blkid;
3033
3034	dh->dh_fail_sparse = fail_sparse;
3035	dh->dh_fail_uncached = fail_uncached;
3036
3037	dh->dh_tag = tag;
3038	dh->dh_dbp = dbp;
3039
3040	dh->dh_db = NULL;
3041	dh->dh_parent = NULL;
3042	dh->dh_bp = NULL;
3043	dh->dh_err = 0;
3044	dh->dh_dr = NULL;
3045
3046	dh->dh_depth = depth;
3047}
3048
3049dmu_buf_impl_t *
3050dbuf_hold(dnode_t *dn, uint64_t blkid, void *tag)
3051{
3052	return (dbuf_hold_level(dn, 0, blkid, tag));
3053}
3054
3055dmu_buf_impl_t *
3056dbuf_hold_level(dnode_t *dn, int level, uint64_t blkid, void *tag)
3057{
3058	dmu_buf_impl_t *db;
3059	int err = dbuf_hold_impl(dn, level, blkid, FALSE, FALSE, tag, &db);
3060	return (err ? NULL : db);
3061}
3062
3063void
3064dbuf_create_bonus(dnode_t *dn)
3065{
3066	ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
3067
3068	ASSERT(dn->dn_bonus == NULL);
3069	dn->dn_bonus = dbuf_create(dn, 0, DMU_BONUS_BLKID, dn->dn_dbuf, NULL);
3070}
3071
3072int
3073dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
3074{
3075	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
3076	dnode_t *dn;
3077
3078	if (db->db_blkid != DMU_SPILL_BLKID)
3079		return (SET_ERROR(ENOTSUP));
3080	if (blksz == 0)
3081		blksz = SPA_MINBLOCKSIZE;
3082	ASSERT3U(blksz, <=, spa_maxblocksize(dmu_objset_spa(db->db_objset)));
3083	blksz = P2ROUNDUP(blksz, SPA_MINBLOCKSIZE);
3084
3085	DB_DNODE_ENTER(db);
3086	dn = DB_DNODE(db);
3087	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
3088	dbuf_new_size(db, blksz, tx);
3089	rw_exit(&dn->dn_struct_rwlock);
3090	DB_DNODE_EXIT(db);
3091
3092	return (0);
3093}
3094
3095void
3096dbuf_rm_spill(dnode_t *dn, dmu_tx_t *tx)
3097{
3098	dbuf_free_range(dn, DMU_SPILL_BLKID, DMU_SPILL_BLKID, tx);
3099}
3100
3101#pragma weak dmu_buf_add_ref = dbuf_add_ref
3102void
3103dbuf_add_ref(dmu_buf_impl_t *db, void *tag)
3104{
3105	int64_t holds = zfs_refcount_add(&db->db_holds, tag);
3106	ASSERT3S(holds, >, 1);
3107}
3108
3109#pragma weak dmu_buf_try_add_ref = dbuf_try_add_ref
3110boolean_t
3111dbuf_try_add_ref(dmu_buf_t *db_fake, objset_t *os, uint64_t obj, uint64_t blkid,
3112    void *tag)
3113{
3114	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
3115	dmu_buf_impl_t *found_db;
3116	boolean_t result = B_FALSE;
3117
3118	if (db->db_blkid == DMU_BONUS_BLKID)
3119		found_db = dbuf_find_bonus(os, obj);
3120	else
3121		found_db = dbuf_find(os, obj, 0, blkid);
3122
3123	if (found_db != NULL) {
3124		if (db == found_db && dbuf_refcount(db) > db->db_dirtycnt) {
3125			(void) zfs_refcount_add(&db->db_holds, tag);
3126			result = B_TRUE;
3127		}
3128		mutex_exit(&db->db_mtx);
3129	}
3130	return (result);
3131}
3132
3133/*
3134 * If you call dbuf_rele() you had better not be referencing the dnode handle
3135 * unless you have some other direct or indirect hold on the dnode. (An indirect
3136 * hold is a hold on one of the dnode's dbufs, including the bonus buffer.)
3137 * Without that, the dbuf_rele() could lead to a dnode_rele() followed by the
3138 * dnode's parent dbuf evicting its dnode handles.
3139 */
3140void
3141dbuf_rele(dmu_buf_impl_t *db, void *tag)
3142{
3143	mutex_enter(&db->db_mtx);
3144	dbuf_rele_and_unlock(db, tag, B_FALSE);
3145}
3146
3147void
3148dmu_buf_rele(dmu_buf_t *db, void *tag)
3149{
3150	dbuf_rele((dmu_buf_impl_t *)db, tag);
3151}
3152
3153/*
3154 * dbuf_rele() for an already-locked dbuf.  This is necessary to allow
3155 * db_dirtycnt and db_holds to be updated atomically.  The 'evicting'
3156 * argument should be set if we are already in the dbuf-evicting code
3157 * path, in which case we don't want to recursively evict.  This allows us to
3158 * avoid deeply nested stacks that would have a call flow similar to this:
3159 *
3160 * dbuf_rele()-->dbuf_rele_and_unlock()-->dbuf_evict_notify()
3161 *	^						|
3162 *	|						|
3163 *	+-----dbuf_destroy()<--dbuf_evict_one()<--------+
3164 *
3165 */
3166void
3167dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag, boolean_t evicting)
3168{
3169	int64_t holds;
3170	uint64_t size;
3171
3172	ASSERT(MUTEX_HELD(&db->db_mtx));
3173	DBUF_VERIFY(db);
3174
3175	/*
3176	 * Remove the reference to the dbuf before removing its hold on the
3177	 * dnode so we can guarantee in dnode_move() that a referenced bonus
3178	 * buffer has a corresponding dnode hold.
3179	 */
3180	holds = zfs_refcount_remove(&db->db_holds, tag);
3181	ASSERT(holds >= 0);
3182
3183	/*
3184	 * We can't freeze indirects if there is a possibility that they
3185	 * may be modified in the current syncing context.
3186	 */
3187	if (db->db_buf != NULL &&
3188	    holds == (db->db_level == 0 ? db->db_dirtycnt : 0)) {
3189		arc_buf_freeze(db->db_buf);
3190	}
3191
3192	if (holds == db->db_dirtycnt &&
3193	    db->db_level == 0 && db->db_user_immediate_evict)
3194		dbuf_evict_user(db);
3195
3196	if (holds == 0) {
3197		if (db->db_blkid == DMU_BONUS_BLKID) {
3198			dnode_t *dn;
3199			boolean_t evict_dbuf = db->db_pending_evict;
3200
3201			/*
3202			 * If the dnode moves here, we cannot cross this
3203			 * barrier until the move completes.
3204			 */
3205			DB_DNODE_ENTER(db);
3206
3207			dn = DB_DNODE(db);
3208			atomic_dec_32(&dn->dn_dbufs_count);
3209
3210			/*
3211			 * Decrementing the dbuf count means that the bonus
3212			 * buffer's dnode hold is no longer discounted in
3213			 * dnode_move(). The dnode cannot move until after
3214			 * the dnode_rele() below.
3215			 */
3216			DB_DNODE_EXIT(db);
3217
3218			/*
3219			 * Do not reference db after its lock is dropped.
3220			 * Another thread may evict it.
3221			 */
3222			mutex_exit(&db->db_mtx);
3223
3224			if (evict_dbuf)
3225				dnode_evict_bonus(dn);
3226
3227			dnode_rele(dn, db);
3228		} else if (db->db_buf == NULL) {
3229			/*
3230			 * This is a special case: we never associated this
3231			 * dbuf with any data allocated from the ARC.
3232			 */
3233			ASSERT(db->db_state == DB_UNCACHED ||
3234			    db->db_state == DB_NOFILL);
3235			dbuf_destroy(db);
3236		} else if (arc_released(db->db_buf)) {
3237			/*
3238			 * This dbuf has anonymous data associated with it.
3239			 */
3240			dbuf_destroy(db);
3241		} else {
3242			boolean_t do_arc_evict = B_FALSE;
3243			blkptr_t bp;
3244			spa_t *spa = dmu_objset_spa(db->db_objset);
3245
3246			if (!DBUF_IS_CACHEABLE(db) &&
3247			    db->db_blkptr != NULL &&
3248			    !BP_IS_HOLE(db->db_blkptr) &&
3249			    !BP_IS_EMBEDDED(db->db_blkptr)) {
3250				do_arc_evict = B_TRUE;
3251				bp = *db->db_blkptr;
3252			}
3253
3254			if (!DBUF_IS_CACHEABLE(db) ||
3255			    db->db_pending_evict) {
3256				dbuf_destroy(db);
3257			} else if (!multilist_link_active(&db->db_cache_link)) {
3258				ASSERT3U(db->db_caching_status, ==,
3259				    DB_NO_CACHE);
3260
3261				dbuf_cached_state_t dcs =
3262				    dbuf_include_in_metadata_cache(db) ?
3263				    DB_DBUF_METADATA_CACHE : DB_DBUF_CACHE;
3264				db->db_caching_status = dcs;
3265
3266				multilist_insert(dbuf_caches[dcs].cache, db);
3267				size = zfs_refcount_add_many(
3268				    &dbuf_caches[dcs].size, db->db.db_size, db);
3269
3270				if (dcs == DB_DBUF_METADATA_CACHE) {
3271					DBUF_STAT_BUMP(metadata_cache_count);
3272					DBUF_STAT_MAX(
3273					    metadata_cache_size_bytes_max,
3274					    size);
3275				} else {
3276					DBUF_STAT_BUMP(
3277					    cache_levels[db->db_level]);
3278					DBUF_STAT_BUMP(cache_count);
3279					DBUF_STAT_INCR(
3280					    cache_levels_bytes[db->db_level],
3281					    db->db.db_size);
3282					DBUF_STAT_MAX(cache_size_bytes_max,
3283					    size);
3284				}
3285				mutex_exit(&db->db_mtx);
3286
3287				if (dcs == DB_DBUF_CACHE && !evicting)
3288					dbuf_evict_notify(size);
3289			}
3290
3291			if (do_arc_evict)
3292				arc_freed(spa, &bp);
3293		}
3294	} else {
3295		mutex_exit(&db->db_mtx);
3296	}
3297
3298}
3299
3300#pragma weak dmu_buf_refcount = dbuf_refcount
3301uint64_t
3302dbuf_refcount(dmu_buf_impl_t *db)
3303{
3304	return (zfs_refcount_count(&db->db_holds));
3305}
3306
3307void *
3308dmu_buf_replace_user(dmu_buf_t *db_fake, dmu_buf_user_t *old_user,
3309    dmu_buf_user_t *new_user)
3310{
3311	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
3312
3313	mutex_enter(&db->db_mtx);
3314	dbuf_verify_user(db, DBVU_NOT_EVICTING);
3315	if (db->db_user == old_user)
3316		db->db_user = new_user;
3317	else
3318		old_user = db->db_user;
3319	dbuf_verify_user(db, DBVU_NOT_EVICTING);
3320	mutex_exit(&db->db_mtx);
3321
3322	return (old_user);
3323}
3324
3325void *
3326dmu_buf_set_user(dmu_buf_t *db_fake, dmu_buf_user_t *user)
3327{
3328	return (dmu_buf_replace_user(db_fake, NULL, user));
3329}
3330
3331void *
3332dmu_buf_set_user_ie(dmu_buf_t *db_fake, dmu_buf_user_t *user)
3333{
3334	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
3335
3336	db->db_user_immediate_evict = TRUE;
3337	return (dmu_buf_set_user(db_fake, user));
3338}
3339
3340void *
3341dmu_buf_remove_user(dmu_buf_t *db_fake, dmu_buf_user_t *user)
3342{
3343	return (dmu_buf_replace_user(db_fake, user, NULL));
3344}
3345
3346void *
3347dmu_buf_get_user(dmu_buf_t *db_fake)
3348{
3349	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
3350
3351	dbuf_verify_user(db, DBVU_NOT_EVICTING);
3352	return (db->db_user);
3353}
3354
3355void
3356dmu_buf_user_evict_wait()
3357{
3358	taskq_wait(dbu_evict_taskq);
3359}
3360
3361blkptr_t *
3362dmu_buf_get_blkptr(dmu_buf_t *db)
3363{
3364	dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
3365	return (dbi->db_blkptr);
3366}
3367
3368objset_t *
3369dmu_buf_get_objset(dmu_buf_t *db)
3370{
3371	dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
3372	return (dbi->db_objset);
3373}
3374
3375dnode_t *
3376dmu_buf_dnode_enter(dmu_buf_t *db)
3377{
3378	dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
3379	DB_DNODE_ENTER(dbi);
3380	return (DB_DNODE(dbi));
3381}
3382
3383void
3384dmu_buf_dnode_exit(dmu_buf_t *db)
3385{
3386	dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
3387	DB_DNODE_EXIT(dbi);
3388}
3389
3390static void
3391dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
3392{
3393	/* ASSERT(dmu_tx_is_syncing(tx) */
3394	ASSERT(MUTEX_HELD(&db->db_mtx));
3395
3396	if (db->db_blkptr != NULL)
3397		return;
3398
3399	if (db->db_blkid == DMU_SPILL_BLKID) {
3400		db->db_blkptr = DN_SPILL_BLKPTR(dn->dn_phys);
3401		BP_ZERO(db->db_blkptr);
3402		return;
3403	}
3404	if (db->db_level == dn->dn_phys->dn_nlevels-1) {
3405		/*
3406		 * This buffer was allocated at a time when there was
3407		 * no available blkptrs from the dnode, or it was
3408		 * inappropriate to hook it in (i.e., nlevels mis-match).
3409		 */
3410		ASSERT(db->db_blkid < dn->dn_phys->dn_nblkptr);
3411		ASSERT(db->db_parent == NULL);
3412		db->db_parent = dn->dn_dbuf;
3413		db->db_blkptr = &dn->dn_phys->dn_blkptr[db->db_blkid];
3414		DBUF_VERIFY(db);
3415	} else {
3416		dmu_buf_impl_t *parent = db->db_parent;
3417		int epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
3418
3419		ASSERT(dn->dn_phys->dn_nlevels > 1);
3420		if (parent == NULL) {
3421			mutex_exit(&db->db_mtx);
3422			rw_enter(&dn->dn_struct_rwlock, RW_READER);
3423			parent = dbuf_hold_level(dn, db->db_level + 1,
3424			    db->db_blkid >> epbs, db);
3425			rw_exit(&dn->dn_struct_rwlock);
3426			mutex_enter(&db->db_mtx);
3427			db->db_parent = parent;
3428		}
3429		db->db_blkptr = (blkptr_t *)parent->db.db_data +
3430		    (db->db_blkid & ((1ULL << epbs) - 1));
3431		DBUF_VERIFY(db);
3432	}
3433}
3434
3435/*
3436 * dbuf_sync_indirect() is called recursively from dbuf_sync_list() so it
3437 * is critical the we not allow the compiler to inline this function in to
3438 * dbuf_sync_list() thereby drastically bloating the stack usage.
3439 */
3440noinline static void
3441dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
3442{
3443	dmu_buf_impl_t *db = dr->dr_dbuf;
3444	dnode_t *dn;
3445	zio_t *zio;
3446
3447	ASSERT(dmu_tx_is_syncing(tx));
3448
3449	dprintf_dbuf_bp(db, db->db_blkptr, "blkptr=%p", db->db_blkptr);
3450
3451	mutex_enter(&db->db_mtx);
3452
3453	ASSERT(db->db_level > 0);
3454	DBUF_VERIFY(db);
3455
3456	/* Read the block if it hasn't been read yet. */
3457	if (db->db_buf == NULL) {
3458		mutex_exit(&db->db_mtx);
3459		(void) dbuf_read(db, NULL, DB_RF_MUST_SUCCEED);
3460		mutex_enter(&db->db_mtx);
3461	}
3462	ASSERT3U(db->db_state, ==, DB_CACHED);
3463	ASSERT(db->db_buf != NULL);
3464
3465	DB_DNODE_ENTER(db);
3466	dn = DB_DNODE(db);
3467	/* Indirect block size must match what the dnode thinks it is. */
3468	ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
3469	dbuf_check_blkptr(dn, db);
3470	DB_DNODE_EXIT(db);
3471
3472	/* Provide the pending dirty record to child dbufs */
3473	db->db_data_pending = dr;
3474
3475	mutex_exit(&db->db_mtx);
3476
3477	dbuf_write(dr, db->db_buf, tx);
3478
3479	zio = dr->dr_zio;
3480	mutex_enter(&dr->dt.di.dr_mtx);
3481	dbuf_sync_list(&dr->dt.di.dr_children, db->db_level - 1, tx);
3482	ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
3483	mutex_exit(&dr->dt.di.dr_mtx);
3484	zio_nowait(zio);
3485}
3486
3487/*
3488 * dbuf_sync_leaf() is called recursively from dbuf_sync_list() so it is
3489 * critical the we not allow the compiler to inline this function in to
3490 * dbuf_sync_list() thereby drastically bloating the stack usage.
3491 */
3492noinline static void
3493dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
3494{
3495	arc_buf_t **datap = &dr->dt.dl.dr_data;
3496	dmu_buf_impl_t *db = dr->dr_dbuf;
3497	dnode_t *dn;
3498	objset_t *os;
3499	uint64_t txg = tx->tx_txg;
3500
3501	ASSERT(dmu_tx_is_syncing(tx));
3502
3503	dprintf_dbuf_bp(db, db->db_blkptr, "blkptr=%p", db->db_blkptr);
3504
3505	mutex_enter(&db->db_mtx);
3506	/*
3507	 * To be synced, we must be dirtied.  But we
3508	 * might have been freed after the dirty.
3509	 */
3510	if (db->db_state == DB_UNCACHED) {
3511		/* This buffer has been freed since it was dirtied */
3512		ASSERT(db->db.db_data == NULL);
3513	} else if (db->db_state == DB_FILL) {
3514		/* This buffer was freed and is now being re-filled */
3515		ASSERT(db->db.db_data != dr->dt.dl.dr_data);
3516	} else {
3517		ASSERT(db->db_state == DB_CACHED || db->db_state == DB_NOFILL);
3518	}
3519	DBUF_VERIFY(db);
3520
3521	DB_DNODE_ENTER(db);
3522	dn = DB_DNODE(db);
3523
3524	if (db->db_blkid == DMU_SPILL_BLKID) {
3525		mutex_enter(&dn->dn_mtx);
3526		if (!(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR)) {
3527			/*
3528			 * In the previous transaction group, the bonus buffer
3529			 * was entirely used to store the attributes for the
3530			 * dnode which overrode the dn_spill field.  However,
3531			 * when adding more attributes to the file a spill
3532			 * block was required to hold the extra attributes.
3533			 *
3534			 * Make sure to clear the garbage left in the dn_spill
3535			 * field from the previous attributes in the bonus
3536			 * buffer.  Otherwise, after writing out the spill
3537			 * block to the new allocated dva, it will free
3538			 * the old block pointed to by the invalid dn_spill.
3539			 */
3540			db->db_blkptr = NULL;
3541		}
3542		dn->dn_phys->dn_flags |= DNODE_FLAG_SPILL_BLKPTR;
3543		mutex_exit(&dn->dn_mtx);
3544	}
3545
3546	/*
3547	 * If this is a bonus buffer, simply copy the bonus data into the
3548	 * dnode.  It will be written out when the dnode is synced (and it
3549	 * will be synced, since it must have been dirty for dbuf_sync to
3550	 * be called).
3551	 */
3552	if (db->db_blkid == DMU_BONUS_BLKID) {
3553		dbuf_dirty_record_t **drp;
3554
3555		ASSERT(*datap != NULL);
3556		ASSERT0(db->db_level);
3557		ASSERT3U(DN_MAX_BONUS_LEN(dn->dn_phys), <=,
3558		    DN_SLOTS_TO_BONUSLEN(dn->dn_phys->dn_extra_slots + 1));
3559		bcopy(*datap, DN_BONUS(dn->dn_phys),
3560		    DN_MAX_BONUS_LEN(dn->dn_phys));
3561		DB_DNODE_EXIT(db);
3562
3563		if (*datap != db->db.db_data) {
3564			int slots = DB_DNODE(db)->dn_num_slots;
3565			int bonuslen = DN_SLOTS_TO_BONUSLEN(slots);
3566			zio_buf_free(*datap, bonuslen);
3567			arc_space_return(bonuslen, ARC_SPACE_BONUS);
3568		}
3569		db->db_data_pending = NULL;
3570		drp = &db->db_last_dirty;
3571		while (*drp != dr)
3572			drp = &(*drp)->dr_next;
3573		ASSERT(dr->dr_next == NULL);
3574		ASSERT(dr->dr_dbuf == db);
3575		*drp = dr->dr_next;
3576		if (dr->dr_dbuf->db_level != 0) {
3577			mutex_destroy(&dr->dt.di.dr_mtx);
3578			list_destroy(&dr->dt.di.dr_children);
3579		}
3580		kmem_free(dr, sizeof (dbuf_dirty_record_t));
3581		ASSERT(db->db_dirtycnt > 0);
3582		db->db_dirtycnt -= 1;
3583		dbuf_rele_and_unlock(db, (void *)(uintptr_t)txg, B_FALSE);
3584		return;
3585	}
3586
3587	os = dn->dn_objset;
3588
3589	/*
3590	 * This function may have dropped the db_mtx lock allowing a dmu_sync
3591	 * operation to sneak in. As a result, we need to ensure that we
3592	 * don't check the dr_override_state until we have returned from
3593	 * dbuf_check_blkptr.
3594	 */
3595	dbuf_check_blkptr(dn, db);
3596
3597	/*
3598	 * If this buffer is in the middle of an immediate write,
3599	 * wait for the synchronous IO to complete.
3600	 */
3601	while (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
3602		ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
3603		cv_wait(&db->db_changed, &db->db_mtx);
3604		ASSERT(dr->dt.dl.dr_override_state != DR_NOT_OVERRIDDEN);
3605	}
3606
3607	if (db->db_state != DB_NOFILL &&
3608	    dn->dn_object != DMU_META_DNODE_OBJECT &&
3609	    zfs_refcount_count(&db->db_holds) > 1 &&
3610	    dr->dt.dl.dr_override_state != DR_OVERRIDDEN &&
3611	    *datap == db->db_buf) {
3612		/*
3613		 * If this buffer is currently "in use" (i.e., there
3614		 * are active holds and db_data still references it),
3615		 * then make a copy before we start the write so that
3616		 * any modifications from the open txg will not leak
3617		 * into this write.
3618		 *
3619		 * NOTE: this copy does not need to be made for
3620		 * objects only modified in the syncing context (e.g.
3621		 * DNONE_DNODE blocks).
3622		 */
3623		int psize = arc_buf_size(*datap);
3624		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
3625		enum zio_compress compress_type = arc_get_compression(*datap);
3626
3627		if (compress_type == ZIO_COMPRESS_OFF) {
3628			*datap = arc_alloc_buf(os->os_spa, db, type, psize);
3629		} else {
3630			ASSERT3U(type, ==, ARC_BUFC_DATA);
3631			int lsize = arc_buf_lsize(*datap);
3632			*datap = arc_alloc_compressed_buf(os->os_spa, db,
3633			    psize, lsize, compress_type);
3634		}
3635		bcopy(db->db.db_data, (*datap)->b_data, psize);
3636	}
3637	db->db_data_pending = dr;
3638
3639	mutex_exit(&db->db_mtx);
3640
3641	dbuf_write(dr, *datap, tx);
3642
3643	ASSERT(!list_link_active(&dr->dr_dirty_node));
3644	if (dn->dn_object == DMU_META_DNODE_OBJECT) {
3645		list_insert_tail(&dn->dn_dirty_records[txg&TXG_MASK], dr);
3646		DB_DNODE_EXIT(db);
3647	} else {
3648		/*
3649		 * Although zio_nowait() does not "wait for an IO", it does
3650		 * initiate the IO. If this is an empty write it seems plausible
3651		 * that the IO could actually be completed before the nowait
3652		 * returns. We need to DB_DNODE_EXIT() first in case
3653		 * zio_nowait() invalidates the dbuf.
3654		 */
3655		DB_DNODE_EXIT(db);
3656		zio_nowait(dr->dr_zio);
3657	}
3658}
3659
3660void
3661dbuf_sync_list(list_t *list, int level, dmu_tx_t *tx)
3662{
3663	dbuf_dirty_record_t *dr;
3664
3665	while (dr = list_head(list)) {
3666		if (dr->dr_zio != NULL) {
3667			/*
3668			 * If we find an already initialized zio then we
3669			 * are processing the meta-dnode, and we have finished.
3670			 * The dbufs for all dnodes are put back on the list
3671			 * during processing, so that we can zio_wait()
3672			 * these IOs after initiating all child IOs.
3673			 */
3674			ASSERT3U(dr->dr_dbuf->db.db_object, ==,
3675			    DMU_META_DNODE_OBJECT);
3676			break;
3677		}
3678		if (dr->dr_dbuf->db_blkid != DMU_BONUS_BLKID &&
3679		    dr->dr_dbuf->db_blkid != DMU_SPILL_BLKID) {
3680			VERIFY3U(dr->dr_dbuf->db_level, ==, level);
3681		}
3682		list_remove(list, dr);
3683		if (dr->dr_dbuf->db_level > 0)
3684			dbuf_sync_indirect(dr, tx);
3685		else
3686			dbuf_sync_leaf(dr, tx);
3687	}
3688}
3689
3690/* ARGSUSED */
3691static void
3692dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
3693{
3694	dmu_buf_impl_t *db = vdb;
3695	dnode_t *dn;
3696	blkptr_t *bp = zio->io_bp;
3697	blkptr_t *bp_orig = &zio->io_bp_orig;
3698	spa_t *spa = zio->io_spa;
3699	int64_t delta;
3700	uint64_t fill = 0;
3701	int i;
3702
3703	ASSERT3P(db->db_blkptr, !=, NULL);
3704	ASSERT3P(&db->db_data_pending->dr_bp_copy, ==, bp);
3705
3706	DB_DNODE_ENTER(db);
3707	dn = DB_DNODE(db);
3708	delta = bp_get_dsize_sync(spa, bp) - bp_get_dsize_sync(spa, bp_orig);
3709	dnode_diduse_space(dn, delta - zio->io_prev_space_delta);
3710	zio->io_prev_space_delta = delta;
3711
3712	if (bp->blk_birth != 0) {
3713		ASSERT((db->db_blkid != DMU_SPILL_BLKID &&
3714		    BP_GET_TYPE(bp) == dn->dn_type) ||
3715		    (db->db_blkid == DMU_SPILL_BLKID &&
3716		    BP_GET_TYPE(bp) == dn->dn_bonustype) ||
3717		    BP_IS_EMBEDDED(bp));
3718		ASSERT(BP_GET_LEVEL(bp) == db->db_level);
3719	}
3720
3721	mutex_enter(&db->db_mtx);
3722
3723#ifdef ZFS_DEBUG
3724	if (db->db_blkid == DMU_SPILL_BLKID) {
3725		ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
3726		ASSERT(!(BP_IS_HOLE(bp)) &&
3727		    db->db_blkptr == DN_SPILL_BLKPTR(dn->dn_phys));
3728	}
3729#endif
3730
3731	if (db->db_level == 0) {
3732		mutex_enter(&dn->dn_mtx);
3733		if (db->db_blkid > dn->dn_phys->dn_maxblkid &&
3734		    db->db_blkid != DMU_SPILL_BLKID)
3735			dn->dn_phys->dn_maxblkid = db->db_blkid;
3736		mutex_exit(&dn->dn_mtx);
3737
3738		if (dn->dn_type == DMU_OT_DNODE) {
3739			i = 0;
3740			while (i < db->db.db_size) {
3741				dnode_phys_t *dnp =
3742				    (void *)(((char *)db->db.db_data) + i);
3743
3744				i += DNODE_MIN_SIZE;
3745				if (dnp->dn_type != DMU_OT_NONE) {
3746					fill++;
3747					i += dnp->dn_extra_slots *
3748					    DNODE_MIN_SIZE;
3749				}
3750			}
3751		} else {
3752			if (BP_IS_HOLE(bp)) {
3753				fill = 0;
3754			} else {
3755				fill = 1;
3756			}
3757		}
3758	} else {
3759		blkptr_t *ibp = db->db.db_data;
3760		ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
3761		for (i = db->db.db_size >> SPA_BLKPTRSHIFT; i > 0; i--, ibp++) {
3762			if (BP_IS_HOLE(ibp))
3763				continue;
3764			fill += BP_GET_FILL(ibp);
3765		}
3766	}
3767	DB_DNODE_EXIT(db);
3768
3769	if (!BP_IS_EMBEDDED(bp))
3770		bp->blk_fill = fill;
3771
3772	mutex_exit(&db->db_mtx);
3773
3774	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
3775	*db->db_blkptr = *bp;
3776	rw_exit(&dn->dn_struct_rwlock);
3777}
3778
3779/* ARGSUSED */
3780/*
3781 * This function gets called just prior to running through the compression
3782 * stage of the zio pipeline. If we're an indirect block comprised of only
3783 * holes, then we want this indirect to be compressed away to a hole. In
3784 * order to do that we must zero out any information about the holes that
3785 * this indirect points to prior to before we try to compress it.
3786 */
3787static void
3788dbuf_write_children_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
3789{
3790	dmu_buf_impl_t *db = vdb;
3791	dnode_t *dn;
3792	blkptr_t *bp;
3793	unsigned int epbs, i;
3794
3795	ASSERT3U(db->db_level, >, 0);
3796	DB_DNODE_ENTER(db);
3797	dn = DB_DNODE(db);
3798	epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
3799	ASSERT3U(epbs, <, 31);
3800
3801	/* Determine if all our children are holes */
3802	for (i = 0, bp = db->db.db_data; i < 1 << epbs; i++, bp++) {
3803		if (!BP_IS_HOLE(bp))
3804			break;
3805	}
3806
3807	/*
3808	 * If all the children are holes, then zero them all out so that
3809	 * we may get compressed away.
3810	 */
3811	if (i == 1 << epbs) {
3812		/*
3813		 * We only found holes. Grab the rwlock to prevent
3814		 * anybody from reading the blocks we're about to
3815		 * zero out.
3816		 */
3817		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
3818		bzero(db->db.db_data, db->db.db_size);
3819		rw_exit(&dn->dn_struct_rwlock);
3820	}
3821	DB_DNODE_EXIT(db);
3822}
3823
3824/*
3825 * The SPA will call this callback several times for each zio - once
3826 * for every physical child i/o (zio->io_phys_children times).  This
3827 * allows the DMU to monitor the progress of each logical i/o.  For example,
3828 * there may be 2 copies of an indirect block, or many fragments of a RAID-Z
3829 * block.  There may be a long delay before all copies/fragments are completed,
3830 * so this callback allows us to retire dirty space gradually, as the physical
3831 * i/os complete.
3832 */
3833/* ARGSUSED */
3834static void
3835dbuf_write_physdone(zio_t *zio, arc_buf_t *buf, void *arg)
3836{
3837	dmu_buf_impl_t *db = arg;
3838	objset_t *os = db->db_objset;
3839	dsl_pool_t *dp = dmu_objset_pool(os);
3840	dbuf_dirty_record_t *dr;
3841	int delta = 0;
3842
3843	dr = db->db_data_pending;
3844	ASSERT3U(dr->dr_txg, ==, zio->io_txg);
3845
3846	/*
3847	 * The callback will be called io_phys_children times.  Retire one
3848	 * portion of our dirty space each time we are called.  Any rounding
3849	 * error will be cleaned up by dsl_pool_sync()'s call to
3850	 * dsl_pool_undirty_space().
3851	 */
3852	delta = dr->dr_accounted / zio->io_phys_children;
3853	dsl_pool_undirty_space(dp, delta, zio->io_txg);
3854}
3855
3856/* ARGSUSED */
3857static void
3858dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
3859{
3860	dmu_buf_impl_t *db = vdb;
3861	blkptr_t *bp_orig = &zio->io_bp_orig;
3862	blkptr_t *bp = db->db_blkptr;
3863	objset_t *os = db->db_objset;
3864	dmu_tx_t *tx = os->os_synctx;
3865	dbuf_dirty_record_t **drp, *dr;
3866
3867	ASSERT0(zio->io_error);
3868	ASSERT(db->db_blkptr == bp);
3869
3870	/*
3871	 * For nopwrites and rewrites we ensure that the bp matches our
3872	 * original and bypass all the accounting.
3873	 */
3874	if (zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)) {
3875		ASSERT(BP_EQUAL(bp, bp_orig));
3876	} else {
3877		dsl_dataset_t *ds = os->os_dsl_dataset;
3878		(void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
3879		dsl_dataset_block_born(ds, bp, tx);
3880	}
3881
3882	mutex_enter(&db->db_mtx);
3883
3884	DBUF_VERIFY(db);
3885
3886	drp = &db->db_last_dirty;
3887	while ((dr = *drp) != db->db_data_pending)
3888		drp = &dr->dr_next;
3889	ASSERT(!list_link_active(&dr->dr_dirty_node));
3890	ASSERT(dr->dr_dbuf == db);
3891	ASSERT(dr->dr_next == NULL);
3892	*drp = dr->dr_next;
3893
3894#ifdef ZFS_DEBUG
3895	if (db->db_blkid == DMU_SPILL_BLKID) {
3896		dnode_t *dn;
3897
3898		DB_DNODE_ENTER(db);
3899		dn = DB_DNODE(db);
3900		ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
3901		ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
3902		    db->db_blkptr == DN_SPILL_BLKPTR(dn->dn_phys));
3903		DB_DNODE_EXIT(db);
3904	}
3905#endif
3906
3907	if (db->db_level == 0) {
3908		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
3909		ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
3910		if (db->db_state != DB_NOFILL) {
3911			if (dr->dt.dl.dr_data != db->db_buf)
3912				arc_buf_destroy(dr->dt.dl.dr_data, db);
3913		}
3914	} else {
3915		dnode_t *dn;
3916
3917		DB_DNODE_ENTER(db);
3918		dn = DB_DNODE(db);
3919		ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
3920		ASSERT3U(db->db.db_size, ==, 1 << dn->dn_phys->dn_indblkshift);
3921		if (!BP_IS_HOLE(db->db_blkptr)) {
3922			int epbs =
3923			    dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
3924			ASSERT3U(db->db_blkid, <=,
3925			    dn->dn_phys->dn_maxblkid >> (db->db_level * epbs));
3926			ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
3927			    db->db.db_size);
3928		}
3929		DB_DNODE_EXIT(db);
3930		mutex_destroy(&dr->dt.di.dr_mtx);
3931		list_destroy(&dr->dt.di.dr_children);
3932	}
3933	kmem_free(dr, sizeof (dbuf_dirty_record_t));
3934
3935	cv_broadcast(&db->db_changed);
3936	ASSERT(db->db_dirtycnt > 0);
3937	db->db_dirtycnt -= 1;
3938	db->db_data_pending = NULL;
3939	dbuf_rele_and_unlock(db, (void *)(uintptr_t)tx->tx_txg, B_FALSE);
3940}
3941
3942static void
3943dbuf_write_nofill_ready(zio_t *zio)
3944{
3945	dbuf_write_ready(zio, NULL, zio->io_private);
3946}
3947
3948static void
3949dbuf_write_nofill_done(zio_t *zio)
3950{
3951	dbuf_write_done(zio, NULL, zio->io_private);
3952}
3953
3954static void
3955dbuf_write_override_ready(zio_t *zio)
3956{
3957	dbuf_dirty_record_t *dr = zio->io_private;
3958	dmu_buf_impl_t *db = dr->dr_dbuf;
3959
3960	dbuf_write_ready(zio, NULL, db);
3961}
3962
3963static void
3964dbuf_write_override_done(zio_t *zio)
3965{
3966	dbuf_dirty_record_t *dr = zio->io_private;
3967	dmu_buf_impl_t *db = dr->dr_dbuf;
3968	blkptr_t *obp = &dr->dt.dl.dr_overridden_by;
3969
3970	mutex_enter(&db->db_mtx);
3971	if (!BP_EQUAL(zio->io_bp, obp)) {
3972		if (!BP_IS_HOLE(obp))
3973			dsl_free(spa_get_dsl(zio->io_spa), zio->io_txg, obp);
3974		arc_release(dr->dt.dl.dr_data, db);
3975	}
3976	mutex_exit(&db->db_mtx);
3977	dbuf_write_done(zio, NULL, db);
3978
3979	if (zio->io_abd != NULL)
3980		abd_put(zio->io_abd);
3981}
3982
3983typedef struct dbuf_remap_impl_callback_arg {
3984	objset_t	*drica_os;
3985	uint64_t	drica_blk_birth;
3986	dmu_tx_t	*drica_tx;
3987} dbuf_remap_impl_callback_arg_t;
3988
3989static void
3990dbuf_remap_impl_callback(uint64_t vdev, uint64_t offset, uint64_t size,
3991    void *arg)
3992{
3993	dbuf_remap_impl_callback_arg_t *drica = arg;
3994	objset_t *os = drica->drica_os;
3995	spa_t *spa = dmu_objset_spa(os);
3996	dmu_tx_t *tx = drica->drica_tx;
3997
3998	ASSERT(dsl_pool_sync_context(spa_get_dsl(spa)));
3999
4000	if (os == spa_meta_objset(spa)) {
4001		spa_vdev_indirect_mark_obsolete(spa, vdev, offset, size, tx);
4002	} else {
4003		dsl_dataset_block_remapped(dmu_objset_ds(os), vdev, offset,
4004		    size, drica->drica_blk_birth, tx);
4005	}
4006}
4007
4008static void
4009dbuf_remap_impl(dnode_t *dn, blkptr_t *bp, dmu_tx_t *tx)
4010{
4011	blkptr_t bp_copy = *bp;
4012	spa_t *spa = dmu_objset_spa(dn->dn_objset);
4013	dbuf_remap_impl_callback_arg_t drica;
4014
4015	ASSERT(dsl_pool_sync_context(spa_get_dsl(spa)));
4016
4017	drica.drica_os = dn->dn_objset;
4018	drica.drica_blk_birth = bp->blk_birth;
4019	drica.drica_tx = tx;
4020	if (spa_remap_blkptr(spa, &bp_copy, dbuf_remap_impl_callback,
4021	    &drica)) {
4022		/*
4023		 * The struct_rwlock prevents dbuf_read_impl() from
4024		 * dereferencing the BP while we are changing it.  To
4025		 * avoid lock contention, only grab it when we are actually
4026		 * changing the BP.
4027		 */
4028		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
4029		*bp = bp_copy;
4030		rw_exit(&dn->dn_struct_rwlock);
4031	}
4032}
4033
4034/*
4035 * Returns true if a dbuf_remap would modify the dbuf. We do this by attempting
4036 * to remap a copy of every bp in the dbuf.
4037 */
4038boolean_t
4039dbuf_can_remap(const dmu_buf_impl_t *db)
4040{
4041	spa_t *spa = dmu_objset_spa(db->db_objset);
4042	blkptr_t *bp = db->db.db_data;
4043	boolean_t ret = B_FALSE;
4044
4045	ASSERT3U(db->db_level, >, 0);
4046	ASSERT3S(db->db_state, ==, DB_CACHED);
4047
4048	ASSERT(spa_feature_is_active(spa, SPA_FEATURE_DEVICE_REMOVAL));
4049
4050	spa_config_enter(spa, SCL_VDEV, FTAG, RW_READER);
4051	for (int i = 0; i < db->db.db_size >> SPA_BLKPTRSHIFT; i++) {
4052		blkptr_t bp_copy = bp[i];
4053		if (spa_remap_blkptr(spa, &bp_copy, NULL, NULL)) {
4054			ret = B_TRUE;
4055			break;
4056		}
4057	}
4058	spa_config_exit(spa, SCL_VDEV, FTAG);
4059
4060	return (ret);
4061}
4062
4063boolean_t
4064dnode_needs_remap(const dnode_t *dn)
4065{
4066	spa_t *spa = dmu_objset_spa(dn->dn_objset);
4067	boolean_t ret = B_FALSE;
4068
4069	if (dn->dn_phys->dn_nlevels == 0) {
4070		return (B_FALSE);
4071	}
4072
4073	ASSERT(spa_feature_is_active(spa, SPA_FEATURE_DEVICE_REMOVAL));
4074
4075	spa_config_enter(spa, SCL_VDEV, FTAG, RW_READER);
4076	for (int j = 0; j < dn->dn_phys->dn_nblkptr; j++) {
4077		blkptr_t bp_copy = dn->dn_phys->dn_blkptr[j];
4078		if (spa_remap_blkptr(spa, &bp_copy, NULL, NULL)) {
4079			ret = B_TRUE;
4080			break;
4081		}
4082	}
4083	spa_config_exit(spa, SCL_VDEV, FTAG);
4084
4085	return (ret);
4086}
4087
4088/*
4089 * Remap any existing BP's to concrete vdevs, if possible.
4090 */
4091static void
4092dbuf_remap(dnode_t *dn, dmu_buf_impl_t *db, dmu_tx_t *tx)
4093{
4094	spa_t *spa = dmu_objset_spa(db->db_objset);
4095	ASSERT(dsl_pool_sync_context(spa_get_dsl(spa)));
4096
4097	if (!spa_feature_is_active(spa, SPA_FEATURE_DEVICE_REMOVAL))
4098		return;
4099
4100	if (db->db_level > 0) {
4101		blkptr_t *bp = db->db.db_data;
4102		for (int i = 0; i < db->db.db_size >> SPA_BLKPTRSHIFT; i++) {
4103			dbuf_remap_impl(dn, &bp[i], tx);
4104		}
4105	} else if (db->db.db_object == DMU_META_DNODE_OBJECT) {
4106		dnode_phys_t *dnp = db->db.db_data;
4107		ASSERT3U(db->db_dnode_handle->dnh_dnode->dn_type, ==,
4108		    DMU_OT_DNODE);
4109		for (int i = 0; i < db->db.db_size >> DNODE_SHIFT;
4110		    i += dnp[i].dn_extra_slots + 1) {
4111			for (int j = 0; j < dnp[i].dn_nblkptr; j++) {
4112				dbuf_remap_impl(dn, &dnp[i].dn_blkptr[j], tx);
4113			}
4114		}
4115	}
4116}
4117
4118
4119/* Issue I/O to commit a dirty buffer to disk. */
4120static void
4121dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
4122{
4123	dmu_buf_impl_t *db = dr->dr_dbuf;
4124	dnode_t *dn;
4125	objset_t *os;
4126	dmu_buf_impl_t *parent = db->db_parent;
4127	uint64_t txg = tx->tx_txg;
4128	zbookmark_phys_t zb;
4129	zio_prop_t zp;
4130	zio_t *zio;
4131	int wp_flag = 0;
4132
4133	ASSERT(dmu_tx_is_syncing(tx));
4134
4135	DB_DNODE_ENTER(db);
4136	dn = DB_DNODE(db);
4137	os = dn->dn_objset;
4138
4139	if (db->db_state != DB_NOFILL) {
4140		if (db->db_level > 0 || dn->dn_type == DMU_OT_DNODE) {
4141			/*
4142			 * Private object buffers are released here rather
4143			 * than in dbuf_dirty() since they are only modified
4144			 * in the syncing context and we don't want the
4145			 * overhead of making multiple copies of the data.
4146			 */
4147			if (BP_IS_HOLE(db->db_blkptr)) {
4148				arc_buf_thaw(data);
4149			} else {
4150				dbuf_release_bp(db);
4151			}
4152			dbuf_remap(dn, db, tx);
4153		}
4154	}
4155
4156	if (parent != dn->dn_dbuf) {
4157		/* Our parent is an indirect block. */
4158		/* We have a dirty parent that has been scheduled for write. */
4159		ASSERT(parent && parent->db_data_pending);
4160		/* Our parent's buffer is one level closer to the dnode. */
4161		ASSERT(db->db_level == parent->db_level-1);
4162		/*
4163		 * We're about to modify our parent's db_data by modifying
4164		 * our block pointer, so the parent must be released.
4165		 */
4166		ASSERT(arc_released(parent->db_buf));
4167		zio = parent->db_data_pending->dr_zio;
4168	} else {
4169		/* Our parent is the dnode itself. */
4170		ASSERT((db->db_level == dn->dn_phys->dn_nlevels-1 &&
4171		    db->db_blkid != DMU_SPILL_BLKID) ||
4172		    (db->db_blkid == DMU_SPILL_BLKID && db->db_level == 0));
4173		if (db->db_blkid != DMU_SPILL_BLKID)
4174			ASSERT3P(db->db_blkptr, ==,
4175			    &dn->dn_phys->dn_blkptr[db->db_blkid]);
4176		zio = dn->dn_zio;
4177	}
4178
4179	ASSERT(db->db_level == 0 || data == db->db_buf);
4180	ASSERT3U(db->db_blkptr->blk_birth, <=, txg);
4181	ASSERT(zio);
4182
4183	SET_BOOKMARK(&zb, os->os_dsl_dataset ?
4184	    os->os_dsl_dataset->ds_object : DMU_META_OBJSET,
4185	    db->db.db_object, db->db_level, db->db_blkid);
4186
4187	if (db->db_blkid == DMU_SPILL_BLKID)
4188		wp_flag = WP_SPILL;
4189	wp_flag |= (db->db_state == DB_NOFILL) ? WP_NOFILL : 0;
4190
4191	dmu_write_policy(os, dn, db->db_level, wp_flag, &zp);
4192	DB_DNODE_EXIT(db);
4193
4194	/*
4195	 * We copy the blkptr now (rather than when we instantiate the dirty
4196	 * record), because its value can change between open context and
4197	 * syncing context. We do not need to hold dn_struct_rwlock to read
4198	 * db_blkptr because we are in syncing context.
4199	 */
4200	dr->dr_bp_copy = *db->db_blkptr;
4201
4202	if (db->db_level == 0 &&
4203	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
4204		/*
4205		 * The BP for this block has been provided by open context
4206		 * (by dmu_sync() or dmu_buf_write_embedded()).
4207		 */
4208		abd_t *contents = (data != NULL) ?
4209		    abd_get_from_buf(data->b_data, arc_buf_size(data)) : NULL;
4210
4211		dr->dr_zio = zio_write(zio, os->os_spa, txg, &dr->dr_bp_copy,
4212		    contents, db->db.db_size, db->db.db_size, &zp,
4213		    dbuf_write_override_ready, NULL, NULL,
4214		    dbuf_write_override_done,
4215		    dr, ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
4216		mutex_enter(&db->db_mtx);
4217		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
4218		zio_write_override(dr->dr_zio, &dr->dt.dl.dr_overridden_by,
4219		    dr->dt.dl.dr_copies, dr->dt.dl.dr_nopwrite);
4220		mutex_exit(&db->db_mtx);
4221	} else if (db->db_state == DB_NOFILL) {
4222		ASSERT(zp.zp_checksum == ZIO_CHECKSUM_OFF ||
4223		    zp.zp_checksum == ZIO_CHECKSUM_NOPARITY);
4224		dr->dr_zio = zio_write(zio, os->os_spa, txg,
4225		    &dr->dr_bp_copy, NULL, db->db.db_size, db->db.db_size, &zp,
4226		    dbuf_write_nofill_ready, NULL, NULL,
4227		    dbuf_write_nofill_done, db,
4228		    ZIO_PRIORITY_ASYNC_WRITE,
4229		    ZIO_FLAG_MUSTSUCCEED | ZIO_FLAG_NODATA, &zb);
4230	} else {
4231		ASSERT(arc_released(data));
4232
4233		/*
4234		 * For indirect blocks, we want to setup the children
4235		 * ready callback so that we can properly handle an indirect
4236		 * block that only contains holes.
4237		 */
4238		arc_write_done_func_t *children_ready_cb = NULL;
4239		if (db->db_level != 0)
4240			children_ready_cb = dbuf_write_children_ready;
4241
4242		dr->dr_zio = arc_write(zio, os->os_spa, txg,
4243		    &dr->dr_bp_copy, data, DBUF_IS_L2CACHEABLE(db),
4244		    &zp, dbuf_write_ready, children_ready_cb,
4245		    dbuf_write_physdone, dbuf_write_done, db,
4246		    ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
4247	}
4248}
4249