zio.c revision 329486
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24 * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
25 * Copyright (c) 2014 Integros [integros.com]
26 */
27
28#include <sys/sysmacros.h>
29#include <sys/zfs_context.h>
30#include <sys/fm/fs/zfs.h>
31#include <sys/spa.h>
32#include <sys/txg.h>
33#include <sys/spa_impl.h>
34#include <sys/vdev_impl.h>
35#include <sys/zio_impl.h>
36#include <sys/zio_compress.h>
37#include <sys/zio_checksum.h>
38#include <sys/dmu_objset.h>
39#include <sys/arc.h>
40#include <sys/ddt.h>
41#include <sys/trim_map.h>
42#include <sys/blkptr.h>
43#include <sys/zfeature.h>
44#include <sys/metaslab_impl.h>
45#include <sys/abd.h>
46
47SYSCTL_DECL(_vfs_zfs);
48SYSCTL_NODE(_vfs_zfs, OID_AUTO, zio, CTLFLAG_RW, 0, "ZFS ZIO");
49#if defined(__amd64__)
50static int zio_use_uma = 1;
51#else
52static int zio_use_uma = 0;
53#endif
54SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, use_uma, CTLFLAG_RDTUN, &zio_use_uma, 0,
55    "Use uma(9) for ZIO allocations");
56static int zio_exclude_metadata = 0;
57SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, exclude_metadata, CTLFLAG_RDTUN, &zio_exclude_metadata, 0,
58    "Exclude metadata buffers from dumps as well");
59
60zio_trim_stats_t zio_trim_stats = {
61	{ "bytes",		KSTAT_DATA_UINT64,
62	  "Number of bytes successfully TRIMmed" },
63	{ "success",		KSTAT_DATA_UINT64,
64	  "Number of successful TRIM requests" },
65	{ "unsupported",	KSTAT_DATA_UINT64,
66	  "Number of TRIM requests that failed because TRIM is not supported" },
67	{ "failed",		KSTAT_DATA_UINT64,
68	  "Number of TRIM requests that failed for reasons other than not supported" },
69};
70
71static kstat_t *zio_trim_ksp;
72
73/*
74 * ==========================================================================
75 * I/O type descriptions
76 * ==========================================================================
77 */
78const char *zio_type_name[ZIO_TYPES] = {
79	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
80	"zio_ioctl"
81};
82
83boolean_t zio_dva_throttle_enabled = B_TRUE;
84SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, dva_throttle_enabled, CTLFLAG_RDTUN,
85    &zio_dva_throttle_enabled, 0, "");
86
87/*
88 * ==========================================================================
89 * I/O kmem caches
90 * ==========================================================================
91 */
92kmem_cache_t *zio_cache;
93kmem_cache_t *zio_link_cache;
94kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
95kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
96
97#ifdef _KERNEL
98extern vmem_t *zio_alloc_arena;
99#endif
100
101#define	ZIO_PIPELINE_CONTINUE		0x100
102#define	ZIO_PIPELINE_STOP		0x101
103
104#define	BP_SPANB(indblkshift, level) \
105	(((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
106#define	COMPARE_META_LEVEL	0x80000000ul
107/*
108 * The following actions directly effect the spa's sync-to-convergence logic.
109 * The values below define the sync pass when we start performing the action.
110 * Care should be taken when changing these values as they directly impact
111 * spa_sync() performance. Tuning these values may introduce subtle performance
112 * pathologies and should only be done in the context of performance analysis.
113 * These tunables will eventually be removed and replaced with #defines once
114 * enough analysis has been done to determine optimal values.
115 *
116 * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
117 * regular blocks are not deferred.
118 */
119int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
120SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_deferred_free, CTLFLAG_RDTUN,
121    &zfs_sync_pass_deferred_free, 0, "defer frees starting in this pass");
122int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
123SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_dont_compress, CTLFLAG_RDTUN,
124    &zfs_sync_pass_dont_compress, 0, "don't compress starting in this pass");
125int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
126SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_rewrite, CTLFLAG_RDTUN,
127    &zfs_sync_pass_rewrite, 0, "rewrite new bps starting in this pass");
128
129/*
130 * An allocating zio is one that either currently has the DVA allocate
131 * stage set or will have it later in its lifetime.
132 */
133#define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
134
135boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
136
137#ifdef illumos
138#ifdef ZFS_DEBUG
139int zio_buf_debug_limit = 16384;
140#else
141int zio_buf_debug_limit = 0;
142#endif
143#endif
144
145static void zio_taskq_dispatch(zio_t *, zio_taskq_type_t, boolean_t);
146
147void
148zio_init(void)
149{
150	size_t c;
151	zio_cache = kmem_cache_create("zio_cache",
152	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
153	zio_link_cache = kmem_cache_create("zio_link_cache",
154	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
155	if (!zio_use_uma)
156		goto out;
157
158	/*
159	 * For small buffers, we want a cache for each multiple of
160	 * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
161	 * for each quarter-power of 2.
162	 */
163	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
164		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
165		size_t p2 = size;
166		size_t align = 0;
167		int cflags = zio_exclude_metadata ? KMC_NODEBUG : 0;
168
169		while (!ISP2(p2))
170			p2 &= p2 - 1;
171
172#ifdef illumos
173#ifndef _KERNEL
174		/*
175		 * If we are using watchpoints, put each buffer on its own page,
176		 * to eliminate the performance overhead of trapping to the
177		 * kernel when modifying a non-watched buffer that shares the
178		 * page with a watched buffer.
179		 */
180		if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
181			continue;
182#endif
183#endif /* illumos */
184		if (size <= 4 * SPA_MINBLOCKSIZE) {
185			align = SPA_MINBLOCKSIZE;
186		} else if (IS_P2ALIGNED(size, p2 >> 2)) {
187			align = MIN(p2 >> 2, PAGESIZE);
188		}
189
190		if (align != 0) {
191			char name[36];
192			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
193			zio_buf_cache[c] = kmem_cache_create(name, size,
194			    align, NULL, NULL, NULL, NULL, NULL, cflags);
195
196			/*
197			 * Since zio_data bufs do not appear in crash dumps, we
198			 * pass KMC_NOTOUCH so that no allocator metadata is
199			 * stored with the buffers.
200			 */
201			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
202			zio_data_buf_cache[c] = kmem_cache_create(name, size,
203			    align, NULL, NULL, NULL, NULL, NULL,
204			    cflags | KMC_NOTOUCH | KMC_NODEBUG);
205		}
206	}
207
208	while (--c != 0) {
209		ASSERT(zio_buf_cache[c] != NULL);
210		if (zio_buf_cache[c - 1] == NULL)
211			zio_buf_cache[c - 1] = zio_buf_cache[c];
212
213		ASSERT(zio_data_buf_cache[c] != NULL);
214		if (zio_data_buf_cache[c - 1] == NULL)
215			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
216	}
217out:
218
219	zio_inject_init();
220
221	zio_trim_ksp = kstat_create("zfs", 0, "zio_trim", "misc",
222	    KSTAT_TYPE_NAMED,
223	    sizeof(zio_trim_stats) / sizeof(kstat_named_t),
224	    KSTAT_FLAG_VIRTUAL);
225
226	if (zio_trim_ksp != NULL) {
227		zio_trim_ksp->ks_data = &zio_trim_stats;
228		kstat_install(zio_trim_ksp);
229	}
230}
231
232void
233zio_fini(void)
234{
235	size_t c;
236	kmem_cache_t *last_cache = NULL;
237	kmem_cache_t *last_data_cache = NULL;
238
239	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
240		if (zio_buf_cache[c] != last_cache) {
241			last_cache = zio_buf_cache[c];
242			kmem_cache_destroy(zio_buf_cache[c]);
243		}
244		zio_buf_cache[c] = NULL;
245
246		if (zio_data_buf_cache[c] != last_data_cache) {
247			last_data_cache = zio_data_buf_cache[c];
248			kmem_cache_destroy(zio_data_buf_cache[c]);
249		}
250		zio_data_buf_cache[c] = NULL;
251	}
252
253	kmem_cache_destroy(zio_link_cache);
254	kmem_cache_destroy(zio_cache);
255
256	zio_inject_fini();
257
258	if (zio_trim_ksp != NULL) {
259		kstat_delete(zio_trim_ksp);
260		zio_trim_ksp = NULL;
261	}
262}
263
264/*
265 * ==========================================================================
266 * Allocate and free I/O buffers
267 * ==========================================================================
268 */
269
270/*
271 * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
272 * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
273 * useful to inspect ZFS metadata, but if possible, we should avoid keeping
274 * excess / transient data in-core during a crashdump.
275 */
276void *
277zio_buf_alloc(size_t size)
278{
279	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
280	int flags = zio_exclude_metadata ? KM_NODEBUG : 0;
281
282	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
283
284	if (zio_use_uma)
285		return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
286	else
287		return (kmem_alloc(size, KM_SLEEP|flags));
288}
289
290/*
291 * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
292 * crashdump if the kernel panics.  This exists so that we will limit the amount
293 * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
294 * of kernel heap dumped to disk when the kernel panics)
295 */
296void *
297zio_data_buf_alloc(size_t size)
298{
299	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
300
301	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
302
303	if (zio_use_uma)
304		return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
305	else
306		return (kmem_alloc(size, KM_SLEEP | KM_NODEBUG));
307}
308
309void
310zio_buf_free(void *buf, size_t size)
311{
312	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
313
314	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
315
316	if (zio_use_uma)
317		kmem_cache_free(zio_buf_cache[c], buf);
318	else
319		kmem_free(buf, size);
320}
321
322void
323zio_data_buf_free(void *buf, size_t size)
324{
325	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
326
327	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
328
329	if (zio_use_uma)
330		kmem_cache_free(zio_data_buf_cache[c], buf);
331	else
332		kmem_free(buf, size);
333}
334
335/*
336 * ==========================================================================
337 * Push and pop I/O transform buffers
338 * ==========================================================================
339 */
340void
341zio_push_transform(zio_t *zio, abd_t *data, uint64_t size, uint64_t bufsize,
342    zio_transform_func_t *transform)
343{
344	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
345
346	/*
347	 * Ensure that anyone expecting this zio to contain a linear ABD isn't
348	 * going to get a nasty surprise when they try to access the data.
349	 */
350#ifdef illumos
351	IMPLY(abd_is_linear(zio->io_abd), abd_is_linear(data));
352#else
353	IMPLY(zio->io_abd != NULL && abd_is_linear(zio->io_abd),
354	    abd_is_linear(data));
355#endif
356
357	zt->zt_orig_abd = zio->io_abd;
358	zt->zt_orig_size = zio->io_size;
359	zt->zt_bufsize = bufsize;
360	zt->zt_transform = transform;
361
362	zt->zt_next = zio->io_transform_stack;
363	zio->io_transform_stack = zt;
364
365	zio->io_abd = data;
366	zio->io_size = size;
367}
368
369void
370zio_pop_transforms(zio_t *zio)
371{
372	zio_transform_t *zt;
373
374	while ((zt = zio->io_transform_stack) != NULL) {
375		if (zt->zt_transform != NULL)
376			zt->zt_transform(zio,
377			    zt->zt_orig_abd, zt->zt_orig_size);
378
379		if (zt->zt_bufsize != 0)
380			abd_free(zio->io_abd);
381
382		zio->io_abd = zt->zt_orig_abd;
383		zio->io_size = zt->zt_orig_size;
384		zio->io_transform_stack = zt->zt_next;
385
386		kmem_free(zt, sizeof (zio_transform_t));
387	}
388}
389
390/*
391 * ==========================================================================
392 * I/O transform callbacks for subblocks and decompression
393 * ==========================================================================
394 */
395static void
396zio_subblock(zio_t *zio, abd_t *data, uint64_t size)
397{
398	ASSERT(zio->io_size > size);
399
400	if (zio->io_type == ZIO_TYPE_READ)
401		abd_copy(data, zio->io_abd, size);
402}
403
404static void
405zio_decompress(zio_t *zio, abd_t *data, uint64_t size)
406{
407	if (zio->io_error == 0) {
408		void *tmp = abd_borrow_buf(data, size);
409		int ret = zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
410		    zio->io_abd, tmp, zio->io_size, size);
411		abd_return_buf_copy(data, tmp, size);
412
413		if (ret != 0)
414			zio->io_error = SET_ERROR(EIO);
415	}
416}
417
418/*
419 * ==========================================================================
420 * I/O parent/child relationships and pipeline interlocks
421 * ==========================================================================
422 */
423zio_t *
424zio_walk_parents(zio_t *cio, zio_link_t **zl)
425{
426	list_t *pl = &cio->io_parent_list;
427
428	*zl = (*zl == NULL) ? list_head(pl) : list_next(pl, *zl);
429	if (*zl == NULL)
430		return (NULL);
431
432	ASSERT((*zl)->zl_child == cio);
433	return ((*zl)->zl_parent);
434}
435
436zio_t *
437zio_walk_children(zio_t *pio, zio_link_t **zl)
438{
439	list_t *cl = &pio->io_child_list;
440
441	*zl = (*zl == NULL) ? list_head(cl) : list_next(cl, *zl);
442	if (*zl == NULL)
443		return (NULL);
444
445	ASSERT((*zl)->zl_parent == pio);
446	return ((*zl)->zl_child);
447}
448
449zio_t *
450zio_unique_parent(zio_t *cio)
451{
452	zio_link_t *zl = NULL;
453	zio_t *pio = zio_walk_parents(cio, &zl);
454
455	VERIFY3P(zio_walk_parents(cio, &zl), ==, NULL);
456	return (pio);
457}
458
459void
460zio_add_child(zio_t *pio, zio_t *cio)
461{
462	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
463
464	/*
465	 * Logical I/Os can have logical, gang, or vdev children.
466	 * Gang I/Os can have gang or vdev children.
467	 * Vdev I/Os can only have vdev children.
468	 * The following ASSERT captures all of these constraints.
469	 */
470	ASSERT3S(cio->io_child_type, <=, pio->io_child_type);
471
472	zl->zl_parent = pio;
473	zl->zl_child = cio;
474
475	mutex_enter(&cio->io_lock);
476	mutex_enter(&pio->io_lock);
477
478	ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
479
480	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
481		pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
482
483	list_insert_head(&pio->io_child_list, zl);
484	list_insert_head(&cio->io_parent_list, zl);
485
486	pio->io_child_count++;
487	cio->io_parent_count++;
488
489	mutex_exit(&pio->io_lock);
490	mutex_exit(&cio->io_lock);
491}
492
493static void
494zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
495{
496	ASSERT(zl->zl_parent == pio);
497	ASSERT(zl->zl_child == cio);
498
499	mutex_enter(&cio->io_lock);
500	mutex_enter(&pio->io_lock);
501
502	list_remove(&pio->io_child_list, zl);
503	list_remove(&cio->io_parent_list, zl);
504
505	pio->io_child_count--;
506	cio->io_parent_count--;
507
508	mutex_exit(&pio->io_lock);
509	mutex_exit(&cio->io_lock);
510
511	kmem_cache_free(zio_link_cache, zl);
512}
513
514static boolean_t
515zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
516{
517	uint64_t *countp = &zio->io_children[child][wait];
518	boolean_t waiting = B_FALSE;
519
520	mutex_enter(&zio->io_lock);
521	ASSERT(zio->io_stall == NULL);
522	if (*countp != 0) {
523		zio->io_stage >>= 1;
524		ASSERT3U(zio->io_stage, !=, ZIO_STAGE_OPEN);
525		zio->io_stall = countp;
526		waiting = B_TRUE;
527	}
528	mutex_exit(&zio->io_lock);
529
530	return (waiting);
531}
532
533static void
534zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
535{
536	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
537	int *errorp = &pio->io_child_error[zio->io_child_type];
538
539	mutex_enter(&pio->io_lock);
540	if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
541		*errorp = zio_worst_error(*errorp, zio->io_error);
542	pio->io_reexecute |= zio->io_reexecute;
543	ASSERT3U(*countp, >, 0);
544
545	(*countp)--;
546
547	if (*countp == 0 && pio->io_stall == countp) {
548		zio_taskq_type_t type =
549		    pio->io_stage < ZIO_STAGE_VDEV_IO_START ? ZIO_TASKQ_ISSUE :
550		    ZIO_TASKQ_INTERRUPT;
551		pio->io_stall = NULL;
552		mutex_exit(&pio->io_lock);
553		/*
554		 * Dispatch the parent zio in its own taskq so that
555		 * the child can continue to make progress. This also
556		 * prevents overflowing the stack when we have deeply nested
557		 * parent-child relationships.
558		 */
559		zio_taskq_dispatch(pio, type, B_FALSE);
560	} else {
561		mutex_exit(&pio->io_lock);
562	}
563}
564
565static void
566zio_inherit_child_errors(zio_t *zio, enum zio_child c)
567{
568	if (zio->io_child_error[c] != 0 && zio->io_error == 0)
569		zio->io_error = zio->io_child_error[c];
570}
571
572int
573zio_bookmark_compare(const void *x1, const void *x2)
574{
575	const zio_t *z1 = x1;
576	const zio_t *z2 = x2;
577
578	if (z1->io_bookmark.zb_objset < z2->io_bookmark.zb_objset)
579		return (-1);
580	if (z1->io_bookmark.zb_objset > z2->io_bookmark.zb_objset)
581		return (1);
582
583	if (z1->io_bookmark.zb_object < z2->io_bookmark.zb_object)
584		return (-1);
585	if (z1->io_bookmark.zb_object > z2->io_bookmark.zb_object)
586		return (1);
587
588	if (z1->io_bookmark.zb_level < z2->io_bookmark.zb_level)
589		return (-1);
590	if (z1->io_bookmark.zb_level > z2->io_bookmark.zb_level)
591		return (1);
592
593	if (z1->io_bookmark.zb_blkid < z2->io_bookmark.zb_blkid)
594		return (-1);
595	if (z1->io_bookmark.zb_blkid > z2->io_bookmark.zb_blkid)
596		return (1);
597
598	if (z1 < z2)
599		return (-1);
600	if (z1 > z2)
601		return (1);
602
603	return (0);
604}
605
606/*
607 * ==========================================================================
608 * Create the various types of I/O (read, write, free, etc)
609 * ==========================================================================
610 */
611static zio_t *
612zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
613    abd_t *data, uint64_t lsize, uint64_t psize, zio_done_func_t *done,
614    void *private, zio_type_t type, zio_priority_t priority,
615    enum zio_flag flags, vdev_t *vd, uint64_t offset,
616    const zbookmark_phys_t *zb, enum zio_stage stage, enum zio_stage pipeline)
617{
618	zio_t *zio;
619
620	ASSERT3U(type == ZIO_TYPE_FREE || psize, <=, SPA_MAXBLOCKSIZE);
621	ASSERT(P2PHASE(psize, SPA_MINBLOCKSIZE) == 0);
622	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
623
624	ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
625	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
626	ASSERT(vd || stage == ZIO_STAGE_OPEN);
627
628	IMPLY(lsize != psize, (flags & ZIO_FLAG_RAW) != 0);
629
630	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
631	bzero(zio, sizeof (zio_t));
632
633	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
634	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
635
636	list_create(&zio->io_parent_list, sizeof (zio_link_t),
637	    offsetof(zio_link_t, zl_parent_node));
638	list_create(&zio->io_child_list, sizeof (zio_link_t),
639	    offsetof(zio_link_t, zl_child_node));
640	metaslab_trace_init(&zio->io_alloc_list);
641
642	if (vd != NULL)
643		zio->io_child_type = ZIO_CHILD_VDEV;
644	else if (flags & ZIO_FLAG_GANG_CHILD)
645		zio->io_child_type = ZIO_CHILD_GANG;
646	else if (flags & ZIO_FLAG_DDT_CHILD)
647		zio->io_child_type = ZIO_CHILD_DDT;
648	else
649		zio->io_child_type = ZIO_CHILD_LOGICAL;
650
651	if (bp != NULL) {
652		zio->io_bp = (blkptr_t *)bp;
653		zio->io_bp_copy = *bp;
654		zio->io_bp_orig = *bp;
655		if (type != ZIO_TYPE_WRITE ||
656		    zio->io_child_type == ZIO_CHILD_DDT)
657			zio->io_bp = &zio->io_bp_copy;	/* so caller can free */
658		if (zio->io_child_type == ZIO_CHILD_LOGICAL)
659			zio->io_logical = zio;
660		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
661			pipeline |= ZIO_GANG_STAGES;
662	}
663
664	zio->io_spa = spa;
665	zio->io_txg = txg;
666	zio->io_done = done;
667	zio->io_private = private;
668	zio->io_type = type;
669	zio->io_priority = priority;
670	zio->io_vd = vd;
671	zio->io_offset = offset;
672	zio->io_orig_abd = zio->io_abd = data;
673	zio->io_orig_size = zio->io_size = psize;
674	zio->io_lsize = lsize;
675	zio->io_orig_flags = zio->io_flags = flags;
676	zio->io_orig_stage = zio->io_stage = stage;
677	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
678	zio->io_pipeline_trace = ZIO_STAGE_OPEN;
679
680	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
681	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
682
683	if (zb != NULL)
684		zio->io_bookmark = *zb;
685
686	if (pio != NULL) {
687		if (zio->io_logical == NULL)
688			zio->io_logical = pio->io_logical;
689		if (zio->io_child_type == ZIO_CHILD_GANG)
690			zio->io_gang_leader = pio->io_gang_leader;
691		zio_add_child(pio, zio);
692	}
693
694	return (zio);
695}
696
697static void
698zio_destroy(zio_t *zio)
699{
700	metaslab_trace_fini(&zio->io_alloc_list);
701	list_destroy(&zio->io_parent_list);
702	list_destroy(&zio->io_child_list);
703	mutex_destroy(&zio->io_lock);
704	cv_destroy(&zio->io_cv);
705	kmem_cache_free(zio_cache, zio);
706}
707
708zio_t *
709zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
710    void *private, enum zio_flag flags)
711{
712	zio_t *zio;
713
714	zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
715	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
716	    ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
717
718	return (zio);
719}
720
721zio_t *
722zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
723{
724	return (zio_null(NULL, spa, NULL, done, private, flags));
725}
726
727void
728zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp)
729{
730	if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
731		zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
732		    bp, (longlong_t)BP_GET_TYPE(bp));
733	}
734	if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||
735	    BP_GET_CHECKSUM(bp) <= ZIO_CHECKSUM_ON) {
736		zfs_panic_recover("blkptr at %p has invalid CHECKSUM %llu",
737		    bp, (longlong_t)BP_GET_CHECKSUM(bp));
738	}
739	if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS ||
740	    BP_GET_COMPRESS(bp) <= ZIO_COMPRESS_ON) {
741		zfs_panic_recover("blkptr at %p has invalid COMPRESS %llu",
742		    bp, (longlong_t)BP_GET_COMPRESS(bp));
743	}
744	if (BP_GET_LSIZE(bp) > SPA_MAXBLOCKSIZE) {
745		zfs_panic_recover("blkptr at %p has invalid LSIZE %llu",
746		    bp, (longlong_t)BP_GET_LSIZE(bp));
747	}
748	if (BP_GET_PSIZE(bp) > SPA_MAXBLOCKSIZE) {
749		zfs_panic_recover("blkptr at %p has invalid PSIZE %llu",
750		    bp, (longlong_t)BP_GET_PSIZE(bp));
751	}
752
753	if (BP_IS_EMBEDDED(bp)) {
754		if (BPE_GET_ETYPE(bp) > NUM_BP_EMBEDDED_TYPES) {
755			zfs_panic_recover("blkptr at %p has invalid ETYPE %llu",
756			    bp, (longlong_t)BPE_GET_ETYPE(bp));
757		}
758	}
759
760	/*
761	 * Pool-specific checks.
762	 *
763	 * Note: it would be nice to verify that the blk_birth and
764	 * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
765	 * allows the birth time of log blocks (and dmu_sync()-ed blocks
766	 * that are in the log) to be arbitrarily large.
767	 */
768	for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
769		uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
770		if (vdevid >= spa->spa_root_vdev->vdev_children) {
771			zfs_panic_recover("blkptr at %p DVA %u has invalid "
772			    "VDEV %llu",
773			    bp, i, (longlong_t)vdevid);
774			continue;
775		}
776		vdev_t *vd = spa->spa_root_vdev->vdev_child[vdevid];
777		if (vd == NULL) {
778			zfs_panic_recover("blkptr at %p DVA %u has invalid "
779			    "VDEV %llu",
780			    bp, i, (longlong_t)vdevid);
781			continue;
782		}
783		if (vd->vdev_ops == &vdev_hole_ops) {
784			zfs_panic_recover("blkptr at %p DVA %u has hole "
785			    "VDEV %llu",
786			    bp, i, (longlong_t)vdevid);
787			continue;
788		}
789		if (vd->vdev_ops == &vdev_missing_ops) {
790			/*
791			 * "missing" vdevs are valid during import, but we
792			 * don't have their detailed info (e.g. asize), so
793			 * we can't perform any more checks on them.
794			 */
795			continue;
796		}
797		uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
798		uint64_t asize = DVA_GET_ASIZE(&bp->blk_dva[i]);
799		if (BP_IS_GANG(bp))
800			asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
801		if (offset + asize > vd->vdev_asize) {
802			zfs_panic_recover("blkptr at %p DVA %u has invalid "
803			    "OFFSET %llu",
804			    bp, i, (longlong_t)offset);
805		}
806	}
807}
808
809zio_t *
810zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
811    abd_t *data, uint64_t size, zio_done_func_t *done, void *private,
812    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
813{
814	zio_t *zio;
815
816	zfs_blkptr_verify(spa, bp);
817
818	zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
819	    data, size, size, done, private,
820	    ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
821	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
822	    ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
823
824	return (zio);
825}
826
827zio_t *
828zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
829    abd_t *data, uint64_t lsize, uint64_t psize, const zio_prop_t *zp,
830    zio_done_func_t *ready, zio_done_func_t *children_ready,
831    zio_done_func_t *physdone, zio_done_func_t *done,
832    void *private, zio_priority_t priority, enum zio_flag flags,
833    const zbookmark_phys_t *zb)
834{
835	zio_t *zio;
836
837	ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
838	    zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
839	    zp->zp_compress >= ZIO_COMPRESS_OFF &&
840	    zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
841	    DMU_OT_IS_VALID(zp->zp_type) &&
842	    zp->zp_level < 32 &&
843	    zp->zp_copies > 0 &&
844	    zp->zp_copies <= spa_max_replication(spa));
845
846	zio = zio_create(pio, spa, txg, bp, data, lsize, psize, done, private,
847	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
848	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
849	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
850
851	zio->io_ready = ready;
852	zio->io_children_ready = children_ready;
853	zio->io_physdone = physdone;
854	zio->io_prop = *zp;
855
856	/*
857	 * Data can be NULL if we are going to call zio_write_override() to
858	 * provide the already-allocated BP.  But we may need the data to
859	 * verify a dedup hit (if requested).  In this case, don't try to
860	 * dedup (just take the already-allocated BP verbatim).
861	 */
862	if (data == NULL && zio->io_prop.zp_dedup_verify) {
863		zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
864	}
865
866	return (zio);
867}
868
869zio_t *
870zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, abd_t *data,
871    uint64_t size, zio_done_func_t *done, void *private,
872    zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
873{
874	zio_t *zio;
875
876	zio = zio_create(pio, spa, txg, bp, data, size, size, done, private,
877	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_IO_REWRITE, NULL, 0, zb,
878	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
879
880	return (zio);
881}
882
883void
884zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
885{
886	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
887	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
888	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
889	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
890
891	/*
892	 * We must reset the io_prop to match the values that existed
893	 * when the bp was first written by dmu_sync() keeping in mind
894	 * that nopwrite and dedup are mutually exclusive.
895	 */
896	zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
897	zio->io_prop.zp_nopwrite = nopwrite;
898	zio->io_prop.zp_copies = copies;
899	zio->io_bp_override = bp;
900}
901
902void
903zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
904{
905
906	/*
907	 * The check for EMBEDDED is a performance optimization.  We
908	 * process the free here (by ignoring it) rather than
909	 * putting it on the list and then processing it in zio_free_sync().
910	 */
911	if (BP_IS_EMBEDDED(bp))
912		return;
913	metaslab_check_free(spa, bp);
914
915	/*
916	 * Frees that are for the currently-syncing txg, are not going to be
917	 * deferred, and which will not need to do a read (i.e. not GANG or
918	 * DEDUP), can be processed immediately.  Otherwise, put them on the
919	 * in-memory list for later processing.
920	 */
921	if (zfs_trim_enabled || BP_IS_GANG(bp) || BP_GET_DEDUP(bp) ||
922	    txg != spa->spa_syncing_txg ||
923	    spa_sync_pass(spa) >= zfs_sync_pass_deferred_free) {
924		bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
925	} else {
926		VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp,
927		    BP_GET_PSIZE(bp), 0)));
928	}
929}
930
931zio_t *
932zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
933    uint64_t size, enum zio_flag flags)
934{
935	zio_t *zio;
936	enum zio_stage stage = ZIO_FREE_PIPELINE;
937
938	ASSERT(!BP_IS_HOLE(bp));
939	ASSERT(spa_syncing_txg(spa) == txg);
940	ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
941
942	if (BP_IS_EMBEDDED(bp))
943		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
944
945	metaslab_check_free(spa, bp);
946	arc_freed(spa, bp);
947
948	if (zfs_trim_enabled)
949		stage |= ZIO_STAGE_ISSUE_ASYNC | ZIO_STAGE_VDEV_IO_START |
950		    ZIO_STAGE_VDEV_IO_ASSESS;
951	/*
952	 * GANG and DEDUP blocks can induce a read (for the gang block header,
953	 * or the DDT), so issue them asynchronously so that this thread is
954	 * not tied up.
955	 */
956	else if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
957		stage |= ZIO_STAGE_ISSUE_ASYNC;
958
959	flags |= ZIO_FLAG_DONT_QUEUE;
960
961	zio = zio_create(pio, spa, txg, bp, NULL, size,
962	    size, NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW,
963	    flags, NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
964
965	return (zio);
966}
967
968zio_t *
969zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
970    zio_done_func_t *done, void *private, enum zio_flag flags)
971{
972	zio_t *zio;
973
974	dprintf_bp(bp, "claiming in txg %llu", txg);
975
976	if (BP_IS_EMBEDDED(bp))
977		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
978
979	/*
980	 * A claim is an allocation of a specific block.  Claims are needed
981	 * to support immediate writes in the intent log.  The issue is that
982	 * immediate writes contain committed data, but in a txg that was
983	 * *not* committed.  Upon opening the pool after an unclean shutdown,
984	 * the intent log claims all blocks that contain immediate write data
985	 * so that the SPA knows they're in use.
986	 *
987	 * All claims *must* be resolved in the first txg -- before the SPA
988	 * starts allocating blocks -- so that nothing is allocated twice.
989	 * If txg == 0 we just verify that the block is claimable.
990	 */
991	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
992	ASSERT(txg == spa_first_txg(spa) || txg == 0);
993	ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));	/* zdb(1M) */
994
995	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
996	    BP_GET_PSIZE(bp), done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW,
997	    flags, NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
998	ASSERT0(zio->io_queued_timestamp);
999
1000	return (zio);
1001}
1002
1003zio_t *
1004zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd, uint64_t offset,
1005    uint64_t size, zio_done_func_t *done, void *private,
1006    zio_priority_t priority, enum zio_flag flags)
1007{
1008	zio_t *zio;
1009	int c;
1010
1011	if (vd->vdev_children == 0) {
1012		zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
1013		    ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
1014		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
1015
1016		zio->io_cmd = cmd;
1017	} else {
1018		zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
1019
1020		for (c = 0; c < vd->vdev_children; c++)
1021			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
1022			    offset, size, done, private, priority, flags));
1023	}
1024
1025	return (zio);
1026}
1027
1028zio_t *
1029zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
1030    abd_t *data, int checksum, zio_done_func_t *done, void *private,
1031    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
1032{
1033	zio_t *zio;
1034
1035	ASSERT(vd->vdev_children == 0);
1036	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
1037	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
1038	ASSERT3U(offset + size, <=, vd->vdev_psize);
1039
1040	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, size, done,
1041	    private, ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd,
1042	    offset, NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
1043
1044	zio->io_prop.zp_checksum = checksum;
1045
1046	return (zio);
1047}
1048
1049zio_t *
1050zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
1051    abd_t *data, int checksum, zio_done_func_t *done, void *private,
1052    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
1053{
1054	zio_t *zio;
1055
1056	ASSERT(vd->vdev_children == 0);
1057	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
1058	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
1059	ASSERT3U(offset + size, <=, vd->vdev_psize);
1060
1061	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, size, done,
1062	    private, ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd,
1063	    offset, NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
1064
1065	zio->io_prop.zp_checksum = checksum;
1066
1067	if (zio_checksum_table[checksum].ci_flags & ZCHECKSUM_FLAG_EMBEDDED) {
1068		/*
1069		 * zec checksums are necessarily destructive -- they modify
1070		 * the end of the write buffer to hold the verifier/checksum.
1071		 * Therefore, we must make a local copy in case the data is
1072		 * being written to multiple places in parallel.
1073		 */
1074		abd_t *wbuf = abd_alloc_sametype(data, size);
1075		abd_copy(wbuf, data, size);
1076
1077		zio_push_transform(zio, wbuf, size, size, NULL);
1078	}
1079
1080	return (zio);
1081}
1082
1083/*
1084 * Create a child I/O to do some work for us.
1085 */
1086zio_t *
1087zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
1088    abd_t *data, uint64_t size, int type, zio_priority_t priority,
1089    enum zio_flag flags, zio_done_func_t *done, void *private)
1090{
1091	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
1092	zio_t *zio;
1093
1094	ASSERT(vd->vdev_parent ==
1095	    (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
1096
1097	if (type == ZIO_TYPE_READ && bp != NULL) {
1098		/*
1099		 * If we have the bp, then the child should perform the
1100		 * checksum and the parent need not.  This pushes error
1101		 * detection as close to the leaves as possible and
1102		 * eliminates redundant checksums in the interior nodes.
1103		 */
1104		pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
1105		pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
1106	}
1107
1108	/* Not all IO types require vdev io done stage e.g. free */
1109	if (!(pio->io_pipeline & ZIO_STAGE_VDEV_IO_DONE))
1110		pipeline &= ~ZIO_STAGE_VDEV_IO_DONE;
1111
1112	if (vd->vdev_children == 0)
1113		offset += VDEV_LABEL_START_SIZE;
1114
1115	flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
1116
1117	/*
1118	 * If we've decided to do a repair, the write is not speculative --
1119	 * even if the original read was.
1120	 */
1121	if (flags & ZIO_FLAG_IO_REPAIR)
1122		flags &= ~ZIO_FLAG_SPECULATIVE;
1123
1124	/*
1125	 * If we're creating a child I/O that is not associated with a
1126	 * top-level vdev, then the child zio is not an allocating I/O.
1127	 * If this is a retried I/O then we ignore it since we will
1128	 * have already processed the original allocating I/O.
1129	 */
1130	if (flags & ZIO_FLAG_IO_ALLOCATING &&
1131	    (vd != vd->vdev_top || (flags & ZIO_FLAG_IO_RETRY))) {
1132		metaslab_class_t *mc = spa_normal_class(pio->io_spa);
1133
1134		ASSERT(mc->mc_alloc_throttle_enabled);
1135		ASSERT(type == ZIO_TYPE_WRITE);
1136		ASSERT(priority == ZIO_PRIORITY_ASYNC_WRITE);
1137		ASSERT(!(flags & ZIO_FLAG_IO_REPAIR));
1138		ASSERT(!(pio->io_flags & ZIO_FLAG_IO_REWRITE) ||
1139		    pio->io_child_type == ZIO_CHILD_GANG);
1140
1141		flags &= ~ZIO_FLAG_IO_ALLOCATING;
1142	}
1143
1144	zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size, size,
1145	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
1146	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
1147	ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
1148
1149	zio->io_physdone = pio->io_physdone;
1150	if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
1151		zio->io_logical->io_phys_children++;
1152
1153	return (zio);
1154}
1155
1156zio_t *
1157zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, abd_t *data, uint64_t size,
1158    int type, zio_priority_t priority, enum zio_flag flags,
1159    zio_done_func_t *done, void *private)
1160{
1161	zio_t *zio;
1162
1163	ASSERT(vd->vdev_ops->vdev_op_leaf);
1164
1165	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
1166	    data, size, size, done, private, type, priority,
1167	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
1168	    vd, offset, NULL,
1169	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
1170
1171	return (zio);
1172}
1173
1174void
1175zio_flush(zio_t *zio, vdev_t *vd)
1176{
1177	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE, 0, 0,
1178	    NULL, NULL, ZIO_PRIORITY_NOW,
1179	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
1180}
1181
1182zio_t *
1183zio_trim(zio_t *zio, spa_t *spa, vdev_t *vd, uint64_t offset, uint64_t size)
1184{
1185
1186	ASSERT(vd->vdev_ops->vdev_op_leaf);
1187
1188	return (zio_create(zio, spa, 0, NULL, NULL, size, size, NULL, NULL,
1189	    ZIO_TYPE_FREE, ZIO_PRIORITY_TRIM, ZIO_FLAG_DONT_AGGREGATE |
1190	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY,
1191	    vd, offset, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PHYS_PIPELINE));
1192}
1193
1194void
1195zio_shrink(zio_t *zio, uint64_t size)
1196{
1197	ASSERT3P(zio->io_executor, ==, NULL);
1198	ASSERT3P(zio->io_orig_size, ==, zio->io_size);
1199	ASSERT3U(size, <=, zio->io_size);
1200
1201	/*
1202	 * We don't shrink for raidz because of problems with the
1203	 * reconstruction when reading back less than the block size.
1204	 * Note, BP_IS_RAIDZ() assumes no compression.
1205	 */
1206	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1207	if (!BP_IS_RAIDZ(zio->io_bp)) {
1208		/* we are not doing a raw write */
1209		ASSERT3U(zio->io_size, ==, zio->io_lsize);
1210		zio->io_orig_size = zio->io_size = zio->io_lsize = size;
1211	}
1212}
1213
1214/*
1215 * ==========================================================================
1216 * Prepare to read and write logical blocks
1217 * ==========================================================================
1218 */
1219
1220static int
1221zio_read_bp_init(zio_t *zio)
1222{
1223	blkptr_t *bp = zio->io_bp;
1224
1225	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
1226	    zio->io_child_type == ZIO_CHILD_LOGICAL &&
1227	    !(zio->io_flags & ZIO_FLAG_RAW)) {
1228		uint64_t psize =
1229		    BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
1230		zio_push_transform(zio, abd_alloc_sametype(zio->io_abd, psize),
1231		    psize, psize, zio_decompress);
1232	}
1233
1234	if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
1235		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1236
1237		int psize = BPE_GET_PSIZE(bp);
1238		void *data = abd_borrow_buf(zio->io_abd, psize);
1239		decode_embedded_bp_compressed(bp, data);
1240		abd_return_buf_copy(zio->io_abd, data, psize);
1241	} else {
1242		ASSERT(!BP_IS_EMBEDDED(bp));
1243	}
1244
1245	if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
1246		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1247
1248	if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
1249		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1250
1251	if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
1252		zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
1253
1254	return (ZIO_PIPELINE_CONTINUE);
1255}
1256
1257static int
1258zio_write_bp_init(zio_t *zio)
1259{
1260	if (!IO_IS_ALLOCATING(zio))
1261		return (ZIO_PIPELINE_CONTINUE);
1262
1263	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1264
1265	if (zio->io_bp_override) {
1266		blkptr_t *bp = zio->io_bp;
1267		zio_prop_t *zp = &zio->io_prop;
1268
1269		ASSERT(bp->blk_birth != zio->io_txg);
1270		ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
1271
1272		*bp = *zio->io_bp_override;
1273		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1274
1275		if (BP_IS_EMBEDDED(bp))
1276			return (ZIO_PIPELINE_CONTINUE);
1277
1278		/*
1279		 * If we've been overridden and nopwrite is set then
1280		 * set the flag accordingly to indicate that a nopwrite
1281		 * has already occurred.
1282		 */
1283		if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
1284			ASSERT(!zp->zp_dedup);
1285			ASSERT3U(BP_GET_CHECKSUM(bp), ==, zp->zp_checksum);
1286			zio->io_flags |= ZIO_FLAG_NOPWRITE;
1287			return (ZIO_PIPELINE_CONTINUE);
1288		}
1289
1290		ASSERT(!zp->zp_nopwrite);
1291
1292		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
1293			return (ZIO_PIPELINE_CONTINUE);
1294
1295		ASSERT((zio_checksum_table[zp->zp_checksum].ci_flags &
1296		    ZCHECKSUM_FLAG_DEDUP) || zp->zp_dedup_verify);
1297
1298		if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
1299			BP_SET_DEDUP(bp, 1);
1300			zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
1301			return (ZIO_PIPELINE_CONTINUE);
1302		}
1303
1304		/*
1305		 * We were unable to handle this as an override bp, treat
1306		 * it as a regular write I/O.
1307		 */
1308		zio->io_bp_override = NULL;
1309		*bp = zio->io_bp_orig;
1310		zio->io_pipeline = zio->io_orig_pipeline;
1311	}
1312
1313	return (ZIO_PIPELINE_CONTINUE);
1314}
1315
1316static int
1317zio_write_compress(zio_t *zio)
1318{
1319	spa_t *spa = zio->io_spa;
1320	zio_prop_t *zp = &zio->io_prop;
1321	enum zio_compress compress = zp->zp_compress;
1322	blkptr_t *bp = zio->io_bp;
1323	uint64_t lsize = zio->io_lsize;
1324	uint64_t psize = zio->io_size;
1325	int pass = 1;
1326
1327	EQUIV(lsize != psize, (zio->io_flags & ZIO_FLAG_RAW) != 0);
1328
1329	/*
1330	 * If our children haven't all reached the ready stage,
1331	 * wait for them and then repeat this pipeline stage.
1332	 */
1333	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
1334	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
1335		return (ZIO_PIPELINE_STOP);
1336
1337	if (!IO_IS_ALLOCATING(zio))
1338		return (ZIO_PIPELINE_CONTINUE);
1339
1340	if (zio->io_children_ready != NULL) {
1341		/*
1342		 * Now that all our children are ready, run the callback
1343		 * associated with this zio in case it wants to modify the
1344		 * data to be written.
1345		 */
1346		ASSERT3U(zp->zp_level, >, 0);
1347		zio->io_children_ready(zio);
1348	}
1349
1350	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1351	ASSERT(zio->io_bp_override == NULL);
1352
1353	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
1354		/*
1355		 * We're rewriting an existing block, which means we're
1356		 * working on behalf of spa_sync().  For spa_sync() to
1357		 * converge, it must eventually be the case that we don't
1358		 * have to allocate new blocks.  But compression changes
1359		 * the blocksize, which forces a reallocate, and makes
1360		 * convergence take longer.  Therefore, after the first
1361		 * few passes, stop compressing to ensure convergence.
1362		 */
1363		pass = spa_sync_pass(spa);
1364
1365		ASSERT(zio->io_txg == spa_syncing_txg(spa));
1366		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1367		ASSERT(!BP_GET_DEDUP(bp));
1368
1369		if (pass >= zfs_sync_pass_dont_compress)
1370			compress = ZIO_COMPRESS_OFF;
1371
1372		/* Make sure someone doesn't change their mind on overwrites */
1373		ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
1374		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
1375	}
1376
1377	/* If it's a compressed write that is not raw, compress the buffer. */
1378	if (compress != ZIO_COMPRESS_OFF && psize == lsize) {
1379		void *cbuf = zio_buf_alloc(lsize);
1380		psize = zio_compress_data(compress, zio->io_abd, cbuf, lsize);
1381		if (psize == 0 || psize == lsize) {
1382			compress = ZIO_COMPRESS_OFF;
1383			zio_buf_free(cbuf, lsize);
1384		} else if (!zp->zp_dedup && psize <= BPE_PAYLOAD_SIZE &&
1385		    zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
1386		    spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
1387			encode_embedded_bp_compressed(bp,
1388			    cbuf, compress, lsize, psize);
1389			BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
1390			BP_SET_TYPE(bp, zio->io_prop.zp_type);
1391			BP_SET_LEVEL(bp, zio->io_prop.zp_level);
1392			zio_buf_free(cbuf, lsize);
1393			bp->blk_birth = zio->io_txg;
1394			zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1395			ASSERT(spa_feature_is_active(spa,
1396			    SPA_FEATURE_EMBEDDED_DATA));
1397			return (ZIO_PIPELINE_CONTINUE);
1398		} else {
1399			/*
1400			 * Round up compressed size up to the ashift
1401			 * of the smallest-ashift device, and zero the tail.
1402			 * This ensures that the compressed size of the BP
1403			 * (and thus compressratio property) are correct,
1404			 * in that we charge for the padding used to fill out
1405			 * the last sector.
1406			 */
1407			ASSERT3U(spa->spa_min_ashift, >=, SPA_MINBLOCKSHIFT);
1408			size_t rounded = (size_t)P2ROUNDUP(psize,
1409			    1ULL << spa->spa_min_ashift);
1410			if (rounded >= lsize) {
1411				compress = ZIO_COMPRESS_OFF;
1412				zio_buf_free(cbuf, lsize);
1413				psize = lsize;
1414			} else {
1415				abd_t *cdata = abd_get_from_buf(cbuf, lsize);
1416				abd_take_ownership_of_buf(cdata, B_TRUE);
1417				abd_zero_off(cdata, psize, rounded - psize);
1418				psize = rounded;
1419				zio_push_transform(zio, cdata,
1420				    psize, lsize, NULL);
1421			}
1422		}
1423
1424		/*
1425		 * We were unable to handle this as an override bp, treat
1426		 * it as a regular write I/O.
1427		 */
1428		zio->io_bp_override = NULL;
1429		*bp = zio->io_bp_orig;
1430		zio->io_pipeline = zio->io_orig_pipeline;
1431	} else {
1432		ASSERT3U(psize, !=, 0);
1433	}
1434
1435	/*
1436	 * The final pass of spa_sync() must be all rewrites, but the first
1437	 * few passes offer a trade-off: allocating blocks defers convergence,
1438	 * but newly allocated blocks are sequential, so they can be written
1439	 * to disk faster.  Therefore, we allow the first few passes of
1440	 * spa_sync() to allocate new blocks, but force rewrites after that.
1441	 * There should only be a handful of blocks after pass 1 in any case.
1442	 */
1443	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
1444	    BP_GET_PSIZE(bp) == psize &&
1445	    pass >= zfs_sync_pass_rewrite) {
1446		ASSERT(psize != 0);
1447		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
1448		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
1449		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
1450	} else {
1451		BP_ZERO(bp);
1452		zio->io_pipeline = ZIO_WRITE_PIPELINE;
1453	}
1454
1455	if (psize == 0) {
1456		if (zio->io_bp_orig.blk_birth != 0 &&
1457		    spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
1458			BP_SET_LSIZE(bp, lsize);
1459			BP_SET_TYPE(bp, zp->zp_type);
1460			BP_SET_LEVEL(bp, zp->zp_level);
1461			BP_SET_BIRTH(bp, zio->io_txg, 0);
1462		}
1463		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1464	} else {
1465		ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
1466		BP_SET_LSIZE(bp, lsize);
1467		BP_SET_TYPE(bp, zp->zp_type);
1468		BP_SET_LEVEL(bp, zp->zp_level);
1469		BP_SET_PSIZE(bp, psize);
1470		BP_SET_COMPRESS(bp, compress);
1471		BP_SET_CHECKSUM(bp, zp->zp_checksum);
1472		BP_SET_DEDUP(bp, zp->zp_dedup);
1473		BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1474		if (zp->zp_dedup) {
1475			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1476			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1477			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
1478		}
1479		if (zp->zp_nopwrite) {
1480			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1481			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1482			zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
1483		}
1484	}
1485	return (ZIO_PIPELINE_CONTINUE);
1486}
1487
1488static int
1489zio_free_bp_init(zio_t *zio)
1490{
1491	blkptr_t *bp = zio->io_bp;
1492
1493	if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
1494		if (BP_GET_DEDUP(bp))
1495			zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
1496	}
1497
1498	return (ZIO_PIPELINE_CONTINUE);
1499}
1500
1501/*
1502 * ==========================================================================
1503 * Execute the I/O pipeline
1504 * ==========================================================================
1505 */
1506
1507static void
1508zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
1509{
1510	spa_t *spa = zio->io_spa;
1511	zio_type_t t = zio->io_type;
1512	int flags = (cutinline ? TQ_FRONT : 0);
1513
1514	ASSERT(q == ZIO_TASKQ_ISSUE || q == ZIO_TASKQ_INTERRUPT);
1515
1516	/*
1517	 * If we're a config writer or a probe, the normal issue and
1518	 * interrupt threads may all be blocked waiting for the config lock.
1519	 * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
1520	 */
1521	if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
1522		t = ZIO_TYPE_NULL;
1523
1524	/*
1525	 * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
1526	 */
1527	if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
1528		t = ZIO_TYPE_NULL;
1529
1530	/*
1531	 * If this is a high priority I/O, then use the high priority taskq if
1532	 * available.
1533	 */
1534	if (zio->io_priority == ZIO_PRIORITY_NOW &&
1535	    spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
1536		q++;
1537
1538	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
1539
1540	/*
1541	 * NB: We are assuming that the zio can only be dispatched
1542	 * to a single taskq at a time.  It would be a grievous error
1543	 * to dispatch the zio to another taskq at the same time.
1544	 */
1545#if defined(illumos) || !defined(_KERNEL)
1546	ASSERT(zio->io_tqent.tqent_next == NULL);
1547#else
1548	ASSERT(zio->io_tqent.tqent_task.ta_pending == 0);
1549#endif
1550	spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
1551	    flags, &zio->io_tqent);
1552}
1553
1554static boolean_t
1555zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
1556{
1557	kthread_t *executor = zio->io_executor;
1558	spa_t *spa = zio->io_spa;
1559
1560	for (zio_type_t t = 0; t < ZIO_TYPES; t++) {
1561		spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
1562		uint_t i;
1563		for (i = 0; i < tqs->stqs_count; i++) {
1564			if (taskq_member(tqs->stqs_taskq[i], executor))
1565				return (B_TRUE);
1566		}
1567	}
1568
1569	return (B_FALSE);
1570}
1571
1572static int
1573zio_issue_async(zio_t *zio)
1574{
1575	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1576
1577	return (ZIO_PIPELINE_STOP);
1578}
1579
1580void
1581zio_interrupt(zio_t *zio)
1582{
1583	zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
1584}
1585
1586void
1587zio_delay_interrupt(zio_t *zio)
1588{
1589	/*
1590	 * The timeout_generic() function isn't defined in userspace, so
1591	 * rather than trying to implement the function, the zio delay
1592	 * functionality has been disabled for userspace builds.
1593	 */
1594
1595#ifdef _KERNEL
1596	/*
1597	 * If io_target_timestamp is zero, then no delay has been registered
1598	 * for this IO, thus jump to the end of this function and "skip" the
1599	 * delay; issuing it directly to the zio layer.
1600	 */
1601	if (zio->io_target_timestamp != 0) {
1602		hrtime_t now = gethrtime();
1603
1604		if (now >= zio->io_target_timestamp) {
1605			/*
1606			 * This IO has already taken longer than the target
1607			 * delay to complete, so we don't want to delay it
1608			 * any longer; we "miss" the delay and issue it
1609			 * directly to the zio layer. This is likely due to
1610			 * the target latency being set to a value less than
1611			 * the underlying hardware can satisfy (e.g. delay
1612			 * set to 1ms, but the disks take 10ms to complete an
1613			 * IO request).
1614			 */
1615
1616			DTRACE_PROBE2(zio__delay__miss, zio_t *, zio,
1617			    hrtime_t, now);
1618
1619			zio_interrupt(zio);
1620		} else {
1621			hrtime_t diff = zio->io_target_timestamp - now;
1622
1623			DTRACE_PROBE3(zio__delay__hit, zio_t *, zio,
1624			    hrtime_t, now, hrtime_t, diff);
1625
1626			(void) timeout_generic(CALLOUT_NORMAL,
1627			    (void (*)(void *))zio_interrupt, zio, diff, 1, 0);
1628		}
1629
1630		return;
1631	}
1632#endif
1633
1634	DTRACE_PROBE1(zio__delay__skip, zio_t *, zio);
1635	zio_interrupt(zio);
1636}
1637
1638/*
1639 * Execute the I/O pipeline until one of the following occurs:
1640 *
1641 *	(1) the I/O completes
1642 *	(2) the pipeline stalls waiting for dependent child I/Os
1643 *	(3) the I/O issues, so we're waiting for an I/O completion interrupt
1644 *	(4) the I/O is delegated by vdev-level caching or aggregation
1645 *	(5) the I/O is deferred due to vdev-level queueing
1646 *	(6) the I/O is handed off to another thread.
1647 *
1648 * In all cases, the pipeline stops whenever there's no CPU work; it never
1649 * burns a thread in cv_wait().
1650 *
1651 * There's no locking on io_stage because there's no legitimate way
1652 * for multiple threads to be attempting to process the same I/O.
1653 */
1654static zio_pipe_stage_t *zio_pipeline[];
1655
1656void
1657zio_execute(zio_t *zio)
1658{
1659	zio->io_executor = curthread;
1660
1661	ASSERT3U(zio->io_queued_timestamp, >, 0);
1662
1663	while (zio->io_stage < ZIO_STAGE_DONE) {
1664		enum zio_stage pipeline = zio->io_pipeline;
1665		enum zio_stage stage = zio->io_stage;
1666		int rv;
1667
1668		ASSERT(!MUTEX_HELD(&zio->io_lock));
1669		ASSERT(ISP2(stage));
1670		ASSERT(zio->io_stall == NULL);
1671
1672		do {
1673			stage <<= 1;
1674		} while ((stage & pipeline) == 0);
1675
1676		ASSERT(stage <= ZIO_STAGE_DONE);
1677
1678		/*
1679		 * If we are in interrupt context and this pipeline stage
1680		 * will grab a config lock that is held across I/O,
1681		 * or may wait for an I/O that needs an interrupt thread
1682		 * to complete, issue async to avoid deadlock.
1683		 *
1684		 * For VDEV_IO_START, we cut in line so that the io will
1685		 * be sent to disk promptly.
1686		 */
1687		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
1688		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
1689			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1690			    zio_requeue_io_start_cut_in_line : B_FALSE;
1691			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1692			return;
1693		}
1694
1695		zio->io_stage = stage;
1696		zio->io_pipeline_trace |= zio->io_stage;
1697		rv = zio_pipeline[highbit64(stage) - 1](zio);
1698
1699		if (rv == ZIO_PIPELINE_STOP)
1700			return;
1701
1702		ASSERT(rv == ZIO_PIPELINE_CONTINUE);
1703	}
1704}
1705
1706/*
1707 * ==========================================================================
1708 * Initiate I/O, either sync or async
1709 * ==========================================================================
1710 */
1711int
1712zio_wait(zio_t *zio)
1713{
1714	int error;
1715
1716	ASSERT3P(zio->io_stage, ==, ZIO_STAGE_OPEN);
1717	ASSERT3P(zio->io_executor, ==, NULL);
1718
1719	zio->io_waiter = curthread;
1720	ASSERT0(zio->io_queued_timestamp);
1721	zio->io_queued_timestamp = gethrtime();
1722
1723	zio_execute(zio);
1724
1725	mutex_enter(&zio->io_lock);
1726	while (zio->io_executor != NULL)
1727		cv_wait(&zio->io_cv, &zio->io_lock);
1728	mutex_exit(&zio->io_lock);
1729
1730	error = zio->io_error;
1731	zio_destroy(zio);
1732
1733	return (error);
1734}
1735
1736void
1737zio_nowait(zio_t *zio)
1738{
1739	ASSERT3P(zio->io_executor, ==, NULL);
1740
1741	if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
1742	    zio_unique_parent(zio) == NULL) {
1743		/*
1744		 * This is a logical async I/O with no parent to wait for it.
1745		 * We add it to the spa_async_root_zio "Godfather" I/O which
1746		 * will ensure they complete prior to unloading the pool.
1747		 */
1748		spa_t *spa = zio->io_spa;
1749
1750		zio_add_child(spa->spa_async_zio_root[CPU_SEQID], zio);
1751	}
1752
1753	ASSERT0(zio->io_queued_timestamp);
1754	zio->io_queued_timestamp = gethrtime();
1755	zio_execute(zio);
1756}
1757
1758/*
1759 * ==========================================================================
1760 * Reexecute, cancel, or suspend/resume failed I/O
1761 * ==========================================================================
1762 */
1763
1764static void
1765zio_reexecute(zio_t *pio)
1766{
1767	zio_t *cio, *cio_next;
1768
1769	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
1770	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
1771	ASSERT(pio->io_gang_leader == NULL);
1772	ASSERT(pio->io_gang_tree == NULL);
1773
1774	pio->io_flags = pio->io_orig_flags;
1775	pio->io_stage = pio->io_orig_stage;
1776	pio->io_pipeline = pio->io_orig_pipeline;
1777	pio->io_reexecute = 0;
1778	pio->io_flags |= ZIO_FLAG_REEXECUTED;
1779	pio->io_pipeline_trace = 0;
1780	pio->io_error = 0;
1781	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1782		pio->io_state[w] = 0;
1783	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
1784		pio->io_child_error[c] = 0;
1785
1786	if (IO_IS_ALLOCATING(pio))
1787		BP_ZERO(pio->io_bp);
1788
1789	/*
1790	 * As we reexecute pio's children, new children could be created.
1791	 * New children go to the head of pio's io_child_list, however,
1792	 * so we will (correctly) not reexecute them.  The key is that
1793	 * the remainder of pio's io_child_list, from 'cio_next' onward,
1794	 * cannot be affected by any side effects of reexecuting 'cio'.
1795	 */
1796	zio_link_t *zl = NULL;
1797	for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
1798		cio_next = zio_walk_children(pio, &zl);
1799		mutex_enter(&pio->io_lock);
1800		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1801			pio->io_children[cio->io_child_type][w]++;
1802		mutex_exit(&pio->io_lock);
1803		zio_reexecute(cio);
1804	}
1805
1806	/*
1807	 * Now that all children have been reexecuted, execute the parent.
1808	 * We don't reexecute "The Godfather" I/O here as it's the
1809	 * responsibility of the caller to wait on him.
1810	 */
1811	if (!(pio->io_flags & ZIO_FLAG_GODFATHER)) {
1812		pio->io_queued_timestamp = gethrtime();
1813		zio_execute(pio);
1814	}
1815}
1816
1817void
1818zio_suspend(spa_t *spa, zio_t *zio)
1819{
1820	if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
1821		fm_panic("Pool '%s' has encountered an uncorrectable I/O "
1822		    "failure and the failure mode property for this pool "
1823		    "is set to panic.", spa_name(spa));
1824
1825	zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
1826
1827	mutex_enter(&spa->spa_suspend_lock);
1828
1829	if (spa->spa_suspend_zio_root == NULL)
1830		spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
1831		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
1832		    ZIO_FLAG_GODFATHER);
1833
1834	spa->spa_suspended = B_TRUE;
1835
1836	if (zio != NULL) {
1837		ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
1838		ASSERT(zio != spa->spa_suspend_zio_root);
1839		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1840		ASSERT(zio_unique_parent(zio) == NULL);
1841		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
1842		zio_add_child(spa->spa_suspend_zio_root, zio);
1843	}
1844
1845	mutex_exit(&spa->spa_suspend_lock);
1846}
1847
1848int
1849zio_resume(spa_t *spa)
1850{
1851	zio_t *pio;
1852
1853	/*
1854	 * Reexecute all previously suspended i/o.
1855	 */
1856	mutex_enter(&spa->spa_suspend_lock);
1857	spa->spa_suspended = B_FALSE;
1858	cv_broadcast(&spa->spa_suspend_cv);
1859	pio = spa->spa_suspend_zio_root;
1860	spa->spa_suspend_zio_root = NULL;
1861	mutex_exit(&spa->spa_suspend_lock);
1862
1863	if (pio == NULL)
1864		return (0);
1865
1866	zio_reexecute(pio);
1867	return (zio_wait(pio));
1868}
1869
1870void
1871zio_resume_wait(spa_t *spa)
1872{
1873	mutex_enter(&spa->spa_suspend_lock);
1874	while (spa_suspended(spa))
1875		cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
1876	mutex_exit(&spa->spa_suspend_lock);
1877}
1878
1879/*
1880 * ==========================================================================
1881 * Gang blocks.
1882 *
1883 * A gang block is a collection of small blocks that looks to the DMU
1884 * like one large block.  When zio_dva_allocate() cannot find a block
1885 * of the requested size, due to either severe fragmentation or the pool
1886 * being nearly full, it calls zio_write_gang_block() to construct the
1887 * block from smaller fragments.
1888 *
1889 * A gang block consists of a gang header (zio_gbh_phys_t) and up to
1890 * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
1891 * an indirect block: it's an array of block pointers.  It consumes
1892 * only one sector and hence is allocatable regardless of fragmentation.
1893 * The gang header's bps point to its gang members, which hold the data.
1894 *
1895 * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
1896 * as the verifier to ensure uniqueness of the SHA256 checksum.
1897 * Critically, the gang block bp's blk_cksum is the checksum of the data,
1898 * not the gang header.  This ensures that data block signatures (needed for
1899 * deduplication) are independent of how the block is physically stored.
1900 *
1901 * Gang blocks can be nested: a gang member may itself be a gang block.
1902 * Thus every gang block is a tree in which root and all interior nodes are
1903 * gang headers, and the leaves are normal blocks that contain user data.
1904 * The root of the gang tree is called the gang leader.
1905 *
1906 * To perform any operation (read, rewrite, free, claim) on a gang block,
1907 * zio_gang_assemble() first assembles the gang tree (minus data leaves)
1908 * in the io_gang_tree field of the original logical i/o by recursively
1909 * reading the gang leader and all gang headers below it.  This yields
1910 * an in-core tree containing the contents of every gang header and the
1911 * bps for every constituent of the gang block.
1912 *
1913 * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
1914 * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
1915 * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
1916 * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
1917 * zio_read_gang() is a wrapper around zio_read() that omits reading gang
1918 * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
1919 * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
1920 * of the gang header plus zio_checksum_compute() of the data to update the
1921 * gang header's blk_cksum as described above.
1922 *
1923 * The two-phase assemble/issue model solves the problem of partial failure --
1924 * what if you'd freed part of a gang block but then couldn't read the
1925 * gang header for another part?  Assembling the entire gang tree first
1926 * ensures that all the necessary gang header I/O has succeeded before
1927 * starting the actual work of free, claim, or write.  Once the gang tree
1928 * is assembled, free and claim are in-memory operations that cannot fail.
1929 *
1930 * In the event that a gang write fails, zio_dva_unallocate() walks the
1931 * gang tree to immediately free (i.e. insert back into the space map)
1932 * everything we've allocated.  This ensures that we don't get ENOSPC
1933 * errors during repeated suspend/resume cycles due to a flaky device.
1934 *
1935 * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
1936 * the gang tree, we won't modify the block, so we can safely defer the free
1937 * (knowing that the block is still intact).  If we *can* assemble the gang
1938 * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
1939 * each constituent bp and we can allocate a new block on the next sync pass.
1940 *
1941 * In all cases, the gang tree allows complete recovery from partial failure.
1942 * ==========================================================================
1943 */
1944
1945static void
1946zio_gang_issue_func_done(zio_t *zio)
1947{
1948	abd_put(zio->io_abd);
1949}
1950
1951static zio_t *
1952zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
1953    uint64_t offset)
1954{
1955	if (gn != NULL)
1956		return (pio);
1957
1958	return (zio_read(pio, pio->io_spa, bp, abd_get_offset(data, offset),
1959	    BP_GET_PSIZE(bp), zio_gang_issue_func_done,
1960	    NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1961	    &pio->io_bookmark));
1962}
1963
1964static zio_t *
1965zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
1966    uint64_t offset)
1967{
1968	zio_t *zio;
1969
1970	if (gn != NULL) {
1971		abd_t *gbh_abd =
1972		    abd_get_from_buf(gn->gn_gbh, SPA_GANGBLOCKSIZE);
1973		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1974		    gbh_abd, SPA_GANGBLOCKSIZE, zio_gang_issue_func_done, NULL,
1975		    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1976		    &pio->io_bookmark);
1977		/*
1978		 * As we rewrite each gang header, the pipeline will compute
1979		 * a new gang block header checksum for it; but no one will
1980		 * compute a new data checksum, so we do that here.  The one
1981		 * exception is the gang leader: the pipeline already computed
1982		 * its data checksum because that stage precedes gang assembly.
1983		 * (Presently, nothing actually uses interior data checksums;
1984		 * this is just good hygiene.)
1985		 */
1986		if (gn != pio->io_gang_leader->io_gang_tree) {
1987			abd_t *buf = abd_get_offset(data, offset);
1988
1989			zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
1990			    buf, BP_GET_PSIZE(bp));
1991
1992			abd_put(buf);
1993		}
1994		/*
1995		 * If we are here to damage data for testing purposes,
1996		 * leave the GBH alone so that we can detect the damage.
1997		 */
1998		if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
1999			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
2000	} else {
2001		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
2002		    abd_get_offset(data, offset), BP_GET_PSIZE(bp),
2003		    zio_gang_issue_func_done, NULL, pio->io_priority,
2004		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2005	}
2006
2007	return (zio);
2008}
2009
2010/* ARGSUSED */
2011static zio_t *
2012zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2013    uint64_t offset)
2014{
2015	return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
2016	    BP_IS_GANG(bp) ? SPA_GANGBLOCKSIZE : BP_GET_PSIZE(bp),
2017	    ZIO_GANG_CHILD_FLAGS(pio)));
2018}
2019
2020/* ARGSUSED */
2021static zio_t *
2022zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2023    uint64_t offset)
2024{
2025	return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
2026	    NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
2027}
2028
2029static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
2030	NULL,
2031	zio_read_gang,
2032	zio_rewrite_gang,
2033	zio_free_gang,
2034	zio_claim_gang,
2035	NULL
2036};
2037
2038static void zio_gang_tree_assemble_done(zio_t *zio);
2039
2040static zio_gang_node_t *
2041zio_gang_node_alloc(zio_gang_node_t **gnpp)
2042{
2043	zio_gang_node_t *gn;
2044
2045	ASSERT(*gnpp == NULL);
2046
2047	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
2048	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
2049	*gnpp = gn;
2050
2051	return (gn);
2052}
2053
2054static void
2055zio_gang_node_free(zio_gang_node_t **gnpp)
2056{
2057	zio_gang_node_t *gn = *gnpp;
2058
2059	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
2060		ASSERT(gn->gn_child[g] == NULL);
2061
2062	zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2063	kmem_free(gn, sizeof (*gn));
2064	*gnpp = NULL;
2065}
2066
2067static void
2068zio_gang_tree_free(zio_gang_node_t **gnpp)
2069{
2070	zio_gang_node_t *gn = *gnpp;
2071
2072	if (gn == NULL)
2073		return;
2074
2075	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
2076		zio_gang_tree_free(&gn->gn_child[g]);
2077
2078	zio_gang_node_free(gnpp);
2079}
2080
2081static void
2082zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
2083{
2084	zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
2085	abd_t *gbh_abd = abd_get_from_buf(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2086
2087	ASSERT(gio->io_gang_leader == gio);
2088	ASSERT(BP_IS_GANG(bp));
2089
2090	zio_nowait(zio_read(gio, gio->io_spa, bp, gbh_abd, SPA_GANGBLOCKSIZE,
2091	    zio_gang_tree_assemble_done, gn, gio->io_priority,
2092	    ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
2093}
2094
2095static void
2096zio_gang_tree_assemble_done(zio_t *zio)
2097{
2098	zio_t *gio = zio->io_gang_leader;
2099	zio_gang_node_t *gn = zio->io_private;
2100	blkptr_t *bp = zio->io_bp;
2101
2102	ASSERT(gio == zio_unique_parent(zio));
2103	ASSERT(zio->io_child_count == 0);
2104
2105	if (zio->io_error)
2106		return;
2107
2108	/* this ABD was created from a linear buf in zio_gang_tree_assemble */
2109	if (BP_SHOULD_BYTESWAP(bp))
2110		byteswap_uint64_array(abd_to_buf(zio->io_abd), zio->io_size);
2111
2112	ASSERT3P(abd_to_buf(zio->io_abd), ==, gn->gn_gbh);
2113	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
2114	ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
2115
2116	abd_put(zio->io_abd);
2117
2118	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2119		blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
2120		if (!BP_IS_GANG(gbp))
2121			continue;
2122		zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
2123	}
2124}
2125
2126static void
2127zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, abd_t *data,
2128    uint64_t offset)
2129{
2130	zio_t *gio = pio->io_gang_leader;
2131	zio_t *zio;
2132
2133	ASSERT(BP_IS_GANG(bp) == !!gn);
2134	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
2135	ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
2136
2137	/*
2138	 * If you're a gang header, your data is in gn->gn_gbh.
2139	 * If you're a gang member, your data is in 'data' and gn == NULL.
2140	 */
2141	zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data, offset);
2142
2143	if (gn != NULL) {
2144		ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
2145
2146		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2147			blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
2148			if (BP_IS_HOLE(gbp))
2149				continue;
2150			zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data,
2151			    offset);
2152			offset += BP_GET_PSIZE(gbp);
2153		}
2154	}
2155
2156	if (gn == gio->io_gang_tree && gio->io_abd != NULL)
2157		ASSERT3U(gio->io_size, ==, offset);
2158
2159	if (zio != pio)
2160		zio_nowait(zio);
2161}
2162
2163static int
2164zio_gang_assemble(zio_t *zio)
2165{
2166	blkptr_t *bp = zio->io_bp;
2167
2168	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
2169	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2170
2171	zio->io_gang_leader = zio;
2172
2173	zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
2174
2175	return (ZIO_PIPELINE_CONTINUE);
2176}
2177
2178static int
2179zio_gang_issue(zio_t *zio)
2180{
2181	blkptr_t *bp = zio->io_bp;
2182
2183	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
2184		return (ZIO_PIPELINE_STOP);
2185
2186	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
2187	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2188
2189	if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
2190		zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_abd,
2191		    0);
2192	else
2193		zio_gang_tree_free(&zio->io_gang_tree);
2194
2195	zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2196
2197	return (ZIO_PIPELINE_CONTINUE);
2198}
2199
2200static void
2201zio_write_gang_member_ready(zio_t *zio)
2202{
2203	zio_t *pio = zio_unique_parent(zio);
2204	zio_t *gio = zio->io_gang_leader;
2205	dva_t *cdva = zio->io_bp->blk_dva;
2206	dva_t *pdva = pio->io_bp->blk_dva;
2207	uint64_t asize;
2208
2209	if (BP_IS_HOLE(zio->io_bp))
2210		return;
2211
2212	ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
2213
2214	ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
2215	ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
2216	ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
2217	ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
2218	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
2219
2220	mutex_enter(&pio->io_lock);
2221	for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
2222		ASSERT(DVA_GET_GANG(&pdva[d]));
2223		asize = DVA_GET_ASIZE(&pdva[d]);
2224		asize += DVA_GET_ASIZE(&cdva[d]);
2225		DVA_SET_ASIZE(&pdva[d], asize);
2226	}
2227	mutex_exit(&pio->io_lock);
2228}
2229
2230static void
2231zio_write_gang_done(zio_t *zio)
2232{
2233	abd_put(zio->io_abd);
2234}
2235
2236static int
2237zio_write_gang_block(zio_t *pio)
2238{
2239	spa_t *spa = pio->io_spa;
2240	metaslab_class_t *mc = spa_normal_class(spa);
2241	blkptr_t *bp = pio->io_bp;
2242	zio_t *gio = pio->io_gang_leader;
2243	zio_t *zio;
2244	zio_gang_node_t *gn, **gnpp;
2245	zio_gbh_phys_t *gbh;
2246	abd_t *gbh_abd;
2247	uint64_t txg = pio->io_txg;
2248	uint64_t resid = pio->io_size;
2249	uint64_t lsize;
2250	int copies = gio->io_prop.zp_copies;
2251	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
2252	zio_prop_t zp;
2253	int error;
2254
2255	int flags = METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER;
2256	if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2257		ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2258		ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
2259
2260		flags |= METASLAB_ASYNC_ALLOC;
2261		VERIFY(refcount_held(&mc->mc_alloc_slots, pio));
2262
2263		/*
2264		 * The logical zio has already placed a reservation for
2265		 * 'copies' allocation slots but gang blocks may require
2266		 * additional copies. These additional copies
2267		 * (i.e. gbh_copies - copies) are guaranteed to succeed
2268		 * since metaslab_class_throttle_reserve() always allows
2269		 * additional reservations for gang blocks.
2270		 */
2271		VERIFY(metaslab_class_throttle_reserve(mc, gbh_copies - copies,
2272		    pio, flags));
2273	}
2274
2275	error = metaslab_alloc(spa, mc, SPA_GANGBLOCKSIZE,
2276	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp, flags,
2277	    &pio->io_alloc_list, pio);
2278	if (error) {
2279		if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2280			ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2281			ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
2282
2283			/*
2284			 * If we failed to allocate the gang block header then
2285			 * we remove any additional allocation reservations that
2286			 * we placed here. The original reservation will
2287			 * be removed when the logical I/O goes to the ready
2288			 * stage.
2289			 */
2290			metaslab_class_throttle_unreserve(mc,
2291			    gbh_copies - copies, pio);
2292		}
2293		pio->io_error = error;
2294		return (ZIO_PIPELINE_CONTINUE);
2295	}
2296
2297	if (pio == gio) {
2298		gnpp = &gio->io_gang_tree;
2299	} else {
2300		gnpp = pio->io_private;
2301		ASSERT(pio->io_ready == zio_write_gang_member_ready);
2302	}
2303
2304	gn = zio_gang_node_alloc(gnpp);
2305	gbh = gn->gn_gbh;
2306	bzero(gbh, SPA_GANGBLOCKSIZE);
2307	gbh_abd = abd_get_from_buf(gbh, SPA_GANGBLOCKSIZE);
2308
2309	/*
2310	 * Create the gang header.
2311	 */
2312	zio = zio_rewrite(pio, spa, txg, bp, gbh_abd, SPA_GANGBLOCKSIZE,
2313	    zio_write_gang_done, NULL, pio->io_priority,
2314	    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2315
2316	/*
2317	 * Create and nowait the gang children.
2318	 */
2319	for (int g = 0; resid != 0; resid -= lsize, g++) {
2320		lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
2321		    SPA_MINBLOCKSIZE);
2322		ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
2323
2324		zp.zp_checksum = gio->io_prop.zp_checksum;
2325		zp.zp_compress = ZIO_COMPRESS_OFF;
2326		zp.zp_type = DMU_OT_NONE;
2327		zp.zp_level = 0;
2328		zp.zp_copies = gio->io_prop.zp_copies;
2329		zp.zp_dedup = B_FALSE;
2330		zp.zp_dedup_verify = B_FALSE;
2331		zp.zp_nopwrite = B_FALSE;
2332
2333		zio_t *cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
2334		    abd_get_offset(pio->io_abd, pio->io_size - resid), lsize,
2335		    lsize, &zp, zio_write_gang_member_ready, NULL, NULL,
2336		    zio_write_gang_done, &gn->gn_child[g], pio->io_priority,
2337		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2338
2339		if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2340			ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2341			ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
2342
2343			/*
2344			 * Gang children won't throttle but we should
2345			 * account for their work, so reserve an allocation
2346			 * slot for them here.
2347			 */
2348			VERIFY(metaslab_class_throttle_reserve(mc,
2349			    zp.zp_copies, cio, flags));
2350		}
2351		zio_nowait(cio);
2352	}
2353
2354	/*
2355	 * Set pio's pipeline to just wait for zio to finish.
2356	 */
2357	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2358
2359	zio_nowait(zio);
2360
2361	return (ZIO_PIPELINE_CONTINUE);
2362}
2363
2364/*
2365 * The zio_nop_write stage in the pipeline determines if allocating a
2366 * new bp is necessary.  The nopwrite feature can handle writes in
2367 * either syncing or open context (i.e. zil writes) and as a result is
2368 * mutually exclusive with dedup.
2369 *
2370 * By leveraging a cryptographically secure checksum, such as SHA256, we
2371 * can compare the checksums of the new data and the old to determine if
2372 * allocating a new block is required.  Note that our requirements for
2373 * cryptographic strength are fairly weak: there can't be any accidental
2374 * hash collisions, but we don't need to be secure against intentional
2375 * (malicious) collisions.  To trigger a nopwrite, you have to be able
2376 * to write the file to begin with, and triggering an incorrect (hash
2377 * collision) nopwrite is no worse than simply writing to the file.
2378 * That said, there are no known attacks against the checksum algorithms
2379 * used for nopwrite, assuming that the salt and the checksums
2380 * themselves remain secret.
2381 */
2382static int
2383zio_nop_write(zio_t *zio)
2384{
2385	blkptr_t *bp = zio->io_bp;
2386	blkptr_t *bp_orig = &zio->io_bp_orig;
2387	zio_prop_t *zp = &zio->io_prop;
2388
2389	ASSERT(BP_GET_LEVEL(bp) == 0);
2390	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
2391	ASSERT(zp->zp_nopwrite);
2392	ASSERT(!zp->zp_dedup);
2393	ASSERT(zio->io_bp_override == NULL);
2394	ASSERT(IO_IS_ALLOCATING(zio));
2395
2396	/*
2397	 * Check to see if the original bp and the new bp have matching
2398	 * characteristics (i.e. same checksum, compression algorithms, etc).
2399	 * If they don't then just continue with the pipeline which will
2400	 * allocate a new bp.
2401	 */
2402	if (BP_IS_HOLE(bp_orig) ||
2403	    !(zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_flags &
2404	    ZCHECKSUM_FLAG_NOPWRITE) ||
2405	    BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
2406	    BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
2407	    BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
2408	    zp->zp_copies != BP_GET_NDVAS(bp_orig))
2409		return (ZIO_PIPELINE_CONTINUE);
2410
2411	/*
2412	 * If the checksums match then reset the pipeline so that we
2413	 * avoid allocating a new bp and issuing any I/O.
2414	 */
2415	if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
2416		ASSERT(zio_checksum_table[zp->zp_checksum].ci_flags &
2417		    ZCHECKSUM_FLAG_NOPWRITE);
2418		ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
2419		ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
2420		ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
2421		ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
2422		    sizeof (uint64_t)) == 0);
2423
2424		*bp = *bp_orig;
2425		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2426		zio->io_flags |= ZIO_FLAG_NOPWRITE;
2427	}
2428
2429	return (ZIO_PIPELINE_CONTINUE);
2430}
2431
2432/*
2433 * ==========================================================================
2434 * Dedup
2435 * ==========================================================================
2436 */
2437static void
2438zio_ddt_child_read_done(zio_t *zio)
2439{
2440	blkptr_t *bp = zio->io_bp;
2441	ddt_entry_t *dde = zio->io_private;
2442	ddt_phys_t *ddp;
2443	zio_t *pio = zio_unique_parent(zio);
2444
2445	mutex_enter(&pio->io_lock);
2446	ddp = ddt_phys_select(dde, bp);
2447	if (zio->io_error == 0)
2448		ddt_phys_clear(ddp);	/* this ddp doesn't need repair */
2449
2450	if (zio->io_error == 0 && dde->dde_repair_abd == NULL)
2451		dde->dde_repair_abd = zio->io_abd;
2452	else
2453		abd_free(zio->io_abd);
2454	mutex_exit(&pio->io_lock);
2455}
2456
2457static int
2458zio_ddt_read_start(zio_t *zio)
2459{
2460	blkptr_t *bp = zio->io_bp;
2461
2462	ASSERT(BP_GET_DEDUP(bp));
2463	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2464	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2465
2466	if (zio->io_child_error[ZIO_CHILD_DDT]) {
2467		ddt_t *ddt = ddt_select(zio->io_spa, bp);
2468		ddt_entry_t *dde = ddt_repair_start(ddt, bp);
2469		ddt_phys_t *ddp = dde->dde_phys;
2470		ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
2471		blkptr_t blk;
2472
2473		ASSERT(zio->io_vsd == NULL);
2474		zio->io_vsd = dde;
2475
2476		if (ddp_self == NULL)
2477			return (ZIO_PIPELINE_CONTINUE);
2478
2479		for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
2480			if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
2481				continue;
2482			ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
2483			    &blk);
2484			zio_nowait(zio_read(zio, zio->io_spa, &blk,
2485			    abd_alloc_for_io(zio->io_size, B_TRUE),
2486			    zio->io_size, zio_ddt_child_read_done, dde,
2487			    zio->io_priority, ZIO_DDT_CHILD_FLAGS(zio) |
2488			    ZIO_FLAG_DONT_PROPAGATE, &zio->io_bookmark));
2489		}
2490		return (ZIO_PIPELINE_CONTINUE);
2491	}
2492
2493	zio_nowait(zio_read(zio, zio->io_spa, bp,
2494	    zio->io_abd, zio->io_size, NULL, NULL, zio->io_priority,
2495	    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
2496
2497	return (ZIO_PIPELINE_CONTINUE);
2498}
2499
2500static int
2501zio_ddt_read_done(zio_t *zio)
2502{
2503	blkptr_t *bp = zio->io_bp;
2504
2505	if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
2506		return (ZIO_PIPELINE_STOP);
2507
2508	ASSERT(BP_GET_DEDUP(bp));
2509	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2510	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2511
2512	if (zio->io_child_error[ZIO_CHILD_DDT]) {
2513		ddt_t *ddt = ddt_select(zio->io_spa, bp);
2514		ddt_entry_t *dde = zio->io_vsd;
2515		if (ddt == NULL) {
2516			ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
2517			return (ZIO_PIPELINE_CONTINUE);
2518		}
2519		if (dde == NULL) {
2520			zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
2521			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
2522			return (ZIO_PIPELINE_STOP);
2523		}
2524		if (dde->dde_repair_abd != NULL) {
2525			abd_copy(zio->io_abd, dde->dde_repair_abd,
2526			    zio->io_size);
2527			zio->io_child_error[ZIO_CHILD_DDT] = 0;
2528		}
2529		ddt_repair_done(ddt, dde);
2530		zio->io_vsd = NULL;
2531	}
2532
2533	ASSERT(zio->io_vsd == NULL);
2534
2535	return (ZIO_PIPELINE_CONTINUE);
2536}
2537
2538static boolean_t
2539zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
2540{
2541	spa_t *spa = zio->io_spa;
2542	boolean_t do_raw = (zio->io_flags & ZIO_FLAG_RAW);
2543
2544	/* We should never get a raw, override zio */
2545	ASSERT(!(zio->io_bp_override && do_raw));
2546
2547	/*
2548	 * Note: we compare the original data, not the transformed data,
2549	 * because when zio->io_bp is an override bp, we will not have
2550	 * pushed the I/O transforms.  That's an important optimization
2551	 * because otherwise we'd compress/encrypt all dmu_sync() data twice.
2552	 */
2553	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2554		zio_t *lio = dde->dde_lead_zio[p];
2555
2556		if (lio != NULL) {
2557			return (lio->io_orig_size != zio->io_orig_size ||
2558			    abd_cmp(zio->io_orig_abd, lio->io_orig_abd,
2559			    zio->io_orig_size) != 0);
2560		}
2561	}
2562
2563	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2564		ddt_phys_t *ddp = &dde->dde_phys[p];
2565
2566		if (ddp->ddp_phys_birth != 0) {
2567			arc_buf_t *abuf = NULL;
2568			arc_flags_t aflags = ARC_FLAG_WAIT;
2569			int zio_flags = ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE;
2570			blkptr_t blk = *zio->io_bp;
2571			int error;
2572
2573			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
2574
2575			ddt_exit(ddt);
2576
2577			/*
2578			 * Intuitively, it would make more sense to compare
2579			 * io_abd than io_orig_abd in the raw case since you
2580			 * don't want to look at any transformations that have
2581			 * happened to the data. However, for raw I/Os the
2582			 * data will actually be the same in io_abd and
2583			 * io_orig_abd, so all we have to do is issue this as
2584			 * a raw ARC read.
2585			 */
2586			if (do_raw) {
2587				zio_flags |= ZIO_FLAG_RAW;
2588				ASSERT3U(zio->io_size, ==, zio->io_orig_size);
2589				ASSERT0(abd_cmp(zio->io_abd, zio->io_orig_abd,
2590				    zio->io_size));
2591				ASSERT3P(zio->io_transform_stack, ==, NULL);
2592			}
2593
2594			error = arc_read(NULL, spa, &blk,
2595			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
2596			    zio_flags, &aflags, &zio->io_bookmark);
2597
2598			if (error == 0) {
2599				if (arc_buf_size(abuf) != zio->io_orig_size ||
2600				    abd_cmp_buf(zio->io_orig_abd, abuf->b_data,
2601				    zio->io_orig_size) != 0)
2602					error = SET_ERROR(EEXIST);
2603				arc_buf_destroy(abuf, &abuf);
2604			}
2605
2606			ddt_enter(ddt);
2607			return (error != 0);
2608		}
2609	}
2610
2611	return (B_FALSE);
2612}
2613
2614static void
2615zio_ddt_child_write_ready(zio_t *zio)
2616{
2617	int p = zio->io_prop.zp_copies;
2618	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2619	ddt_entry_t *dde = zio->io_private;
2620	ddt_phys_t *ddp = &dde->dde_phys[p];
2621	zio_t *pio;
2622
2623	if (zio->io_error)
2624		return;
2625
2626	ddt_enter(ddt);
2627
2628	ASSERT(dde->dde_lead_zio[p] == zio);
2629
2630	ddt_phys_fill(ddp, zio->io_bp);
2631
2632	zio_link_t *zl = NULL;
2633	while ((pio = zio_walk_parents(zio, &zl)) != NULL)
2634		ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
2635
2636	ddt_exit(ddt);
2637}
2638
2639static void
2640zio_ddt_child_write_done(zio_t *zio)
2641{
2642	int p = zio->io_prop.zp_copies;
2643	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2644	ddt_entry_t *dde = zio->io_private;
2645	ddt_phys_t *ddp = &dde->dde_phys[p];
2646
2647	ddt_enter(ddt);
2648
2649	ASSERT(ddp->ddp_refcnt == 0);
2650	ASSERT(dde->dde_lead_zio[p] == zio);
2651	dde->dde_lead_zio[p] = NULL;
2652
2653	if (zio->io_error == 0) {
2654		zio_link_t *zl = NULL;
2655		while (zio_walk_parents(zio, &zl) != NULL)
2656			ddt_phys_addref(ddp);
2657	} else {
2658		ddt_phys_clear(ddp);
2659	}
2660
2661	ddt_exit(ddt);
2662}
2663
2664static void
2665zio_ddt_ditto_write_done(zio_t *zio)
2666{
2667	int p = DDT_PHYS_DITTO;
2668	zio_prop_t *zp = &zio->io_prop;
2669	blkptr_t *bp = zio->io_bp;
2670	ddt_t *ddt = ddt_select(zio->io_spa, bp);
2671	ddt_entry_t *dde = zio->io_private;
2672	ddt_phys_t *ddp = &dde->dde_phys[p];
2673	ddt_key_t *ddk = &dde->dde_key;
2674
2675	ddt_enter(ddt);
2676
2677	ASSERT(ddp->ddp_refcnt == 0);
2678	ASSERT(dde->dde_lead_zio[p] == zio);
2679	dde->dde_lead_zio[p] = NULL;
2680
2681	if (zio->io_error == 0) {
2682		ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
2683		ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
2684		ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
2685		if (ddp->ddp_phys_birth != 0)
2686			ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
2687		ddt_phys_fill(ddp, bp);
2688	}
2689
2690	ddt_exit(ddt);
2691}
2692
2693static int
2694zio_ddt_write(zio_t *zio)
2695{
2696	spa_t *spa = zio->io_spa;
2697	blkptr_t *bp = zio->io_bp;
2698	uint64_t txg = zio->io_txg;
2699	zio_prop_t *zp = &zio->io_prop;
2700	int p = zp->zp_copies;
2701	int ditto_copies;
2702	zio_t *cio = NULL;
2703	zio_t *dio = NULL;
2704	ddt_t *ddt = ddt_select(spa, bp);
2705	ddt_entry_t *dde;
2706	ddt_phys_t *ddp;
2707
2708	ASSERT(BP_GET_DEDUP(bp));
2709	ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
2710	ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
2711	ASSERT(!(zio->io_bp_override && (zio->io_flags & ZIO_FLAG_RAW)));
2712
2713	ddt_enter(ddt);
2714	dde = ddt_lookup(ddt, bp, B_TRUE);
2715	ddp = &dde->dde_phys[p];
2716
2717	if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
2718		/*
2719		 * If we're using a weak checksum, upgrade to a strong checksum
2720		 * and try again.  If we're already using a strong checksum,
2721		 * we can't resolve it, so just convert to an ordinary write.
2722		 * (And automatically e-mail a paper to Nature?)
2723		 */
2724		if (!(zio_checksum_table[zp->zp_checksum].ci_flags &
2725		    ZCHECKSUM_FLAG_DEDUP)) {
2726			zp->zp_checksum = spa_dedup_checksum(spa);
2727			zio_pop_transforms(zio);
2728			zio->io_stage = ZIO_STAGE_OPEN;
2729			BP_ZERO(bp);
2730		} else {
2731			zp->zp_dedup = B_FALSE;
2732			BP_SET_DEDUP(bp, B_FALSE);
2733		}
2734		ASSERT(!BP_GET_DEDUP(bp));
2735		zio->io_pipeline = ZIO_WRITE_PIPELINE;
2736		ddt_exit(ddt);
2737		return (ZIO_PIPELINE_CONTINUE);
2738	}
2739
2740	ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
2741	ASSERT(ditto_copies < SPA_DVAS_PER_BP);
2742
2743	if (ditto_copies > ddt_ditto_copies_present(dde) &&
2744	    dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
2745		zio_prop_t czp = *zp;
2746
2747		czp.zp_copies = ditto_copies;
2748
2749		/*
2750		 * If we arrived here with an override bp, we won't have run
2751		 * the transform stack, so we won't have the data we need to
2752		 * generate a child i/o.  So, toss the override bp and restart.
2753		 * This is safe, because using the override bp is just an
2754		 * optimization; and it's rare, so the cost doesn't matter.
2755		 */
2756		if (zio->io_bp_override) {
2757			zio_pop_transforms(zio);
2758			zio->io_stage = ZIO_STAGE_OPEN;
2759			zio->io_pipeline = ZIO_WRITE_PIPELINE;
2760			zio->io_bp_override = NULL;
2761			BP_ZERO(bp);
2762			ddt_exit(ddt);
2763			return (ZIO_PIPELINE_CONTINUE);
2764		}
2765
2766		dio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
2767		    zio->io_orig_size, zio->io_orig_size, &czp, NULL, NULL,
2768		    NULL, zio_ddt_ditto_write_done, dde, zio->io_priority,
2769		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2770
2771		zio_push_transform(dio, zio->io_abd, zio->io_size, 0, NULL);
2772		dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
2773	}
2774
2775	if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
2776		if (ddp->ddp_phys_birth != 0)
2777			ddt_bp_fill(ddp, bp, txg);
2778		if (dde->dde_lead_zio[p] != NULL)
2779			zio_add_child(zio, dde->dde_lead_zio[p]);
2780		else
2781			ddt_phys_addref(ddp);
2782	} else if (zio->io_bp_override) {
2783		ASSERT(bp->blk_birth == txg);
2784		ASSERT(BP_EQUAL(bp, zio->io_bp_override));
2785		ddt_phys_fill(ddp, bp);
2786		ddt_phys_addref(ddp);
2787	} else {
2788		cio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
2789		    zio->io_orig_size, zio->io_orig_size, zp,
2790		    zio_ddt_child_write_ready, NULL, NULL,
2791		    zio_ddt_child_write_done, dde, zio->io_priority,
2792		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2793
2794		zio_push_transform(cio, zio->io_abd, zio->io_size, 0, NULL);
2795		dde->dde_lead_zio[p] = cio;
2796	}
2797
2798	ddt_exit(ddt);
2799
2800	if (cio)
2801		zio_nowait(cio);
2802	if (dio)
2803		zio_nowait(dio);
2804
2805	return (ZIO_PIPELINE_CONTINUE);
2806}
2807
2808ddt_entry_t *freedde; /* for debugging */
2809
2810static int
2811zio_ddt_free(zio_t *zio)
2812{
2813	spa_t *spa = zio->io_spa;
2814	blkptr_t *bp = zio->io_bp;
2815	ddt_t *ddt = ddt_select(spa, bp);
2816	ddt_entry_t *dde;
2817	ddt_phys_t *ddp;
2818
2819	ASSERT(BP_GET_DEDUP(bp));
2820	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2821
2822	ddt_enter(ddt);
2823	freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
2824	ddp = ddt_phys_select(dde, bp);
2825	ddt_phys_decref(ddp);
2826	ddt_exit(ddt);
2827
2828	return (ZIO_PIPELINE_CONTINUE);
2829}
2830
2831/*
2832 * ==========================================================================
2833 * Allocate and free blocks
2834 * ==========================================================================
2835 */
2836
2837static zio_t *
2838zio_io_to_allocate(spa_t *spa)
2839{
2840	zio_t *zio;
2841
2842	ASSERT(MUTEX_HELD(&spa->spa_alloc_lock));
2843
2844	zio = avl_first(&spa->spa_alloc_tree);
2845	if (zio == NULL)
2846		return (NULL);
2847
2848	ASSERT(IO_IS_ALLOCATING(zio));
2849
2850	/*
2851	 * Try to place a reservation for this zio. If we're unable to
2852	 * reserve then we throttle.
2853	 */
2854	if (!metaslab_class_throttle_reserve(spa_normal_class(spa),
2855	    zio->io_prop.zp_copies, zio, 0)) {
2856		return (NULL);
2857	}
2858
2859	avl_remove(&spa->spa_alloc_tree, zio);
2860	ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
2861
2862	return (zio);
2863}
2864
2865static int
2866zio_dva_throttle(zio_t *zio)
2867{
2868	spa_t *spa = zio->io_spa;
2869	zio_t *nio;
2870
2871	if (zio->io_priority == ZIO_PRIORITY_SYNC_WRITE ||
2872	    !spa_normal_class(zio->io_spa)->mc_alloc_throttle_enabled ||
2873	    zio->io_child_type == ZIO_CHILD_GANG ||
2874	    zio->io_flags & ZIO_FLAG_NODATA) {
2875		return (ZIO_PIPELINE_CONTINUE);
2876	}
2877
2878	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2879
2880	ASSERT3U(zio->io_queued_timestamp, >, 0);
2881	ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
2882
2883	mutex_enter(&spa->spa_alloc_lock);
2884
2885	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
2886	avl_add(&spa->spa_alloc_tree, zio);
2887
2888	nio = zio_io_to_allocate(zio->io_spa);
2889	mutex_exit(&spa->spa_alloc_lock);
2890
2891	if (nio == zio)
2892		return (ZIO_PIPELINE_CONTINUE);
2893
2894	if (nio != NULL) {
2895		ASSERT(nio->io_stage == ZIO_STAGE_DVA_THROTTLE);
2896		/*
2897		 * We are passing control to a new zio so make sure that
2898		 * it is processed by a different thread. We do this to
2899		 * avoid stack overflows that can occur when parents are
2900		 * throttled and children are making progress. We allow
2901		 * it to go to the head of the taskq since it's already
2902		 * been waiting.
2903		 */
2904		zio_taskq_dispatch(nio, ZIO_TASKQ_ISSUE, B_TRUE);
2905	}
2906	return (ZIO_PIPELINE_STOP);
2907}
2908
2909void
2910zio_allocate_dispatch(spa_t *spa)
2911{
2912	zio_t *zio;
2913
2914	mutex_enter(&spa->spa_alloc_lock);
2915	zio = zio_io_to_allocate(spa);
2916	mutex_exit(&spa->spa_alloc_lock);
2917	if (zio == NULL)
2918		return;
2919
2920	ASSERT3U(zio->io_stage, ==, ZIO_STAGE_DVA_THROTTLE);
2921	ASSERT0(zio->io_error);
2922	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_TRUE);
2923}
2924
2925static int
2926zio_dva_allocate(zio_t *zio)
2927{
2928	spa_t *spa = zio->io_spa;
2929	metaslab_class_t *mc = spa_normal_class(spa);
2930	blkptr_t *bp = zio->io_bp;
2931	int error;
2932	int flags = 0;
2933
2934	if (zio->io_gang_leader == NULL) {
2935		ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2936		zio->io_gang_leader = zio;
2937	}
2938
2939	ASSERT(BP_IS_HOLE(bp));
2940	ASSERT0(BP_GET_NDVAS(bp));
2941	ASSERT3U(zio->io_prop.zp_copies, >, 0);
2942	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
2943	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
2944
2945	if (zio->io_flags & ZIO_FLAG_NODATA) {
2946		flags |= METASLAB_DONT_THROTTLE;
2947	}
2948	if (zio->io_flags & ZIO_FLAG_GANG_CHILD) {
2949		flags |= METASLAB_GANG_CHILD;
2950	}
2951	if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE) {
2952		flags |= METASLAB_ASYNC_ALLOC;
2953	}
2954
2955	error = metaslab_alloc(spa, mc, zio->io_size, bp,
2956	    zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
2957	    &zio->io_alloc_list, zio);
2958
2959	if (error != 0) {
2960		spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
2961		    "size %llu, error %d", spa_name(spa), zio, zio->io_size,
2962		    error);
2963		if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
2964			return (zio_write_gang_block(zio));
2965		zio->io_error = error;
2966	}
2967
2968	return (ZIO_PIPELINE_CONTINUE);
2969}
2970
2971static int
2972zio_dva_free(zio_t *zio)
2973{
2974	metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
2975
2976	return (ZIO_PIPELINE_CONTINUE);
2977}
2978
2979static int
2980zio_dva_claim(zio_t *zio)
2981{
2982	int error;
2983
2984	error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
2985	if (error)
2986		zio->io_error = error;
2987
2988	return (ZIO_PIPELINE_CONTINUE);
2989}
2990
2991/*
2992 * Undo an allocation.  This is used by zio_done() when an I/O fails
2993 * and we want to give back the block we just allocated.
2994 * This handles both normal blocks and gang blocks.
2995 */
2996static void
2997zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
2998{
2999	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
3000	ASSERT(zio->io_bp_override == NULL);
3001
3002	if (!BP_IS_HOLE(bp))
3003		metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
3004
3005	if (gn != NULL) {
3006		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
3007			zio_dva_unallocate(zio, gn->gn_child[g],
3008			    &gn->gn_gbh->zg_blkptr[g]);
3009		}
3010	}
3011}
3012
3013/*
3014 * Try to allocate an intent log block.  Return 0 on success, errno on failure.
3015 */
3016int
3017zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
3018    uint64_t size, boolean_t *slog)
3019{
3020	int error = 1;
3021	zio_alloc_list_t io_alloc_list;
3022
3023	ASSERT(txg > spa_syncing_txg(spa));
3024
3025	metaslab_trace_init(&io_alloc_list);
3026	error = metaslab_alloc(spa, spa_log_class(spa), size, new_bp, 1,
3027	    txg, old_bp, METASLAB_HINTBP_AVOID, &io_alloc_list, NULL);
3028	if (error == 0) {
3029		*slog = TRUE;
3030	} else {
3031		error = metaslab_alloc(spa, spa_normal_class(spa), size,
3032		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID,
3033		    &io_alloc_list, NULL);
3034		if (error == 0)
3035			*slog = FALSE;
3036	}
3037	metaslab_trace_fini(&io_alloc_list);
3038
3039	if (error == 0) {
3040		BP_SET_LSIZE(new_bp, size);
3041		BP_SET_PSIZE(new_bp, size);
3042		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
3043		BP_SET_CHECKSUM(new_bp,
3044		    spa_version(spa) >= SPA_VERSION_SLIM_ZIL
3045		    ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
3046		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
3047		BP_SET_LEVEL(new_bp, 0);
3048		BP_SET_DEDUP(new_bp, 0);
3049		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
3050	} else {
3051		zfs_dbgmsg("%s: zil block allocation failure: "
3052		    "size %llu, error %d", spa_name(spa), size, error);
3053	}
3054
3055	return (error);
3056}
3057
3058/*
3059 * Free an intent log block.
3060 */
3061void
3062zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
3063{
3064	ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
3065	ASSERT(!BP_IS_GANG(bp));
3066
3067	zio_free(spa, txg, bp);
3068}
3069
3070/*
3071 * ==========================================================================
3072 * Read, write and delete to physical devices
3073 * ==========================================================================
3074 */
3075
3076
3077/*
3078 * Issue an I/O to the underlying vdev. Typically the issue pipeline
3079 * stops after this stage and will resume upon I/O completion.
3080 * However, there are instances where the vdev layer may need to
3081 * continue the pipeline when an I/O was not issued. Since the I/O
3082 * that was sent to the vdev layer might be different than the one
3083 * currently active in the pipeline (see vdev_queue_io()), we explicitly
3084 * force the underlying vdev layers to call either zio_execute() or
3085 * zio_interrupt() to ensure that the pipeline continues with the correct I/O.
3086 */
3087static int
3088zio_vdev_io_start(zio_t *zio)
3089{
3090	vdev_t *vd = zio->io_vd;
3091	uint64_t align;
3092	spa_t *spa = zio->io_spa;
3093	int ret;
3094
3095	ASSERT(zio->io_error == 0);
3096	ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
3097
3098	if (vd == NULL) {
3099		if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
3100			spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
3101
3102		/*
3103		 * The mirror_ops handle multiple DVAs in a single BP.
3104		 */
3105		vdev_mirror_ops.vdev_op_io_start(zio);
3106		return (ZIO_PIPELINE_STOP);
3107	}
3108
3109	if (vd->vdev_ops->vdev_op_leaf && zio->io_type == ZIO_TYPE_FREE &&
3110	    zio->io_priority == ZIO_PRIORITY_NOW) {
3111		trim_map_free(vd, zio->io_offset, zio->io_size, zio->io_txg);
3112		return (ZIO_PIPELINE_CONTINUE);
3113	}
3114
3115	ASSERT3P(zio->io_logical, !=, zio);
3116
3117	/*
3118	 * We keep track of time-sensitive I/Os so that the scan thread
3119	 * can quickly react to certain workloads.  In particular, we care
3120	 * about non-scrubbing, top-level reads and writes with the following
3121	 * characteristics:
3122	 *	- synchronous writes of user data to non-slog devices
3123	 *	- any reads of user data
3124	 * When these conditions are met, adjust the timestamp of spa_last_io
3125	 * which allows the scan thread to adjust its workload accordingly.
3126	 */
3127	if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
3128	    vd == vd->vdev_top && !vd->vdev_islog &&
3129	    zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
3130	    zio->io_txg != spa_syncing_txg(spa)) {
3131		uint64_t old = spa->spa_last_io;
3132		uint64_t new = ddi_get_lbolt64();
3133		if (old != new)
3134			(void) atomic_cas_64(&spa->spa_last_io, old, new);
3135	}
3136
3137	align = 1ULL << vd->vdev_top->vdev_ashift;
3138
3139	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
3140	    P2PHASE(zio->io_size, align) != 0) {
3141		/* Transform logical writes to be a full physical block size. */
3142		uint64_t asize = P2ROUNDUP(zio->io_size, align);
3143		abd_t *abuf = NULL;
3144		if (zio->io_type == ZIO_TYPE_READ ||
3145		    zio->io_type == ZIO_TYPE_WRITE)
3146			abuf = abd_alloc_sametype(zio->io_abd, asize);
3147		ASSERT(vd == vd->vdev_top);
3148		if (zio->io_type == ZIO_TYPE_WRITE) {
3149			abd_copy(abuf, zio->io_abd, zio->io_size);
3150			abd_zero_off(abuf, zio->io_size, asize - zio->io_size);
3151		}
3152		zio_push_transform(zio, abuf, asize, abuf ? asize : 0,
3153		    zio_subblock);
3154	}
3155
3156	/*
3157	 * If this is not a physical io, make sure that it is properly aligned
3158	 * before proceeding.
3159	 */
3160	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
3161		ASSERT0(P2PHASE(zio->io_offset, align));
3162		ASSERT0(P2PHASE(zio->io_size, align));
3163	} else {
3164		/*
3165		 * For the physical io we allow alignment
3166		 * to a logical block size.
3167		 */
3168		uint64_t log_align =
3169		    1ULL << vd->vdev_top->vdev_logical_ashift;
3170		ASSERT0(P2PHASE(zio->io_offset, log_align));
3171		ASSERT0(P2PHASE(zio->io_size, log_align));
3172	}
3173
3174	VERIFY(zio->io_type == ZIO_TYPE_READ || spa_writeable(spa));
3175
3176	/*
3177	 * If this is a repair I/O, and there's no self-healing involved --
3178	 * that is, we're just resilvering what we expect to resilver --
3179	 * then don't do the I/O unless zio's txg is actually in vd's DTL.
3180	 * This prevents spurious resilvering with nested replication.
3181	 * For example, given a mirror of mirrors, (A+B)+(C+D), if only
3182	 * A is out of date, we'll read from C+D, then use the data to
3183	 * resilver A+B -- but we don't actually want to resilver B, just A.
3184	 * The top-level mirror has no way to know this, so instead we just
3185	 * discard unnecessary repairs as we work our way down the vdev tree.
3186	 * The same logic applies to any form of nested replication:
3187	 * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
3188	 */
3189	if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
3190	    !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
3191	    zio->io_txg != 0 &&	/* not a delegated i/o */
3192	    !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
3193		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
3194		zio_vdev_io_bypass(zio);
3195		return (ZIO_PIPELINE_CONTINUE);
3196	}
3197
3198	if (vd->vdev_ops->vdev_op_leaf) {
3199		switch (zio->io_type) {
3200		case ZIO_TYPE_READ:
3201			if (vdev_cache_read(zio))
3202				return (ZIO_PIPELINE_CONTINUE);
3203			/* FALLTHROUGH */
3204		case ZIO_TYPE_WRITE:
3205		case ZIO_TYPE_FREE:
3206			if ((zio = vdev_queue_io(zio)) == NULL)
3207				return (ZIO_PIPELINE_STOP);
3208
3209			if (!vdev_accessible(vd, zio)) {
3210				zio->io_error = SET_ERROR(ENXIO);
3211				zio_interrupt(zio);
3212				return (ZIO_PIPELINE_STOP);
3213			}
3214			break;
3215		}
3216		/*
3217		 * Note that we ignore repair writes for TRIM because they can
3218		 * conflict with normal writes. This isn't an issue because, by
3219		 * definition, we only repair blocks that aren't freed.
3220		 */
3221		if (zio->io_type == ZIO_TYPE_WRITE &&
3222		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
3223		    !trim_map_write_start(zio))
3224			return (ZIO_PIPELINE_STOP);
3225	}
3226
3227	vd->vdev_ops->vdev_op_io_start(zio);
3228	return (ZIO_PIPELINE_STOP);
3229}
3230
3231static int
3232zio_vdev_io_done(zio_t *zio)
3233{
3234	vdev_t *vd = zio->io_vd;
3235	vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
3236	boolean_t unexpected_error = B_FALSE;
3237
3238	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
3239		return (ZIO_PIPELINE_STOP);
3240
3241	ASSERT(zio->io_type == ZIO_TYPE_READ ||
3242	    zio->io_type == ZIO_TYPE_WRITE || zio->io_type == ZIO_TYPE_FREE);
3243
3244	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3245	    (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE ||
3246	    zio->io_type == ZIO_TYPE_FREE)) {
3247
3248		if (zio->io_type == ZIO_TYPE_WRITE &&
3249		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR))
3250			trim_map_write_done(zio);
3251
3252		vdev_queue_io_done(zio);
3253
3254		if (zio->io_type == ZIO_TYPE_WRITE)
3255			vdev_cache_write(zio);
3256
3257		if (zio_injection_enabled && zio->io_error == 0)
3258			zio->io_error = zio_handle_device_injection(vd,
3259			    zio, EIO);
3260
3261		if (zio_injection_enabled && zio->io_error == 0)
3262			zio->io_error = zio_handle_label_injection(zio, EIO);
3263
3264		if (zio->io_error) {
3265			if (zio->io_error == ENOTSUP &&
3266			    zio->io_type == ZIO_TYPE_FREE) {
3267				/* Not all devices support TRIM. */
3268			} else if (!vdev_accessible(vd, zio)) {
3269				zio->io_error = SET_ERROR(ENXIO);
3270			} else {
3271				unexpected_error = B_TRUE;
3272			}
3273		}
3274	}
3275
3276	ops->vdev_op_io_done(zio);
3277
3278	if (unexpected_error)
3279		VERIFY(vdev_probe(vd, zio) == NULL);
3280
3281	return (ZIO_PIPELINE_CONTINUE);
3282}
3283
3284/*
3285 * For non-raidz ZIOs, we can just copy aside the bad data read from the
3286 * disk, and use that to finish the checksum ereport later.
3287 */
3288static void
3289zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
3290    const void *good_buf)
3291{
3292	/* no processing needed */
3293	zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
3294}
3295
3296/*ARGSUSED*/
3297void
3298zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
3299{
3300	void *buf = zio_buf_alloc(zio->io_size);
3301
3302	abd_copy_to_buf(buf, zio->io_abd, zio->io_size);
3303
3304	zcr->zcr_cbinfo = zio->io_size;
3305	zcr->zcr_cbdata = buf;
3306	zcr->zcr_finish = zio_vsd_default_cksum_finish;
3307	zcr->zcr_free = zio_buf_free;
3308}
3309
3310static int
3311zio_vdev_io_assess(zio_t *zio)
3312{
3313	vdev_t *vd = zio->io_vd;
3314
3315	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
3316		return (ZIO_PIPELINE_STOP);
3317
3318	if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
3319		spa_config_exit(zio->io_spa, SCL_ZIO, zio);
3320
3321	if (zio->io_vsd != NULL) {
3322		zio->io_vsd_ops->vsd_free(zio);
3323		zio->io_vsd = NULL;
3324	}
3325
3326	if (zio_injection_enabled && zio->io_error == 0)
3327		zio->io_error = zio_handle_fault_injection(zio, EIO);
3328
3329	if (zio->io_type == ZIO_TYPE_FREE &&
3330	    zio->io_priority != ZIO_PRIORITY_NOW) {
3331		switch (zio->io_error) {
3332		case 0:
3333			ZIO_TRIM_STAT_INCR(bytes, zio->io_size);
3334			ZIO_TRIM_STAT_BUMP(success);
3335			break;
3336		case EOPNOTSUPP:
3337			ZIO_TRIM_STAT_BUMP(unsupported);
3338			break;
3339		default:
3340			ZIO_TRIM_STAT_BUMP(failed);
3341			break;
3342		}
3343	}
3344
3345	/*
3346	 * If the I/O failed, determine whether we should attempt to retry it.
3347	 *
3348	 * On retry, we cut in line in the issue queue, since we don't want
3349	 * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
3350	 */
3351	if (zio->io_error && vd == NULL &&
3352	    !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
3353		ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE));	/* not a leaf */
3354		ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));	/* not a leaf */
3355		zio->io_error = 0;
3356		zio->io_flags |= ZIO_FLAG_IO_RETRY |
3357		    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
3358		zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
3359		zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
3360		    zio_requeue_io_start_cut_in_line);
3361		return (ZIO_PIPELINE_STOP);
3362	}
3363
3364	/*
3365	 * If we got an error on a leaf device, convert it to ENXIO
3366	 * if the device is not accessible at all.
3367	 */
3368	if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3369	    !vdev_accessible(vd, zio))
3370		zio->io_error = SET_ERROR(ENXIO);
3371
3372	/*
3373	 * If we can't write to an interior vdev (mirror or RAID-Z),
3374	 * set vdev_cant_write so that we stop trying to allocate from it.
3375	 */
3376	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
3377	    vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
3378		vd->vdev_cant_write = B_TRUE;
3379	}
3380
3381	/*
3382	 * If a cache flush returns ENOTSUP or ENOTTY, we know that no future
3383	 * attempts will ever succeed. In this case we set a persistent bit so
3384	 * that we don't bother with it in the future.
3385	 */
3386	if ((zio->io_error == ENOTSUP || zio->io_error == ENOTTY) &&
3387	    zio->io_type == ZIO_TYPE_IOCTL &&
3388	    zio->io_cmd == DKIOCFLUSHWRITECACHE && vd != NULL)
3389		vd->vdev_nowritecache = B_TRUE;
3390
3391	if (zio->io_error)
3392		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3393
3394	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3395	    zio->io_physdone != NULL) {
3396		ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
3397		ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
3398		zio->io_physdone(zio->io_logical);
3399	}
3400
3401	return (ZIO_PIPELINE_CONTINUE);
3402}
3403
3404void
3405zio_vdev_io_reissue(zio_t *zio)
3406{
3407	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
3408	ASSERT(zio->io_error == 0);
3409
3410	zio->io_stage >>= 1;
3411}
3412
3413void
3414zio_vdev_io_redone(zio_t *zio)
3415{
3416	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
3417
3418	zio->io_stage >>= 1;
3419}
3420
3421void
3422zio_vdev_io_bypass(zio_t *zio)
3423{
3424	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
3425	ASSERT(zio->io_error == 0);
3426
3427	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
3428	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
3429}
3430
3431/*
3432 * ==========================================================================
3433 * Generate and verify checksums
3434 * ==========================================================================
3435 */
3436static int
3437zio_checksum_generate(zio_t *zio)
3438{
3439	blkptr_t *bp = zio->io_bp;
3440	enum zio_checksum checksum;
3441
3442	if (bp == NULL) {
3443		/*
3444		 * This is zio_write_phys().
3445		 * We're either generating a label checksum, or none at all.
3446		 */
3447		checksum = zio->io_prop.zp_checksum;
3448
3449		if (checksum == ZIO_CHECKSUM_OFF)
3450			return (ZIO_PIPELINE_CONTINUE);
3451
3452		ASSERT(checksum == ZIO_CHECKSUM_LABEL);
3453	} else {
3454		if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
3455			ASSERT(!IO_IS_ALLOCATING(zio));
3456			checksum = ZIO_CHECKSUM_GANG_HEADER;
3457		} else {
3458			checksum = BP_GET_CHECKSUM(bp);
3459		}
3460	}
3461
3462	zio_checksum_compute(zio, checksum, zio->io_abd, zio->io_size);
3463
3464	return (ZIO_PIPELINE_CONTINUE);
3465}
3466
3467static int
3468zio_checksum_verify(zio_t *zio)
3469{
3470	zio_bad_cksum_t info;
3471	blkptr_t *bp = zio->io_bp;
3472	int error;
3473
3474	ASSERT(zio->io_vd != NULL);
3475
3476	if (bp == NULL) {
3477		/*
3478		 * This is zio_read_phys().
3479		 * We're either verifying a label checksum, or nothing at all.
3480		 */
3481		if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
3482			return (ZIO_PIPELINE_CONTINUE);
3483
3484		ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
3485	}
3486
3487	if ((error = zio_checksum_error(zio, &info)) != 0) {
3488		zio->io_error = error;
3489		if (error == ECKSUM &&
3490		    !(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
3491			zfs_ereport_start_checksum(zio->io_spa,
3492			    zio->io_vd, zio, zio->io_offset,
3493			    zio->io_size, NULL, &info);
3494		}
3495	}
3496
3497	return (ZIO_PIPELINE_CONTINUE);
3498}
3499
3500/*
3501 * Called by RAID-Z to ensure we don't compute the checksum twice.
3502 */
3503void
3504zio_checksum_verified(zio_t *zio)
3505{
3506	zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
3507}
3508
3509/*
3510 * ==========================================================================
3511 * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
3512 * An error of 0 indicates success.  ENXIO indicates whole-device failure,
3513 * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
3514 * indicate errors that are specific to one I/O, and most likely permanent.
3515 * Any other error is presumed to be worse because we weren't expecting it.
3516 * ==========================================================================
3517 */
3518int
3519zio_worst_error(int e1, int e2)
3520{
3521	static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
3522	int r1, r2;
3523
3524	for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
3525		if (e1 == zio_error_rank[r1])
3526			break;
3527
3528	for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
3529		if (e2 == zio_error_rank[r2])
3530			break;
3531
3532	return (r1 > r2 ? e1 : e2);
3533}
3534
3535/*
3536 * ==========================================================================
3537 * I/O completion
3538 * ==========================================================================
3539 */
3540static int
3541zio_ready(zio_t *zio)
3542{
3543	blkptr_t *bp = zio->io_bp;
3544	zio_t *pio, *pio_next;
3545	zio_link_t *zl = NULL;
3546
3547	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
3548	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
3549		return (ZIO_PIPELINE_STOP);
3550
3551	if (zio->io_ready) {
3552		ASSERT(IO_IS_ALLOCATING(zio));
3553		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
3554		    (zio->io_flags & ZIO_FLAG_NOPWRITE));
3555		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
3556
3557		zio->io_ready(zio);
3558	}
3559
3560	if (bp != NULL && bp != &zio->io_bp_copy)
3561		zio->io_bp_copy = *bp;
3562
3563	if (zio->io_error != 0) {
3564		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3565
3566		if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
3567			ASSERT(IO_IS_ALLOCATING(zio));
3568			ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
3569			/*
3570			 * We were unable to allocate anything, unreserve and
3571			 * issue the next I/O to allocate.
3572			 */
3573			metaslab_class_throttle_unreserve(
3574			    spa_normal_class(zio->io_spa),
3575			    zio->io_prop.zp_copies, zio);
3576			zio_allocate_dispatch(zio->io_spa);
3577		}
3578	}
3579
3580	mutex_enter(&zio->io_lock);
3581	zio->io_state[ZIO_WAIT_READY] = 1;
3582	pio = zio_walk_parents(zio, &zl);
3583	mutex_exit(&zio->io_lock);
3584
3585	/*
3586	 * As we notify zio's parents, new parents could be added.
3587	 * New parents go to the head of zio's io_parent_list, however,
3588	 * so we will (correctly) not notify them.  The remainder of zio's
3589	 * io_parent_list, from 'pio_next' onward, cannot change because
3590	 * all parents must wait for us to be done before they can be done.
3591	 */
3592	for (; pio != NULL; pio = pio_next) {
3593		pio_next = zio_walk_parents(zio, &zl);
3594		zio_notify_parent(pio, zio, ZIO_WAIT_READY);
3595	}
3596
3597	if (zio->io_flags & ZIO_FLAG_NODATA) {
3598		if (BP_IS_GANG(bp)) {
3599			zio->io_flags &= ~ZIO_FLAG_NODATA;
3600		} else {
3601			ASSERT((uintptr_t)zio->io_abd < SPA_MAXBLOCKSIZE);
3602			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
3603		}
3604	}
3605
3606	if (zio_injection_enabled &&
3607	    zio->io_spa->spa_syncing_txg == zio->io_txg)
3608		zio_handle_ignored_writes(zio);
3609
3610	return (ZIO_PIPELINE_CONTINUE);
3611}
3612
3613/*
3614 * Update the allocation throttle accounting.
3615 */
3616static void
3617zio_dva_throttle_done(zio_t *zio)
3618{
3619	zio_t *lio = zio->io_logical;
3620	zio_t *pio = zio_unique_parent(zio);
3621	vdev_t *vd = zio->io_vd;
3622	int flags = METASLAB_ASYNC_ALLOC;
3623
3624	ASSERT3P(zio->io_bp, !=, NULL);
3625	ASSERT3U(zio->io_type, ==, ZIO_TYPE_WRITE);
3626	ASSERT3U(zio->io_priority, ==, ZIO_PRIORITY_ASYNC_WRITE);
3627	ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
3628	ASSERT(vd != NULL);
3629	ASSERT3P(vd, ==, vd->vdev_top);
3630	ASSERT(!(zio->io_flags & (ZIO_FLAG_IO_REPAIR | ZIO_FLAG_IO_RETRY)));
3631	ASSERT(zio->io_flags & ZIO_FLAG_IO_ALLOCATING);
3632	ASSERT(!(lio->io_flags & ZIO_FLAG_IO_REWRITE));
3633	ASSERT(!(lio->io_orig_flags & ZIO_FLAG_NODATA));
3634
3635	/*
3636	 * Parents of gang children can have two flavors -- ones that
3637	 * allocated the gang header (will have ZIO_FLAG_IO_REWRITE set)
3638	 * and ones that allocated the constituent blocks. The allocation
3639	 * throttle needs to know the allocating parent zio so we must find
3640	 * it here.
3641	 */
3642	if (pio->io_child_type == ZIO_CHILD_GANG) {
3643		/*
3644		 * If our parent is a rewrite gang child then our grandparent
3645		 * would have been the one that performed the allocation.
3646		 */
3647		if (pio->io_flags & ZIO_FLAG_IO_REWRITE)
3648			pio = zio_unique_parent(pio);
3649		flags |= METASLAB_GANG_CHILD;
3650	}
3651
3652	ASSERT(IO_IS_ALLOCATING(pio));
3653	ASSERT3P(zio, !=, zio->io_logical);
3654	ASSERT(zio->io_logical != NULL);
3655	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REPAIR));
3656	ASSERT0(zio->io_flags & ZIO_FLAG_NOPWRITE);
3657
3658	mutex_enter(&pio->io_lock);
3659	metaslab_group_alloc_decrement(zio->io_spa, vd->vdev_id, pio, flags);
3660	mutex_exit(&pio->io_lock);
3661
3662	metaslab_class_throttle_unreserve(spa_normal_class(zio->io_spa),
3663	    1, pio);
3664
3665	/*
3666	 * Call into the pipeline to see if there is more work that
3667	 * needs to be done. If there is work to be done it will be
3668	 * dispatched to another taskq thread.
3669	 */
3670	zio_allocate_dispatch(zio->io_spa);
3671}
3672
3673static int
3674zio_done(zio_t *zio)
3675{
3676	spa_t *spa = zio->io_spa;
3677	zio_t *lio = zio->io_logical;
3678	blkptr_t *bp = zio->io_bp;
3679	vdev_t *vd = zio->io_vd;
3680	uint64_t psize = zio->io_size;
3681	zio_t *pio, *pio_next;
3682	metaslab_class_t *mc = spa_normal_class(spa);
3683	zio_link_t *zl = NULL;
3684
3685	/*
3686	 * If our children haven't all completed,
3687	 * wait for them and then repeat this pipeline stage.
3688	 */
3689	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
3690	    zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
3691	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
3692	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
3693		return (ZIO_PIPELINE_STOP);
3694
3695	/*
3696	 * If the allocation throttle is enabled, then update the accounting.
3697	 * We only track child I/Os that are part of an allocating async
3698	 * write. We must do this since the allocation is performed
3699	 * by the logical I/O but the actual write is done by child I/Os.
3700	 */
3701	if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING &&
3702	    zio->io_child_type == ZIO_CHILD_VDEV) {
3703		ASSERT(mc->mc_alloc_throttle_enabled);
3704		zio_dva_throttle_done(zio);
3705	}
3706
3707	/*
3708	 * If the allocation throttle is enabled, verify that
3709	 * we have decremented the refcounts for every I/O that was throttled.
3710	 */
3711	if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
3712		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
3713		ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
3714		ASSERT(bp != NULL);
3715		metaslab_group_alloc_verify(spa, zio->io_bp, zio);
3716		VERIFY(refcount_not_held(&mc->mc_alloc_slots, zio));
3717	}
3718
3719	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
3720		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
3721			ASSERT(zio->io_children[c][w] == 0);
3722
3723	if (bp != NULL && !BP_IS_EMBEDDED(bp)) {
3724		ASSERT(bp->blk_pad[0] == 0);
3725		ASSERT(bp->blk_pad[1] == 0);
3726		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
3727		    (bp == zio_unique_parent(zio)->io_bp));
3728		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
3729		    zio->io_bp_override == NULL &&
3730		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
3731			ASSERT(!BP_SHOULD_BYTESWAP(bp));
3732			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
3733			ASSERT(BP_COUNT_GANG(bp) == 0 ||
3734			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
3735		}
3736		if (zio->io_flags & ZIO_FLAG_NOPWRITE)
3737			VERIFY(BP_EQUAL(bp, &zio->io_bp_orig));
3738	}
3739
3740	/*
3741	 * If there were child vdev/gang/ddt errors, they apply to us now.
3742	 */
3743	zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
3744	zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
3745	zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
3746
3747	/*
3748	 * If the I/O on the transformed data was successful, generate any
3749	 * checksum reports now while we still have the transformed data.
3750	 */
3751	if (zio->io_error == 0) {
3752		while (zio->io_cksum_report != NULL) {
3753			zio_cksum_report_t *zcr = zio->io_cksum_report;
3754			uint64_t align = zcr->zcr_align;
3755			uint64_t asize = P2ROUNDUP(psize, align);
3756			char *abuf = NULL;
3757			abd_t *adata = zio->io_abd;
3758
3759			if (asize != psize) {
3760				adata = abd_alloc_linear(asize, B_TRUE);
3761				abd_copy(adata, zio->io_abd, psize);
3762				abd_zero_off(adata, psize, asize - psize);
3763			}
3764
3765			if (adata != NULL)
3766				abuf = abd_borrow_buf_copy(adata, asize);
3767
3768			zio->io_cksum_report = zcr->zcr_next;
3769			zcr->zcr_next = NULL;
3770			zcr->zcr_finish(zcr, abuf);
3771			zfs_ereport_free_checksum(zcr);
3772
3773			if (adata != NULL)
3774				abd_return_buf(adata, abuf, asize);
3775
3776			if (asize != psize)
3777				abd_free(adata);
3778		}
3779	}
3780
3781	zio_pop_transforms(zio);	/* note: may set zio->io_error */
3782
3783	vdev_stat_update(zio, psize);
3784
3785	if (zio->io_error) {
3786		/*
3787		 * If this I/O is attached to a particular vdev,
3788		 * generate an error message describing the I/O failure
3789		 * at the block level.  We ignore these errors if the
3790		 * device is currently unavailable.
3791		 */
3792		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
3793			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
3794
3795		if ((zio->io_error == EIO || !(zio->io_flags &
3796		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
3797		    zio == lio) {
3798			/*
3799			 * For logical I/O requests, tell the SPA to log the
3800			 * error and generate a logical data ereport.
3801			 */
3802			spa_log_error(spa, zio);
3803			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
3804			    0, 0);
3805		}
3806	}
3807
3808	if (zio->io_error && zio == lio) {
3809		/*
3810		 * Determine whether zio should be reexecuted.  This will
3811		 * propagate all the way to the root via zio_notify_parent().
3812		 */
3813		ASSERT(vd == NULL && bp != NULL);
3814		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3815
3816		if (IO_IS_ALLOCATING(zio) &&
3817		    !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
3818			if (zio->io_error != ENOSPC)
3819				zio->io_reexecute |= ZIO_REEXECUTE_NOW;
3820			else
3821				zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3822		}
3823
3824		if ((zio->io_type == ZIO_TYPE_READ ||
3825		    zio->io_type == ZIO_TYPE_FREE) &&
3826		    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
3827		    zio->io_error == ENXIO &&
3828		    spa_load_state(spa) == SPA_LOAD_NONE &&
3829		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
3830			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3831
3832		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
3833			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3834
3835		/*
3836		 * Here is a possibly good place to attempt to do
3837		 * either combinatorial reconstruction or error correction
3838		 * based on checksums.  It also might be a good place
3839		 * to send out preliminary ereports before we suspend
3840		 * processing.
3841		 */
3842	}
3843
3844	/*
3845	 * If there were logical child errors, they apply to us now.
3846	 * We defer this until now to avoid conflating logical child
3847	 * errors with errors that happened to the zio itself when
3848	 * updating vdev stats and reporting FMA events above.
3849	 */
3850	zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
3851
3852	if ((zio->io_error || zio->io_reexecute) &&
3853	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
3854	    !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
3855		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
3856
3857	zio_gang_tree_free(&zio->io_gang_tree);
3858
3859	/*
3860	 * Godfather I/Os should never suspend.
3861	 */
3862	if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
3863	    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
3864		zio->io_reexecute = 0;
3865
3866	if (zio->io_reexecute) {
3867		/*
3868		 * This is a logical I/O that wants to reexecute.
3869		 *
3870		 * Reexecute is top-down.  When an i/o fails, if it's not
3871		 * the root, it simply notifies its parent and sticks around.
3872		 * The parent, seeing that it still has children in zio_done(),
3873		 * does the same.  This percolates all the way up to the root.
3874		 * The root i/o will reexecute or suspend the entire tree.
3875		 *
3876		 * This approach ensures that zio_reexecute() honors
3877		 * all the original i/o dependency relationships, e.g.
3878		 * parents not executing until children are ready.
3879		 */
3880		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3881
3882		zio->io_gang_leader = NULL;
3883
3884		mutex_enter(&zio->io_lock);
3885		zio->io_state[ZIO_WAIT_DONE] = 1;
3886		mutex_exit(&zio->io_lock);
3887
3888		/*
3889		 * "The Godfather" I/O monitors its children but is
3890		 * not a true parent to them. It will track them through
3891		 * the pipeline but severs its ties whenever they get into
3892		 * trouble (e.g. suspended). This allows "The Godfather"
3893		 * I/O to return status without blocking.
3894		 */
3895		zl = NULL;
3896		for (pio = zio_walk_parents(zio, &zl); pio != NULL;
3897		    pio = pio_next) {
3898			zio_link_t *remove_zl = zl;
3899			pio_next = zio_walk_parents(zio, &zl);
3900
3901			if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
3902			    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
3903				zio_remove_child(pio, zio, remove_zl);
3904				zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3905			}
3906		}
3907
3908		if ((pio = zio_unique_parent(zio)) != NULL) {
3909			/*
3910			 * We're not a root i/o, so there's nothing to do
3911			 * but notify our parent.  Don't propagate errors
3912			 * upward since we haven't permanently failed yet.
3913			 */
3914			ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
3915			zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
3916			zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3917		} else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
3918			/*
3919			 * We'd fail again if we reexecuted now, so suspend
3920			 * until conditions improve (e.g. device comes online).
3921			 */
3922			zio_suspend(spa, zio);
3923		} else {
3924			/*
3925			 * Reexecution is potentially a huge amount of work.
3926			 * Hand it off to the otherwise-unused claim taskq.
3927			 */
3928#if defined(illumos) || !defined(_KERNEL)
3929			ASSERT(zio->io_tqent.tqent_next == NULL);
3930#else
3931			ASSERT(zio->io_tqent.tqent_task.ta_pending == 0);
3932#endif
3933			spa_taskq_dispatch_ent(spa, ZIO_TYPE_CLAIM,
3934			    ZIO_TASKQ_ISSUE, (task_func_t *)zio_reexecute, zio,
3935			    0, &zio->io_tqent);
3936		}
3937		return (ZIO_PIPELINE_STOP);
3938	}
3939
3940	ASSERT(zio->io_child_count == 0);
3941	ASSERT(zio->io_reexecute == 0);
3942	ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
3943
3944	/*
3945	 * Report any checksum errors, since the I/O is complete.
3946	 */
3947	while (zio->io_cksum_report != NULL) {
3948		zio_cksum_report_t *zcr = zio->io_cksum_report;
3949		zio->io_cksum_report = zcr->zcr_next;
3950		zcr->zcr_next = NULL;
3951		zcr->zcr_finish(zcr, NULL);
3952		zfs_ereport_free_checksum(zcr);
3953	}
3954
3955	/*
3956	 * It is the responsibility of the done callback to ensure that this
3957	 * particular zio is no longer discoverable for adoption, and as
3958	 * such, cannot acquire any new parents.
3959	 */
3960	if (zio->io_done)
3961		zio->io_done(zio);
3962
3963	mutex_enter(&zio->io_lock);
3964	zio->io_state[ZIO_WAIT_DONE] = 1;
3965	mutex_exit(&zio->io_lock);
3966
3967	zl = NULL;
3968	for (pio = zio_walk_parents(zio, &zl); pio != NULL; pio = pio_next) {
3969		zio_link_t *remove_zl = zl;
3970		pio_next = zio_walk_parents(zio, &zl);
3971		zio_remove_child(pio, zio, remove_zl);
3972		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3973	}
3974
3975	if (zio->io_waiter != NULL) {
3976		mutex_enter(&zio->io_lock);
3977		zio->io_executor = NULL;
3978		cv_broadcast(&zio->io_cv);
3979		mutex_exit(&zio->io_lock);
3980	} else {
3981		zio_destroy(zio);
3982	}
3983
3984	return (ZIO_PIPELINE_STOP);
3985}
3986
3987/*
3988 * ==========================================================================
3989 * I/O pipeline definition
3990 * ==========================================================================
3991 */
3992static zio_pipe_stage_t *zio_pipeline[] = {
3993	NULL,
3994	zio_read_bp_init,
3995	zio_write_bp_init,
3996	zio_free_bp_init,
3997	zio_issue_async,
3998	zio_write_compress,
3999	zio_checksum_generate,
4000	zio_nop_write,
4001	zio_ddt_read_start,
4002	zio_ddt_read_done,
4003	zio_ddt_write,
4004	zio_ddt_free,
4005	zio_gang_assemble,
4006	zio_gang_issue,
4007	zio_dva_throttle,
4008	zio_dva_allocate,
4009	zio_dva_free,
4010	zio_dva_claim,
4011	zio_ready,
4012	zio_vdev_io_start,
4013	zio_vdev_io_done,
4014	zio_vdev_io_assess,
4015	zio_checksum_verify,
4016	zio_done
4017};
4018
4019
4020
4021
4022/*
4023 * Compare two zbookmark_phys_t's to see which we would reach first in a
4024 * pre-order traversal of the object tree.
4025 *
4026 * This is simple in every case aside from the meta-dnode object. For all other
4027 * objects, we traverse them in order (object 1 before object 2, and so on).
4028 * However, all of these objects are traversed while traversing object 0, since
4029 * the data it points to is the list of objects.  Thus, we need to convert to a
4030 * canonical representation so we can compare meta-dnode bookmarks to
4031 * non-meta-dnode bookmarks.
4032 *
4033 * We do this by calculating "equivalents" for each field of the zbookmark.
4034 * zbookmarks outside of the meta-dnode use their own object and level, and
4035 * calculate the level 0 equivalent (the first L0 blkid that is contained in the
4036 * blocks this bookmark refers to) by multiplying their blkid by their span
4037 * (the number of L0 blocks contained within one block at their level).
4038 * zbookmarks inside the meta-dnode calculate their object equivalent
4039 * (which is L0equiv * dnodes per data block), use 0 for their L0equiv, and use
4040 * level + 1<<31 (any value larger than a level could ever be) for their level.
4041 * This causes them to always compare before a bookmark in their object
4042 * equivalent, compare appropriately to bookmarks in other objects, and to
4043 * compare appropriately to other bookmarks in the meta-dnode.
4044 */
4045int
4046zbookmark_compare(uint16_t dbss1, uint8_t ibs1, uint16_t dbss2, uint8_t ibs2,
4047    const zbookmark_phys_t *zb1, const zbookmark_phys_t *zb2)
4048{
4049	/*
4050	 * These variables represent the "equivalent" values for the zbookmark,
4051	 * after converting zbookmarks inside the meta dnode to their
4052	 * normal-object equivalents.
4053	 */
4054	uint64_t zb1obj, zb2obj;
4055	uint64_t zb1L0, zb2L0;
4056	uint64_t zb1level, zb2level;
4057
4058	if (zb1->zb_object == zb2->zb_object &&
4059	    zb1->zb_level == zb2->zb_level &&
4060	    zb1->zb_blkid == zb2->zb_blkid)
4061		return (0);
4062
4063	/*
4064	 * BP_SPANB calculates the span in blocks.
4065	 */
4066	zb1L0 = (zb1->zb_blkid) * BP_SPANB(ibs1, zb1->zb_level);
4067	zb2L0 = (zb2->zb_blkid) * BP_SPANB(ibs2, zb2->zb_level);
4068
4069	if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
4070		zb1obj = zb1L0 * (dbss1 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
4071		zb1L0 = 0;
4072		zb1level = zb1->zb_level + COMPARE_META_LEVEL;
4073	} else {
4074		zb1obj = zb1->zb_object;
4075		zb1level = zb1->zb_level;
4076	}
4077
4078	if (zb2->zb_object == DMU_META_DNODE_OBJECT) {
4079		zb2obj = zb2L0 * (dbss2 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
4080		zb2L0 = 0;
4081		zb2level = zb2->zb_level + COMPARE_META_LEVEL;
4082	} else {
4083		zb2obj = zb2->zb_object;
4084		zb2level = zb2->zb_level;
4085	}
4086
4087	/* Now that we have a canonical representation, do the comparison. */
4088	if (zb1obj != zb2obj)
4089		return (zb1obj < zb2obj ? -1 : 1);
4090	else if (zb1L0 != zb2L0)
4091		return (zb1L0 < zb2L0 ? -1 : 1);
4092	else if (zb1level != zb2level)
4093		return (zb1level > zb2level ? -1 : 1);
4094	/*
4095	 * This can (theoretically) happen if the bookmarks have the same object
4096	 * and level, but different blkids, if the block sizes are not the same.
4097	 * There is presently no way to change the indirect block sizes
4098	 */
4099	return (0);
4100}
4101
4102/*
4103 *  This function checks the following: given that last_block is the place that
4104 *  our traversal stopped last time, does that guarantee that we've visited
4105 *  every node under subtree_root?  Therefore, we can't just use the raw output
4106 *  of zbookmark_compare.  We have to pass in a modified version of
4107 *  subtree_root; by incrementing the block id, and then checking whether
4108 *  last_block is before or equal to that, we can tell whether or not having
4109 *  visited last_block implies that all of subtree_root's children have been
4110 *  visited.
4111 */
4112boolean_t
4113zbookmark_subtree_completed(const dnode_phys_t *dnp,
4114    const zbookmark_phys_t *subtree_root, const zbookmark_phys_t *last_block)
4115{
4116	zbookmark_phys_t mod_zb = *subtree_root;
4117	mod_zb.zb_blkid++;
4118	ASSERT(last_block->zb_level == 0);
4119
4120	/* The objset_phys_t isn't before anything. */
4121	if (dnp == NULL)
4122		return (B_FALSE);
4123
4124	/*
4125	 * We pass in 1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT) for the
4126	 * data block size in sectors, because that variable is only used if
4127	 * the bookmark refers to a block in the meta-dnode.  Since we don't
4128	 * know without examining it what object it refers to, and there's no
4129	 * harm in passing in this value in other cases, we always pass it in.
4130	 *
4131	 * We pass in 0 for the indirect block size shift because zb2 must be
4132	 * level 0.  The indirect block size is only used to calculate the span
4133	 * of the bookmark, but since the bookmark must be level 0, the span is
4134	 * always 1, so the math works out.
4135	 *
4136	 * If you make changes to how the zbookmark_compare code works, be sure
4137	 * to make sure that this code still works afterwards.
4138	 */
4139	return (zbookmark_compare(dnp->dn_datablkszsec, dnp->dn_indblkshift,
4140	    1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT), 0, &mod_zb,
4141	    last_block) <= 0);
4142}
4143