dsl_scan.c revision 358600
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
24 * Copyright 2016 Gary Mills
25 * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
26 * Copyright 2017 Joyent, Inc.
27 * Copyright (c) 2017 Datto Inc.
28 */
29
30#include <sys/dsl_scan.h>
31#include <sys/dsl_pool.h>
32#include <sys/dsl_dataset.h>
33#include <sys/dsl_prop.h>
34#include <sys/dsl_dir.h>
35#include <sys/dsl_synctask.h>
36#include <sys/dnode.h>
37#include <sys/dmu_tx.h>
38#include <sys/dmu_objset.h>
39#include <sys/arc.h>
40#include <sys/zap.h>
41#include <sys/zio.h>
42#include <sys/zfs_context.h>
43#include <sys/fs/zfs.h>
44#include <sys/zfs_znode.h>
45#include <sys/spa_impl.h>
46#include <sys/vdev_impl.h>
47#include <sys/zil_impl.h>
48#include <sys/zio_checksum.h>
49#include <sys/ddt.h>
50#include <sys/sa.h>
51#include <sys/sa_impl.h>
52#include <sys/zfeature.h>
53#include <sys/abd.h>
54#include <sys/range_tree.h>
55#ifdef _KERNEL
56#include <sys/zfs_vfsops.h>
57#endif
58
59/*
60 * Grand theory statement on scan queue sorting
61 *
62 * Scanning is implemented by recursively traversing all indirection levels
63 * in an object and reading all blocks referenced from said objects. This
64 * results in us approximately traversing the object from lowest logical
65 * offset to the highest. For best performance, we would want the logical
66 * blocks to be physically contiguous. However, this is frequently not the
67 * case with pools given the allocation patterns of copy-on-write filesystems.
68 * So instead, we put the I/Os into a reordering queue and issue them in a
69 * way that will most benefit physical disks (LBA-order).
70 *
71 * Queue management:
72 *
73 * Ideally, we would want to scan all metadata and queue up all block I/O
74 * prior to starting to issue it, because that allows us to do an optimal
75 * sorting job. This can however consume large amounts of memory. Therefore
76 * we continuously monitor the size of the queues and constrain them to 5%
77 * (zfs_scan_mem_lim_fact) of physmem. If the queues grow larger than this
78 * limit, we clear out a few of the largest extents at the head of the queues
79 * to make room for more scanning. Hopefully, these extents will be fairly
80 * large and contiguous, allowing us to approach sequential I/O throughput
81 * even without a fully sorted tree.
82 *
83 * Metadata scanning takes place in dsl_scan_visit(), which is called from
84 * dsl_scan_sync() every spa_sync(). If we have either fully scanned all
85 * metadata on the pool, or we need to make room in memory because our
86 * queues are too large, dsl_scan_visit() is postponed and
87 * scan_io_queues_run() is called from dsl_scan_sync() instead. This implies
88 * that metadata scanning and queued I/O issuing are mutually exclusive. This
89 * allows us to provide maximum sequential I/O throughput for the majority of
90 * I/O's issued since sequential I/O performance is significantly negatively
91 * impacted if it is interleaved with random I/O.
92 *
93 * Implementation Notes
94 *
95 * One side effect of the queued scanning algorithm is that the scanning code
96 * needs to be notified whenever a block is freed. This is needed to allow
97 * the scanning code to remove these I/Os from the issuing queue. Additionally,
98 * we do not attempt to queue gang blocks to be issued sequentially since this
99 * is very hard to do and would have an extremely limitted performance benefit.
100 * Instead, we simply issue gang I/Os as soon as we find them using the legacy
101 * algorithm.
102 *
103 * Backwards compatibility
104 *
105 * This new algorithm is backwards compatible with the legacy on-disk data
106 * structures (and therefore does not require a new feature flag).
107 * Periodically during scanning (see zfs_scan_checkpoint_intval), the scan
108 * will stop scanning metadata (in logical order) and wait for all outstanding
109 * sorted I/O to complete. Once this is done, we write out a checkpoint
110 * bookmark, indicating that we have scanned everything logically before it.
111 * If the pool is imported on a machine without the new sorting algorithm,
112 * the scan simply resumes from the last checkpoint using the legacy algorithm.
113 */
114
115typedef int (scan_cb_t)(dsl_pool_t *, const blkptr_t *,
116    const zbookmark_phys_t *);
117
118static scan_cb_t dsl_scan_scrub_cb;
119
120static int scan_ds_queue_compare(const void *a, const void *b);
121static int scan_prefetch_queue_compare(const void *a, const void *b);
122static void scan_ds_queue_clear(dsl_scan_t *scn);
123static boolean_t scan_ds_queue_contains(dsl_scan_t *scn, uint64_t dsobj,
124    uint64_t *txg);
125static void scan_ds_queue_insert(dsl_scan_t *scn, uint64_t dsobj, uint64_t txg);
126static void scan_ds_queue_remove(dsl_scan_t *scn, uint64_t dsobj);
127static void scan_ds_queue_sync(dsl_scan_t *scn, dmu_tx_t *tx);
128static uint64_t dsl_scan_count_leaves(vdev_t *vd);
129
130extern int zfs_vdev_async_write_active_min_dirty_percent;
131
132/*
133 * By default zfs will check to ensure it is not over the hard memory
134 * limit before each txg. If finer-grained control of this is needed
135 * this value can be set to 1 to enable checking before scanning each
136 * block.
137 */
138int zfs_scan_strict_mem_lim = B_FALSE;
139
140/*
141 * Maximum number of parallelly executing I/Os per top-level vdev.
142 * Tune with care. Very high settings (hundreds) are known to trigger
143 * some firmware bugs and resets on certain SSDs.
144 */
145int zfs_top_maxinflight = 32;		/* maximum I/Os per top-level */
146unsigned int zfs_resilver_delay = 2;	/* number of ticks to delay resilver -- 2 is a good number */
147unsigned int zfs_scrub_delay = 4;	/* number of ticks to delay scrub -- 4 is a good number */
148unsigned int zfs_scan_idle = 50;	/* idle window in clock ticks */
149
150/*
151 * Maximum number of parallelly executed bytes per leaf vdev. We attempt
152 * to strike a balance here between keeping the vdev queues full of I/Os
153 * at all times and not overflowing the queues to cause long latency,
154 * which would cause long txg sync times. No matter what, we will not
155 * overload the drives with I/O, since that is protected by
156 * zfs_vdev_scrub_max_active.
157 */
158unsigned long zfs_scan_vdev_limit = 4 << 20;
159
160int zfs_scan_issue_strategy = 0;
161int zfs_scan_legacy = B_FALSE;	/* don't queue & sort zios, go direct */
162uint64_t zfs_scan_max_ext_gap = 2 << 20;	/* in bytes */
163
164unsigned int zfs_scan_checkpoint_intval = 7200;	/* seconds */
165#define	ZFS_SCAN_CHECKPOINT_INTVAL	SEC_TO_TICK(zfs_scan_checkpoint_intval)
166
167/*
168 * fill_weight is non-tunable at runtime, so we copy it at module init from
169 * zfs_scan_fill_weight. Runtime adjustments to zfs_scan_fill_weight would
170 * break queue sorting.
171 */
172uint64_t zfs_scan_fill_weight = 3;
173static uint64_t fill_weight;
174
175/* See dsl_scan_should_clear() for details on the memory limit tunables */
176uint64_t zfs_scan_mem_lim_min = 16 << 20;	/* bytes */
177uint64_t zfs_scan_mem_lim_soft_max = 128 << 20;	/* bytes */
178int zfs_scan_mem_lim_fact = 20;		/* fraction of physmem */
179int zfs_scan_mem_lim_soft_fact = 20;	/* fraction of mem lim above */
180
181unsigned int zfs_scrub_min_time_ms = 1000; /* min millisecs to scrub per txg */
182unsigned int zfs_free_min_time_ms = 1000; /* min millisecs to free per txg */
183unsigned int zfs_obsolete_min_time_ms = 500; /* min millisecs to obsolete per txg */
184unsigned int zfs_resilver_min_time_ms = 3000; /* min millisecs to resilver per txg */
185boolean_t zfs_no_scrub_io = B_FALSE; /* set to disable scrub i/o */
186boolean_t zfs_no_scrub_prefetch = B_FALSE; /* set to disable scrub prefetch */
187
188SYSCTL_DECL(_vfs_zfs);
189SYSCTL_UINT(_vfs_zfs, OID_AUTO, top_maxinflight, CTLFLAG_RWTUN,
190    &zfs_top_maxinflight, 0, "Maximum I/Os per top-level vdev");
191SYSCTL_UINT(_vfs_zfs, OID_AUTO, resilver_delay, CTLFLAG_RWTUN,
192    &zfs_resilver_delay, 0, "Number of ticks to delay resilver");
193SYSCTL_UINT(_vfs_zfs, OID_AUTO, scrub_delay, CTLFLAG_RWTUN,
194    &zfs_scrub_delay, 0, "Number of ticks to delay scrub");
195SYSCTL_UINT(_vfs_zfs, OID_AUTO, scan_idle, CTLFLAG_RWTUN,
196    &zfs_scan_idle, 0, "Idle scan window in clock ticks");
197SYSCTL_UINT(_vfs_zfs, OID_AUTO, scan_min_time_ms, CTLFLAG_RWTUN,
198    &zfs_scrub_min_time_ms, 0, "Min millisecs to scrub per txg");
199SYSCTL_UINT(_vfs_zfs, OID_AUTO, free_min_time_ms, CTLFLAG_RWTUN,
200    &zfs_free_min_time_ms, 0, "Min millisecs to free per txg");
201SYSCTL_UINT(_vfs_zfs, OID_AUTO, resilver_min_time_ms, CTLFLAG_RWTUN,
202    &zfs_resilver_min_time_ms, 0, "Min millisecs to resilver per txg");
203SYSCTL_INT(_vfs_zfs, OID_AUTO, no_scrub_io, CTLFLAG_RWTUN,
204    &zfs_no_scrub_io, 0, "Disable scrub I/O");
205SYSCTL_INT(_vfs_zfs, OID_AUTO, no_scrub_prefetch, CTLFLAG_RWTUN,
206    &zfs_no_scrub_prefetch, 0, "Disable scrub prefetching");
207SYSCTL_UINT(_vfs_zfs, OID_AUTO, zfs_scan_legacy, CTLFLAG_RWTUN,
208    &zfs_scan_legacy, 0, "Scrub using legacy non-sequential method");
209SYSCTL_UINT(_vfs_zfs, OID_AUTO, zfs_scan_checkpoint_interval, CTLFLAG_RWTUN,
210    &zfs_scan_checkpoint_intval, 0, "Scan progress on-disk checkpointing interval");
211
212enum ddt_class zfs_scrub_ddt_class_max = DDT_CLASS_DUPLICATE;
213/* max number of blocks to free in a single TXG */
214uint64_t zfs_async_block_max_blocks = UINT64_MAX;
215SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, free_max_blocks, CTLFLAG_RWTUN,
216    &zfs_async_block_max_blocks, 0, "Maximum number of blocks to free in one TXG");
217
218/*
219 * We wait a few txgs after importing a pool to begin scanning so that
220 * the import / mounting code isn't held up by scrub / resilver IO.
221 * Unfortunately, it is a bit difficult to determine exactly how long
222 * this will take since userspace will trigger fs mounts asynchronously
223 * and the kernel will create zvol minors asynchronously. As a result,
224 * the value provided here is a bit arbitrary, but represents a
225 * reasonable estimate of how many txgs it will take to finish fully
226 * importing a pool
227 */
228#define        SCAN_IMPORT_WAIT_TXGS           5
229
230
231#define	DSL_SCAN_IS_SCRUB_RESILVER(scn) \
232	((scn)->scn_phys.scn_func == POOL_SCAN_SCRUB || \
233	(scn)->scn_phys.scn_func == POOL_SCAN_RESILVER)
234
235extern int zfs_txg_timeout;
236
237/*
238 * Enable/disable the processing of the free_bpobj object.
239 */
240boolean_t zfs_free_bpobj_enabled = B_TRUE;
241
242SYSCTL_INT(_vfs_zfs, OID_AUTO, free_bpobj_enabled, CTLFLAG_RWTUN,
243    &zfs_free_bpobj_enabled, 0, "Enable free_bpobj processing");
244
245/* the order has to match pool_scan_type */
246static scan_cb_t *scan_funcs[POOL_SCAN_FUNCS] = {
247	NULL,
248	dsl_scan_scrub_cb,	/* POOL_SCAN_SCRUB */
249	dsl_scan_scrub_cb,	/* POOL_SCAN_RESILVER */
250};
251
252/* In core node for the scn->scn_queue. Represents a dataset to be scanned */
253typedef struct {
254	uint64_t	sds_dsobj;
255	uint64_t	sds_txg;
256	avl_node_t	sds_node;
257} scan_ds_t;
258
259/*
260 * This controls what conditions are placed on dsl_scan_sync_state():
261 * SYNC_OPTIONAL) write out scn_phys iff scn_bytes_pending == 0
262 * SYNC_MANDATORY) write out scn_phys always. scn_bytes_pending must be 0.
263 * SYNC_CACHED) if scn_bytes_pending == 0, write out scn_phys. Otherwise
264 *	write out the scn_phys_cached version.
265 * See dsl_scan_sync_state for details.
266 */
267typedef enum {
268	SYNC_OPTIONAL,
269	SYNC_MANDATORY,
270	SYNC_CACHED
271} state_sync_type_t;
272
273/*
274 * This struct represents the minimum information needed to reconstruct a
275 * zio for sequential scanning. This is useful because many of these will
276 * accumulate in the sequential IO queues before being issued, so saving
277 * memory matters here.
278 */
279typedef struct scan_io {
280	/* fields from blkptr_t */
281	uint64_t		sio_offset;
282	uint64_t		sio_blk_prop;
283	uint64_t		sio_phys_birth;
284	uint64_t		sio_birth;
285	zio_cksum_t		sio_cksum;
286	uint32_t		sio_asize;
287
288	/* fields from zio_t */
289	int			sio_flags;
290	zbookmark_phys_t	sio_zb;
291
292	/* members for queue sorting */
293	union {
294		avl_node_t	sio_addr_node; /* link into issueing queue */
295		list_node_t	sio_list_node; /* link for issuing to disk */
296	} sio_nodes;
297} scan_io_t;
298
299struct dsl_scan_io_queue {
300	dsl_scan_t	*q_scn; /* associated dsl_scan_t */
301	vdev_t		*q_vd; /* top-level vdev that this queue represents */
302
303	/* trees used for sorting I/Os and extents of I/Os */
304	range_tree_t	*q_exts_by_addr;
305	avl_tree_t	q_exts_by_size;
306	avl_tree_t	q_sios_by_addr;
307
308	/* members for zio rate limiting */
309	uint64_t	q_maxinflight_bytes;
310	uint64_t	q_inflight_bytes;
311	kcondvar_t	q_zio_cv; /* used under vd->vdev_scan_io_queue_lock */
312
313	/* per txg statistics */
314	uint64_t	q_total_seg_size_this_txg;
315	uint64_t	q_segs_this_txg;
316	uint64_t	q_total_zio_size_this_txg;
317	uint64_t	q_zios_this_txg;
318};
319
320/* private data for dsl_scan_prefetch_cb() */
321typedef struct scan_prefetch_ctx {
322	refcount_t spc_refcnt;		/* refcount for memory management */
323	dsl_scan_t *spc_scn;		/* dsl_scan_t for the pool */
324	boolean_t spc_root;		/* is this prefetch for an objset? */
325	uint8_t spc_indblkshift;	/* dn_indblkshift of current dnode */
326	uint16_t spc_datablkszsec;	/* dn_idatablkszsec of current dnode */
327} scan_prefetch_ctx_t;
328
329/* private data for dsl_scan_prefetch() */
330typedef struct scan_prefetch_issue_ctx {
331	avl_node_t spic_avl_node;	/* link into scn->scn_prefetch_queue */
332	scan_prefetch_ctx_t *spic_spc;	/* spc for the callback */
333	blkptr_t spic_bp;		/* bp to prefetch */
334	zbookmark_phys_t spic_zb;	/* bookmark to prefetch */
335} scan_prefetch_issue_ctx_t;
336
337static void scan_exec_io(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
338    const zbookmark_phys_t *zb, dsl_scan_io_queue_t *queue);
339static void scan_io_queue_insert_impl(dsl_scan_io_queue_t *queue,
340    scan_io_t *sio);
341
342static dsl_scan_io_queue_t *scan_io_queue_create(vdev_t *vd);
343static void scan_io_queues_destroy(dsl_scan_t *scn);
344
345static kmem_cache_t *sio_cache;
346
347void
348scan_init(void)
349{
350	/*
351	 * This is used in ext_size_compare() to weight segments
352	 * based on how sparse they are. This cannot be changed
353	 * mid-scan and the tree comparison functions don't currently
354	 * have a mechansim for passing additional context to the
355	 * compare functions. Thus we store this value globally and
356	 * we only allow it to be set at module intiailization time
357	 */
358	fill_weight = zfs_scan_fill_weight;
359
360	sio_cache = kmem_cache_create("sio_cache",
361	    sizeof (scan_io_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
362}
363
364void
365scan_fini(void)
366{
367	kmem_cache_destroy(sio_cache);
368}
369
370static inline boolean_t
371dsl_scan_is_running(const dsl_scan_t *scn)
372{
373	return (scn->scn_phys.scn_state == DSS_SCANNING);
374}
375
376boolean_t
377dsl_scan_resilvering(dsl_pool_t *dp)
378{
379	return (dsl_scan_is_running(dp->dp_scan) &&
380	    dp->dp_scan->scn_phys.scn_func == POOL_SCAN_RESILVER);
381}
382
383static inline void
384sio2bp(const scan_io_t *sio, blkptr_t *bp, uint64_t vdev_id)
385{
386	bzero(bp, sizeof (*bp));
387	DVA_SET_ASIZE(&bp->blk_dva[0], sio->sio_asize);
388	DVA_SET_VDEV(&bp->blk_dva[0], vdev_id);
389	DVA_SET_OFFSET(&bp->blk_dva[0], sio->sio_offset);
390	bp->blk_prop = sio->sio_blk_prop;
391	bp->blk_phys_birth = sio->sio_phys_birth;
392	bp->blk_birth = sio->sio_birth;
393	bp->blk_fill = 1;	/* we always only work with data pointers */
394	bp->blk_cksum = sio->sio_cksum;
395}
396
397static inline void
398bp2sio(const blkptr_t *bp, scan_io_t *sio, int dva_i)
399{
400	/* we discard the vdev id, since we can deduce it from the queue */
401	sio->sio_offset = DVA_GET_OFFSET(&bp->blk_dva[dva_i]);
402	sio->sio_asize = DVA_GET_ASIZE(&bp->blk_dva[dva_i]);
403	sio->sio_blk_prop = bp->blk_prop;
404	sio->sio_phys_birth = bp->blk_phys_birth;
405	sio->sio_birth = bp->blk_birth;
406	sio->sio_cksum = bp->blk_cksum;
407}
408
409void
410dsl_scan_global_init(void)
411{
412	/*
413	 * This is used in ext_size_compare() to weight segments
414	 * based on how sparse they are. This cannot be changed
415	 * mid-scan and the tree comparison functions don't currently
416	 * have a mechansim for passing additional context to the
417	 * compare functions. Thus we store this value globally and
418	 * we only allow it to be set at module intiailization time
419	 */
420	fill_weight = zfs_scan_fill_weight;
421}
422
423int
424dsl_scan_init(dsl_pool_t *dp, uint64_t txg)
425{
426	int err;
427	dsl_scan_t *scn;
428	spa_t *spa = dp->dp_spa;
429	uint64_t f;
430
431	scn = dp->dp_scan = kmem_zalloc(sizeof (dsl_scan_t), KM_SLEEP);
432	scn->scn_dp = dp;
433
434	/*
435	 * It's possible that we're resuming a scan after a reboot so
436	 * make sure that the scan_async_destroying flag is initialized
437	 * appropriately.
438	 */
439	ASSERT(!scn->scn_async_destroying);
440	scn->scn_async_destroying = spa_feature_is_active(dp->dp_spa,
441	    SPA_FEATURE_ASYNC_DESTROY);
442
443	/*
444	 * Calculate the max number of in-flight bytes for pool-wide
445	 * scanning operations (minimum 1MB). Limits for the issuing
446	 * phase are done per top-level vdev and are handled separately.
447	 */
448	scn->scn_maxinflight_bytes = MAX(zfs_scan_vdev_limit *
449	    dsl_scan_count_leaves(spa->spa_root_vdev), 1ULL << 20);
450
451	bcopy(&scn->scn_phys, &scn->scn_phys_cached, sizeof (scn->scn_phys));
452	avl_create(&scn->scn_queue, scan_ds_queue_compare, sizeof (scan_ds_t),
453	    offsetof(scan_ds_t, sds_node));
454	avl_create(&scn->scn_prefetch_queue, scan_prefetch_queue_compare,
455	    sizeof (scan_prefetch_issue_ctx_t),
456	    offsetof(scan_prefetch_issue_ctx_t, spic_avl_node));
457
458	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
459	    "scrub_func", sizeof (uint64_t), 1, &f);
460	if (err == 0) {
461		/*
462		 * There was an old-style scrub in progress.  Restart a
463		 * new-style scrub from the beginning.
464		 */
465		scn->scn_restart_txg = txg;
466		zfs_dbgmsg("old-style scrub was in progress; "
467		    "restarting new-style scrub in txg %llu",
468		    (longlong_t)scn->scn_restart_txg);
469
470		/*
471		 * Load the queue obj from the old location so that it
472		 * can be freed by dsl_scan_done().
473		 */
474		(void) zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
475		    "scrub_queue", sizeof (uint64_t), 1,
476		    &scn->scn_phys.scn_queue_obj);
477	} else {
478		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
479		    DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
480		    &scn->scn_phys);
481		if (err == ENOENT)
482			return (0);
483		else if (err)
484			return (err);
485
486		/*
487		 * We might be restarting after a reboot, so jump the issued
488		 * counter to how far we've scanned. We know we're consistent
489		 * up to here.
490		 */
491		scn->scn_issued_before_pass = scn->scn_phys.scn_examined;
492
493		if (dsl_scan_is_running(scn) &&
494		    spa_prev_software_version(dp->dp_spa) < SPA_VERSION_SCAN) {
495			/*
496			 * A new-type scrub was in progress on an old
497			 * pool, and the pool was accessed by old
498			 * software.  Restart from the beginning, since
499			 * the old software may have changed the pool in
500			 * the meantime.
501			 */
502			scn->scn_restart_txg = txg;
503			zfs_dbgmsg("new-style scrub was modified "
504			    "by old software; restarting in txg %llu",
505			    (longlong_t)scn->scn_restart_txg);
506		}
507	}
508
509	/* reload the queue into the in-core state */
510	if (scn->scn_phys.scn_queue_obj != 0) {
511		zap_cursor_t zc;
512		zap_attribute_t za;
513
514		for (zap_cursor_init(&zc, dp->dp_meta_objset,
515		    scn->scn_phys.scn_queue_obj);
516		    zap_cursor_retrieve(&zc, &za) == 0;
517		    (void) zap_cursor_advance(&zc)) {
518			scan_ds_queue_insert(scn,
519			    zfs_strtonum(za.za_name, NULL),
520			    za.za_first_integer);
521		}
522		zap_cursor_fini(&zc);
523	}
524
525	spa_scan_stat_init(spa);
526	return (0);
527}
528
529void
530dsl_scan_fini(dsl_pool_t *dp)
531{
532	if (dp->dp_scan != NULL) {
533		dsl_scan_t *scn = dp->dp_scan;
534
535		if (scn->scn_taskq != NULL)
536			taskq_destroy(scn->scn_taskq);
537		scan_ds_queue_clear(scn);
538		avl_destroy(&scn->scn_queue);
539		avl_destroy(&scn->scn_prefetch_queue);
540
541		kmem_free(dp->dp_scan, sizeof (dsl_scan_t));
542		dp->dp_scan = NULL;
543	}
544}
545
546static boolean_t
547dsl_scan_restarting(dsl_scan_t *scn, dmu_tx_t *tx)
548{
549	return (scn->scn_restart_txg != 0 &&
550	    scn->scn_restart_txg <= tx->tx_txg);
551}
552
553boolean_t
554dsl_scan_scrubbing(const dsl_pool_t *dp)
555{
556	dsl_scan_phys_t *scn_phys = &dp->dp_scan->scn_phys;
557
558	return (scn_phys->scn_state == DSS_SCANNING &&
559	    scn_phys->scn_func == POOL_SCAN_SCRUB);
560}
561
562boolean_t
563dsl_scan_is_paused_scrub(const dsl_scan_t *scn)
564{
565	return (dsl_scan_scrubbing(scn->scn_dp) &&
566	    scn->scn_phys.scn_flags & DSF_SCRUB_PAUSED);
567}
568
569/*
570 * Writes out a persistent dsl_scan_phys_t record to the pool directory.
571 * Because we can be running in the block sorting algorithm, we do not always
572 * want to write out the record, only when it is "safe" to do so. This safety
573 * condition is achieved by making sure that the sorting queues are empty
574 * (scn_bytes_pending == 0). When this condition is not true, the sync'd state
575 * is inconsistent with how much actual scanning progress has been made. The
576 * kind of sync to be performed is specified by the sync_type argument. If the
577 * sync is optional, we only sync if the queues are empty. If the sync is
578 * mandatory, we do a hard ASSERT to make sure that the queues are empty. The
579 * third possible state is a "cached" sync. This is done in response to:
580 * 1) The dataset that was in the last sync'd dsl_scan_phys_t having been
581 *	destroyed, so we wouldn't be able to restart scanning from it.
582 * 2) The snapshot that was in the last sync'd dsl_scan_phys_t having been
583 *	superseded by a newer snapshot.
584 * 3) The dataset that was in the last sync'd dsl_scan_phys_t having been
585 *	swapped with its clone.
586 * In all cases, a cached sync simply rewrites the last record we've written,
587 * just slightly modified. For the modifications that are performed to the
588 * last written dsl_scan_phys_t, see dsl_scan_ds_destroyed,
589 * dsl_scan_ds_snapshotted and dsl_scan_ds_clone_swapped.
590 */
591static void
592dsl_scan_sync_state(dsl_scan_t *scn, dmu_tx_t *tx, state_sync_type_t sync_type)
593{
594	int i;
595	spa_t *spa = scn->scn_dp->dp_spa;
596
597	ASSERT(sync_type != SYNC_MANDATORY || scn->scn_bytes_pending == 0);
598	if (scn->scn_bytes_pending == 0) {
599		for (i = 0; i < spa->spa_root_vdev->vdev_children; i++) {
600			vdev_t *vd = spa->spa_root_vdev->vdev_child[i];
601			dsl_scan_io_queue_t *q = vd->vdev_scan_io_queue;
602
603			if (q == NULL)
604				continue;
605
606			mutex_enter(&vd->vdev_scan_io_queue_lock);
607			ASSERT3P(avl_first(&q->q_sios_by_addr), ==, NULL);
608			ASSERT3P(avl_first(&q->q_exts_by_size), ==, NULL);
609			ASSERT3P(range_tree_first(q->q_exts_by_addr), ==, NULL);
610			mutex_exit(&vd->vdev_scan_io_queue_lock);
611		}
612
613		if (scn->scn_phys.scn_queue_obj != 0)
614			scan_ds_queue_sync(scn, tx);
615		VERIFY0(zap_update(scn->scn_dp->dp_meta_objset,
616		    DMU_POOL_DIRECTORY_OBJECT,
617		    DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
618		    &scn->scn_phys, tx));
619		bcopy(&scn->scn_phys, &scn->scn_phys_cached,
620		    sizeof (scn->scn_phys));
621
622		if (scn->scn_checkpointing)
623			zfs_dbgmsg("finish scan checkpoint");
624
625		scn->scn_checkpointing = B_FALSE;
626		scn->scn_last_checkpoint = ddi_get_lbolt();
627	} else if (sync_type == SYNC_CACHED) {
628		VERIFY0(zap_update(scn->scn_dp->dp_meta_objset,
629		    DMU_POOL_DIRECTORY_OBJECT,
630		    DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
631		    &scn->scn_phys_cached, tx));
632	}
633}
634
635/* ARGSUSED */
636static int
637dsl_scan_setup_check(void *arg, dmu_tx_t *tx)
638{
639	dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
640
641	if (dsl_scan_is_running(scn))
642		return (SET_ERROR(EBUSY));
643
644	return (0);
645}
646
647static void
648dsl_scan_setup_sync(void *arg, dmu_tx_t *tx)
649{
650	dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
651	pool_scan_func_t *funcp = arg;
652	dmu_object_type_t ot = 0;
653	dsl_pool_t *dp = scn->scn_dp;
654	spa_t *spa = dp->dp_spa;
655
656	ASSERT(!dsl_scan_is_running(scn));
657	ASSERT(*funcp > POOL_SCAN_NONE && *funcp < POOL_SCAN_FUNCS);
658	bzero(&scn->scn_phys, sizeof (scn->scn_phys));
659	scn->scn_phys.scn_func = *funcp;
660	scn->scn_phys.scn_state = DSS_SCANNING;
661	scn->scn_phys.scn_min_txg = 0;
662	scn->scn_phys.scn_max_txg = tx->tx_txg;
663	scn->scn_phys.scn_ddt_class_max = DDT_CLASSES - 1; /* the entire DDT */
664	scn->scn_phys.scn_start_time = gethrestime_sec();
665	scn->scn_phys.scn_errors = 0;
666	scn->scn_phys.scn_to_examine = spa->spa_root_vdev->vdev_stat.vs_alloc;
667	scn->scn_issued_before_pass = 0;
668	scn->scn_restart_txg = 0;
669	scn->scn_done_txg = 0;
670	scn->scn_last_checkpoint = 0;
671	scn->scn_checkpointing = B_FALSE;
672	spa_scan_stat_init(spa);
673
674	if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
675		scn->scn_phys.scn_ddt_class_max = zfs_scrub_ddt_class_max;
676
677		/* rewrite all disk labels */
678		vdev_config_dirty(spa->spa_root_vdev);
679
680		if (vdev_resilver_needed(spa->spa_root_vdev,
681		    &scn->scn_phys.scn_min_txg, &scn->scn_phys.scn_max_txg)) {
682			spa_event_notify(spa, NULL, NULL,
683			    ESC_ZFS_RESILVER_START);
684		} else {
685			spa_event_notify(spa, NULL, NULL, ESC_ZFS_SCRUB_START);
686		}
687
688		spa->spa_scrub_started = B_TRUE;
689		/*
690		 * If this is an incremental scrub, limit the DDT scrub phase
691		 * to just the auto-ditto class (for correctness); the rest
692		 * of the scrub should go faster using top-down pruning.
693		 */
694		if (scn->scn_phys.scn_min_txg > TXG_INITIAL)
695			scn->scn_phys.scn_ddt_class_max = DDT_CLASS_DITTO;
696
697	}
698
699	/* back to the generic stuff */
700
701	if (dp->dp_blkstats == NULL) {
702		dp->dp_blkstats =
703		    kmem_alloc(sizeof (zfs_all_blkstats_t), KM_SLEEP);
704		mutex_init(&dp->dp_blkstats->zab_lock, NULL,
705		    MUTEX_DEFAULT, NULL);
706	}
707	bzero(&dp->dp_blkstats->zab_type, sizeof (dp->dp_blkstats->zab_type));
708
709	if (spa_version(spa) < SPA_VERSION_DSL_SCRUB)
710		ot = DMU_OT_ZAP_OTHER;
711
712	scn->scn_phys.scn_queue_obj = zap_create(dp->dp_meta_objset,
713	    ot ? ot : DMU_OT_SCAN_QUEUE, DMU_OT_NONE, 0, tx);
714
715	bcopy(&scn->scn_phys, &scn->scn_phys_cached, sizeof (scn->scn_phys));
716
717	dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
718
719	spa_history_log_internal(spa, "scan setup", tx,
720	    "func=%u mintxg=%llu maxtxg=%llu",
721	    *funcp, scn->scn_phys.scn_min_txg, scn->scn_phys.scn_max_txg);
722}
723
724/*
725 * Called by the ZFS_IOC_POOL_SCAN ioctl to start a scrub or resilver.
726 * Can also be called to resume a paused scrub.
727 */
728int
729dsl_scan(dsl_pool_t *dp, pool_scan_func_t func)
730{
731	spa_t *spa = dp->dp_spa;
732	dsl_scan_t *scn = dp->dp_scan;
733
734	/*
735	 * Purge all vdev caches and probe all devices.  We do this here
736	 * rather than in sync context because this requires a writer lock
737	 * on the spa_config lock, which we can't do from sync context.  The
738	 * spa_scrub_reopen flag indicates that vdev_open() should not
739	 * attempt to start another scrub.
740	 */
741	spa_vdev_state_enter(spa, SCL_NONE);
742	spa->spa_scrub_reopen = B_TRUE;
743	vdev_reopen(spa->spa_root_vdev);
744	spa->spa_scrub_reopen = B_FALSE;
745	(void) spa_vdev_state_exit(spa, NULL, 0);
746
747	if (func == POOL_SCAN_SCRUB && dsl_scan_is_paused_scrub(scn)) {
748		/* got scrub start cmd, resume paused scrub */
749		int err = dsl_scrub_set_pause_resume(scn->scn_dp,
750		    POOL_SCRUB_NORMAL);
751		if (err == 0) {
752			spa_event_notify(spa, NULL, NULL, ESC_ZFS_SCRUB_RESUME);
753			return (ECANCELED);
754		}
755		return (SET_ERROR(err));
756	}
757
758	return (dsl_sync_task(spa_name(spa), dsl_scan_setup_check,
759	    dsl_scan_setup_sync, &func, 0, ZFS_SPACE_CHECK_EXTRA_RESERVED));
760}
761
762/* ARGSUSED */
763static void
764dsl_scan_done(dsl_scan_t *scn, boolean_t complete, dmu_tx_t *tx)
765{
766	static const char *old_names[] = {
767		"scrub_bookmark",
768		"scrub_ddt_bookmark",
769		"scrub_ddt_class_max",
770		"scrub_queue",
771		"scrub_min_txg",
772		"scrub_max_txg",
773		"scrub_func",
774		"scrub_errors",
775		NULL
776	};
777
778	dsl_pool_t *dp = scn->scn_dp;
779	spa_t *spa = dp->dp_spa;
780	int i;
781
782	/* Remove any remnants of an old-style scrub. */
783	for (i = 0; old_names[i]; i++) {
784		(void) zap_remove(dp->dp_meta_objset,
785		    DMU_POOL_DIRECTORY_OBJECT, old_names[i], tx);
786	}
787
788	if (scn->scn_phys.scn_queue_obj != 0) {
789		VERIFY0(dmu_object_free(dp->dp_meta_objset,
790		    scn->scn_phys.scn_queue_obj, tx));
791		scn->scn_phys.scn_queue_obj = 0;
792	}
793	scan_ds_queue_clear(scn);
794
795	scn->scn_phys.scn_flags &= ~DSF_SCRUB_PAUSED;
796
797	/*
798	 * If we were "restarted" from a stopped state, don't bother
799	 * with anything else.
800	 */
801	if (!dsl_scan_is_running(scn)) {
802		ASSERT(!scn->scn_is_sorted);
803		return;
804	}
805
806	if (scn->scn_is_sorted) {
807		scan_io_queues_destroy(scn);
808		scn->scn_is_sorted = B_FALSE;
809
810		if (scn->scn_taskq != NULL) {
811			taskq_destroy(scn->scn_taskq);
812			scn->scn_taskq = NULL;
813		}
814	}
815
816	scn->scn_phys.scn_state = complete ? DSS_FINISHED : DSS_CANCELED;
817
818	if (dsl_scan_restarting(scn, tx))
819		spa_history_log_internal(spa, "scan aborted, restarting", tx,
820		    "errors=%llu", spa_get_errlog_size(spa));
821	else if (!complete)
822		spa_history_log_internal(spa, "scan cancelled", tx,
823		    "errors=%llu", spa_get_errlog_size(spa));
824	else
825		spa_history_log_internal(spa, "scan done", tx,
826		    "errors=%llu", spa_get_errlog_size(spa));
827
828	if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
829		spa->spa_scrub_started = B_FALSE;
830		spa->spa_scrub_active = B_FALSE;
831
832		/*
833		 * If the scrub/resilver completed, update all DTLs to
834		 * reflect this.  Whether it succeeded or not, vacate
835		 * all temporary scrub DTLs.
836		 *
837		 * As the scrub does not currently support traversing
838		 * data that have been freed but are part of a checkpoint,
839		 * we don't mark the scrub as done in the DTLs as faults
840		 * may still exist in those vdevs.
841		 */
842		if (complete &&
843		    !spa_feature_is_active(spa, SPA_FEATURE_POOL_CHECKPOINT)) {
844			vdev_dtl_reassess(spa->spa_root_vdev, tx->tx_txg,
845			    scn->scn_phys.scn_max_txg, B_TRUE);
846
847			spa_event_notify(spa, NULL, NULL,
848			    scn->scn_phys.scn_min_txg ?
849			    ESC_ZFS_RESILVER_FINISH : ESC_ZFS_SCRUB_FINISH);
850		} else {
851			vdev_dtl_reassess(spa->spa_root_vdev, tx->tx_txg,
852			    0, B_TRUE);
853		}
854		spa_errlog_rotate(spa);
855
856		/*
857		 * We may have finished replacing a device.
858		 * Let the async thread assess this and handle the detach.
859		 */
860		spa_async_request(spa, SPA_ASYNC_RESILVER_DONE);
861	}
862
863	scn->scn_phys.scn_end_time = gethrestime_sec();
864
865	ASSERT(!dsl_scan_is_running(scn));
866}
867
868/* ARGSUSED */
869static int
870dsl_scan_cancel_check(void *arg, dmu_tx_t *tx)
871{
872	dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
873
874	if (!dsl_scan_is_running(scn))
875		return (SET_ERROR(ENOENT));
876	return (0);
877}
878
879/* ARGSUSED */
880static void
881dsl_scan_cancel_sync(void *arg, dmu_tx_t *tx)
882{
883	dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
884
885	dsl_scan_done(scn, B_FALSE, tx);
886	dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
887	spa_event_notify(scn->scn_dp->dp_spa, NULL, NULL, ESC_ZFS_SCRUB_ABORT);
888}
889
890int
891dsl_scan_cancel(dsl_pool_t *dp)
892{
893	return (dsl_sync_task(spa_name(dp->dp_spa), dsl_scan_cancel_check,
894	    dsl_scan_cancel_sync, NULL, 3, ZFS_SPACE_CHECK_RESERVED));
895}
896
897static int
898dsl_scrub_pause_resume_check(void *arg, dmu_tx_t *tx)
899{
900	pool_scrub_cmd_t *cmd = arg;
901	dsl_pool_t *dp = dmu_tx_pool(tx);
902	dsl_scan_t *scn = dp->dp_scan;
903
904	if (*cmd == POOL_SCRUB_PAUSE) {
905		/* can't pause a scrub when there is no in-progress scrub */
906		if (!dsl_scan_scrubbing(dp))
907			return (SET_ERROR(ENOENT));
908
909		/* can't pause a paused scrub */
910		if (dsl_scan_is_paused_scrub(scn))
911			return (SET_ERROR(EBUSY));
912	} else if (*cmd != POOL_SCRUB_NORMAL) {
913		return (SET_ERROR(ENOTSUP));
914	}
915
916	return (0);
917}
918
919static void
920dsl_scrub_pause_resume_sync(void *arg, dmu_tx_t *tx)
921{
922	pool_scrub_cmd_t *cmd = arg;
923	dsl_pool_t *dp = dmu_tx_pool(tx);
924	spa_t *spa = dp->dp_spa;
925	dsl_scan_t *scn = dp->dp_scan;
926
927	if (*cmd == POOL_SCRUB_PAUSE) {
928		/* can't pause a scrub when there is no in-progress scrub */
929		spa->spa_scan_pass_scrub_pause = gethrestime_sec();
930		scn->scn_phys.scn_flags |= DSF_SCRUB_PAUSED;
931		dsl_scan_sync_state(scn, tx, SYNC_CACHED);
932		spa_event_notify(spa, NULL, NULL, ESC_ZFS_SCRUB_PAUSED);
933	} else {
934		ASSERT3U(*cmd, ==, POOL_SCRUB_NORMAL);
935		if (dsl_scan_is_paused_scrub(scn)) {
936			/*
937			 * We need to keep track of how much time we spend
938			 * paused per pass so that we can adjust the scrub rate
939			 * shown in the output of 'zpool status'
940			 */
941			spa->spa_scan_pass_scrub_spent_paused +=
942			    gethrestime_sec() - spa->spa_scan_pass_scrub_pause;
943			spa->spa_scan_pass_scrub_pause = 0;
944			scn->scn_phys.scn_flags &= ~DSF_SCRUB_PAUSED;
945			dsl_scan_sync_state(scn, tx, SYNC_CACHED);
946		}
947	}
948}
949
950/*
951 * Set scrub pause/resume state if it makes sense to do so
952 */
953int
954dsl_scrub_set_pause_resume(const dsl_pool_t *dp, pool_scrub_cmd_t cmd)
955{
956	return (dsl_sync_task(spa_name(dp->dp_spa),
957	    dsl_scrub_pause_resume_check, dsl_scrub_pause_resume_sync, &cmd, 3,
958	    ZFS_SPACE_CHECK_RESERVED));
959}
960
961
962/* start a new scan, or restart an existing one. */
963void
964dsl_resilver_restart(dsl_pool_t *dp, uint64_t txg)
965{
966	if (txg == 0) {
967		dmu_tx_t *tx;
968		tx = dmu_tx_create_dd(dp->dp_mos_dir);
969		VERIFY(0 == dmu_tx_assign(tx, TXG_WAIT));
970
971		txg = dmu_tx_get_txg(tx);
972		dp->dp_scan->scn_restart_txg = txg;
973		dmu_tx_commit(tx);
974	} else {
975		dp->dp_scan->scn_restart_txg = txg;
976	}
977	zfs_dbgmsg("restarting resilver txg=%llu", txg);
978}
979
980void
981dsl_free(dsl_pool_t *dp, uint64_t txg, const blkptr_t *bp)
982{
983	zio_free(dp->dp_spa, txg, bp);
984}
985
986void
987dsl_free_sync(zio_t *pio, dsl_pool_t *dp, uint64_t txg, const blkptr_t *bpp)
988{
989	ASSERT(dsl_pool_sync_context(dp));
990	zio_nowait(zio_free_sync(pio, dp->dp_spa, txg, bpp, BP_GET_PSIZE(bpp),
991	    pio->io_flags));
992}
993
994static int
995scan_ds_queue_compare(const void *a, const void *b)
996{
997	const scan_ds_t *sds_a = a, *sds_b = b;
998
999	if (sds_a->sds_dsobj < sds_b->sds_dsobj)
1000		return (-1);
1001	if (sds_a->sds_dsobj == sds_b->sds_dsobj)
1002		return (0);
1003	return (1);
1004}
1005
1006static void
1007scan_ds_queue_clear(dsl_scan_t *scn)
1008{
1009	void *cookie = NULL;
1010	scan_ds_t *sds;
1011	while ((sds = avl_destroy_nodes(&scn->scn_queue, &cookie)) != NULL) {
1012		kmem_free(sds, sizeof (*sds));
1013	}
1014}
1015
1016static boolean_t
1017scan_ds_queue_contains(dsl_scan_t *scn, uint64_t dsobj, uint64_t *txg)
1018{
1019	scan_ds_t srch, *sds;
1020
1021	srch.sds_dsobj = dsobj;
1022	sds = avl_find(&scn->scn_queue, &srch, NULL);
1023	if (sds != NULL && txg != NULL)
1024		*txg = sds->sds_txg;
1025	return (sds != NULL);
1026}
1027
1028static void
1029scan_ds_queue_insert(dsl_scan_t *scn, uint64_t dsobj, uint64_t txg)
1030{
1031	scan_ds_t *sds;
1032	avl_index_t where;
1033
1034	sds = kmem_zalloc(sizeof (*sds), KM_SLEEP);
1035	sds->sds_dsobj = dsobj;
1036	sds->sds_txg = txg;
1037
1038	VERIFY3P(avl_find(&scn->scn_queue, sds, &where), ==, NULL);
1039	avl_insert(&scn->scn_queue, sds, where);
1040}
1041
1042static void
1043scan_ds_queue_remove(dsl_scan_t *scn, uint64_t dsobj)
1044{
1045	scan_ds_t srch, *sds;
1046
1047	srch.sds_dsobj = dsobj;
1048
1049	sds = avl_find(&scn->scn_queue, &srch, NULL);
1050	VERIFY(sds != NULL);
1051	avl_remove(&scn->scn_queue, sds);
1052	kmem_free(sds, sizeof (*sds));
1053}
1054
1055static void
1056scan_ds_queue_sync(dsl_scan_t *scn, dmu_tx_t *tx)
1057{
1058	dsl_pool_t *dp = scn->scn_dp;
1059	spa_t *spa = dp->dp_spa;
1060	dmu_object_type_t ot = (spa_version(spa) >= SPA_VERSION_DSL_SCRUB) ?
1061	    DMU_OT_SCAN_QUEUE : DMU_OT_ZAP_OTHER;
1062
1063	ASSERT0(scn->scn_bytes_pending);
1064	ASSERT(scn->scn_phys.scn_queue_obj != 0);
1065
1066	VERIFY0(dmu_object_free(dp->dp_meta_objset,
1067	    scn->scn_phys.scn_queue_obj, tx));
1068	scn->scn_phys.scn_queue_obj = zap_create(dp->dp_meta_objset, ot,
1069	    DMU_OT_NONE, 0, tx);
1070	for (scan_ds_t *sds = avl_first(&scn->scn_queue);
1071	    sds != NULL; sds = AVL_NEXT(&scn->scn_queue, sds)) {
1072		VERIFY0(zap_add_int_key(dp->dp_meta_objset,
1073		    scn->scn_phys.scn_queue_obj, sds->sds_dsobj,
1074		    sds->sds_txg, tx));
1075	}
1076}
1077
1078/*
1079 * Computes the memory limit state that we're currently in. A sorted scan
1080 * needs quite a bit of memory to hold the sorting queue, so we need to
1081 * reasonably constrain the size so it doesn't impact overall system
1082 * performance. We compute two limits:
1083 * 1) Hard memory limit: if the amount of memory used by the sorting
1084 *	queues on a pool gets above this value, we stop the metadata
1085 *	scanning portion and start issuing the queued up and sorted
1086 *	I/Os to reduce memory usage.
1087 *	This limit is calculated as a fraction of physmem (by default 5%).
1088 *	We constrain the lower bound of the hard limit to an absolute
1089 *	minimum of zfs_scan_mem_lim_min (default: 16 MiB). We also constrain
1090 *	the upper bound to 5% of the total pool size - no chance we'll
1091 *	ever need that much memory, but just to keep the value in check.
1092 * 2) Soft memory limit: once we hit the hard memory limit, we start
1093 *	issuing I/O to reduce queue memory usage, but we don't want to
1094 *	completely empty out the queues, since we might be able to find I/Os
1095 *	that will fill in the gaps of our non-sequential IOs at some point
1096 *	in the future. So we stop the issuing of I/Os once the amount of
1097 *	memory used drops below the soft limit (at which point we stop issuing
1098 *	I/O and start scanning metadata again).
1099 *
1100 *	This limit is calculated by subtracting a fraction of the hard
1101 *	limit from the hard limit. By default this fraction is 5%, so
1102 *	the soft limit is 95% of the hard limit. We cap the size of the
1103 *	difference between the hard and soft limits at an absolute
1104 *	maximum of zfs_scan_mem_lim_soft_max (default: 128 MiB) - this is
1105 *	sufficient to not cause too frequent switching between the
1106 *	metadata scan and I/O issue (even at 2k recordsize, 128 MiB's
1107 *	worth of queues is about 1.2 GiB of on-pool data, so scanning
1108 *	that should take at least a decent fraction of a second).
1109 */
1110static boolean_t
1111dsl_scan_should_clear(dsl_scan_t *scn)
1112{
1113	vdev_t *rvd = scn->scn_dp->dp_spa->spa_root_vdev;
1114	uint64_t mlim_hard, mlim_soft, mused;
1115	uint64_t alloc = metaslab_class_get_alloc(spa_normal_class(
1116	    scn->scn_dp->dp_spa));
1117
1118	mlim_hard = MAX((physmem / zfs_scan_mem_lim_fact) * PAGESIZE,
1119	    zfs_scan_mem_lim_min);
1120	mlim_hard = MIN(mlim_hard, alloc / 20);
1121	mlim_soft = mlim_hard - MIN(mlim_hard / zfs_scan_mem_lim_soft_fact,
1122	    zfs_scan_mem_lim_soft_max);
1123	mused = 0;
1124	for (uint64_t i = 0; i < rvd->vdev_children; i++) {
1125		vdev_t *tvd = rvd->vdev_child[i];
1126		dsl_scan_io_queue_t *queue;
1127
1128		mutex_enter(&tvd->vdev_scan_io_queue_lock);
1129		queue = tvd->vdev_scan_io_queue;
1130		if (queue != NULL) {
1131			/* #extents in exts_by_size = # in exts_by_addr */
1132			mused += avl_numnodes(&queue->q_exts_by_size) *
1133			    sizeof (range_seg_t) +
1134			    avl_numnodes(&queue->q_sios_by_addr) *
1135			    sizeof (scan_io_t);
1136		}
1137		mutex_exit(&tvd->vdev_scan_io_queue_lock);
1138	}
1139
1140	dprintf("current scan memory usage: %llu bytes\n", (longlong_t)mused);
1141
1142	if (mused == 0)
1143		ASSERT0(scn->scn_bytes_pending);
1144
1145	/*
1146	 * If we are above our hard limit, we need to clear out memory.
1147	 * If we are below our soft limit, we need to accumulate sequential IOs.
1148	 * Otherwise, we should keep doing whatever we are currently doing.
1149	 */
1150	if (mused >= mlim_hard)
1151		return (B_TRUE);
1152	else if (mused < mlim_soft)
1153		return (B_FALSE);
1154	else
1155		return (scn->scn_clearing);
1156}
1157
1158static boolean_t
1159dsl_scan_check_suspend(dsl_scan_t *scn, const zbookmark_phys_t *zb)
1160{
1161	/* we never skip user/group accounting objects */
1162	if (zb && (int64_t)zb->zb_object < 0)
1163		return (B_FALSE);
1164
1165	if (scn->scn_suspending)
1166		return (B_TRUE); /* we're already suspending */
1167
1168	if (!ZB_IS_ZERO(&scn->scn_phys.scn_bookmark))
1169		return (B_FALSE); /* we're resuming */
1170
1171	/* We only know how to resume from level-0 blocks. */
1172	if (zb && zb->zb_level != 0)
1173		return (B_FALSE);
1174
1175	/*
1176	 * We suspend if:
1177	 *  - we have scanned for at least the minimum time (default 1 sec
1178	 *    for scrub, 3 sec for resilver), and either we have sufficient
1179	 *    dirty data that we are starting to write more quickly
1180	 *    (default 30%), or someone is explicitly waiting for this txg
1181	 *    to complete.
1182	 *  or
1183	 *  - the spa is shutting down because this pool is being exported
1184	 *    or the machine is rebooting.
1185	 *  or
1186	 *  - the scan queue has reached its memory use limit
1187	 */
1188	uint64_t elapsed_nanosecs = gethrtime();
1189	uint64_t curr_time_ns = gethrtime();
1190	uint64_t scan_time_ns = curr_time_ns - scn->scn_sync_start_time;
1191	uint64_t sync_time_ns = curr_time_ns -
1192	    scn->scn_dp->dp_spa->spa_sync_starttime;
1193
1194	int dirty_pct = scn->scn_dp->dp_dirty_total * 100 / zfs_dirty_data_max;
1195	int mintime = (scn->scn_phys.scn_func == POOL_SCAN_RESILVER) ?
1196	    zfs_resilver_min_time_ms : zfs_scrub_min_time_ms;
1197
1198	if ((NSEC2MSEC(scan_time_ns) > mintime &&
1199            (dirty_pct >= zfs_vdev_async_write_active_min_dirty_percent ||
1200            txg_sync_waiting(scn->scn_dp) ||
1201            NSEC2SEC(sync_time_ns) >= zfs_txg_timeout)) ||
1202            spa_shutting_down(scn->scn_dp->dp_spa) ||
1203	    (zfs_scan_strict_mem_lim && dsl_scan_should_clear(scn))) {
1204		if (zb) {
1205			dprintf("suspending at bookmark %llx/%llx/%llx/%llx\n",
1206			    (longlong_t)zb->zb_objset,
1207			    (longlong_t)zb->zb_object,
1208			    (longlong_t)zb->zb_level,
1209			    (longlong_t)zb->zb_blkid);
1210			scn->scn_phys.scn_bookmark = *zb;
1211		} else {
1212			dsl_scan_phys_t *scnp = &scn->scn_phys;
1213
1214			dprintf("suspending at at DDT bookmark "
1215			    "%llx/%llx/%llx/%llx\n",
1216			    (longlong_t)scnp->scn_ddt_bookmark.ddb_class,
1217			    (longlong_t)scnp->scn_ddt_bookmark.ddb_type,
1218			    (longlong_t)scnp->scn_ddt_bookmark.ddb_checksum,
1219			    (longlong_t)scnp->scn_ddt_bookmark.ddb_cursor);
1220		}
1221		scn->scn_suspending = B_TRUE;
1222		return (B_TRUE);
1223	}
1224	return (B_FALSE);
1225}
1226
1227typedef struct zil_scan_arg {
1228	dsl_pool_t	*zsa_dp;
1229	zil_header_t	*zsa_zh;
1230} zil_scan_arg_t;
1231
1232/* ARGSUSED */
1233static int
1234dsl_scan_zil_block(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
1235{
1236	zil_scan_arg_t *zsa = arg;
1237	dsl_pool_t *dp = zsa->zsa_dp;
1238	dsl_scan_t *scn = dp->dp_scan;
1239	zil_header_t *zh = zsa->zsa_zh;
1240	zbookmark_phys_t zb;
1241
1242	if (BP_IS_HOLE(bp) || bp->blk_birth <= scn->scn_phys.scn_cur_min_txg)
1243		return (0);
1244
1245	/*
1246	 * One block ("stubby") can be allocated a long time ago; we
1247	 * want to visit that one because it has been allocated
1248	 * (on-disk) even if it hasn't been claimed (even though for
1249	 * scrub there's nothing to do to it).
1250	 */
1251	if (claim_txg == 0 && bp->blk_birth >= spa_min_claim_txg(dp->dp_spa))
1252		return (0);
1253
1254	SET_BOOKMARK(&zb, zh->zh_log.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1255	    ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
1256
1257	VERIFY(0 == scan_funcs[scn->scn_phys.scn_func](dp, bp, &zb));
1258	return (0);
1259}
1260
1261/* ARGSUSED */
1262static int
1263dsl_scan_zil_record(zilog_t *zilog, lr_t *lrc, void *arg, uint64_t claim_txg)
1264{
1265	if (lrc->lrc_txtype == TX_WRITE) {
1266		zil_scan_arg_t *zsa = arg;
1267		dsl_pool_t *dp = zsa->zsa_dp;
1268		dsl_scan_t *scn = dp->dp_scan;
1269		zil_header_t *zh = zsa->zsa_zh;
1270		lr_write_t *lr = (lr_write_t *)lrc;
1271		blkptr_t *bp = &lr->lr_blkptr;
1272		zbookmark_phys_t zb;
1273
1274		if (BP_IS_HOLE(bp) ||
1275		    bp->blk_birth <= scn->scn_phys.scn_cur_min_txg)
1276			return (0);
1277
1278		/*
1279		 * birth can be < claim_txg if this record's txg is
1280		 * already txg sync'ed (but this log block contains
1281		 * other records that are not synced)
1282		 */
1283		if (claim_txg == 0 || bp->blk_birth < claim_txg)
1284			return (0);
1285
1286		SET_BOOKMARK(&zb, zh->zh_log.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1287		    lr->lr_foid, ZB_ZIL_LEVEL,
1288		    lr->lr_offset / BP_GET_LSIZE(bp));
1289
1290		VERIFY(0 == scan_funcs[scn->scn_phys.scn_func](dp, bp, &zb));
1291	}
1292	return (0);
1293}
1294
1295static void
1296dsl_scan_zil(dsl_pool_t *dp, zil_header_t *zh)
1297{
1298	uint64_t claim_txg = zh->zh_claim_txg;
1299	zil_scan_arg_t zsa = { dp, zh };
1300	zilog_t *zilog;
1301
1302	ASSERT(spa_writeable(dp->dp_spa));
1303
1304	/*
1305	 * We only want to visit blocks that have been claimed
1306	 * but not yet replayed.
1307	 */
1308	if (claim_txg == 0)
1309		return;
1310
1311	zilog = zil_alloc(dp->dp_meta_objset, zh);
1312
1313	(void) zil_parse(zilog, dsl_scan_zil_block, dsl_scan_zil_record, &zsa,
1314	    claim_txg);
1315
1316	zil_free(zilog);
1317}
1318
1319/*
1320 * We compare scan_prefetch_issue_ctx_t's based on their bookmarks. The idea
1321 * here is to sort the AVL tree by the order each block will be needed.
1322 */
1323static int
1324scan_prefetch_queue_compare(const void *a, const void *b)
1325{
1326	const scan_prefetch_issue_ctx_t *spic_a = a, *spic_b = b;
1327	const scan_prefetch_ctx_t *spc_a = spic_a->spic_spc;
1328	const scan_prefetch_ctx_t *spc_b = spic_b->spic_spc;
1329
1330	return (zbookmark_compare(spc_a->spc_datablkszsec,
1331	    spc_a->spc_indblkshift, spc_b->spc_datablkszsec,
1332	    spc_b->spc_indblkshift, &spic_a->spic_zb, &spic_b->spic_zb));
1333}
1334
1335static void
1336scan_prefetch_ctx_rele(scan_prefetch_ctx_t *spc, void *tag)
1337{
1338	if (refcount_remove(&spc->spc_refcnt, tag) == 0) {
1339		refcount_destroy(&spc->spc_refcnt);
1340		kmem_free(spc, sizeof (scan_prefetch_ctx_t));
1341	}
1342}
1343
1344static scan_prefetch_ctx_t *
1345scan_prefetch_ctx_create(dsl_scan_t *scn, dnode_phys_t *dnp, void *tag)
1346{
1347	scan_prefetch_ctx_t *spc;
1348
1349	spc = kmem_alloc(sizeof (scan_prefetch_ctx_t), KM_SLEEP);
1350	refcount_create(&spc->spc_refcnt);
1351	refcount_add(&spc->spc_refcnt, tag);
1352	spc->spc_scn = scn;
1353	if (dnp != NULL) {
1354		spc->spc_datablkszsec = dnp->dn_datablkszsec;
1355		spc->spc_indblkshift = dnp->dn_indblkshift;
1356		spc->spc_root = B_FALSE;
1357	} else {
1358		spc->spc_datablkszsec = 0;
1359		spc->spc_indblkshift = 0;
1360		spc->spc_root = B_TRUE;
1361	}
1362
1363	return (spc);
1364}
1365
1366static void
1367scan_prefetch_ctx_add_ref(scan_prefetch_ctx_t *spc, void *tag)
1368{
1369	refcount_add(&spc->spc_refcnt, tag);
1370}
1371
1372static boolean_t
1373dsl_scan_check_prefetch_resume(scan_prefetch_ctx_t *spc,
1374    const zbookmark_phys_t *zb)
1375{
1376	zbookmark_phys_t *last_zb = &spc->spc_scn->scn_prefetch_bookmark;
1377	dnode_phys_t tmp_dnp;
1378	dnode_phys_t *dnp = (spc->spc_root) ? NULL : &tmp_dnp;
1379
1380	if (zb->zb_objset != last_zb->zb_objset)
1381		return (B_TRUE);
1382	if ((int64_t)zb->zb_object < 0)
1383		return (B_FALSE);
1384
1385	tmp_dnp.dn_datablkszsec = spc->spc_datablkszsec;
1386	tmp_dnp.dn_indblkshift = spc->spc_indblkshift;
1387
1388	if (zbookmark_subtree_completed(dnp, zb, last_zb))
1389		return (B_TRUE);
1390
1391	return (B_FALSE);
1392}
1393
1394static void
1395dsl_scan_prefetch(scan_prefetch_ctx_t *spc, blkptr_t *bp, zbookmark_phys_t *zb)
1396{
1397	avl_index_t idx;
1398	dsl_scan_t *scn = spc->spc_scn;
1399	spa_t *spa = scn->scn_dp->dp_spa;
1400	scan_prefetch_issue_ctx_t *spic;
1401
1402	if (zfs_no_scrub_prefetch)
1403		return;
1404
1405	if (BP_IS_HOLE(bp) || bp->blk_birth <= scn->scn_phys.scn_cur_min_txg ||
1406	    (BP_GET_LEVEL(bp) == 0 && BP_GET_TYPE(bp) != DMU_OT_DNODE &&
1407	    BP_GET_TYPE(bp) != DMU_OT_OBJSET))
1408		return;
1409
1410	if (dsl_scan_check_prefetch_resume(spc, zb))
1411		return;
1412
1413	scan_prefetch_ctx_add_ref(spc, scn);
1414	spic = kmem_alloc(sizeof (scan_prefetch_issue_ctx_t), KM_SLEEP);
1415	spic->spic_spc = spc;
1416	spic->spic_bp = *bp;
1417	spic->spic_zb = *zb;
1418
1419	/*
1420	 * Add the IO to the queue of blocks to prefetch. This allows us to
1421	 * prioritize blocks that we will need first for the main traversal
1422	 * thread.
1423	 */
1424	mutex_enter(&spa->spa_scrub_lock);
1425	if (avl_find(&scn->scn_prefetch_queue, spic, &idx) != NULL) {
1426		/* this block is already queued for prefetch */
1427		kmem_free(spic, sizeof (scan_prefetch_issue_ctx_t));
1428		scan_prefetch_ctx_rele(spc, scn);
1429		mutex_exit(&spa->spa_scrub_lock);
1430		return;
1431	}
1432
1433	avl_insert(&scn->scn_prefetch_queue, spic, idx);
1434	cv_broadcast(&spa->spa_scrub_io_cv);
1435	mutex_exit(&spa->spa_scrub_lock);
1436}
1437
1438static void
1439dsl_scan_prefetch_dnode(dsl_scan_t *scn, dnode_phys_t *dnp,
1440    uint64_t objset, uint64_t object)
1441{
1442	int i;
1443	zbookmark_phys_t zb;
1444	scan_prefetch_ctx_t *spc;
1445
1446	if (dnp->dn_nblkptr == 0 && !(dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR))
1447		return;
1448
1449	SET_BOOKMARK(&zb, objset, object, 0, 0);
1450
1451	spc = scan_prefetch_ctx_create(scn, dnp, FTAG);
1452
1453	for (i = 0; i < dnp->dn_nblkptr; i++) {
1454		zb.zb_level = BP_GET_LEVEL(&dnp->dn_blkptr[i]);
1455		zb.zb_blkid = i;
1456		dsl_scan_prefetch(spc, &dnp->dn_blkptr[i], &zb);
1457	}
1458
1459	if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) {
1460		zb.zb_level = 0;
1461		zb.zb_blkid = DMU_SPILL_BLKID;
1462		dsl_scan_prefetch(spc, &dnp->dn_spill, &zb);
1463	}
1464
1465	scan_prefetch_ctx_rele(spc, FTAG);
1466}
1467
1468void
1469dsl_scan_prefetch_cb(zio_t *zio, const zbookmark_phys_t *zb, const blkptr_t *bp,
1470    arc_buf_t *buf, void *private)
1471{
1472	scan_prefetch_ctx_t *spc = private;
1473	dsl_scan_t *scn = spc->spc_scn;
1474	spa_t *spa = scn->scn_dp->dp_spa;
1475
1476	/* broadcast that the IO has completed for rate limitting purposes */
1477	mutex_enter(&spa->spa_scrub_lock);
1478	ASSERT3U(spa->spa_scrub_inflight, >=, BP_GET_PSIZE(bp));
1479	spa->spa_scrub_inflight -= BP_GET_PSIZE(bp);
1480	cv_broadcast(&spa->spa_scrub_io_cv);
1481	mutex_exit(&spa->spa_scrub_lock);
1482
1483	/* if there was an error or we are done prefetching, just cleanup */
1484	if (buf == NULL || scn->scn_suspending)
1485		goto out;
1486
1487	if (BP_GET_LEVEL(bp) > 0) {
1488		int i;
1489		blkptr_t *cbp;
1490		int epb = BP_GET_LSIZE(bp) >> SPA_BLKPTRSHIFT;
1491		zbookmark_phys_t czb;
1492
1493		for (i = 0, cbp = buf->b_data; i < epb; i++, cbp++) {
1494			SET_BOOKMARK(&czb, zb->zb_objset, zb->zb_object,
1495			    zb->zb_level - 1, zb->zb_blkid * epb + i);
1496			dsl_scan_prefetch(spc, cbp, &czb);
1497		}
1498	} else if (BP_GET_TYPE(bp) == DMU_OT_DNODE) {
1499		dnode_phys_t *cdnp = buf->b_data;
1500		int i;
1501		int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
1502
1503		for (i = 0, cdnp = buf->b_data; i < epb; i++, cdnp++) {
1504			dsl_scan_prefetch_dnode(scn, cdnp,
1505						zb->zb_objset, zb->zb_blkid * epb + i);
1506		}
1507	} else if (BP_GET_TYPE(bp) == DMU_OT_OBJSET) {
1508		objset_phys_t *osp = buf->b_data;
1509
1510		dsl_scan_prefetch_dnode(scn, &osp->os_meta_dnode,
1511		    zb->zb_objset, DMU_META_DNODE_OBJECT);
1512
1513		if (OBJSET_BUF_HAS_USERUSED(buf)) {
1514			dsl_scan_prefetch_dnode(scn,
1515			    &osp->os_groupused_dnode, zb->zb_objset,
1516			    DMU_GROUPUSED_OBJECT);
1517			dsl_scan_prefetch_dnode(scn,
1518			    &osp->os_userused_dnode, zb->zb_objset,
1519			    DMU_USERUSED_OBJECT);
1520		}
1521	}
1522
1523out:
1524	if (buf != NULL)
1525		arc_buf_destroy(buf, private);
1526	scan_prefetch_ctx_rele(spc, scn);
1527}
1528
1529/* ARGSUSED */
1530static void
1531dsl_scan_prefetch_thread(void *arg)
1532{
1533	dsl_scan_t *scn = arg;
1534	spa_t *spa = scn->scn_dp->dp_spa;
1535	vdev_t *rvd = spa->spa_root_vdev;
1536	uint64_t maxinflight = rvd->vdev_children * zfs_top_maxinflight;
1537	scan_prefetch_issue_ctx_t *spic;
1538
1539	/* loop until we are told to stop */
1540	while (!scn->scn_prefetch_stop) {
1541		arc_flags_t flags = ARC_FLAG_NOWAIT |
1542                    ARC_FLAG_PRESCIENT_PREFETCH | ARC_FLAG_PREFETCH;
1543		int zio_flags = ZIO_FLAG_CANFAIL | ZIO_FLAG_SCAN_THREAD;
1544
1545		mutex_enter(&spa->spa_scrub_lock);
1546
1547		/*
1548		 * Wait until we have an IO to issue and are not above our
1549		 * maximum in flight limit.
1550		 */
1551		while (!scn->scn_prefetch_stop &&
1552		    (avl_numnodes(&scn->scn_prefetch_queue) == 0 ||
1553		    spa->spa_scrub_inflight >= scn->scn_maxinflight_bytes)) {
1554			cv_wait(&spa->spa_scrub_io_cv, &spa->spa_scrub_lock);
1555		}
1556
1557		/* recheck if we should stop since we waited for the cv */
1558		if (scn->scn_prefetch_stop) {
1559			mutex_exit(&spa->spa_scrub_lock);
1560			break;
1561		}
1562
1563		/* remove the prefetch IO from the tree */
1564		spic = avl_first(&scn->scn_prefetch_queue);
1565		spa->spa_scrub_inflight += BP_GET_PSIZE(&spic->spic_bp);
1566		avl_remove(&scn->scn_prefetch_queue, spic);
1567
1568		mutex_exit(&spa->spa_scrub_lock);
1569
1570		/* issue the prefetch asynchronously */
1571		(void) arc_read(scn->scn_zio_root, scn->scn_dp->dp_spa,
1572		    &spic->spic_bp, dsl_scan_prefetch_cb, spic->spic_spc,
1573		    ZIO_PRIORITY_SCRUB, zio_flags, &flags, &spic->spic_zb);
1574
1575		kmem_free(spic, sizeof (scan_prefetch_issue_ctx_t));
1576	}
1577
1578	ASSERT(scn->scn_prefetch_stop);
1579
1580	/* free any prefetches we didn't get to complete */
1581	mutex_enter(&spa->spa_scrub_lock);
1582	while ((spic = avl_first(&scn->scn_prefetch_queue)) != NULL) {
1583		avl_remove(&scn->scn_prefetch_queue, spic);
1584		scan_prefetch_ctx_rele(spic->spic_spc, scn);
1585		kmem_free(spic, sizeof (scan_prefetch_issue_ctx_t));
1586	}
1587	ASSERT0(avl_numnodes(&scn->scn_prefetch_queue));
1588	mutex_exit(&spa->spa_scrub_lock);
1589}
1590
1591static boolean_t
1592dsl_scan_check_resume(dsl_scan_t *scn, const dnode_phys_t *dnp,
1593    const zbookmark_phys_t *zb)
1594{
1595	/*
1596	 * We never skip over user/group accounting objects (obj<0)
1597	 */
1598	if (!ZB_IS_ZERO(&scn->scn_phys.scn_bookmark) &&
1599	    (int64_t)zb->zb_object >= 0) {
1600		/*
1601		 * If we already visited this bp & everything below (in
1602		 * a prior txg sync), don't bother doing it again.
1603		 */
1604		if (zbookmark_subtree_completed(dnp, zb,
1605		    &scn->scn_phys.scn_bookmark))
1606			return (B_TRUE);
1607
1608		/*
1609		 * If we found the block we're trying to resume from, or
1610		 * we went past it to a different object, zero it out to
1611		 * indicate that it's OK to start checking for suspending
1612		 * again.
1613		 */
1614		if (bcmp(zb, &scn->scn_phys.scn_bookmark, sizeof (*zb)) == 0 ||
1615		    zb->zb_object > scn->scn_phys.scn_bookmark.zb_object) {
1616			dprintf("resuming at %llx/%llx/%llx/%llx\n",
1617			    (longlong_t)zb->zb_objset,
1618			    (longlong_t)zb->zb_object,
1619			    (longlong_t)zb->zb_level,
1620			    (longlong_t)zb->zb_blkid);
1621			bzero(&scn->scn_phys.scn_bookmark, sizeof (*zb));
1622		}
1623	}
1624	return (B_FALSE);
1625}
1626
1627static void dsl_scan_visitbp(blkptr_t *bp, const zbookmark_phys_t *zb,
1628    dnode_phys_t *dnp, dsl_dataset_t *ds, dsl_scan_t *scn,
1629    dmu_objset_type_t ostype, dmu_tx_t *tx);
1630static void dsl_scan_visitdnode(
1631    dsl_scan_t *, dsl_dataset_t *ds, dmu_objset_type_t ostype,
1632    dnode_phys_t *dnp, uint64_t object, dmu_tx_t *tx);
1633
1634/*
1635 * Return nonzero on i/o error.
1636 * Return new buf to write out in *bufp.
1637 */
1638static int
1639dsl_scan_recurse(dsl_scan_t *scn, dsl_dataset_t *ds, dmu_objset_type_t ostype,
1640    dnode_phys_t *dnp, const blkptr_t *bp,
1641    const zbookmark_phys_t *zb, dmu_tx_t *tx)
1642{
1643	dsl_pool_t *dp = scn->scn_dp;
1644	int zio_flags = ZIO_FLAG_CANFAIL | ZIO_FLAG_SCAN_THREAD;
1645	int err;
1646
1647	if (BP_GET_LEVEL(bp) > 0) {
1648		arc_flags_t flags = ARC_FLAG_WAIT;
1649		int i;
1650		blkptr_t *cbp;
1651		int epb = BP_GET_LSIZE(bp) >> SPA_BLKPTRSHIFT;
1652		arc_buf_t *buf;
1653
1654		err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
1655		    ZIO_PRIORITY_SCRUB, zio_flags, &flags, zb);
1656		if (err) {
1657			scn->scn_phys.scn_errors++;
1658			return (err);
1659		}
1660		for (i = 0, cbp = buf->b_data; i < epb; i++, cbp++) {
1661			zbookmark_phys_t czb;
1662
1663			SET_BOOKMARK(&czb, zb->zb_objset, zb->zb_object,
1664			    zb->zb_level - 1,
1665			    zb->zb_blkid * epb + i);
1666			dsl_scan_visitbp(cbp, &czb, dnp,
1667			    ds, scn, ostype, tx);
1668		}
1669		arc_buf_destroy(buf, &buf);
1670	} else if (BP_GET_TYPE(bp) == DMU_OT_DNODE) {
1671		arc_flags_t flags = ARC_FLAG_WAIT;
1672		dnode_phys_t *cdnp;
1673		int i;
1674		int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
1675		arc_buf_t *buf;
1676
1677		err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
1678		    ZIO_PRIORITY_SCRUB, zio_flags, &flags, zb);
1679		if (err) {
1680			scn->scn_phys.scn_errors++;
1681			return (err);
1682		}
1683		for (i = 0, cdnp = buf->b_data; i < epb; i++, cdnp++) {
1684			dsl_scan_visitdnode(scn, ds, ostype,
1685			    cdnp, zb->zb_blkid * epb + i, tx);
1686		}
1687
1688		arc_buf_destroy(buf, &buf);
1689	} else if (BP_GET_TYPE(bp) == DMU_OT_OBJSET) {
1690		arc_flags_t flags = ARC_FLAG_WAIT;
1691		objset_phys_t *osp;
1692		arc_buf_t *buf;
1693
1694		err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
1695		    ZIO_PRIORITY_SCRUB, zio_flags, &flags, zb);
1696		if (err) {
1697			scn->scn_phys.scn_errors++;
1698			return (err);
1699		}
1700
1701		osp = buf->b_data;
1702
1703		dsl_scan_visitdnode(scn, ds, osp->os_type,
1704		    &osp->os_meta_dnode, DMU_META_DNODE_OBJECT, tx);
1705
1706		if (OBJSET_BUF_HAS_USERUSED(buf)) {
1707			/*
1708			 * We also always visit user/group accounting
1709			 * objects, and never skip them, even if we are
1710			 * suspending.  This is necessary so that the space
1711			 * deltas from this txg get integrated.
1712			 */
1713			dsl_scan_visitdnode(scn, ds, osp->os_type,
1714			    &osp->os_groupused_dnode,
1715			    DMU_GROUPUSED_OBJECT, tx);
1716			dsl_scan_visitdnode(scn, ds, osp->os_type,
1717			    &osp->os_userused_dnode,
1718			    DMU_USERUSED_OBJECT, tx);
1719		}
1720		arc_buf_destroy(buf, &buf);
1721	}
1722
1723	return (0);
1724}
1725
1726static void
1727dsl_scan_visitdnode(dsl_scan_t *scn, dsl_dataset_t *ds,
1728    dmu_objset_type_t ostype, dnode_phys_t *dnp,
1729    uint64_t object, dmu_tx_t *tx)
1730{
1731	int j;
1732
1733	for (j = 0; j < dnp->dn_nblkptr; j++) {
1734		zbookmark_phys_t czb;
1735
1736		SET_BOOKMARK(&czb, ds ? ds->ds_object : 0, object,
1737		    dnp->dn_nlevels - 1, j);
1738		dsl_scan_visitbp(&dnp->dn_blkptr[j],
1739		    &czb, dnp, ds, scn, ostype, tx);
1740	}
1741
1742	if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) {
1743		zbookmark_phys_t czb;
1744		SET_BOOKMARK(&czb, ds ? ds->ds_object : 0, object,
1745		    0, DMU_SPILL_BLKID);
1746		dsl_scan_visitbp(&dnp->dn_spill,
1747		    &czb, dnp, ds, scn, ostype, tx);
1748	}
1749}
1750
1751/*
1752 * The arguments are in this order because mdb can only print the
1753 * first 5; we want them to be useful.
1754 */
1755static void
1756dsl_scan_visitbp(blkptr_t *bp, const zbookmark_phys_t *zb,
1757    dnode_phys_t *dnp, dsl_dataset_t *ds, dsl_scan_t *scn,
1758    dmu_objset_type_t ostype, dmu_tx_t *tx)
1759{
1760	dsl_pool_t *dp = scn->scn_dp;
1761	blkptr_t *bp_toread = NULL;
1762
1763	if (dsl_scan_check_suspend(scn, zb))
1764		return;
1765
1766	if (dsl_scan_check_resume(scn, dnp, zb))
1767		return;
1768
1769	scn->scn_visited_this_txg++;
1770
1771	dprintf_bp(bp,
1772	    "visiting ds=%p/%llu zb=%llx/%llx/%llx/%llx bp=%p",
1773	    ds, ds ? ds->ds_object : 0,
1774	    zb->zb_objset, zb->zb_object, zb->zb_level, zb->zb_blkid,
1775	    bp);
1776
1777	if (BP_IS_HOLE(bp)) {
1778		scn->scn_holes_this_txg++;
1779		return;
1780	}
1781
1782	if (bp->blk_birth <= scn->scn_phys.scn_cur_min_txg) {
1783		scn->scn_lt_min_this_txg++;
1784		return;
1785	}
1786
1787	bp_toread = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
1788	*bp_toread = *bp;
1789
1790	if (dsl_scan_recurse(scn, ds, ostype, dnp, bp_toread, zb, tx) != 0)
1791		return;
1792
1793	/*
1794	 * If dsl_scan_ddt() has already visited this block, it will have
1795	 * already done any translations or scrubbing, so don't call the
1796	 * callback again.
1797	 */
1798	if (ddt_class_contains(dp->dp_spa,
1799	    scn->scn_phys.scn_ddt_class_max, bp)) {
1800		scn->scn_ddt_contained_this_txg++;
1801		goto out;
1802	}
1803
1804	/*
1805	 * If this block is from the future (after cur_max_txg), then we
1806	 * are doing this on behalf of a deleted snapshot, and we will
1807	 * revisit the future block on the next pass of this dataset.
1808	 * Don't scan it now unless we need to because something
1809	 * under it was modified.
1810	 */
1811	if (BP_PHYSICAL_BIRTH(bp) > scn->scn_phys.scn_cur_max_txg) {
1812		scn->scn_gt_max_this_txg++;
1813		goto out;
1814	}
1815
1816	scan_funcs[scn->scn_phys.scn_func](dp, bp, zb);
1817out:
1818	kmem_free(bp_toread, sizeof (blkptr_t));
1819}
1820
1821static void
1822dsl_scan_visit_rootbp(dsl_scan_t *scn, dsl_dataset_t *ds, blkptr_t *bp,
1823    dmu_tx_t *tx)
1824{
1825	zbookmark_phys_t zb;
1826	scan_prefetch_ctx_t *spc;
1827
1828	SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
1829	    ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
1830
1831	if (ZB_IS_ZERO(&scn->scn_phys.scn_bookmark)) {
1832		SET_BOOKMARK(&scn->scn_prefetch_bookmark,
1833		    zb.zb_objset, 0, 0, 0);
1834	} else {
1835		scn->scn_prefetch_bookmark = scn->scn_phys.scn_bookmark;
1836	}
1837
1838	scn->scn_objsets_visited_this_txg++;
1839
1840	spc = scan_prefetch_ctx_create(scn, NULL, FTAG);
1841	dsl_scan_prefetch(spc, bp, &zb);
1842	scan_prefetch_ctx_rele(spc, FTAG);
1843
1844	dsl_scan_visitbp(bp, &zb, NULL, ds, scn, DMU_OST_NONE, tx);
1845
1846	dprintf_ds(ds, "finished scan%s", "");
1847}
1848
1849static void
1850ds_destroyed_scn_phys(dsl_dataset_t *ds, dsl_scan_phys_t *scn_phys)
1851{
1852	if (scn_phys->scn_bookmark.zb_objset == ds->ds_object) {
1853		if (ds->ds_is_snapshot) {
1854			/*
1855			 * Note:
1856			 *  - scn_cur_{min,max}_txg stays the same.
1857			 *  - Setting the flag is not really necessary if
1858			 *    scn_cur_max_txg == scn_max_txg, because there
1859			 *    is nothing after this snapshot that we care
1860			 *    about.  However, we set it anyway and then
1861			 *    ignore it when we retraverse it in
1862			 *    dsl_scan_visitds().
1863			 */
1864			scn_phys->scn_bookmark.zb_objset =
1865			    dsl_dataset_phys(ds)->ds_next_snap_obj;
1866			zfs_dbgmsg("destroying ds %llu; currently traversing; "
1867			    "reset zb_objset to %llu",
1868			    (u_longlong_t)ds->ds_object,
1869			    (u_longlong_t)dsl_dataset_phys(ds)->
1870			    ds_next_snap_obj);
1871			scn_phys->scn_flags |= DSF_VISIT_DS_AGAIN;
1872		} else {
1873			SET_BOOKMARK(&scn_phys->scn_bookmark,
1874			    ZB_DESTROYED_OBJSET, 0, 0, 0);
1875			zfs_dbgmsg("destroying ds %llu; currently traversing; "
1876			    "reset bookmark to -1,0,0,0",
1877			    (u_longlong_t)ds->ds_object);
1878		}
1879	}
1880}
1881
1882/*
1883 * Invoked when a dataset is destroyed. We need to make sure that:
1884 *
1885 * 1) If it is the dataset that was currently being scanned, we write
1886 *	a new dsl_scan_phys_t and marking the objset reference in it
1887 *	as destroyed.
1888 * 2) Remove it from the work queue, if it was present.
1889 *
1890 * If the dataset was actually a snapshot, instead of marking the dataset
1891 * as destroyed, we instead substitute the next snapshot in line.
1892 */
1893void
1894dsl_scan_ds_destroyed(dsl_dataset_t *ds, dmu_tx_t *tx)
1895{
1896	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1897	dsl_scan_t *scn = dp->dp_scan;
1898	uint64_t mintxg;
1899
1900	if (!dsl_scan_is_running(scn))
1901		return;
1902
1903	ds_destroyed_scn_phys(ds, &scn->scn_phys);
1904	ds_destroyed_scn_phys(ds, &scn->scn_phys_cached);
1905
1906	if (scan_ds_queue_contains(scn, ds->ds_object, &mintxg)) {
1907		scan_ds_queue_remove(scn, ds->ds_object);
1908		if (ds->ds_is_snapshot)
1909			scan_ds_queue_insert(scn,
1910			    dsl_dataset_phys(ds)->ds_next_snap_obj, mintxg);
1911	}
1912
1913	if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
1914	    ds->ds_object, &mintxg) == 0) {
1915		ASSERT3U(dsl_dataset_phys(ds)->ds_num_children, <=, 1);
1916		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
1917		    scn->scn_phys.scn_queue_obj, ds->ds_object, tx));
1918		if (ds->ds_is_snapshot) {
1919			/*
1920			 * We keep the same mintxg; it could be >
1921			 * ds_creation_txg if the previous snapshot was
1922			 * deleted too.
1923			 */
1924			VERIFY(zap_add_int_key(dp->dp_meta_objset,
1925			    scn->scn_phys.scn_queue_obj,
1926			    dsl_dataset_phys(ds)->ds_next_snap_obj,
1927			    mintxg, tx) == 0);
1928			zfs_dbgmsg("destroying ds %llu; in queue; "
1929			    "replacing with %llu",
1930			    (u_longlong_t)ds->ds_object,
1931			    (u_longlong_t)dsl_dataset_phys(ds)->
1932			    ds_next_snap_obj);
1933		} else {
1934			zfs_dbgmsg("destroying ds %llu; in queue; removing",
1935			    (u_longlong_t)ds->ds_object);
1936		}
1937	}
1938
1939	/*
1940	 * dsl_scan_sync() should be called after this, and should sync
1941	 * out our changed state, but just to be safe, do it here.
1942	 */
1943	dsl_scan_sync_state(scn, tx, SYNC_CACHED);
1944}
1945
1946static void
1947ds_snapshotted_bookmark(dsl_dataset_t *ds, zbookmark_phys_t *scn_bookmark)
1948{
1949	if (scn_bookmark->zb_objset == ds->ds_object) {
1950		scn_bookmark->zb_objset =
1951		    dsl_dataset_phys(ds)->ds_prev_snap_obj;
1952		zfs_dbgmsg("snapshotting ds %llu; currently traversing; "
1953		    "reset zb_objset to %llu",
1954		    (u_longlong_t)ds->ds_object,
1955		    (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
1956	}
1957}
1958
1959/*
1960 * Called when a dataset is snapshotted. If we were currently traversing
1961 * this snapshot, we reset our bookmark to point at the newly created
1962 * snapshot. We also modify our work queue to remove the old snapshot and
1963 * replace with the new one.
1964 */
1965void
1966dsl_scan_ds_snapshotted(dsl_dataset_t *ds, dmu_tx_t *tx)
1967{
1968	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1969	dsl_scan_t *scn = dp->dp_scan;
1970	uint64_t mintxg;
1971
1972	if (!dsl_scan_is_running(scn))
1973		return;
1974
1975	ASSERT(dsl_dataset_phys(ds)->ds_prev_snap_obj != 0);
1976
1977	ds_snapshotted_bookmark(ds, &scn->scn_phys.scn_bookmark);
1978	ds_snapshotted_bookmark(ds, &scn->scn_phys_cached.scn_bookmark);
1979
1980	if (scan_ds_queue_contains(scn, ds->ds_object, &mintxg)) {
1981		scan_ds_queue_remove(scn, ds->ds_object);
1982		scan_ds_queue_insert(scn,
1983		    dsl_dataset_phys(ds)->ds_prev_snap_obj, mintxg);
1984	}
1985
1986	if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
1987	    ds->ds_object, &mintxg) == 0) {
1988		VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
1989		    scn->scn_phys.scn_queue_obj, ds->ds_object, tx));
1990		VERIFY(zap_add_int_key(dp->dp_meta_objset,
1991		    scn->scn_phys.scn_queue_obj,
1992		    dsl_dataset_phys(ds)->ds_prev_snap_obj, mintxg, tx) == 0);
1993		zfs_dbgmsg("snapshotting ds %llu; in queue; "
1994		    "replacing with %llu",
1995		    (u_longlong_t)ds->ds_object,
1996		    (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
1997	}
1998
1999	dsl_scan_sync_state(scn, tx, SYNC_CACHED);
2000}
2001
2002static void
2003ds_clone_swapped_bookmark(dsl_dataset_t *ds1, dsl_dataset_t *ds2,
2004    zbookmark_phys_t *scn_bookmark)
2005{
2006	if (scn_bookmark->zb_objset == ds1->ds_object) {
2007		scn_bookmark->zb_objset = ds2->ds_object;
2008		zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
2009		    "reset zb_objset to %llu",
2010		    (u_longlong_t)ds1->ds_object,
2011		    (u_longlong_t)ds2->ds_object);
2012	} else if (scn_bookmark->zb_objset == ds2->ds_object) {
2013		scn_bookmark->zb_objset = ds1->ds_object;
2014		zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
2015		    "reset zb_objset to %llu",
2016		    (u_longlong_t)ds2->ds_object,
2017		    (u_longlong_t)ds1->ds_object);
2018	}
2019}
2020
2021/*
2022 * Called when an origin dataset and its clone are swapped.  If we were
2023 * currently traversing the dataset, we need to switch to traversing the
2024 * newly promoted clone.
2025 */
2026void
2027dsl_scan_ds_clone_swapped(dsl_dataset_t *ds1, dsl_dataset_t *ds2, dmu_tx_t *tx)
2028{
2029	dsl_pool_t *dp = ds1->ds_dir->dd_pool;
2030	dsl_scan_t *scn = dp->dp_scan;
2031	uint64_t mintxg1, mintxg2;
2032	boolean_t ds1_queued, ds2_queued;
2033
2034	if (!dsl_scan_is_running(scn))
2035		return;
2036
2037	ds_clone_swapped_bookmark(ds1, ds2, &scn->scn_phys.scn_bookmark);
2038	ds_clone_swapped_bookmark(ds1, ds2, &scn->scn_phys_cached.scn_bookmark);
2039
2040	/*
2041	 * Handle the in-memory scan queue.
2042	 */
2043	ds1_queued = scan_ds_queue_contains(scn, ds1->ds_object, &mintxg1);
2044	ds2_queued = scan_ds_queue_contains(scn, ds2->ds_object, &mintxg2);
2045
2046	/* Sanity checking. */
2047	if (ds1_queued) {
2048		ASSERT3U(mintxg1, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
2049		ASSERT3U(mintxg1, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
2050	}
2051	if (ds2_queued) {
2052		ASSERT3U(mintxg2, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
2053		ASSERT3U(mintxg2, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
2054	}
2055
2056	if (ds1_queued && ds2_queued) {
2057		/*
2058		 * If both are queued, we don't need to do anything.
2059		 * The swapping code below would not handle this case correctly,
2060		 * since we can't insert ds2 if it is already there. That's
2061		 * because scan_ds_queue_insert() prohibits a duplicate insert
2062		 * and panics.
2063		 */
2064	} else if (ds1_queued) {
2065		scan_ds_queue_remove(scn, ds1->ds_object);
2066		scan_ds_queue_insert(scn, ds2->ds_object, mintxg1);
2067	} else if (ds2_queued) {
2068		scan_ds_queue_remove(scn, ds2->ds_object);
2069		scan_ds_queue_insert(scn, ds1->ds_object, mintxg2);
2070	}
2071
2072	/*
2073	 * Handle the on-disk scan queue.
2074	 * The on-disk state is an out-of-date version of the in-memory state,
2075	 * so the in-memory and on-disk values for ds1_queued and ds2_queued may
2076	 * be different. Therefore we need to apply the swap logic to the
2077	 * on-disk state independently of the in-memory state.
2078	 */
2079	ds1_queued = zap_lookup_int_key(dp->dp_meta_objset,
2080	    scn->scn_phys.scn_queue_obj, ds1->ds_object, &mintxg1) == 0;
2081	ds2_queued = zap_lookup_int_key(dp->dp_meta_objset,
2082	    scn->scn_phys.scn_queue_obj, ds2->ds_object, &mintxg2) == 0;
2083
2084	/* Sanity checking. */
2085	if (ds1_queued) {
2086		ASSERT3U(mintxg1, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
2087		ASSERT3U(mintxg1, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
2088	}
2089	if (ds2_queued) {
2090		ASSERT3U(mintxg2, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
2091		ASSERT3U(mintxg2, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
2092	}
2093
2094	if (ds1_queued && ds2_queued) {
2095		/*
2096		 * If both are queued, we don't need to do anything.
2097		 * Alternatively, we could check for EEXIST from
2098		 * zap_add_int_key() and back out to the original state, but
2099		 * that would be more work than checking for this case upfront.
2100		 */
2101	} else if (ds1_queued) {
2102		VERIFY3S(0, ==, zap_remove_int(dp->dp_meta_objset,
2103		    scn->scn_phys.scn_queue_obj, ds1->ds_object, tx));
2104		VERIFY3S(0, ==, zap_add_int_key(dp->dp_meta_objset,
2105		    scn->scn_phys.scn_queue_obj, ds2->ds_object, mintxg1, tx));
2106		zfs_dbgmsg("clone_swap ds %llu; in queue; "
2107		    "replacing with %llu",
2108		    (u_longlong_t)ds1->ds_object,
2109		    (u_longlong_t)ds2->ds_object);
2110	} else if (ds2_queued) {
2111		VERIFY3S(0, ==, zap_remove_int(dp->dp_meta_objset,
2112		    scn->scn_phys.scn_queue_obj, ds2->ds_object, tx));
2113		VERIFY3S(0, ==, zap_add_int_key(dp->dp_meta_objset,
2114		    scn->scn_phys.scn_queue_obj, ds1->ds_object, mintxg2, tx));
2115		zfs_dbgmsg("clone_swap ds %llu; in queue; "
2116		    "replacing with %llu",
2117		    (u_longlong_t)ds2->ds_object,
2118		    (u_longlong_t)ds1->ds_object);
2119	}
2120
2121	dsl_scan_sync_state(scn, tx, SYNC_CACHED);
2122}
2123
2124/* ARGSUSED */
2125static int
2126enqueue_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
2127{
2128	uint64_t originobj = *(uint64_t *)arg;
2129	dsl_dataset_t *ds;
2130	int err;
2131	dsl_scan_t *scn = dp->dp_scan;
2132
2133	if (dsl_dir_phys(hds->ds_dir)->dd_origin_obj != originobj)
2134		return (0);
2135
2136	err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
2137	if (err)
2138		return (err);
2139
2140	while (dsl_dataset_phys(ds)->ds_prev_snap_obj != originobj) {
2141		dsl_dataset_t *prev;
2142		err = dsl_dataset_hold_obj(dp,
2143		    dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
2144
2145		dsl_dataset_rele(ds, FTAG);
2146		if (err)
2147			return (err);
2148		ds = prev;
2149	}
2150	scan_ds_queue_insert(scn, ds->ds_object,
2151	    dsl_dataset_phys(ds)->ds_prev_snap_txg);
2152	dsl_dataset_rele(ds, FTAG);
2153	return (0);
2154}
2155
2156static void
2157dsl_scan_visitds(dsl_scan_t *scn, uint64_t dsobj, dmu_tx_t *tx)
2158{
2159	dsl_pool_t *dp = scn->scn_dp;
2160	dsl_dataset_t *ds;
2161
2162	VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
2163
2164	if (scn->scn_phys.scn_cur_min_txg >=
2165	    scn->scn_phys.scn_max_txg) {
2166		/*
2167		 * This can happen if this snapshot was created after the
2168		 * scan started, and we already completed a previous snapshot
2169		 * that was created after the scan started.  This snapshot
2170		 * only references blocks with:
2171		 *
2172		 *	birth < our ds_creation_txg
2173		 *	cur_min_txg is no less than ds_creation_txg.
2174		 *	We have already visited these blocks.
2175		 * or
2176		 *	birth > scn_max_txg
2177		 *	The scan requested not to visit these blocks.
2178		 *
2179		 * Subsequent snapshots (and clones) can reference our
2180		 * blocks, or blocks with even higher birth times.
2181		 * Therefore we do not need to visit them either,
2182		 * so we do not add them to the work queue.
2183		 *
2184		 * Note that checking for cur_min_txg >= cur_max_txg
2185		 * is not sufficient, because in that case we may need to
2186		 * visit subsequent snapshots.  This happens when min_txg > 0,
2187		 * which raises cur_min_txg.  In this case we will visit
2188		 * this dataset but skip all of its blocks, because the
2189		 * rootbp's birth time is < cur_min_txg.  Then we will
2190		 * add the next snapshots/clones to the work queue.
2191		 */
2192		char *dsname = kmem_alloc(MAXNAMELEN, KM_SLEEP);
2193		dsl_dataset_name(ds, dsname);
2194		zfs_dbgmsg("scanning dataset %llu (%s) is unnecessary because "
2195		    "cur_min_txg (%llu) >= max_txg (%llu)",
2196		    (longlong_t)dsobj, dsname,
2197		    (longlong_t)scn->scn_phys.scn_cur_min_txg,
2198		    (longlong_t)scn->scn_phys.scn_max_txg);
2199		kmem_free(dsname, MAXNAMELEN);
2200
2201		goto out;
2202	}
2203
2204	/*
2205	 * Only the ZIL in the head (non-snapshot) is valid. Even though
2206	 * snapshots can have ZIL block pointers (which may be the same
2207	 * BP as in the head), they must be ignored. In addition, $ORIGIN
2208	 * doesn't have a objset (i.e. its ds_bp is a hole) so we don't
2209	 * need to look for a ZIL in it either. So we traverse the ZIL here,
2210	 * rather than in scan_recurse(), because the regular snapshot
2211	 * block-sharing rules don't apply to it.
2212	 */
2213	if (DSL_SCAN_IS_SCRUB_RESILVER(scn) && !dsl_dataset_is_snapshot(ds) &&
2214	    (dp->dp_origin_snap == NULL ||
2215	    ds->ds_dir != dp->dp_origin_snap->ds_dir)) {
2216		objset_t *os;
2217		if (dmu_objset_from_ds(ds, &os) != 0) {
2218			goto out;
2219		}
2220		dsl_scan_zil(dp, &os->os_zil_header);
2221	}
2222
2223	/*
2224	 * Iterate over the bps in this ds.
2225	 */
2226	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2227	rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
2228	dsl_scan_visit_rootbp(scn, ds, &dsl_dataset_phys(ds)->ds_bp, tx);
2229	rrw_exit(&ds->ds_bp_rwlock, FTAG);
2230
2231	char *dsname = kmem_alloc(ZFS_MAX_DATASET_NAME_LEN, KM_SLEEP);
2232	dsl_dataset_name(ds, dsname);
2233	zfs_dbgmsg("scanned dataset %llu (%s) with min=%llu max=%llu; "
2234	    "suspending=%u",
2235	    (longlong_t)dsobj, dsname,
2236	    (longlong_t)scn->scn_phys.scn_cur_min_txg,
2237	    (longlong_t)scn->scn_phys.scn_cur_max_txg,
2238	    (int)scn->scn_suspending);
2239	kmem_free(dsname, ZFS_MAX_DATASET_NAME_LEN);
2240
2241	if (scn->scn_suspending)
2242		goto out;
2243
2244	/*
2245	 * We've finished this pass over this dataset.
2246	 */
2247
2248	/*
2249	 * If we did not completely visit this dataset, do another pass.
2250	 */
2251	if (scn->scn_phys.scn_flags & DSF_VISIT_DS_AGAIN) {
2252		zfs_dbgmsg("incomplete pass; visiting again");
2253		scn->scn_phys.scn_flags &= ~DSF_VISIT_DS_AGAIN;
2254		scan_ds_queue_insert(scn, ds->ds_object,
2255		    scn->scn_phys.scn_cur_max_txg);
2256		goto out;
2257	}
2258
2259	/*
2260	 * Add descendent datasets to work queue.
2261	 */
2262	if (dsl_dataset_phys(ds)->ds_next_snap_obj != 0) {
2263		scan_ds_queue_insert(scn,
2264		    dsl_dataset_phys(ds)->ds_next_snap_obj,
2265		    dsl_dataset_phys(ds)->ds_creation_txg);
2266	}
2267	if (dsl_dataset_phys(ds)->ds_num_children > 1) {
2268		boolean_t usenext = B_FALSE;
2269		if (dsl_dataset_phys(ds)->ds_next_clones_obj != 0) {
2270			uint64_t count;
2271			/*
2272			 * A bug in a previous version of the code could
2273			 * cause upgrade_clones_cb() to not set
2274			 * ds_next_snap_obj when it should, leading to a
2275			 * missing entry.  Therefore we can only use the
2276			 * next_clones_obj when its count is correct.
2277			 */
2278			int err = zap_count(dp->dp_meta_objset,
2279			    dsl_dataset_phys(ds)->ds_next_clones_obj, &count);
2280			if (err == 0 &&
2281			    count == dsl_dataset_phys(ds)->ds_num_children - 1)
2282				usenext = B_TRUE;
2283		}
2284
2285		if (usenext) {
2286			zap_cursor_t zc;
2287			zap_attribute_t za;
2288			for (zap_cursor_init(&zc, dp->dp_meta_objset,
2289			    dsl_dataset_phys(ds)->ds_next_clones_obj);
2290			    zap_cursor_retrieve(&zc, &za) == 0;
2291			    (void) zap_cursor_advance(&zc)) {
2292				scan_ds_queue_insert(scn,
2293				    zfs_strtonum(za.za_name, NULL),
2294				    dsl_dataset_phys(ds)->ds_creation_txg);
2295			}
2296			zap_cursor_fini(&zc);
2297		} else {
2298			VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
2299			    enqueue_clones_cb, &ds->ds_object,
2300			    DS_FIND_CHILDREN));
2301		}
2302	}
2303
2304out:
2305	dsl_dataset_rele(ds, FTAG);
2306}
2307
2308/* ARGSUSED */
2309static int
2310enqueue_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
2311{
2312	dsl_dataset_t *ds;
2313	int err;
2314	dsl_scan_t *scn = dp->dp_scan;
2315
2316	err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
2317	if (err)
2318		return (err);
2319
2320	while (dsl_dataset_phys(ds)->ds_prev_snap_obj != 0) {
2321		dsl_dataset_t *prev;
2322		err = dsl_dataset_hold_obj(dp,
2323		    dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
2324		if (err) {
2325			dsl_dataset_rele(ds, FTAG);
2326			return (err);
2327		}
2328
2329		/*
2330		 * If this is a clone, we don't need to worry about it for now.
2331		 */
2332		if (dsl_dataset_phys(prev)->ds_next_snap_obj != ds->ds_object) {
2333			dsl_dataset_rele(ds, FTAG);
2334			dsl_dataset_rele(prev, FTAG);
2335			return (0);
2336		}
2337		dsl_dataset_rele(ds, FTAG);
2338		ds = prev;
2339	}
2340
2341	scan_ds_queue_insert(scn, ds->ds_object,
2342	    dsl_dataset_phys(ds)->ds_prev_snap_txg);
2343	dsl_dataset_rele(ds, FTAG);
2344	return (0);
2345}
2346
2347/* ARGSUSED */
2348void
2349dsl_scan_ddt_entry(dsl_scan_t *scn, enum zio_checksum checksum,
2350    ddt_entry_t *dde, dmu_tx_t *tx)
2351{
2352	const ddt_key_t *ddk = &dde->dde_key;
2353	ddt_phys_t *ddp = dde->dde_phys;
2354	blkptr_t bp;
2355	zbookmark_phys_t zb = { 0 };
2356	int p;
2357
2358	if (!dsl_scan_is_running(scn))
2359		return;
2360
2361	for (p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
2362		if (ddp->ddp_phys_birth == 0 ||
2363		    ddp->ddp_phys_birth > scn->scn_phys.scn_max_txg)
2364			continue;
2365		ddt_bp_create(checksum, ddk, ddp, &bp);
2366
2367		scn->scn_visited_this_txg++;
2368		scan_funcs[scn->scn_phys.scn_func](scn->scn_dp, &bp, &zb);
2369	}
2370}
2371
2372/*
2373 * Scrub/dedup interaction.
2374 *
2375 * If there are N references to a deduped block, we don't want to scrub it
2376 * N times -- ideally, we should scrub it exactly once.
2377 *
2378 * We leverage the fact that the dde's replication class (enum ddt_class)
2379 * is ordered from highest replication class (DDT_CLASS_DITTO) to lowest
2380 * (DDT_CLASS_UNIQUE) so that we may walk the DDT in that order.
2381 *
2382 * To prevent excess scrubbing, the scrub begins by walking the DDT
2383 * to find all blocks with refcnt > 1, and scrubs each of these once.
2384 * Since there are two replication classes which contain blocks with
2385 * refcnt > 1, we scrub the highest replication class (DDT_CLASS_DITTO) first.
2386 * Finally the top-down scrub begins, only visiting blocks with refcnt == 1.
2387 *
2388 * There would be nothing more to say if a block's refcnt couldn't change
2389 * during a scrub, but of course it can so we must account for changes
2390 * in a block's replication class.
2391 *
2392 * Here's an example of what can occur:
2393 *
2394 * If a block has refcnt > 1 during the DDT scrub phase, but has refcnt == 1
2395 * when visited during the top-down scrub phase, it will be scrubbed twice.
2396 * This negates our scrub optimization, but is otherwise harmless.
2397 *
2398 * If a block has refcnt == 1 during the DDT scrub phase, but has refcnt > 1
2399 * on each visit during the top-down scrub phase, it will never be scrubbed.
2400 * To catch this, ddt_sync_entry() notifies the scrub code whenever a block's
2401 * reference class transitions to a higher level (i.e DDT_CLASS_UNIQUE to
2402 * DDT_CLASS_DUPLICATE); if it transitions from refcnt == 1 to refcnt > 1
2403 * while a scrub is in progress, it scrubs the block right then.
2404 */
2405static void
2406dsl_scan_ddt(dsl_scan_t *scn, dmu_tx_t *tx)
2407{
2408	ddt_bookmark_t *ddb = &scn->scn_phys.scn_ddt_bookmark;
2409	ddt_entry_t dde = { 0 };
2410	int error;
2411	uint64_t n = 0;
2412
2413	while ((error = ddt_walk(scn->scn_dp->dp_spa, ddb, &dde)) == 0) {
2414		ddt_t *ddt;
2415
2416		if (ddb->ddb_class > scn->scn_phys.scn_ddt_class_max)
2417			break;
2418		dprintf("visiting ddb=%llu/%llu/%llu/%llx\n",
2419		    (longlong_t)ddb->ddb_class,
2420		    (longlong_t)ddb->ddb_type,
2421		    (longlong_t)ddb->ddb_checksum,
2422		    (longlong_t)ddb->ddb_cursor);
2423
2424		/* There should be no pending changes to the dedup table */
2425		ddt = scn->scn_dp->dp_spa->spa_ddt[ddb->ddb_checksum];
2426		ASSERT(avl_first(&ddt->ddt_tree) == NULL);
2427
2428		dsl_scan_ddt_entry(scn, ddb->ddb_checksum, &dde, tx);
2429		n++;
2430
2431		if (dsl_scan_check_suspend(scn, NULL))
2432			break;
2433	}
2434
2435	zfs_dbgmsg("scanned %llu ddt entries with class_max = %u; "
2436	    "suspending=%u", (longlong_t)n,
2437	    (int)scn->scn_phys.scn_ddt_class_max, (int)scn->scn_suspending);
2438
2439	ASSERT(error == 0 || error == ENOENT);
2440	ASSERT(error != ENOENT ||
2441	    ddb->ddb_class > scn->scn_phys.scn_ddt_class_max);
2442}
2443
2444static uint64_t
2445dsl_scan_ds_maxtxg(dsl_dataset_t *ds)
2446{
2447	uint64_t smt = ds->ds_dir->dd_pool->dp_scan->scn_phys.scn_max_txg;
2448	if (ds->ds_is_snapshot)
2449		return (MIN(smt, dsl_dataset_phys(ds)->ds_creation_txg));
2450	return (smt);
2451}
2452
2453static void
2454dsl_scan_visit(dsl_scan_t *scn, dmu_tx_t *tx)
2455{
2456	scan_ds_t *sds;
2457	dsl_pool_t *dp = scn->scn_dp;
2458
2459	if (scn->scn_phys.scn_ddt_bookmark.ddb_class <=
2460	    scn->scn_phys.scn_ddt_class_max) {
2461		scn->scn_phys.scn_cur_min_txg = scn->scn_phys.scn_min_txg;
2462		scn->scn_phys.scn_cur_max_txg = scn->scn_phys.scn_max_txg;
2463		dsl_scan_ddt(scn, tx);
2464		if (scn->scn_suspending)
2465			return;
2466	}
2467
2468	if (scn->scn_phys.scn_bookmark.zb_objset == DMU_META_OBJSET) {
2469		/* First do the MOS & ORIGIN */
2470
2471		scn->scn_phys.scn_cur_min_txg = scn->scn_phys.scn_min_txg;
2472		scn->scn_phys.scn_cur_max_txg = scn->scn_phys.scn_max_txg;
2473		dsl_scan_visit_rootbp(scn, NULL,
2474		    &dp->dp_meta_rootbp, tx);
2475		spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
2476		if (scn->scn_suspending)
2477			return;
2478
2479		if (spa_version(dp->dp_spa) < SPA_VERSION_DSL_SCRUB) {
2480			VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
2481			    enqueue_cb, NULL, DS_FIND_CHILDREN));
2482		} else {
2483			dsl_scan_visitds(scn,
2484			    dp->dp_origin_snap->ds_object, tx);
2485		}
2486		ASSERT(!scn->scn_suspending);
2487	} else if (scn->scn_phys.scn_bookmark.zb_objset !=
2488	    ZB_DESTROYED_OBJSET) {
2489		uint64_t dsobj = scn->scn_phys.scn_bookmark.zb_objset;
2490		/*
2491		 * If we were suspended, continue from here. Note if the
2492		 * ds we were suspended on was deleted, the zb_objset may
2493		 * be -1, so we will skip this and find a new objset
2494		 * below.
2495		 */
2496		dsl_scan_visitds(scn, dsobj, tx);
2497		if (scn->scn_suspending)
2498			return;
2499	}
2500
2501	/*
2502	 * In case we suspended right at the end of the ds, zero the
2503	 * bookmark so we don't think that we're still trying to resume.
2504	 */
2505	bzero(&scn->scn_phys.scn_bookmark, sizeof (zbookmark_phys_t));
2506
2507	/*
2508	 * Keep pulling things out of the dataset avl queue. Updates to the
2509	 * persistent zap-object-as-queue happen only at checkpoints.
2510	 */
2511	while ((sds = avl_first(&scn->scn_queue)) != NULL) {
2512		dsl_dataset_t *ds;
2513		uint64_t dsobj = sds->sds_dsobj;
2514		uint64_t txg = sds->sds_txg;
2515
2516		/* dequeue and free the ds from the queue */
2517		scan_ds_queue_remove(scn, dsobj);
2518		sds = NULL;	/* must not be touched after removal */
2519
2520		/* Set up min / max txg */
2521		VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
2522		if (txg != 0) {
2523			scn->scn_phys.scn_cur_min_txg =
2524			    MAX(scn->scn_phys.scn_min_txg, txg);
2525		} else {
2526			scn->scn_phys.scn_cur_min_txg =
2527			    MAX(scn->scn_phys.scn_min_txg,
2528			    dsl_dataset_phys(ds)->ds_prev_snap_txg);
2529		}
2530		scn->scn_phys.scn_cur_max_txg = dsl_scan_ds_maxtxg(ds);
2531		dsl_dataset_rele(ds, FTAG);
2532
2533		dsl_scan_visitds(scn, dsobj, tx);
2534		if (scn->scn_suspending)
2535			return;
2536	}
2537	/* No more objsets to fetch, we're done */
2538	scn->scn_phys.scn_bookmark.zb_objset = ZB_DESTROYED_OBJSET;
2539	ASSERT0(scn->scn_suspending);
2540}
2541
2542static uint64_t
2543dsl_scan_count_leaves(vdev_t *vd)
2544{
2545	uint64_t i, leaves = 0;
2546
2547	/* we only count leaves that belong to the main pool and are readable */
2548	if (vd->vdev_islog || vd->vdev_isspare ||
2549	    vd->vdev_isl2cache || !vdev_readable(vd))
2550		return (0);
2551
2552	if (vd->vdev_ops->vdev_op_leaf)
2553		return (1);
2554
2555	for (i = 0; i < vd->vdev_children; i++) {
2556		leaves += dsl_scan_count_leaves(vd->vdev_child[i]);
2557	}
2558
2559	return (leaves);
2560}
2561
2562
2563static void
2564scan_io_queues_update_zio_stats(dsl_scan_io_queue_t *q, const blkptr_t *bp)
2565{
2566	int i;
2567	uint64_t cur_size = 0;
2568
2569	for (i = 0; i < BP_GET_NDVAS(bp); i++) {
2570		cur_size += DVA_GET_ASIZE(&bp->blk_dva[i]);
2571	}
2572
2573	q->q_total_zio_size_this_txg += cur_size;
2574	q->q_zios_this_txg++;
2575}
2576
2577static void
2578scan_io_queues_update_seg_stats(dsl_scan_io_queue_t *q, uint64_t start,
2579    uint64_t end)
2580{
2581	q->q_total_seg_size_this_txg += end - start;
2582	q->q_segs_this_txg++;
2583}
2584
2585static boolean_t
2586scan_io_queue_check_suspend(dsl_scan_t *scn)
2587{
2588	/* See comment in dsl_scan_check_suspend() */
2589	uint64_t curr_time_ns = gethrtime();
2590	uint64_t scan_time_ns = curr_time_ns - scn->scn_sync_start_time;
2591	uint64_t sync_time_ns = curr_time_ns -
2592	    scn->scn_dp->dp_spa->spa_sync_starttime;
2593	int dirty_pct = scn->scn_dp->dp_dirty_total * 100 / zfs_dirty_data_max;
2594	int mintime = (scn->scn_phys.scn_func == POOL_SCAN_RESILVER) ?
2595	    zfs_resilver_min_time_ms : zfs_scrub_min_time_ms;
2596
2597	return ((NSEC2MSEC(scan_time_ns) > mintime &&
2598	    (dirty_pct >= zfs_vdev_async_write_active_min_dirty_percent ||
2599	    txg_sync_waiting(scn->scn_dp) ||
2600	    NSEC2SEC(sync_time_ns) >= zfs_txg_timeout)) ||
2601	    spa_shutting_down(scn->scn_dp->dp_spa));
2602}
2603
2604/*
2605 * Given a list of scan_io_t's in io_list, this issues the io's out to
2606 * disk. This consumes the io_list and frees the scan_io_t's. This is
2607 * called when emptying queues, either when we're up against the memory
2608 * limit or when we have finished scanning. Returns B_TRUE if we stopped
2609 * processing the list before we finished. Any zios that were not issued
2610 * will remain in the io_list.
2611 */
2612static boolean_t
2613scan_io_queue_issue(dsl_scan_io_queue_t *queue, list_t *io_list)
2614{
2615	dsl_scan_t *scn = queue->q_scn;
2616	scan_io_t *sio;
2617	int64_t bytes_issued = 0;
2618	boolean_t suspended = B_FALSE;
2619
2620	while ((sio = list_head(io_list)) != NULL) {
2621		blkptr_t bp;
2622
2623		if (scan_io_queue_check_suspend(scn)) {
2624			suspended = B_TRUE;
2625			break;
2626		}
2627
2628		sio2bp(sio, &bp, queue->q_vd->vdev_id);
2629		bytes_issued += sio->sio_asize;
2630		scan_exec_io(scn->scn_dp, &bp, sio->sio_flags,
2631		    &sio->sio_zb, queue);
2632		(void) list_remove_head(io_list);
2633		scan_io_queues_update_zio_stats(queue, &bp);
2634		kmem_free(sio, sizeof (*sio));
2635	}
2636
2637	atomic_add_64(&scn->scn_bytes_pending, -bytes_issued);
2638
2639	return (suspended);
2640}
2641
2642/*
2643 * Given a range_seg_t (extent) and a list, this function passes over a
2644 * scan queue and gathers up the appropriate ios which fit into that
2645 * scan seg (starting from lowest LBA). At the end, we remove the segment
2646 * from the q_exts_by_addr range tree.
2647 */
2648static boolean_t
2649scan_io_queue_gather(dsl_scan_io_queue_t *queue, range_seg_t *rs, list_t *list)
2650{
2651	scan_io_t srch_sio, *sio, *next_sio;
2652	avl_index_t idx;
2653	uint_t num_sios = 0;
2654	int64_t bytes_issued = 0;
2655
2656	ASSERT(rs != NULL);
2657	ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
2658
2659	srch_sio.sio_offset = rs->rs_start;
2660
2661	/*
2662	 * The exact start of the extent might not contain any matching zios,
2663	 * so if that's the case, examine the next one in the tree.
2664	 */
2665	sio = avl_find(&queue->q_sios_by_addr, &srch_sio, &idx);
2666	if (sio == NULL)
2667		sio = avl_nearest(&queue->q_sios_by_addr, idx, AVL_AFTER);
2668
2669	while (sio != NULL && sio->sio_offset < rs->rs_end && num_sios <= 32) {
2670		ASSERT3U(sio->sio_offset, >=, rs->rs_start);
2671		ASSERT3U(sio->sio_offset + sio->sio_asize, <=, rs->rs_end);
2672
2673		next_sio = AVL_NEXT(&queue->q_sios_by_addr, sio);
2674		avl_remove(&queue->q_sios_by_addr, sio);
2675
2676		bytes_issued += sio->sio_asize;
2677		num_sios++;
2678		list_insert_tail(list, sio);
2679		sio = next_sio;
2680	}
2681
2682	/*
2683	 * We limit the number of sios we process at once to 32 to avoid
2684	 * biting off more than we can chew. If we didn't take everything
2685	 * in the segment we update it to reflect the work we were able to
2686	 * complete. Otherwise, we remove it from the range tree entirely.
2687	 */
2688	if (sio != NULL && sio->sio_offset < rs->rs_end) {
2689		range_tree_adjust_fill(queue->q_exts_by_addr, rs,
2690		    -bytes_issued);
2691		range_tree_resize_segment(queue->q_exts_by_addr, rs,
2692		    sio->sio_offset, rs->rs_end - sio->sio_offset);
2693
2694		return (B_TRUE);
2695	} else {
2696		range_tree_remove(queue->q_exts_by_addr, rs->rs_start,
2697		    rs->rs_end - rs->rs_start);
2698		return (B_FALSE);
2699	}
2700}
2701
2702
2703/*
2704 * This is called from the queue emptying thread and selects the next
2705 * extent from which we are to issue io's. The behavior of this function
2706 * depends on the state of the scan, the current memory consumption and
2707 * whether or not we are performing a scan shutdown.
2708 * 1) We select extents in an elevator algorithm (LBA-order) if the scan
2709 * 	needs to perform a checkpoint
2710 * 2) We select the largest available extent if we are up against the
2711 * 	memory limit.
2712 * 3) Otherwise we don't select any extents.
2713 */
2714static const range_seg_t *
2715scan_io_queue_fetch_ext(dsl_scan_io_queue_t *queue)
2716{
2717	dsl_scan_t *scn = queue->q_scn;
2718
2719	ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
2720	ASSERT(scn->scn_is_sorted);
2721
2722	/* handle tunable overrides */
2723	if (scn->scn_checkpointing || scn->scn_clearing) {
2724		if (zfs_scan_issue_strategy == 1) {
2725			return (range_tree_first(queue->q_exts_by_addr));
2726		} else if (zfs_scan_issue_strategy == 2) {
2727			return (avl_first(&queue->q_exts_by_size));
2728		}
2729	}
2730
2731	/*
2732	 * During normal clearing, we want to issue our largest segments
2733	 * first, keeping IO as sequential as possible, and leaving the
2734	 * smaller extents for later with the hope that they might eventually
2735	 * grow to larger sequential segments. However, when the scan is
2736	 * checkpointing, no new extents will be added to the sorting queue,
2737	 * so the way we are sorted now is as good as it will ever get.
2738	 * In this case, we instead switch to issuing extents in LBA order.
2739	 */
2740	if (scn->scn_checkpointing) {
2741		return (range_tree_first(queue->q_exts_by_addr));
2742	} else if (scn->scn_clearing) {
2743		return (avl_first(&queue->q_exts_by_size));
2744	} else {
2745		return (NULL);
2746	}
2747}
2748
2749static void
2750scan_io_queues_run_one(void *arg)
2751{
2752	dsl_scan_io_queue_t *queue = arg;
2753	kmutex_t *q_lock = &queue->q_vd->vdev_scan_io_queue_lock;
2754	boolean_t suspended = B_FALSE;
2755	range_seg_t *rs = NULL;
2756	scan_io_t *sio = NULL;
2757	list_t sio_list;
2758	uint64_t bytes_per_leaf = zfs_scan_vdev_limit;
2759	uint64_t nr_leaves = dsl_scan_count_leaves(queue->q_vd);
2760
2761	ASSERT(queue->q_scn->scn_is_sorted);
2762
2763	list_create(&sio_list, sizeof (scan_io_t),
2764	    offsetof(scan_io_t, sio_nodes.sio_list_node));
2765	mutex_enter(q_lock);
2766
2767	/* calculate maximum in-flight bytes for this txg (min 1MB) */
2768	queue->q_maxinflight_bytes =
2769	    MAX(nr_leaves * bytes_per_leaf, 1ULL << 20);
2770
2771	/* reset per-queue scan statistics for this txg */
2772	queue->q_total_seg_size_this_txg = 0;
2773	queue->q_segs_this_txg = 0;
2774	queue->q_total_zio_size_this_txg = 0;
2775	queue->q_zios_this_txg = 0;
2776
2777	/* loop until we have run out of time or sios */
2778	while ((rs = (range_seg_t*)scan_io_queue_fetch_ext(queue)) != NULL) {
2779		uint64_t seg_start = 0, seg_end = 0;
2780		boolean_t more_left = B_TRUE;
2781
2782		ASSERT(list_is_empty(&sio_list));
2783
2784		/* loop while we still have sios left to process in this rs */
2785		while (more_left) {
2786			scan_io_t *first_sio, *last_sio;
2787
2788			/*
2789			 * We have selected which extent needs to be
2790			 * processed next. Gather up the corresponding sios.
2791			 */
2792			more_left = scan_io_queue_gather(queue, rs, &sio_list);
2793			ASSERT(!list_is_empty(&sio_list));
2794			first_sio = list_head(&sio_list);
2795			last_sio = list_tail(&sio_list);
2796
2797			seg_end = last_sio->sio_offset + last_sio->sio_asize;
2798			if (seg_start == 0)
2799				seg_start = first_sio->sio_offset;
2800
2801			/*
2802			 * Issuing sios can take a long time so drop the
2803			 * queue lock. The sio queue won't be updated by
2804			 * other threads since we're in syncing context so
2805			 * we can be sure that our trees will remain exactly
2806			 * as we left them.
2807			 */
2808			mutex_exit(q_lock);
2809			suspended = scan_io_queue_issue(queue, &sio_list);
2810			mutex_enter(q_lock);
2811
2812			if (suspended)
2813				break;
2814		}
2815		/* update statistics for debugging purposes */
2816		scan_io_queues_update_seg_stats(queue, seg_start, seg_end);
2817
2818		if (suspended)
2819			break;
2820	}
2821
2822
2823	/* If we were suspended in the middle of processing,
2824	 * requeue any unfinished sios and exit.
2825	 */
2826	while ((sio = list_head(&sio_list)) != NULL) {
2827		list_remove(&sio_list, sio);
2828		scan_io_queue_insert_impl(queue, sio);
2829	}
2830
2831	mutex_exit(q_lock);
2832	list_destroy(&sio_list);
2833}
2834
2835/*
2836 * Performs an emptying run on all scan queues in the pool. This just
2837 * punches out one thread per top-level vdev, each of which processes
2838 * only that vdev's scan queue. We can parallelize the I/O here because
2839 * we know that each queue's io's only affect its own top-level vdev.
2840 *
2841 * This function waits for the queue runs to complete, and must be
2842 * called from dsl_scan_sync (or in general, syncing context).
2843 */
2844static void
2845scan_io_queues_run(dsl_scan_t *scn)
2846{
2847	spa_t *spa = scn->scn_dp->dp_spa;
2848
2849	ASSERT(scn->scn_is_sorted);
2850	ASSERT(spa_config_held(spa, SCL_CONFIG, RW_READER));
2851
2852	if (scn->scn_bytes_pending == 0)
2853		return;
2854
2855	if (scn->scn_taskq == NULL) {
2856		char *tq_name = kmem_zalloc(ZFS_MAX_DATASET_NAME_LEN + 16,
2857		    KM_SLEEP);
2858		int nthreads = spa->spa_root_vdev->vdev_children;
2859
2860		/*
2861		 * We need to make this taskq *always* execute as many
2862		 * threads in parallel as we have top-level vdevs and no
2863		 * less, otherwise strange serialization of the calls to
2864		 * scan_io_queues_run_one can occur during spa_sync runs
2865		 * and that significantly impacts performance.
2866		 */
2867		(void) snprintf(tq_name, ZFS_MAX_DATASET_NAME_LEN + 16,
2868		    "dsl_scan_tq_%s", spa->spa_name);
2869		scn->scn_taskq = taskq_create(tq_name, nthreads, minclsyspri,
2870		    nthreads, nthreads, TASKQ_PREPOPULATE);
2871		kmem_free(tq_name, ZFS_MAX_DATASET_NAME_LEN + 16);
2872	}
2873
2874	for (uint64_t i = 0; i < spa->spa_root_vdev->vdev_children; i++) {
2875		vdev_t *vd = spa->spa_root_vdev->vdev_child[i];
2876
2877		mutex_enter(&vd->vdev_scan_io_queue_lock);
2878		if (vd->vdev_scan_io_queue != NULL) {
2879			VERIFY(taskq_dispatch(scn->scn_taskq,
2880			    scan_io_queues_run_one, vd->vdev_scan_io_queue,
2881			    TQ_SLEEP) != TASKQID_INVALID);
2882		}
2883		mutex_exit(&vd->vdev_scan_io_queue_lock);
2884	}
2885
2886	/*
2887	 * Wait for the queues to finish issuing thir IOs for this run
2888	 * before we return. There may still be IOs in flight at this
2889	 * point.
2890	 */
2891	taskq_wait(scn->scn_taskq);
2892}
2893
2894static boolean_t
2895dsl_scan_async_block_should_pause(dsl_scan_t *scn)
2896{
2897	uint64_t elapsed_nanosecs;
2898
2899	if (zfs_recover)
2900		return (B_FALSE);
2901
2902	if (scn->scn_visited_this_txg >= zfs_async_block_max_blocks)
2903		return (B_TRUE);
2904
2905	elapsed_nanosecs = gethrtime() - scn->scn_sync_start_time;
2906	return (elapsed_nanosecs / NANOSEC > zfs_txg_timeout ||
2907	    (NSEC2MSEC(elapsed_nanosecs) > scn->scn_async_block_min_time_ms &&
2908	    txg_sync_waiting(scn->scn_dp)) ||
2909	    spa_shutting_down(scn->scn_dp->dp_spa));
2910}
2911
2912static int
2913dsl_scan_free_block_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
2914{
2915	dsl_scan_t *scn = arg;
2916
2917	if (!scn->scn_is_bptree ||
2918	    (BP_GET_LEVEL(bp) == 0 && BP_GET_TYPE(bp) != DMU_OT_OBJSET)) {
2919		if (dsl_scan_async_block_should_pause(scn))
2920			return (SET_ERROR(ERESTART));
2921	}
2922
2923	zio_nowait(zio_free_sync(scn->scn_zio_root, scn->scn_dp->dp_spa,
2924	    dmu_tx_get_txg(tx), bp, BP_GET_PSIZE(bp), 0));
2925	dsl_dir_diduse_space(tx->tx_pool->dp_free_dir, DD_USED_HEAD,
2926	    -bp_get_dsize_sync(scn->scn_dp->dp_spa, bp),
2927	    -BP_GET_PSIZE(bp), -BP_GET_UCSIZE(bp), tx);
2928	scn->scn_visited_this_txg++;
2929	return (0);
2930}
2931
2932static void
2933dsl_scan_update_stats(dsl_scan_t *scn)
2934{
2935	spa_t *spa = scn->scn_dp->dp_spa;
2936	uint64_t i;
2937	uint64_t seg_size_total = 0, zio_size_total = 0;
2938	uint64_t seg_count_total = 0, zio_count_total = 0;
2939
2940	for (i = 0; i < spa->spa_root_vdev->vdev_children; i++) {
2941		vdev_t *vd = spa->spa_root_vdev->vdev_child[i];
2942		dsl_scan_io_queue_t *queue = vd->vdev_scan_io_queue;
2943
2944		if (queue == NULL)
2945			continue;
2946
2947		seg_size_total += queue->q_total_seg_size_this_txg;
2948		zio_size_total += queue->q_total_zio_size_this_txg;
2949		seg_count_total += queue->q_segs_this_txg;
2950		zio_count_total += queue->q_zios_this_txg;
2951	}
2952
2953	if (seg_count_total == 0 || zio_count_total == 0) {
2954		scn->scn_avg_seg_size_this_txg = 0;
2955		scn->scn_avg_zio_size_this_txg = 0;
2956		scn->scn_segs_this_txg = 0;
2957		scn->scn_zios_this_txg = 0;
2958		return;
2959	}
2960
2961	scn->scn_avg_seg_size_this_txg = seg_size_total / seg_count_total;
2962	scn->scn_avg_zio_size_this_txg = zio_size_total / zio_count_total;
2963	scn->scn_segs_this_txg = seg_count_total;
2964	scn->scn_zios_this_txg = zio_count_total;
2965}
2966
2967static int
2968dsl_scan_obsolete_block_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
2969{
2970	dsl_scan_t *scn = arg;
2971	const dva_t *dva = &bp->blk_dva[0];
2972
2973	if (dsl_scan_async_block_should_pause(scn))
2974		return (SET_ERROR(ERESTART));
2975
2976	spa_vdev_indirect_mark_obsolete(scn->scn_dp->dp_spa,
2977	    DVA_GET_VDEV(dva), DVA_GET_OFFSET(dva),
2978	    DVA_GET_ASIZE(dva), tx);
2979	scn->scn_visited_this_txg++;
2980	return (0);
2981}
2982
2983boolean_t
2984dsl_scan_active(dsl_scan_t *scn)
2985{
2986	spa_t *spa = scn->scn_dp->dp_spa;
2987	uint64_t used = 0, comp, uncomp;
2988
2989	if (spa->spa_load_state != SPA_LOAD_NONE)
2990		return (B_FALSE);
2991	if (spa_shutting_down(spa))
2992		return (B_FALSE);
2993	if ((dsl_scan_is_running(scn) && !dsl_scan_is_paused_scrub(scn)) ||
2994	    (scn->scn_async_destroying && !scn->scn_async_stalled))
2995		return (B_TRUE);
2996
2997	if (spa_version(scn->scn_dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
2998		(void) bpobj_space(&scn->scn_dp->dp_free_bpobj,
2999		    &used, &comp, &uncomp);
3000	}
3001	return (used != 0);
3002}
3003
3004static boolean_t
3005dsl_scan_need_resilver(spa_t *spa, const dva_t *dva, size_t psize,
3006    uint64_t phys_birth)
3007{
3008	vdev_t *vd;
3009
3010	vd = vdev_lookup_top(spa, DVA_GET_VDEV(dva));
3011
3012	if (vd->vdev_ops == &vdev_indirect_ops) {
3013		/*
3014		 * The indirect vdev can point to multiple
3015		 * vdevs.  For simplicity, always create
3016		 * the resilver zio_t. zio_vdev_io_start()
3017		 * will bypass the child resilver i/o's if
3018		 * they are on vdevs that don't have DTL's.
3019		 */
3020		return (B_TRUE);
3021	}
3022
3023	if (DVA_GET_GANG(dva)) {
3024		/*
3025		 * Gang members may be spread across multiple
3026		 * vdevs, so the best estimate we have is the
3027		 * scrub range, which has already been checked.
3028		 * XXX -- it would be better to change our
3029		 * allocation policy to ensure that all
3030		 * gang members reside on the same vdev.
3031		 */
3032		return (B_TRUE);
3033	}
3034
3035	/*
3036	 * Check if the txg falls within the range which must be
3037	 * resilvered.  DVAs outside this range can always be skipped.
3038	 */
3039	if (!vdev_dtl_contains(vd, DTL_PARTIAL, phys_birth, 1))
3040		return (B_FALSE);
3041
3042	/*
3043	 * Check if the top-level vdev must resilver this offset.
3044	 * When the offset does not intersect with a dirty leaf DTL
3045	 * then it may be possible to skip the resilver IO.  The psize
3046	 * is provided instead of asize to simplify the check for RAIDZ.
3047	 */
3048	if (!vdev_dtl_need_resilver(vd, DVA_GET_OFFSET(dva), psize))
3049		return (B_FALSE);
3050
3051	return (B_TRUE);
3052}
3053
3054static int
3055dsl_process_async_destroys(dsl_pool_t *dp, dmu_tx_t *tx)
3056{
3057	int err = 0;
3058	dsl_scan_t *scn = dp->dp_scan;
3059	spa_t *spa = dp->dp_spa;
3060
3061	if (spa_suspend_async_destroy(spa))
3062		return (0);
3063
3064	if (zfs_free_bpobj_enabled &&
3065	    spa_version(spa) >= SPA_VERSION_DEADLISTS) {
3066		scn->scn_is_bptree = B_FALSE;
3067		scn->scn_async_block_min_time_ms = zfs_free_min_time_ms;
3068		scn->scn_zio_root = zio_root(spa, NULL,
3069		    NULL, ZIO_FLAG_MUSTSUCCEED);
3070		err = bpobj_iterate(&dp->dp_free_bpobj,
3071		    dsl_scan_free_block_cb, scn, tx);
3072		VERIFY0(zio_wait(scn->scn_zio_root));
3073		scn->scn_zio_root = NULL;
3074
3075		if (err != 0 && err != ERESTART)
3076			zfs_panic_recover("error %u from bpobj_iterate()", err);
3077	}
3078
3079	if (err == 0 && spa_feature_is_active(spa, SPA_FEATURE_ASYNC_DESTROY)) {
3080		ASSERT(scn->scn_async_destroying);
3081		scn->scn_is_bptree = B_TRUE;
3082		scn->scn_zio_root = zio_root(spa, NULL,
3083		    NULL, ZIO_FLAG_MUSTSUCCEED);
3084		err = bptree_iterate(dp->dp_meta_objset,
3085		    dp->dp_bptree_obj, B_TRUE, dsl_scan_free_block_cb, scn, tx);
3086		VERIFY0(zio_wait(scn->scn_zio_root));
3087		scn->scn_zio_root = NULL;
3088
3089		if (err == EIO || err == ECKSUM) {
3090			err = 0;
3091		} else if (err != 0 && err != ERESTART) {
3092			zfs_panic_recover("error %u from "
3093			    "traverse_dataset_destroyed()", err);
3094		}
3095
3096		if (bptree_is_empty(dp->dp_meta_objset, dp->dp_bptree_obj)) {
3097			/* finished; deactivate async destroy feature */
3098			spa_feature_decr(spa, SPA_FEATURE_ASYNC_DESTROY, tx);
3099			ASSERT(!spa_feature_is_active(spa,
3100			    SPA_FEATURE_ASYNC_DESTROY));
3101			VERIFY0(zap_remove(dp->dp_meta_objset,
3102			    DMU_POOL_DIRECTORY_OBJECT,
3103			    DMU_POOL_BPTREE_OBJ, tx));
3104			VERIFY0(bptree_free(dp->dp_meta_objset,
3105			    dp->dp_bptree_obj, tx));
3106			dp->dp_bptree_obj = 0;
3107			scn->scn_async_destroying = B_FALSE;
3108			scn->scn_async_stalled = B_FALSE;
3109		} else {
3110			/*
3111			 * If we didn't make progress, mark the async
3112			 * destroy as stalled, so that we will not initiate
3113			 * a spa_sync() on its behalf.  Note that we only
3114			 * check this if we are not finished, because if the
3115			 * bptree had no blocks for us to visit, we can
3116			 * finish without "making progress".
3117			 */
3118			scn->scn_async_stalled =
3119			    (scn->scn_visited_this_txg == 0);
3120		}
3121	}
3122	if (scn->scn_visited_this_txg) {
3123		zfs_dbgmsg("freed %llu blocks in %llums from "
3124		    "free_bpobj/bptree txg %llu; err=%d",
3125		    (longlong_t)scn->scn_visited_this_txg,
3126		    (longlong_t)
3127		    NSEC2MSEC(gethrtime() - scn->scn_sync_start_time),
3128		    (longlong_t)tx->tx_txg, err);
3129		scn->scn_visited_this_txg = 0;
3130
3131		/*
3132		 * Write out changes to the DDT that may be required as a
3133		 * result of the blocks freed.  This ensures that the DDT
3134		 * is clean when a scrub/resilver runs.
3135		 */
3136		ddt_sync(spa, tx->tx_txg);
3137	}
3138	if (err != 0)
3139		return (err);
3140	if (dp->dp_free_dir != NULL && !scn->scn_async_destroying &&
3141	    zfs_free_leak_on_eio &&
3142	    (dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes != 0 ||
3143	    dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes != 0 ||
3144	    dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes != 0)) {
3145		/*
3146		 * We have finished background destroying, but there is still
3147		 * some space left in the dp_free_dir. Transfer this leaked
3148		 * space to the dp_leak_dir.
3149		 */
3150		if (dp->dp_leak_dir == NULL) {
3151			rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
3152			(void) dsl_dir_create_sync(dp, dp->dp_root_dir,
3153			    LEAK_DIR_NAME, tx);
3154			VERIFY0(dsl_pool_open_special_dir(dp,
3155			    LEAK_DIR_NAME, &dp->dp_leak_dir));
3156			rrw_exit(&dp->dp_config_rwlock, FTAG);
3157		}
3158		dsl_dir_diduse_space(dp->dp_leak_dir, DD_USED_HEAD,
3159		    dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes,
3160		    dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes,
3161		    dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes, tx);
3162		dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
3163		    -dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes,
3164		    -dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes,
3165		    -dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes, tx);
3166	}
3167
3168	if (dp->dp_free_dir != NULL && !scn->scn_async_destroying) {
3169		/* finished; verify that space accounting went to zero */
3170		ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes);
3171		ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes);
3172		ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes);
3173	}
3174
3175	EQUIV(bpobj_is_open(&dp->dp_obsolete_bpobj),
3176	    0 == zap_contains(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
3177	    DMU_POOL_OBSOLETE_BPOBJ));
3178	if (err == 0 && bpobj_is_open(&dp->dp_obsolete_bpobj)) {
3179		ASSERT(spa_feature_is_active(dp->dp_spa,
3180		    SPA_FEATURE_OBSOLETE_COUNTS));
3181
3182		scn->scn_is_bptree = B_FALSE;
3183		scn->scn_async_block_min_time_ms = zfs_obsolete_min_time_ms;
3184		err = bpobj_iterate(&dp->dp_obsolete_bpobj,
3185		    dsl_scan_obsolete_block_cb, scn, tx);
3186		if (err != 0 && err != ERESTART)
3187			zfs_panic_recover("error %u from bpobj_iterate()", err);
3188
3189		if (bpobj_is_empty(&dp->dp_obsolete_bpobj))
3190			dsl_pool_destroy_obsolete_bpobj(dp, tx);
3191	}
3192
3193	return (0);
3194}
3195
3196/*
3197 * This is the primary entry point for scans that is called from syncing
3198 * context. Scans must happen entirely during syncing context so that we
3199 * cna guarantee that blocks we are currently scanning will not change out
3200 * from under us. While a scan is active, this funciton controls how quickly
3201 * transaction groups proceed, instead of the normal handling provided by
3202 * txg_sync_thread().
3203 */
3204void
3205dsl_scan_sync(dsl_pool_t *dp, dmu_tx_t *tx)
3206{
3207	dsl_scan_t *scn = dp->dp_scan;
3208	spa_t *spa = dp->dp_spa;
3209	int err = 0;
3210	state_sync_type_t sync_type = SYNC_OPTIONAL;
3211
3212	/*
3213	 * Check for scn_restart_txg before checking spa_load_state, so
3214	 * that we can restart an old-style scan while the pool is being
3215	 * imported (see dsl_scan_init).
3216	 */
3217	if (dsl_scan_restarting(scn, tx)) {
3218		pool_scan_func_t func = POOL_SCAN_SCRUB;
3219		dsl_scan_done(scn, B_FALSE, tx);
3220		if (vdev_resilver_needed(spa->spa_root_vdev, NULL, NULL))
3221			func = POOL_SCAN_RESILVER;
3222		zfs_dbgmsg("restarting scan func=%u txg=%llu",
3223		    func, (longlong_t)tx->tx_txg);
3224		dsl_scan_setup_sync(&func, tx);
3225	}
3226
3227	/*
3228	 * Only process scans in sync pass 1.
3229	 */
3230	if (spa_sync_pass(dp->dp_spa) > 1)
3231		return;
3232
3233	/*
3234	 * If the spa is shutting down, then stop scanning. This will
3235	 * ensure that the scan does not dirty any new data during the
3236	 * shutdown phase.
3237	 */
3238	if (spa_shutting_down(spa))
3239		return;
3240
3241	/*
3242	 * If the scan is inactive due to a stalled async destroy, try again.
3243	 */
3244	if (!scn->scn_async_stalled && !dsl_scan_active(scn))
3245		return;
3246
3247	/* reset scan statistics */
3248	scn->scn_visited_this_txg = 0;
3249	scn->scn_holes_this_txg = 0;
3250	scn->scn_lt_min_this_txg = 0;
3251	scn->scn_gt_max_this_txg = 0;
3252	scn->scn_ddt_contained_this_txg = 0;
3253	scn->scn_objsets_visited_this_txg = 0;
3254	scn->scn_avg_seg_size_this_txg = 0;
3255	scn->scn_segs_this_txg = 0;
3256	scn->scn_avg_zio_size_this_txg = 0;
3257	scn->scn_zios_this_txg = 0;
3258	scn->scn_suspending = B_FALSE;
3259	scn->scn_sync_start_time = gethrtime();
3260	spa->spa_scrub_active = B_TRUE;
3261
3262	/*
3263	 * First process the async destroys.  If we pause, don't do
3264	 * any scrubbing or resilvering.  This ensures that there are no
3265	 * async destroys while we are scanning, so the scan code doesn't
3266	 * have to worry about traversing it.  It is also faster to free the
3267	 * blocks than to scrub them.
3268	 */
3269	err = dsl_process_async_destroys(dp, tx);
3270	if (err != 0)
3271		return;
3272
3273	if (!dsl_scan_is_running(scn) || dsl_scan_is_paused_scrub(scn))
3274		return;
3275
3276	/*
3277	 * Wait a few txgs after importing to begin scanning so that
3278	 * we can get the pool imported quickly.
3279	 */
3280	if (spa->spa_syncing_txg < spa->spa_first_txg + SCAN_IMPORT_WAIT_TXGS)
3281		return;
3282
3283	/*
3284	 * It is possible to switch from unsorted to sorted at any time,
3285	 * but afterwards the scan will remain sorted unless reloaded from
3286	 * a checkpoint after a reboot.
3287	 */
3288	if (!zfs_scan_legacy) {
3289		scn->scn_is_sorted = B_TRUE;
3290		if (scn->scn_last_checkpoint == 0)
3291			scn->scn_last_checkpoint = ddi_get_lbolt();
3292	}
3293
3294	/*
3295	 * For sorted scans, determine what kind of work we will be doing
3296	 * this txg based on our memory limitations and whether or not we
3297	 * need to perform a checkpoint.
3298	 */
3299	if (scn->scn_is_sorted) {
3300		/*
3301		 * If we are over our checkpoint interval, set scn_clearing
3302		 * so that we can begin checkpointing immediately. The
3303		 * checkpoint allows us to save a consisent bookmark
3304		 * representing how much data we have scrubbed so far.
3305		 * Otherwise, use the memory limit to determine if we should
3306		 * scan for metadata or start issue scrub IOs. We accumulate
3307		 * metadata until we hit our hard memory limit at which point
3308		 * we issue scrub IOs until we are at our soft memory limit.
3309		 */
3310		if (scn->scn_checkpointing ||
3311		    ddi_get_lbolt() - scn->scn_last_checkpoint >
3312		    SEC_TO_TICK(zfs_scan_checkpoint_intval)) {
3313			if (!scn->scn_checkpointing)
3314				zfs_dbgmsg("begin scan checkpoint");
3315
3316			scn->scn_checkpointing = B_TRUE;
3317			scn->scn_clearing = B_TRUE;
3318		} else {
3319			boolean_t should_clear = dsl_scan_should_clear(scn);
3320			if (should_clear && !scn->scn_clearing) {
3321				zfs_dbgmsg("begin scan clearing");
3322				scn->scn_clearing = B_TRUE;
3323			} else if (!should_clear && scn->scn_clearing) {
3324				zfs_dbgmsg("finish scan clearing");
3325				scn->scn_clearing = B_FALSE;
3326			}
3327		}
3328	} else {
3329		ASSERT0(scn->scn_checkpointing);
3330                ASSERT0(scn->scn_clearing);
3331	}
3332
3333	if (!scn->scn_clearing && scn->scn_done_txg == 0) {
3334		/* Need to scan metadata for more blocks to scrub */
3335		dsl_scan_phys_t *scnp = &scn->scn_phys;
3336		taskqid_t prefetch_tqid;
3337		uint64_t bytes_per_leaf = zfs_scan_vdev_limit;
3338		uint64_t nr_leaves = dsl_scan_count_leaves(spa->spa_root_vdev);
3339
3340		/*
3341		 * Recalculate the max number of in-flight bytes for pool-wide
3342		 * scanning operations (minimum 1MB). Limits for the issuing
3343		 * phase are done per top-level vdev and are handled separately.
3344		 */
3345		scn->scn_maxinflight_bytes =
3346		    MAX(nr_leaves * bytes_per_leaf, 1ULL << 20);
3347
3348		if (scnp->scn_ddt_bookmark.ddb_class <=
3349		    scnp->scn_ddt_class_max) {
3350			ASSERT(ZB_IS_ZERO(&scnp->scn_bookmark));
3351			zfs_dbgmsg("doing scan sync txg %llu; "
3352			    "ddt bm=%llu/%llu/%llu/%llx",
3353			    (longlong_t)tx->tx_txg,
3354			    (longlong_t)scnp->scn_ddt_bookmark.ddb_class,
3355			    (longlong_t)scnp->scn_ddt_bookmark.ddb_type,
3356			    (longlong_t)scnp->scn_ddt_bookmark.ddb_checksum,
3357			    (longlong_t)scnp->scn_ddt_bookmark.ddb_cursor);
3358		} else {
3359			zfs_dbgmsg("doing scan sync txg %llu; "
3360			    "bm=%llu/%llu/%llu/%llu",
3361			    (longlong_t)tx->tx_txg,
3362			    (longlong_t)scnp->scn_bookmark.zb_objset,
3363			    (longlong_t)scnp->scn_bookmark.zb_object,
3364			    (longlong_t)scnp->scn_bookmark.zb_level,
3365			    (longlong_t)scnp->scn_bookmark.zb_blkid);
3366		}
3367
3368		scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
3369		    NULL, ZIO_FLAG_CANFAIL);
3370
3371		scn->scn_prefetch_stop = B_FALSE;
3372		prefetch_tqid = taskq_dispatch(dp->dp_sync_taskq,
3373		    dsl_scan_prefetch_thread, scn, TQ_SLEEP);
3374		ASSERT(prefetch_tqid != TASKQID_INVALID);
3375
3376		dsl_pool_config_enter(dp, FTAG);
3377		dsl_scan_visit(scn, tx);
3378		dsl_pool_config_exit(dp, FTAG);
3379
3380		mutex_enter(&dp->dp_spa->spa_scrub_lock);
3381		scn->scn_prefetch_stop = B_TRUE;
3382		cv_broadcast(&spa->spa_scrub_io_cv);
3383		mutex_exit(&dp->dp_spa->spa_scrub_lock);
3384
3385		taskq_wait_id(dp->dp_sync_taskq, prefetch_tqid);
3386		(void) zio_wait(scn->scn_zio_root);
3387		scn->scn_zio_root = NULL;
3388
3389		zfs_dbgmsg("scan visited %llu blocks in %llums "
3390		    "(%llu os's, %llu holes, %llu < mintxg, "
3391		    "%llu in ddt, %llu > maxtxg)",
3392		    (longlong_t)scn->scn_visited_this_txg,
3393		    (longlong_t)NSEC2MSEC(gethrtime() -
3394		    scn->scn_sync_start_time),
3395		    (longlong_t)scn->scn_objsets_visited_this_txg,
3396		    (longlong_t)scn->scn_holes_this_txg,
3397		    (longlong_t)scn->scn_lt_min_this_txg,
3398		    (longlong_t)scn->scn_ddt_contained_this_txg,
3399		    (longlong_t)scn->scn_gt_max_this_txg);
3400
3401		if (!scn->scn_suspending) {
3402			ASSERT0(avl_numnodes(&scn->scn_queue));
3403			scn->scn_done_txg = tx->tx_txg + 1;
3404			if (scn->scn_is_sorted) {
3405				scn->scn_checkpointing = B_TRUE;
3406				scn->scn_clearing = B_TRUE;
3407			}
3408			zfs_dbgmsg("scan complete txg %llu",
3409				   (longlong_t)tx->tx_txg);
3410		}
3411	} else if (scn->scn_is_sorted && scn->scn_bytes_pending != 0) {
3412		/* need to issue scrubbing IOs from per-vdev queues */
3413		scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
3414		    NULL, ZIO_FLAG_CANFAIL);
3415		scan_io_queues_run(scn);
3416		(void) zio_wait(scn->scn_zio_root);
3417		scn->scn_zio_root = NULL;
3418
3419		/* calculate and dprintf the current memory usage */
3420		(void) dsl_scan_should_clear(scn);
3421		dsl_scan_update_stats(scn);
3422
3423		zfs_dbgmsg("scrubbed %llu blocks (%llu segs) in %llums "
3424		    "(avg_block_size = %llu, avg_seg_size = %llu)",
3425		    (longlong_t)scn->scn_zios_this_txg,
3426		    (longlong_t)scn->scn_segs_this_txg,
3427		    (longlong_t)NSEC2MSEC(gethrtime() -
3428		    scn->scn_sync_start_time),
3429		    (longlong_t)scn->scn_avg_zio_size_this_txg,
3430		    (longlong_t)scn->scn_avg_seg_size_this_txg);
3431	} else if (scn->scn_done_txg != 0 && scn->scn_done_txg <= tx->tx_txg) {
3432		/* Finished with everything. Mark the scrub as complete */
3433		zfs_dbgmsg("scan issuing complete txg %llu",
3434		    (longlong_t)tx->tx_txg);
3435		ASSERT3U(scn->scn_done_txg, !=, 0);
3436		ASSERT0(spa->spa_scrub_inflight);
3437		ASSERT0(scn->scn_bytes_pending);
3438		dsl_scan_done(scn, B_TRUE, tx);
3439		sync_type = SYNC_MANDATORY;
3440	}
3441
3442	dsl_scan_sync_state(scn, tx, sync_type);
3443}
3444
3445static void
3446count_block(dsl_scan_t *scn, zfs_all_blkstats_t *zab, const blkptr_t *bp)
3447{
3448	int i;
3449
3450	/* update the spa's stats on how many bytes we have issued */
3451	for (i = 0; i < BP_GET_NDVAS(bp); i++) {
3452		atomic_add_64(&scn->scn_dp->dp_spa->spa_scan_pass_issued,
3453		    DVA_GET_ASIZE(&bp->blk_dva[i]));
3454	}
3455
3456	/*
3457	 * If we resume after a reboot, zab will be NULL; don't record
3458	 * incomplete stats in that case.
3459	 */
3460	if (zab == NULL)
3461		return;
3462
3463	mutex_enter(&zab->zab_lock);
3464
3465	for (i = 0; i < 4; i++) {
3466		int l = (i < 2) ? BP_GET_LEVEL(bp) : DN_MAX_LEVELS;
3467		int t = (i & 1) ? BP_GET_TYPE(bp) : DMU_OT_TOTAL;
3468		if (t & DMU_OT_NEWTYPE)
3469			t = DMU_OT_OTHER;
3470		zfs_blkstat_t *zb = &zab->zab_type[l][t];
3471		int equal;
3472
3473		zb->zb_count++;
3474		zb->zb_asize += BP_GET_ASIZE(bp);
3475		zb->zb_lsize += BP_GET_LSIZE(bp);
3476		zb->zb_psize += BP_GET_PSIZE(bp);
3477		zb->zb_gangs += BP_COUNT_GANG(bp);
3478
3479		switch (BP_GET_NDVAS(bp)) {
3480		case 2:
3481			if (DVA_GET_VDEV(&bp->blk_dva[0]) ==
3482			    DVA_GET_VDEV(&bp->blk_dva[1]))
3483				zb->zb_ditto_2_of_2_samevdev++;
3484			break;
3485		case 3:
3486			equal = (DVA_GET_VDEV(&bp->blk_dva[0]) ==
3487			    DVA_GET_VDEV(&bp->blk_dva[1])) +
3488			    (DVA_GET_VDEV(&bp->blk_dva[0]) ==
3489			    DVA_GET_VDEV(&bp->blk_dva[2])) +
3490			    (DVA_GET_VDEV(&bp->blk_dva[1]) ==
3491			    DVA_GET_VDEV(&bp->blk_dva[2]));
3492			if (equal == 1)
3493				zb->zb_ditto_2_of_3_samevdev++;
3494			else if (equal == 3)
3495				zb->zb_ditto_3_of_3_samevdev++;
3496			break;
3497		}
3498	}
3499
3500	mutex_exit(&zab->zab_lock);
3501}
3502
3503static void
3504scan_io_queue_insert_impl(dsl_scan_io_queue_t *queue, scan_io_t *sio)
3505{
3506	avl_index_t idx;
3507	int64_t asize = sio->sio_asize;
3508	dsl_scan_t *scn = queue->q_scn;
3509
3510	ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
3511
3512	if (avl_find(&queue->q_sios_by_addr, sio, &idx) != NULL) {
3513		/* block is already scheduled for reading */
3514		atomic_add_64(&scn->scn_bytes_pending, -asize);
3515		kmem_free(sio, sizeof (*sio));
3516		return;
3517	}
3518	avl_insert(&queue->q_sios_by_addr, sio, idx);
3519	range_tree_add(queue->q_exts_by_addr, sio->sio_offset, asize);
3520}
3521
3522/*
3523 * Given all the info we got from our metadata scanning process, we
3524 * construct a scan_io_t and insert it into the scan sorting queue. The
3525 * I/O must already be suitable for us to process. This is controlled
3526 * by dsl_scan_enqueue().
3527 */
3528static void
3529scan_io_queue_insert(dsl_scan_io_queue_t *queue, const blkptr_t *bp, int dva_i,
3530    int zio_flags, const zbookmark_phys_t *zb)
3531{
3532	dsl_scan_t *scn = queue->q_scn;
3533	scan_io_t *sio = kmem_zalloc(sizeof (*sio), KM_SLEEP);
3534
3535	ASSERT0(BP_IS_GANG(bp));
3536	ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
3537
3538	bp2sio(bp, sio, dva_i);
3539	sio->sio_flags = zio_flags;
3540	sio->sio_zb = *zb;
3541
3542	/*
3543	 * Increment the bytes pending counter now so that we can't
3544	 * get an integer underflow in case the worker processes the
3545	 * zio before we get to incrementing this counter.
3546	 */
3547	atomic_add_64(&scn->scn_bytes_pending, sio->sio_asize);
3548
3549	scan_io_queue_insert_impl(queue, sio);
3550}
3551
3552/*
3553 * Given a set of I/O parameters as discovered by the metadata traversal
3554 * process, attempts to place the I/O into the sorted queues (if allowed),
3555 * or immediately executes the I/O.
3556 */
3557static void
3558dsl_scan_enqueue(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
3559    const zbookmark_phys_t *zb)
3560{
3561	spa_t *spa = dp->dp_spa;
3562
3563	ASSERT(!BP_IS_EMBEDDED(bp));
3564
3565	/*
3566	 * Gang blocks are hard to issue sequentially, so we just issue them
3567	 * here immediately instead of queuing them.
3568	 */
3569	if (!dp->dp_scan->scn_is_sorted || BP_IS_GANG(bp)) {
3570		scan_exec_io(dp, bp, zio_flags, zb, NULL);
3571		return;
3572	}
3573	for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
3574		dva_t dva;
3575		vdev_t *vdev;
3576
3577		dva = bp->blk_dva[i];
3578		vdev = vdev_lookup_top(spa, DVA_GET_VDEV(&dva));
3579		ASSERT(vdev != NULL);
3580
3581		mutex_enter(&vdev->vdev_scan_io_queue_lock);
3582		if (vdev->vdev_scan_io_queue == NULL)
3583			vdev->vdev_scan_io_queue = scan_io_queue_create(vdev);
3584		ASSERT(dp->dp_scan != NULL);
3585		scan_io_queue_insert(vdev->vdev_scan_io_queue, bp,
3586		    i, zio_flags, zb);
3587		mutex_exit(&vdev->vdev_scan_io_queue_lock);
3588	}
3589}
3590
3591static int
3592dsl_scan_scrub_cb(dsl_pool_t *dp,
3593    const blkptr_t *bp, const zbookmark_phys_t *zb)
3594{
3595	dsl_scan_t *scn = dp->dp_scan;
3596	spa_t *spa = dp->dp_spa;
3597	uint64_t phys_birth = BP_PHYSICAL_BIRTH(bp);
3598	size_t psize = BP_GET_PSIZE(bp);
3599	boolean_t needs_io;
3600	int zio_flags = ZIO_FLAG_SCAN_THREAD | ZIO_FLAG_RAW | ZIO_FLAG_CANFAIL;
3601	int d;
3602
3603	if (phys_birth <= scn->scn_phys.scn_min_txg ||
3604	    phys_birth >= scn->scn_phys.scn_max_txg) {
3605		count_block(scn, dp->dp_blkstats, bp);
3606		return (0);
3607	}
3608
3609	/* Embedded BP's have phys_birth==0, so we reject them above. */
3610	ASSERT(!BP_IS_EMBEDDED(bp));
3611
3612	ASSERT(DSL_SCAN_IS_SCRUB_RESILVER(scn));
3613	if (scn->scn_phys.scn_func == POOL_SCAN_SCRUB) {
3614		zio_flags |= ZIO_FLAG_SCRUB;
3615		needs_io = B_TRUE;
3616	} else {
3617		ASSERT3U(scn->scn_phys.scn_func, ==, POOL_SCAN_RESILVER);
3618		zio_flags |= ZIO_FLAG_RESILVER;
3619		needs_io = B_FALSE;
3620	}
3621
3622	/* If it's an intent log block, failure is expected. */
3623	if (zb->zb_level == ZB_ZIL_LEVEL)
3624		zio_flags |= ZIO_FLAG_SPECULATIVE;
3625
3626	for (d = 0; d < BP_GET_NDVAS(bp); d++) {
3627		const dva_t *dva = &bp->blk_dva[d];
3628
3629		/*
3630		 * Keep track of how much data we've examined so that
3631		 * zpool(1M) status can make useful progress reports.
3632		 */
3633		scn->scn_phys.scn_examined += DVA_GET_ASIZE(dva);
3634		spa->spa_scan_pass_exam += DVA_GET_ASIZE(dva);
3635
3636		/* if it's a resilver, this may not be in the target range */
3637		if (!needs_io)
3638			needs_io = dsl_scan_need_resilver(spa, dva, psize,
3639                            phys_birth);
3640	}
3641
3642	if (needs_io && !zfs_no_scrub_io) {
3643		dsl_scan_enqueue(dp, bp, zio_flags, zb);
3644	} else {
3645		count_block(scn, dp->dp_blkstats, bp);
3646	}
3647
3648	/* do not relocate this block */
3649	return (0);
3650}
3651
3652static void
3653dsl_scan_scrub_done(zio_t *zio)
3654{
3655	spa_t *spa = zio->io_spa;
3656	blkptr_t *bp = zio->io_bp;
3657	dsl_scan_io_queue_t *queue = zio->io_private;
3658
3659	abd_free(zio->io_abd);
3660
3661	if (queue == NULL) {
3662		mutex_enter(&spa->spa_scrub_lock);
3663		ASSERT3U(spa->spa_scrub_inflight, >=, BP_GET_PSIZE(bp));
3664		spa->spa_scrub_inflight -= BP_GET_PSIZE(bp);
3665		cv_broadcast(&spa->spa_scrub_io_cv);
3666		mutex_exit(&spa->spa_scrub_lock);
3667	} else {
3668		mutex_enter(&queue->q_vd->vdev_scan_io_queue_lock);
3669		ASSERT3U(queue->q_inflight_bytes, >=, BP_GET_PSIZE(bp));
3670		queue->q_inflight_bytes -= BP_GET_PSIZE(bp);
3671		cv_broadcast(&queue->q_zio_cv);
3672		mutex_exit(&queue->q_vd->vdev_scan_io_queue_lock);
3673	}
3674
3675	if (zio->io_error && (zio->io_error != ECKSUM ||
3676	    !(zio->io_flags & ZIO_FLAG_SPECULATIVE))) {
3677		atomic_inc_64(&spa->spa_dsl_pool->dp_scan->scn_phys.scn_errors);
3678	}
3679}
3680
3681/*
3682 * Given a scanning zio's information, executes the zio. The zio need
3683 * not necessarily be only sortable, this function simply executes the
3684 * zio, no matter what it is. The optional queue argument allows the
3685 * caller to specify that they want per top level vdev IO rate limiting
3686 * instead of the legacy global limiting.
3687 */
3688static void
3689scan_exec_io(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
3690    const zbookmark_phys_t *zb, dsl_scan_io_queue_t *queue)
3691{
3692	spa_t *spa = dp->dp_spa;
3693	dsl_scan_t *scn = dp->dp_scan;
3694	size_t size = BP_GET_PSIZE(bp);
3695	abd_t *data = abd_alloc_for_io(size, B_FALSE);
3696	unsigned int scan_delay = 0;
3697
3698	ASSERT3U(scn->scn_maxinflight_bytes, >, 0);
3699
3700	if (queue == NULL) {
3701		mutex_enter(&spa->spa_scrub_lock);
3702		while (spa->spa_scrub_inflight >= scn->scn_maxinflight_bytes)
3703			cv_wait(&spa->spa_scrub_io_cv, &spa->spa_scrub_lock);
3704		spa->spa_scrub_inflight += BP_GET_PSIZE(bp);
3705		mutex_exit(&spa->spa_scrub_lock);
3706	} else {
3707		kmutex_t *q_lock = &queue->q_vd->vdev_scan_io_queue_lock;
3708
3709		mutex_enter(q_lock);
3710		while (queue->q_inflight_bytes >= queue->q_maxinflight_bytes)
3711			cv_wait(&queue->q_zio_cv, q_lock);
3712		queue->q_inflight_bytes += BP_GET_PSIZE(bp);
3713		mutex_exit(q_lock);
3714	}
3715
3716	if (zio_flags & ZIO_FLAG_RESILVER)
3717		scan_delay = zfs_resilver_delay;
3718	else {
3719		ASSERT(zio_flags & ZIO_FLAG_SCRUB);
3720		scan_delay = zfs_scrub_delay;
3721	}
3722
3723	if (scan_delay && (ddi_get_lbolt64() - spa->spa_last_io <= zfs_scan_idle))
3724		delay(MAX((int)scan_delay, 0));
3725
3726	count_block(dp->dp_scan, dp->dp_blkstats, bp);
3727	zio_nowait(zio_read(dp->dp_scan->scn_zio_root, spa, bp, data, size,
3728	    dsl_scan_scrub_done, queue, ZIO_PRIORITY_SCRUB, zio_flags, zb));
3729}
3730
3731/*
3732 * This is the primary extent sorting algorithm. We balance two parameters:
3733 * 1) how many bytes of I/O are in an extent
3734 * 2) how well the extent is filled with I/O (as a fraction of its total size)
3735 * Since we allow extents to have gaps between their constituent I/Os, it's
3736 * possible to have a fairly large extent that contains the same amount of
3737 * I/O bytes than a much smaller extent, which just packs the I/O more tightly.
3738 * The algorithm sorts based on a score calculated from the extent's size,
3739 * the relative fill volume (in %) and a "fill weight" parameter that controls
3740 * the split between whether we prefer larger extents or more well populated
3741 * extents:
3742 *
3743 * SCORE = FILL_IN_BYTES + (FILL_IN_PERCENT * FILL_IN_BYTES * FILL_WEIGHT)
3744 *
3745 * Example:
3746 * 1) assume extsz = 64 MiB
3747 * 2) assume fill = 32 MiB (extent is half full)
3748 * 3) assume fill_weight = 3
3749 * 4)	SCORE = 32M + (((32M * 100) / 64M) * 3 * 32M) / 100
3750 *	SCORE = 32M + (50 * 3 * 32M) / 100
3751 *	SCORE = 32M + (4800M / 100)
3752 *	SCORE = 32M + 48M
3753 *	         ^     ^
3754 *	         |     +--- final total relative fill-based score
3755 *	         +--------- final total fill-based score
3756 *	SCORE = 80M
3757 *
3758 * As can be seen, at fill_ratio=3, the algorithm is slightly biased towards
3759 * extents that are more completely filled (in a 3:2 ratio) vs just larger.
3760 * Note that as an optimization, we replace multiplication and division by
3761 * 100 with bitshifting by 7 (which effecitvely multiplies and divides by 128).
3762 */
3763static int
3764ext_size_compare(const void *x, const void *y)
3765{
3766	const range_seg_t *rsa = x, *rsb = y;
3767	uint64_t sa = rsa->rs_end - rsa->rs_start,
3768	    sb = rsb->rs_end - rsb->rs_start;
3769	uint64_t score_a, score_b;
3770
3771	score_a = rsa->rs_fill + ((((rsa->rs_fill << 7) / sa) *
3772	    fill_weight * rsa->rs_fill) >> 7);
3773	score_b = rsb->rs_fill + ((((rsb->rs_fill << 7) / sb) *
3774	    fill_weight * rsb->rs_fill) >> 7);
3775
3776	if (score_a > score_b)
3777		return (-1);
3778	if (score_a == score_b) {
3779		if (rsa->rs_start < rsb->rs_start)
3780			return (-1);
3781		if (rsa->rs_start == rsb->rs_start)
3782			return (0);
3783		return (1);
3784	}
3785	return (1);
3786}
3787
3788/*
3789 * Comparator for the q_sios_by_addr tree. Sorting is simply performed
3790 * based on LBA-order (from lowest to highest).
3791 */
3792static int
3793io_addr_compare(const void *x, const void *y)
3794{
3795	const scan_io_t *a = x, *b = y;
3796
3797	if (a->sio_offset < b->sio_offset)
3798		return (-1);
3799	if (a->sio_offset == b->sio_offset)
3800		return (0);
3801	return (1);
3802}
3803
3804/* IO queues are created on demand when they are needed. */
3805static dsl_scan_io_queue_t *
3806scan_io_queue_create(vdev_t *vd)
3807{
3808	dsl_scan_t *scn = vd->vdev_spa->spa_dsl_pool->dp_scan;
3809	dsl_scan_io_queue_t *q = kmem_zalloc(sizeof (*q), KM_SLEEP);
3810
3811	q->q_scn = scn;
3812	q->q_vd = vd;
3813	cv_init(&q->q_zio_cv, NULL, CV_DEFAULT, NULL);
3814	q->q_exts_by_addr = range_tree_create_impl(&rt_avl_ops,
3815	    &q->q_exts_by_size, ext_size_compare, zfs_scan_max_ext_gap);
3816	avl_create(&q->q_sios_by_addr, io_addr_compare,
3817	    sizeof (scan_io_t), offsetof(scan_io_t, sio_nodes.sio_addr_node));
3818
3819	return (q);
3820}
3821
3822/*
3823 * Destroys a scan queue and all segments and scan_io_t's contained in it.
3824 * No further execution of I/O occurs, anything pending in the queue is
3825 * simply freed without being executed.
3826 */
3827void
3828dsl_scan_io_queue_destroy(dsl_scan_io_queue_t *queue)
3829{
3830	dsl_scan_t *scn = queue->q_scn;
3831	scan_io_t *sio;
3832	void *cookie = NULL;
3833	int64_t bytes_dequeued = 0;
3834
3835	ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
3836
3837	while ((sio = avl_destroy_nodes(&queue->q_sios_by_addr, &cookie)) !=
3838	    NULL) {
3839		ASSERT(range_tree_contains(queue->q_exts_by_addr,
3840		    sio->sio_offset, sio->sio_asize));
3841		bytes_dequeued += sio->sio_asize;
3842		kmem_free(sio, sizeof (*sio));
3843	}
3844
3845	atomic_add_64(&scn->scn_bytes_pending, -bytes_dequeued);
3846	range_tree_vacate(queue->q_exts_by_addr, NULL, queue);
3847	range_tree_destroy(queue->q_exts_by_addr);
3848	avl_destroy(&queue->q_sios_by_addr);
3849	cv_destroy(&queue->q_zio_cv);
3850
3851	kmem_free(queue, sizeof (*queue));
3852}
3853
3854/*
3855 * Properly transfers a dsl_scan_queue_t from `svd' to `tvd'. This is
3856 * called on behalf of vdev_top_transfer when creating or destroying
3857 * a mirror vdev due to zpool attach/detach.
3858 */
3859void
3860dsl_scan_io_queue_vdev_xfer(vdev_t *svd, vdev_t *tvd)
3861{
3862	mutex_enter(&svd->vdev_scan_io_queue_lock);
3863	mutex_enter(&tvd->vdev_scan_io_queue_lock);
3864
3865	VERIFY3P(tvd->vdev_scan_io_queue, ==, NULL);
3866	tvd->vdev_scan_io_queue = svd->vdev_scan_io_queue;
3867	svd->vdev_scan_io_queue = NULL;
3868	if (tvd->vdev_scan_io_queue != NULL)
3869		tvd->vdev_scan_io_queue->q_vd = tvd;
3870
3871	mutex_exit(&tvd->vdev_scan_io_queue_lock);
3872	mutex_exit(&svd->vdev_scan_io_queue_lock);
3873}
3874
3875static void
3876scan_io_queues_destroy(dsl_scan_t *scn)
3877{
3878	vdev_t *rvd = scn->scn_dp->dp_spa->spa_root_vdev;
3879
3880	for (uint64_t i = 0; i < rvd->vdev_children; i++) {
3881		vdev_t *tvd = rvd->vdev_child[i];
3882
3883		mutex_enter(&tvd->vdev_scan_io_queue_lock);
3884		if (tvd->vdev_scan_io_queue != NULL)
3885			dsl_scan_io_queue_destroy(tvd->vdev_scan_io_queue);
3886		tvd->vdev_scan_io_queue = NULL;
3887		mutex_exit(&tvd->vdev_scan_io_queue_lock);
3888	}
3889}
3890
3891static void
3892dsl_scan_freed_dva(spa_t *spa, const blkptr_t *bp, int dva_i)
3893{
3894	dsl_pool_t *dp = spa->spa_dsl_pool;
3895	dsl_scan_t *scn = dp->dp_scan;
3896	vdev_t *vdev;
3897	kmutex_t *q_lock;
3898	dsl_scan_io_queue_t *queue;
3899	scan_io_t srch, *sio;
3900	avl_index_t idx;
3901	uint64_t start, size;
3902
3903	vdev = vdev_lookup_top(spa, DVA_GET_VDEV(&bp->blk_dva[dva_i]));
3904	ASSERT(vdev != NULL);
3905	q_lock = &vdev->vdev_scan_io_queue_lock;
3906	queue = vdev->vdev_scan_io_queue;
3907
3908	mutex_enter(q_lock);
3909	if (queue == NULL) {
3910		mutex_exit(q_lock);
3911		return;
3912	}
3913
3914	bp2sio(bp, &srch, dva_i);
3915	start = srch.sio_offset;
3916	size = srch.sio_asize;
3917
3918	/*
3919	 * We can find the zio in two states:
3920	 * 1) Cold, just sitting in the queue of zio's to be issued at
3921	 *	some point in the future. In this case, all we do is
3922	 *	remove the zio from the q_sios_by_addr tree, decrement
3923	 *	its data volume from the containing range_seg_t and
3924	 *	resort the q_exts_by_size tree to reflect that the
3925	 *	range_seg_t has lost some of its 'fill'. We don't shorten
3926	 *	the range_seg_t - this is usually rare enough not to be
3927	 *	worth the extra hassle of trying keep track of precise
3928	 *	extent boundaries.
3929	 * 2) Hot, where the zio is currently in-flight in
3930	 *	dsl_scan_issue_ios. In this case, we can't simply
3931	 *	reach in and stop the in-flight zio's, so we instead
3932	 *	block the caller. Eventually, dsl_scan_issue_ios will
3933	 *	be done with issuing the zio's it gathered and will
3934	 *	signal us.
3935	 */
3936	sio = avl_find(&queue->q_sios_by_addr, &srch, &idx);
3937	if (sio != NULL) {
3938		int64_t asize = sio->sio_asize;
3939		blkptr_t tmpbp;
3940
3941		/* Got it while it was cold in the queue */
3942		ASSERT3U(start, ==, sio->sio_offset);
3943		ASSERT3U(size, ==, asize);
3944		avl_remove(&queue->q_sios_by_addr, sio);
3945
3946		ASSERT(range_tree_contains(queue->q_exts_by_addr, start, size));
3947		range_tree_remove_fill(queue->q_exts_by_addr, start, size);
3948
3949		/*
3950		 * We only update scn_bytes_pending in the cold path,
3951		 * otherwise it will already have been accounted for as
3952		 * part of the zio's execution.
3953		 */
3954		atomic_add_64(&scn->scn_bytes_pending, -asize);
3955
3956		/* count the block as though we issued it */
3957		sio2bp(sio, &tmpbp, dva_i);
3958		count_block(scn, dp->dp_blkstats, &tmpbp);
3959
3960		kmem_free(sio, sizeof (*sio));
3961	}
3962	mutex_exit(q_lock);
3963}
3964
3965/*
3966 * Callback invoked when a zio_free() zio is executing. This needs to be
3967 * intercepted to prevent the zio from deallocating a particular portion
3968 * of disk space and it then getting reallocated and written to, while we
3969 * still have it queued up for processing.
3970 */
3971void
3972dsl_scan_freed(spa_t *spa, const blkptr_t *bp)
3973{
3974	dsl_pool_t *dp = spa->spa_dsl_pool;
3975	dsl_scan_t *scn = dp->dp_scan;
3976
3977	ASSERT(!BP_IS_EMBEDDED(bp));
3978	ASSERT(scn != NULL);
3979	if (!dsl_scan_is_running(scn))
3980		return;
3981
3982	for (int i = 0; i < BP_GET_NDVAS(bp); i++)
3983		dsl_scan_freed_dva(spa, bp, i);
3984}
3985