zio.c revision 276081
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
24 * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
25 */
26
27#include <sys/zfs_context.h>
28#include <sys/fm/fs/zfs.h>
29#include <sys/spa.h>
30#include <sys/txg.h>
31#include <sys/spa_impl.h>
32#include <sys/vdev_impl.h>
33#include <sys/zio_impl.h>
34#include <sys/zio_compress.h>
35#include <sys/zio_checksum.h>
36#include <sys/dmu_objset.h>
37#include <sys/arc.h>
38#include <sys/ddt.h>
39#include <sys/trim_map.h>
40#include <sys/blkptr.h>
41#include <sys/zfeature.h>
42
43SYSCTL_DECL(_vfs_zfs);
44SYSCTL_NODE(_vfs_zfs, OID_AUTO, zio, CTLFLAG_RW, 0, "ZFS ZIO");
45#if defined(__amd64__)
46static int zio_use_uma = 1;
47#else
48static int zio_use_uma = 0;
49#endif
50TUNABLE_INT("vfs.zfs.zio.use_uma", &zio_use_uma);
51SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, use_uma, CTLFLAG_RDTUN, &zio_use_uma, 0,
52    "Use uma(9) for ZIO allocations");
53static int zio_exclude_metadata = 0;
54TUNABLE_INT("vfs.zfs.zio.exclude_metadata", &zio_exclude_metadata);
55SYSCTL_INT(_vfs_zfs_zio, OID_AUTO, exclude_metadata, CTLFLAG_RDTUN, &zio_exclude_metadata, 0,
56    "Exclude metadata buffers from dumps as well");
57
58zio_trim_stats_t zio_trim_stats = {
59	{ "bytes",		KSTAT_DATA_UINT64,
60	  "Number of bytes successfully TRIMmed" },
61	{ "success",		KSTAT_DATA_UINT64,
62	  "Number of successful TRIM requests" },
63	{ "unsupported",	KSTAT_DATA_UINT64,
64	  "Number of TRIM requests that failed because TRIM is not supported" },
65	{ "failed",		KSTAT_DATA_UINT64,
66	  "Number of TRIM requests that failed for reasons other than not supported" },
67};
68
69static kstat_t *zio_trim_ksp;
70
71/*
72 * ==========================================================================
73 * I/O type descriptions
74 * ==========================================================================
75 */
76const char *zio_type_name[ZIO_TYPES] = {
77	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
78	"zio_ioctl"
79};
80
81/*
82 * ==========================================================================
83 * I/O kmem caches
84 * ==========================================================================
85 */
86kmem_cache_t *zio_cache;
87kmem_cache_t *zio_link_cache;
88kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
89kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
90
91#ifdef _KERNEL
92extern vmem_t *zio_alloc_arena;
93#endif
94
95/*
96 * The following actions directly effect the spa's sync-to-convergence logic.
97 * The values below define the sync pass when we start performing the action.
98 * Care should be taken when changing these values as they directly impact
99 * spa_sync() performance. Tuning these values may introduce subtle performance
100 * pathologies and should only be done in the context of performance analysis.
101 * These tunables will eventually be removed and replaced with #defines once
102 * enough analysis has been done to determine optimal values.
103 *
104 * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
105 * regular blocks are not deferred.
106 */
107int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
108TUNABLE_INT("vfs.zfs.sync_pass_deferred_free", &zfs_sync_pass_deferred_free);
109SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_deferred_free, CTLFLAG_RDTUN,
110    &zfs_sync_pass_deferred_free, 0, "defer frees starting in this pass");
111int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
112TUNABLE_INT("vfs.zfs.sync_pass_dont_compress", &zfs_sync_pass_dont_compress);
113SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_dont_compress, CTLFLAG_RDTUN,
114    &zfs_sync_pass_dont_compress, 0, "don't compress starting in this pass");
115int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
116TUNABLE_INT("vfs.zfs.sync_pass_rewrite", &zfs_sync_pass_rewrite);
117SYSCTL_INT(_vfs_zfs, OID_AUTO, sync_pass_rewrite, CTLFLAG_RDTUN,
118    &zfs_sync_pass_rewrite, 0, "rewrite new bps starting in this pass");
119
120/*
121 * An allocating zio is one that either currently has the DVA allocate
122 * stage set or will have it later in its lifetime.
123 */
124#define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
125
126boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
127
128#ifdef ZFS_DEBUG
129int zio_buf_debug_limit = 16384;
130#else
131int zio_buf_debug_limit = 0;
132#endif
133
134void
135zio_init(void)
136{
137	size_t c;
138	zio_cache = kmem_cache_create("zio_cache",
139	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
140	zio_link_cache = kmem_cache_create("zio_link_cache",
141	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
142	if (!zio_use_uma)
143		goto out;
144
145	/*
146	 * For small buffers, we want a cache for each multiple of
147	 * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
148	 * for each quarter-power of 2.
149	 */
150	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
151		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
152		size_t p2 = size;
153		size_t align = 0;
154		size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
155
156		while (p2 & (p2 - 1))
157			p2 &= p2 - 1;
158
159#ifdef illumos
160#ifndef _KERNEL
161		/*
162		 * If we are using watchpoints, put each buffer on its own page,
163		 * to eliminate the performance overhead of trapping to the
164		 * kernel when modifying a non-watched buffer that shares the
165		 * page with a watched buffer.
166		 */
167		if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
168			continue;
169#endif
170#endif /* illumos */
171		if (size <= 4 * SPA_MINBLOCKSIZE) {
172			align = SPA_MINBLOCKSIZE;
173		} else if (IS_P2ALIGNED(size, p2 >> 2)) {
174			align = MIN(p2 >> 2, PAGESIZE);
175		}
176
177		if (align != 0) {
178			char name[36];
179			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
180			zio_buf_cache[c] = kmem_cache_create(name, size,
181			    align, NULL, NULL, NULL, NULL, NULL, cflags);
182
183			/*
184			 * Since zio_data bufs do not appear in crash dumps, we
185			 * pass KMC_NOTOUCH so that no allocator metadata is
186			 * stored with the buffers.
187			 */
188			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
189			zio_data_buf_cache[c] = kmem_cache_create(name, size,
190			    align, NULL, NULL, NULL, NULL, NULL,
191			    cflags | KMC_NOTOUCH | KMC_NODEBUG);
192		}
193	}
194
195	while (--c != 0) {
196		ASSERT(zio_buf_cache[c] != NULL);
197		if (zio_buf_cache[c - 1] == NULL)
198			zio_buf_cache[c - 1] = zio_buf_cache[c];
199
200		ASSERT(zio_data_buf_cache[c] != NULL);
201		if (zio_data_buf_cache[c - 1] == NULL)
202			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
203	}
204out:
205
206	zio_inject_init();
207
208	zio_trim_ksp = kstat_create("zfs", 0, "zio_trim", "misc",
209	    KSTAT_TYPE_NAMED,
210	    sizeof(zio_trim_stats) / sizeof(kstat_named_t),
211	    KSTAT_FLAG_VIRTUAL);
212
213	if (zio_trim_ksp != NULL) {
214		zio_trim_ksp->ks_data = &zio_trim_stats;
215		kstat_install(zio_trim_ksp);
216	}
217}
218
219void
220zio_fini(void)
221{
222	size_t c;
223	kmem_cache_t *last_cache = NULL;
224	kmem_cache_t *last_data_cache = NULL;
225
226	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
227		if (zio_buf_cache[c] != last_cache) {
228			last_cache = zio_buf_cache[c];
229			kmem_cache_destroy(zio_buf_cache[c]);
230		}
231		zio_buf_cache[c] = NULL;
232
233		if (zio_data_buf_cache[c] != last_data_cache) {
234			last_data_cache = zio_data_buf_cache[c];
235			kmem_cache_destroy(zio_data_buf_cache[c]);
236		}
237		zio_data_buf_cache[c] = NULL;
238	}
239
240	kmem_cache_destroy(zio_link_cache);
241	kmem_cache_destroy(zio_cache);
242
243	zio_inject_fini();
244
245	if (zio_trim_ksp != NULL) {
246		kstat_delete(zio_trim_ksp);
247		zio_trim_ksp = NULL;
248	}
249}
250
251/*
252 * ==========================================================================
253 * Allocate and free I/O buffers
254 * ==========================================================================
255 */
256
257/*
258 * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
259 * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
260 * useful to inspect ZFS metadata, but if possible, we should avoid keeping
261 * excess / transient data in-core during a crashdump.
262 */
263void *
264zio_buf_alloc(size_t size)
265{
266	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
267	int flags = zio_exclude_metadata ? KM_NODEBUG : 0;
268
269	ASSERT3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
270
271	if (zio_use_uma)
272		return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
273	else
274		return (kmem_alloc(size, KM_SLEEP|flags));
275}
276
277/*
278 * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
279 * crashdump if the kernel panics.  This exists so that we will limit the amount
280 * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
281 * of kernel heap dumped to disk when the kernel panics)
282 */
283void *
284zio_data_buf_alloc(size_t size)
285{
286	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
287
288	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
289
290	if (zio_use_uma)
291		return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
292	else
293		return (kmem_alloc(size, KM_SLEEP | KM_NODEBUG));
294}
295
296void
297zio_buf_free(void *buf, size_t size)
298{
299	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
300
301	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
302
303	if (zio_use_uma)
304		kmem_cache_free(zio_buf_cache[c], buf);
305	else
306		kmem_free(buf, size);
307}
308
309void
310zio_data_buf_free(void *buf, size_t size)
311{
312	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
313
314	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
315
316	if (zio_use_uma)
317		kmem_cache_free(zio_data_buf_cache[c], buf);
318	else
319		kmem_free(buf, size);
320}
321
322/*
323 * ==========================================================================
324 * Push and pop I/O transform buffers
325 * ==========================================================================
326 */
327static void
328zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
329	zio_transform_func_t *transform)
330{
331	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
332
333	zt->zt_orig_data = zio->io_data;
334	zt->zt_orig_size = zio->io_size;
335	zt->zt_bufsize = bufsize;
336	zt->zt_transform = transform;
337
338	zt->zt_next = zio->io_transform_stack;
339	zio->io_transform_stack = zt;
340
341	zio->io_data = data;
342	zio->io_size = size;
343}
344
345static void
346zio_pop_transforms(zio_t *zio)
347{
348	zio_transform_t *zt;
349
350	while ((zt = zio->io_transform_stack) != NULL) {
351		if (zt->zt_transform != NULL)
352			zt->zt_transform(zio,
353			    zt->zt_orig_data, zt->zt_orig_size);
354
355		if (zt->zt_bufsize != 0)
356			zio_buf_free(zio->io_data, zt->zt_bufsize);
357
358		zio->io_data = zt->zt_orig_data;
359		zio->io_size = zt->zt_orig_size;
360		zio->io_transform_stack = zt->zt_next;
361
362		kmem_free(zt, sizeof (zio_transform_t));
363	}
364}
365
366/*
367 * ==========================================================================
368 * I/O transform callbacks for subblocks and decompression
369 * ==========================================================================
370 */
371static void
372zio_subblock(zio_t *zio, void *data, uint64_t size)
373{
374	ASSERT(zio->io_size > size);
375
376	if (zio->io_type == ZIO_TYPE_READ)
377		bcopy(zio->io_data, data, size);
378}
379
380static void
381zio_decompress(zio_t *zio, void *data, uint64_t size)
382{
383	if (zio->io_error == 0 &&
384	    zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
385	    zio->io_data, data, zio->io_size, size) != 0)
386		zio->io_error = SET_ERROR(EIO);
387}
388
389/*
390 * ==========================================================================
391 * I/O parent/child relationships and pipeline interlocks
392 * ==========================================================================
393 */
394/*
395 * NOTE - Callers to zio_walk_parents() and zio_walk_children must
396 *        continue calling these functions until they return NULL.
397 *        Otherwise, the next caller will pick up the list walk in
398 *        some indeterminate state.  (Otherwise every caller would
399 *        have to pass in a cookie to keep the state represented by
400 *        io_walk_link, which gets annoying.)
401 */
402zio_t *
403zio_walk_parents(zio_t *cio)
404{
405	zio_link_t *zl = cio->io_walk_link;
406	list_t *pl = &cio->io_parent_list;
407
408	zl = (zl == NULL) ? list_head(pl) : list_next(pl, zl);
409	cio->io_walk_link = zl;
410
411	if (zl == NULL)
412		return (NULL);
413
414	ASSERT(zl->zl_child == cio);
415	return (zl->zl_parent);
416}
417
418zio_t *
419zio_walk_children(zio_t *pio)
420{
421	zio_link_t *zl = pio->io_walk_link;
422	list_t *cl = &pio->io_child_list;
423
424	zl = (zl == NULL) ? list_head(cl) : list_next(cl, zl);
425	pio->io_walk_link = zl;
426
427	if (zl == NULL)
428		return (NULL);
429
430	ASSERT(zl->zl_parent == pio);
431	return (zl->zl_child);
432}
433
434zio_t *
435zio_unique_parent(zio_t *cio)
436{
437	zio_t *pio = zio_walk_parents(cio);
438
439	VERIFY(zio_walk_parents(cio) == NULL);
440	return (pio);
441}
442
443void
444zio_add_child(zio_t *pio, zio_t *cio)
445{
446	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
447
448	/*
449	 * Logical I/Os can have logical, gang, or vdev children.
450	 * Gang I/Os can have gang or vdev children.
451	 * Vdev I/Os can only have vdev children.
452	 * The following ASSERT captures all of these constraints.
453	 */
454	ASSERT(cio->io_child_type <= pio->io_child_type);
455
456	zl->zl_parent = pio;
457	zl->zl_child = cio;
458
459	mutex_enter(&cio->io_lock);
460	mutex_enter(&pio->io_lock);
461
462	ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
463
464	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
465		pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
466
467	list_insert_head(&pio->io_child_list, zl);
468	list_insert_head(&cio->io_parent_list, zl);
469
470	pio->io_child_count++;
471	cio->io_parent_count++;
472
473	mutex_exit(&pio->io_lock);
474	mutex_exit(&cio->io_lock);
475}
476
477static void
478zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
479{
480	ASSERT(zl->zl_parent == pio);
481	ASSERT(zl->zl_child == cio);
482
483	mutex_enter(&cio->io_lock);
484	mutex_enter(&pio->io_lock);
485
486	list_remove(&pio->io_child_list, zl);
487	list_remove(&cio->io_parent_list, zl);
488
489	pio->io_child_count--;
490	cio->io_parent_count--;
491
492	mutex_exit(&pio->io_lock);
493	mutex_exit(&cio->io_lock);
494
495	kmem_cache_free(zio_link_cache, zl);
496}
497
498static boolean_t
499zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
500{
501	uint64_t *countp = &zio->io_children[child][wait];
502	boolean_t waiting = B_FALSE;
503
504	mutex_enter(&zio->io_lock);
505	ASSERT(zio->io_stall == NULL);
506	if (*countp != 0) {
507		zio->io_stage >>= 1;
508		zio->io_stall = countp;
509		waiting = B_TRUE;
510	}
511	mutex_exit(&zio->io_lock);
512
513	return (waiting);
514}
515
516static void
517zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
518{
519	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
520	int *errorp = &pio->io_child_error[zio->io_child_type];
521
522	mutex_enter(&pio->io_lock);
523	if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
524		*errorp = zio_worst_error(*errorp, zio->io_error);
525	pio->io_reexecute |= zio->io_reexecute;
526	ASSERT3U(*countp, >, 0);
527
528	(*countp)--;
529
530	if (*countp == 0 && pio->io_stall == countp) {
531		pio->io_stall = NULL;
532		mutex_exit(&pio->io_lock);
533		zio_execute(pio);
534	} else {
535		mutex_exit(&pio->io_lock);
536	}
537}
538
539static void
540zio_inherit_child_errors(zio_t *zio, enum zio_child c)
541{
542	if (zio->io_child_error[c] != 0 && zio->io_error == 0)
543		zio->io_error = zio->io_child_error[c];
544}
545
546/*
547 * ==========================================================================
548 * Create the various types of I/O (read, write, free, etc)
549 * ==========================================================================
550 */
551static zio_t *
552zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
553    void *data, uint64_t size, zio_done_func_t *done, void *private,
554    zio_type_t type, zio_priority_t priority, enum zio_flag flags,
555    vdev_t *vd, uint64_t offset, const zbookmark_phys_t *zb,
556    enum zio_stage stage, enum zio_stage pipeline)
557{
558	zio_t *zio;
559
560	ASSERT3U(type == ZIO_TYPE_FREE || size, <=, SPA_MAXBLOCKSIZE);
561	ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
562	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
563
564	ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
565	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
566	ASSERT(vd || stage == ZIO_STAGE_OPEN);
567
568	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
569	bzero(zio, sizeof (zio_t));
570
571	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
572	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
573
574	list_create(&zio->io_parent_list, sizeof (zio_link_t),
575	    offsetof(zio_link_t, zl_parent_node));
576	list_create(&zio->io_child_list, sizeof (zio_link_t),
577	    offsetof(zio_link_t, zl_child_node));
578
579	if (vd != NULL)
580		zio->io_child_type = ZIO_CHILD_VDEV;
581	else if (flags & ZIO_FLAG_GANG_CHILD)
582		zio->io_child_type = ZIO_CHILD_GANG;
583	else if (flags & ZIO_FLAG_DDT_CHILD)
584		zio->io_child_type = ZIO_CHILD_DDT;
585	else
586		zio->io_child_type = ZIO_CHILD_LOGICAL;
587
588	if (bp != NULL) {
589		zio->io_bp = (blkptr_t *)bp;
590		zio->io_bp_copy = *bp;
591		zio->io_bp_orig = *bp;
592		if (type != ZIO_TYPE_WRITE ||
593		    zio->io_child_type == ZIO_CHILD_DDT)
594			zio->io_bp = &zio->io_bp_copy;	/* so caller can free */
595		if (zio->io_child_type == ZIO_CHILD_LOGICAL)
596			zio->io_logical = zio;
597		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
598			pipeline |= ZIO_GANG_STAGES;
599	}
600
601	zio->io_spa = spa;
602	zio->io_txg = txg;
603	zio->io_done = done;
604	zio->io_private = private;
605	zio->io_type = type;
606	zio->io_priority = priority;
607	zio->io_vd = vd;
608	zio->io_offset = offset;
609	zio->io_orig_data = zio->io_data = data;
610	zio->io_orig_size = zio->io_size = size;
611	zio->io_orig_flags = zio->io_flags = flags;
612	zio->io_orig_stage = zio->io_stage = stage;
613	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
614
615	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
616	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
617
618	if (zb != NULL)
619		zio->io_bookmark = *zb;
620
621	if (pio != NULL) {
622		if (zio->io_logical == NULL)
623			zio->io_logical = pio->io_logical;
624		if (zio->io_child_type == ZIO_CHILD_GANG)
625			zio->io_gang_leader = pio->io_gang_leader;
626		zio_add_child(pio, zio);
627	}
628
629	return (zio);
630}
631
632static void
633zio_destroy(zio_t *zio)
634{
635	list_destroy(&zio->io_parent_list);
636	list_destroy(&zio->io_child_list);
637	mutex_destroy(&zio->io_lock);
638	cv_destroy(&zio->io_cv);
639	kmem_cache_free(zio_cache, zio);
640}
641
642zio_t *
643zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
644    void *private, enum zio_flag flags)
645{
646	zio_t *zio;
647
648	zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
649	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
650	    ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
651
652	return (zio);
653}
654
655zio_t *
656zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
657{
658	return (zio_null(NULL, spa, NULL, done, private, flags));
659}
660
661zio_t *
662zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
663    void *data, uint64_t size, zio_done_func_t *done, void *private,
664    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
665{
666	zio_t *zio;
667
668	zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
669	    data, size, done, private,
670	    ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
671	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
672	    ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
673
674	return (zio);
675}
676
677zio_t *
678zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
679    void *data, uint64_t size, const zio_prop_t *zp,
680    zio_done_func_t *ready, zio_done_func_t *physdone, zio_done_func_t *done,
681    void *private,
682    zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
683{
684	zio_t *zio;
685
686	ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
687	    zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
688	    zp->zp_compress >= ZIO_COMPRESS_OFF &&
689	    zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
690	    DMU_OT_IS_VALID(zp->zp_type) &&
691	    zp->zp_level < 32 &&
692	    zp->zp_copies > 0 &&
693	    zp->zp_copies <= spa_max_replication(spa));
694
695	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
696	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
697	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
698	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
699
700	zio->io_ready = ready;
701	zio->io_physdone = physdone;
702	zio->io_prop = *zp;
703
704	/*
705	 * Data can be NULL if we are going to call zio_write_override() to
706	 * provide the already-allocated BP.  But we may need the data to
707	 * verify a dedup hit (if requested).  In this case, don't try to
708	 * dedup (just take the already-allocated BP verbatim).
709	 */
710	if (data == NULL && zio->io_prop.zp_dedup_verify) {
711		zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
712	}
713
714	return (zio);
715}
716
717zio_t *
718zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
719    uint64_t size, zio_done_func_t *done, void *private,
720    zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
721{
722	zio_t *zio;
723
724	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
725	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
726	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
727
728	return (zio);
729}
730
731void
732zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
733{
734	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
735	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
736	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
737	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
738
739	/*
740	 * We must reset the io_prop to match the values that existed
741	 * when the bp was first written by dmu_sync() keeping in mind
742	 * that nopwrite and dedup are mutually exclusive.
743	 */
744	zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
745	zio->io_prop.zp_nopwrite = nopwrite;
746	zio->io_prop.zp_copies = copies;
747	zio->io_bp_override = bp;
748}
749
750void
751zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
752{
753
754	/*
755	 * The check for EMBEDDED is a performance optimization.  We
756	 * process the free here (by ignoring it) rather than
757	 * putting it on the list and then processing it in zio_free_sync().
758	 */
759	if (BP_IS_EMBEDDED(bp))
760		return;
761	metaslab_check_free(spa, bp);
762
763	/*
764	 * Frees that are for the currently-syncing txg, are not going to be
765	 * deferred, and which will not need to do a read (i.e. not GANG or
766	 * DEDUP), can be processed immediately.  Otherwise, put them on the
767	 * in-memory list for later processing.
768	 */
769	if (zfs_trim_enabled || BP_IS_GANG(bp) || BP_GET_DEDUP(bp) ||
770	    txg != spa->spa_syncing_txg ||
771	    spa_sync_pass(spa) >= zfs_sync_pass_deferred_free) {
772		bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
773	} else {
774		VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp,
775		    BP_GET_PSIZE(bp), 0)));
776	}
777}
778
779zio_t *
780zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
781    uint64_t size, enum zio_flag flags)
782{
783	zio_t *zio;
784	enum zio_stage stage = ZIO_FREE_PIPELINE;
785
786	ASSERT(!BP_IS_HOLE(bp));
787	ASSERT(spa_syncing_txg(spa) == txg);
788	ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
789
790	if (BP_IS_EMBEDDED(bp))
791		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
792
793	metaslab_check_free(spa, bp);
794	arc_freed(spa, bp);
795
796	if (zfs_trim_enabled)
797		stage |= ZIO_STAGE_ISSUE_ASYNC | ZIO_STAGE_VDEV_IO_START |
798		    ZIO_STAGE_VDEV_IO_ASSESS;
799	/*
800	 * GANG and DEDUP blocks can induce a read (for the gang block header,
801	 * or the DDT), so issue them asynchronously so that this thread is
802	 * not tied up.
803	 */
804	else if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
805		stage |= ZIO_STAGE_ISSUE_ASYNC;
806
807	flags |= ZIO_FLAG_DONT_QUEUE;
808
809	zio = zio_create(pio, spa, txg, bp, NULL, size,
810	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW, flags,
811	    NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
812
813	return (zio);
814}
815
816zio_t *
817zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
818    zio_done_func_t *done, void *private, enum zio_flag flags)
819{
820	zio_t *zio;
821
822	dprintf_bp(bp, "claiming in txg %llu", txg);
823
824	if (BP_IS_EMBEDDED(bp))
825		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
826
827	/*
828	 * A claim is an allocation of a specific block.  Claims are needed
829	 * to support immediate writes in the intent log.  The issue is that
830	 * immediate writes contain committed data, but in a txg that was
831	 * *not* committed.  Upon opening the pool after an unclean shutdown,
832	 * the intent log claims all blocks that contain immediate write data
833	 * so that the SPA knows they're in use.
834	 *
835	 * All claims *must* be resolved in the first txg -- before the SPA
836	 * starts allocating blocks -- so that nothing is allocated twice.
837	 * If txg == 0 we just verify that the block is claimable.
838	 */
839	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
840	ASSERT(txg == spa_first_txg(spa) || txg == 0);
841	ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));	/* zdb(1M) */
842
843	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
844	    done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW, flags,
845	    NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
846
847	return (zio);
848}
849
850zio_t *
851zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd, uint64_t offset,
852    uint64_t size, zio_done_func_t *done, void *private,
853    zio_priority_t priority, enum zio_flag flags)
854{
855	zio_t *zio;
856	int c;
857
858	if (vd->vdev_children == 0) {
859		zio = zio_create(pio, spa, 0, NULL, NULL, size, done, private,
860		    ZIO_TYPE_IOCTL, priority, flags, vd, offset, NULL,
861		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
862
863		zio->io_cmd = cmd;
864	} else {
865		zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
866
867		for (c = 0; c < vd->vdev_children; c++)
868			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
869			    offset, size, done, private, priority, flags));
870	}
871
872	return (zio);
873}
874
875zio_t *
876zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
877    void *data, int checksum, zio_done_func_t *done, void *private,
878    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
879{
880	zio_t *zio;
881
882	ASSERT(vd->vdev_children == 0);
883	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
884	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
885	ASSERT3U(offset + size, <=, vd->vdev_psize);
886
887	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
888	    ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
889	    NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
890
891	zio->io_prop.zp_checksum = checksum;
892
893	return (zio);
894}
895
896zio_t *
897zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
898    void *data, int checksum, zio_done_func_t *done, void *private,
899    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
900{
901	zio_t *zio;
902
903	ASSERT(vd->vdev_children == 0);
904	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
905	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
906	ASSERT3U(offset + size, <=, vd->vdev_psize);
907
908	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
909	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd, offset,
910	    NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
911
912	zio->io_prop.zp_checksum = checksum;
913
914	if (zio_checksum_table[checksum].ci_eck) {
915		/*
916		 * zec checksums are necessarily destructive -- they modify
917		 * the end of the write buffer to hold the verifier/checksum.
918		 * Therefore, we must make a local copy in case the data is
919		 * being written to multiple places in parallel.
920		 */
921		void *wbuf = zio_buf_alloc(size);
922		bcopy(data, wbuf, size);
923		zio_push_transform(zio, wbuf, size, size, NULL);
924	}
925
926	return (zio);
927}
928
929/*
930 * Create a child I/O to do some work for us.
931 */
932zio_t *
933zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
934	void *data, uint64_t size, int type, zio_priority_t priority,
935	enum zio_flag flags, zio_done_func_t *done, void *private)
936{
937	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
938	zio_t *zio;
939
940	ASSERT(vd->vdev_parent ==
941	    (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
942
943	if (type == ZIO_TYPE_READ && bp != NULL) {
944		/*
945		 * If we have the bp, then the child should perform the
946		 * checksum and the parent need not.  This pushes error
947		 * detection as close to the leaves as possible and
948		 * eliminates redundant checksums in the interior nodes.
949		 */
950		pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
951		pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
952	}
953
954	/* Not all IO types require vdev io done stage e.g. free */
955	if (!(pio->io_pipeline & ZIO_STAGE_VDEV_IO_DONE))
956		pipeline &= ~ZIO_STAGE_VDEV_IO_DONE;
957
958	if (vd->vdev_children == 0)
959		offset += VDEV_LABEL_START_SIZE;
960
961	flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
962
963	/*
964	 * If we've decided to do a repair, the write is not speculative --
965	 * even if the original read was.
966	 */
967	if (flags & ZIO_FLAG_IO_REPAIR)
968		flags &= ~ZIO_FLAG_SPECULATIVE;
969
970	zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size,
971	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
972	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
973
974	zio->io_physdone = pio->io_physdone;
975	if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
976		zio->io_logical->io_phys_children++;
977
978	return (zio);
979}
980
981zio_t *
982zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
983	int type, zio_priority_t priority, enum zio_flag flags,
984	zio_done_func_t *done, void *private)
985{
986	zio_t *zio;
987
988	ASSERT(vd->vdev_ops->vdev_op_leaf);
989
990	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
991	    data, size, done, private, type, priority,
992	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
993	    vd, offset, NULL,
994	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
995
996	return (zio);
997}
998
999void
1000zio_flush(zio_t *zio, vdev_t *vd)
1001{
1002	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE, 0, 0,
1003	    NULL, NULL, ZIO_PRIORITY_NOW,
1004	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
1005}
1006
1007zio_t *
1008zio_trim(zio_t *zio, spa_t *spa, vdev_t *vd, uint64_t offset, uint64_t size)
1009{
1010
1011	ASSERT(vd->vdev_ops->vdev_op_leaf);
1012
1013	return (zio_create(zio, spa, 0, NULL, NULL, size, NULL, NULL,
1014	    ZIO_TYPE_FREE, ZIO_PRIORITY_TRIM, ZIO_FLAG_DONT_AGGREGATE |
1015	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY,
1016	    vd, offset, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PHYS_PIPELINE));
1017}
1018
1019void
1020zio_shrink(zio_t *zio, uint64_t size)
1021{
1022	ASSERT(zio->io_executor == NULL);
1023	ASSERT(zio->io_orig_size == zio->io_size);
1024	ASSERT(size <= zio->io_size);
1025
1026	/*
1027	 * We don't shrink for raidz because of problems with the
1028	 * reconstruction when reading back less than the block size.
1029	 * Note, BP_IS_RAIDZ() assumes no compression.
1030	 */
1031	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1032	if (!BP_IS_RAIDZ(zio->io_bp))
1033		zio->io_orig_size = zio->io_size = size;
1034}
1035
1036/*
1037 * ==========================================================================
1038 * Prepare to read and write logical blocks
1039 * ==========================================================================
1040 */
1041
1042static int
1043zio_read_bp_init(zio_t *zio)
1044{
1045	blkptr_t *bp = zio->io_bp;
1046
1047	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
1048	    zio->io_child_type == ZIO_CHILD_LOGICAL &&
1049	    !(zio->io_flags & ZIO_FLAG_RAW)) {
1050		uint64_t psize =
1051		    BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
1052		void *cbuf = zio_buf_alloc(psize);
1053
1054		zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
1055	}
1056
1057	if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
1058		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1059		decode_embedded_bp_compressed(bp, zio->io_data);
1060	} else {
1061		ASSERT(!BP_IS_EMBEDDED(bp));
1062	}
1063
1064	if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
1065		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1066
1067	if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
1068		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1069
1070	if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
1071		zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
1072
1073	return (ZIO_PIPELINE_CONTINUE);
1074}
1075
1076static int
1077zio_write_bp_init(zio_t *zio)
1078{
1079	spa_t *spa = zio->io_spa;
1080	zio_prop_t *zp = &zio->io_prop;
1081	enum zio_compress compress = zp->zp_compress;
1082	blkptr_t *bp = zio->io_bp;
1083	uint64_t lsize = zio->io_size;
1084	uint64_t psize = lsize;
1085	int pass = 1;
1086
1087	/*
1088	 * If our children haven't all reached the ready stage,
1089	 * wait for them and then repeat this pipeline stage.
1090	 */
1091	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
1092	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
1093		return (ZIO_PIPELINE_STOP);
1094
1095	if (!IO_IS_ALLOCATING(zio))
1096		return (ZIO_PIPELINE_CONTINUE);
1097
1098	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1099
1100	if (zio->io_bp_override) {
1101		ASSERT(bp->blk_birth != zio->io_txg);
1102		ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
1103
1104		*bp = *zio->io_bp_override;
1105		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1106
1107		if (BP_IS_EMBEDDED(bp))
1108			return (ZIO_PIPELINE_CONTINUE);
1109
1110		/*
1111		 * If we've been overridden and nopwrite is set then
1112		 * set the flag accordingly to indicate that a nopwrite
1113		 * has already occurred.
1114		 */
1115		if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
1116			ASSERT(!zp->zp_dedup);
1117			zio->io_flags |= ZIO_FLAG_NOPWRITE;
1118			return (ZIO_PIPELINE_CONTINUE);
1119		}
1120
1121		ASSERT(!zp->zp_nopwrite);
1122
1123		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
1124			return (ZIO_PIPELINE_CONTINUE);
1125
1126		ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup ||
1127		    zp->zp_dedup_verify);
1128
1129		if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
1130			BP_SET_DEDUP(bp, 1);
1131			zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
1132			return (ZIO_PIPELINE_CONTINUE);
1133		}
1134		zio->io_bp_override = NULL;
1135		BP_ZERO(bp);
1136	}
1137
1138	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
1139		/*
1140		 * We're rewriting an existing block, which means we're
1141		 * working on behalf of spa_sync().  For spa_sync() to
1142		 * converge, it must eventually be the case that we don't
1143		 * have to allocate new blocks.  But compression changes
1144		 * the blocksize, which forces a reallocate, and makes
1145		 * convergence take longer.  Therefore, after the first
1146		 * few passes, stop compressing to ensure convergence.
1147		 */
1148		pass = spa_sync_pass(spa);
1149
1150		ASSERT(zio->io_txg == spa_syncing_txg(spa));
1151		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1152		ASSERT(!BP_GET_DEDUP(bp));
1153
1154		if (pass >= zfs_sync_pass_dont_compress)
1155			compress = ZIO_COMPRESS_OFF;
1156
1157		/* Make sure someone doesn't change their mind on overwrites */
1158		ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
1159		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
1160	}
1161
1162	if (compress != ZIO_COMPRESS_OFF) {
1163		void *cbuf = zio_buf_alloc(lsize);
1164		psize = zio_compress_data(compress, zio->io_data, cbuf, lsize);
1165		if (psize == 0 || psize == lsize) {
1166			compress = ZIO_COMPRESS_OFF;
1167			zio_buf_free(cbuf, lsize);
1168		} else if (!zp->zp_dedup && psize <= BPE_PAYLOAD_SIZE &&
1169		    zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
1170		    spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
1171			encode_embedded_bp_compressed(bp,
1172			    cbuf, compress, lsize, psize);
1173			BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
1174			BP_SET_TYPE(bp, zio->io_prop.zp_type);
1175			BP_SET_LEVEL(bp, zio->io_prop.zp_level);
1176			zio_buf_free(cbuf, lsize);
1177			bp->blk_birth = zio->io_txg;
1178			zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1179			ASSERT(spa_feature_is_active(spa,
1180			    SPA_FEATURE_EMBEDDED_DATA));
1181			return (ZIO_PIPELINE_CONTINUE);
1182		} else {
1183			/*
1184			 * Round up compressed size to MINBLOCKSIZE and
1185			 * zero the tail.
1186			 */
1187			size_t rounded =
1188			    P2ROUNDUP(psize, (size_t)SPA_MINBLOCKSIZE);
1189			if (rounded > psize) {
1190				bzero((char *)cbuf + psize, rounded - psize);
1191				psize = rounded;
1192			}
1193			if (psize == lsize) {
1194				compress = ZIO_COMPRESS_OFF;
1195				zio_buf_free(cbuf, lsize);
1196			} else {
1197				zio_push_transform(zio, cbuf,
1198				    psize, lsize, NULL);
1199			}
1200		}
1201	}
1202
1203	/*
1204	 * The final pass of spa_sync() must be all rewrites, but the first
1205	 * few passes offer a trade-off: allocating blocks defers convergence,
1206	 * but newly allocated blocks are sequential, so they can be written
1207	 * to disk faster.  Therefore, we allow the first few passes of
1208	 * spa_sync() to allocate new blocks, but force rewrites after that.
1209	 * There should only be a handful of blocks after pass 1 in any case.
1210	 */
1211	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
1212	    BP_GET_PSIZE(bp) == psize &&
1213	    pass >= zfs_sync_pass_rewrite) {
1214		ASSERT(psize != 0);
1215		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
1216		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
1217		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
1218	} else {
1219		BP_ZERO(bp);
1220		zio->io_pipeline = ZIO_WRITE_PIPELINE;
1221	}
1222
1223	if (psize == 0) {
1224		if (zio->io_bp_orig.blk_birth != 0 &&
1225		    spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
1226			BP_SET_LSIZE(bp, lsize);
1227			BP_SET_TYPE(bp, zp->zp_type);
1228			BP_SET_LEVEL(bp, zp->zp_level);
1229			BP_SET_BIRTH(bp, zio->io_txg, 0);
1230		}
1231		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1232	} else {
1233		ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
1234		BP_SET_LSIZE(bp, lsize);
1235		BP_SET_TYPE(bp, zp->zp_type);
1236		BP_SET_LEVEL(bp, zp->zp_level);
1237		BP_SET_PSIZE(bp, psize);
1238		BP_SET_COMPRESS(bp, compress);
1239		BP_SET_CHECKSUM(bp, zp->zp_checksum);
1240		BP_SET_DEDUP(bp, zp->zp_dedup);
1241		BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1242		if (zp->zp_dedup) {
1243			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1244			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1245			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
1246		}
1247		if (zp->zp_nopwrite) {
1248			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1249			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1250			zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
1251		}
1252	}
1253
1254	return (ZIO_PIPELINE_CONTINUE);
1255}
1256
1257static int
1258zio_free_bp_init(zio_t *zio)
1259{
1260	blkptr_t *bp = zio->io_bp;
1261
1262	if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
1263		if (BP_GET_DEDUP(bp))
1264			zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
1265	}
1266
1267	return (ZIO_PIPELINE_CONTINUE);
1268}
1269
1270/*
1271 * ==========================================================================
1272 * Execute the I/O pipeline
1273 * ==========================================================================
1274 */
1275
1276static void
1277zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
1278{
1279	spa_t *spa = zio->io_spa;
1280	zio_type_t t = zio->io_type;
1281	int flags = (cutinline ? TQ_FRONT : 0);
1282
1283	ASSERT(q == ZIO_TASKQ_ISSUE || q == ZIO_TASKQ_INTERRUPT);
1284
1285	/*
1286	 * If we're a config writer or a probe, the normal issue and
1287	 * interrupt threads may all be blocked waiting for the config lock.
1288	 * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
1289	 */
1290	if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
1291		t = ZIO_TYPE_NULL;
1292
1293	/*
1294	 * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
1295	 */
1296	if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
1297		t = ZIO_TYPE_NULL;
1298
1299	/*
1300	 * If this is a high priority I/O, then use the high priority taskq if
1301	 * available.
1302	 */
1303	if (zio->io_priority == ZIO_PRIORITY_NOW &&
1304	    spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
1305		q++;
1306
1307	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
1308
1309	/*
1310	 * NB: We are assuming that the zio can only be dispatched
1311	 * to a single taskq at a time.  It would be a grievous error
1312	 * to dispatch the zio to another taskq at the same time.
1313	 */
1314#if defined(illumos) || !defined(_KERNEL)
1315	ASSERT(zio->io_tqent.tqent_next == NULL);
1316#else
1317	ASSERT(zio->io_tqent.tqent_task.ta_pending == 0);
1318#endif
1319	spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
1320	    flags, &zio->io_tqent);
1321}
1322
1323static boolean_t
1324zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
1325{
1326	kthread_t *executor = zio->io_executor;
1327	spa_t *spa = zio->io_spa;
1328
1329	for (zio_type_t t = 0; t < ZIO_TYPES; t++) {
1330		spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
1331		uint_t i;
1332		for (i = 0; i < tqs->stqs_count; i++) {
1333			if (taskq_member(tqs->stqs_taskq[i], executor))
1334				return (B_TRUE);
1335		}
1336	}
1337
1338	return (B_FALSE);
1339}
1340
1341static int
1342zio_issue_async(zio_t *zio)
1343{
1344	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1345
1346	return (ZIO_PIPELINE_STOP);
1347}
1348
1349void
1350zio_interrupt(zio_t *zio)
1351{
1352	zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
1353}
1354
1355/*
1356 * Execute the I/O pipeline until one of the following occurs:
1357 *
1358 *	(1) the I/O completes
1359 *	(2) the pipeline stalls waiting for dependent child I/Os
1360 *	(3) the I/O issues, so we're waiting for an I/O completion interrupt
1361 *	(4) the I/O is delegated by vdev-level caching or aggregation
1362 *	(5) the I/O is deferred due to vdev-level queueing
1363 *	(6) the I/O is handed off to another thread.
1364 *
1365 * In all cases, the pipeline stops whenever there's no CPU work; it never
1366 * burns a thread in cv_wait().
1367 *
1368 * There's no locking on io_stage because there's no legitimate way
1369 * for multiple threads to be attempting to process the same I/O.
1370 */
1371static zio_pipe_stage_t *zio_pipeline[];
1372
1373void
1374zio_execute(zio_t *zio)
1375{
1376	zio->io_executor = curthread;
1377
1378	while (zio->io_stage < ZIO_STAGE_DONE) {
1379		enum zio_stage pipeline = zio->io_pipeline;
1380		enum zio_stage stage = zio->io_stage;
1381		int rv;
1382
1383		ASSERT(!MUTEX_HELD(&zio->io_lock));
1384		ASSERT(ISP2(stage));
1385		ASSERT(zio->io_stall == NULL);
1386
1387		do {
1388			stage <<= 1;
1389		} while ((stage & pipeline) == 0);
1390
1391		ASSERT(stage <= ZIO_STAGE_DONE);
1392
1393		/*
1394		 * If we are in interrupt context and this pipeline stage
1395		 * will grab a config lock that is held across I/O,
1396		 * or may wait for an I/O that needs an interrupt thread
1397		 * to complete, issue async to avoid deadlock.
1398		 *
1399		 * For VDEV_IO_START, we cut in line so that the io will
1400		 * be sent to disk promptly.
1401		 */
1402		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
1403		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
1404			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1405			    zio_requeue_io_start_cut_in_line : B_FALSE;
1406			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1407			return;
1408		}
1409
1410		zio->io_stage = stage;
1411		rv = zio_pipeline[highbit64(stage) - 1](zio);
1412
1413		if (rv == ZIO_PIPELINE_STOP)
1414			return;
1415
1416		ASSERT(rv == ZIO_PIPELINE_CONTINUE);
1417	}
1418}
1419
1420/*
1421 * ==========================================================================
1422 * Initiate I/O, either sync or async
1423 * ==========================================================================
1424 */
1425int
1426zio_wait(zio_t *zio)
1427{
1428	int error;
1429
1430	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
1431	ASSERT(zio->io_executor == NULL);
1432
1433	zio->io_waiter = curthread;
1434
1435	zio_execute(zio);
1436
1437	mutex_enter(&zio->io_lock);
1438	while (zio->io_executor != NULL)
1439		cv_wait(&zio->io_cv, &zio->io_lock);
1440	mutex_exit(&zio->io_lock);
1441
1442	error = zio->io_error;
1443	zio_destroy(zio);
1444
1445	return (error);
1446}
1447
1448void
1449zio_nowait(zio_t *zio)
1450{
1451	ASSERT(zio->io_executor == NULL);
1452
1453	if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
1454	    zio_unique_parent(zio) == NULL) {
1455		/*
1456		 * This is a logical async I/O with no parent to wait for it.
1457		 * We add it to the spa_async_root_zio "Godfather" I/O which
1458		 * will ensure they complete prior to unloading the pool.
1459		 */
1460		spa_t *spa = zio->io_spa;
1461
1462		zio_add_child(spa->spa_async_zio_root[CPU_SEQID], zio);
1463	}
1464
1465	zio_execute(zio);
1466}
1467
1468/*
1469 * ==========================================================================
1470 * Reexecute or suspend/resume failed I/O
1471 * ==========================================================================
1472 */
1473
1474static void
1475zio_reexecute(zio_t *pio)
1476{
1477	zio_t *cio, *cio_next;
1478
1479	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
1480	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
1481	ASSERT(pio->io_gang_leader == NULL);
1482	ASSERT(pio->io_gang_tree == NULL);
1483
1484	pio->io_flags = pio->io_orig_flags;
1485	pio->io_stage = pio->io_orig_stage;
1486	pio->io_pipeline = pio->io_orig_pipeline;
1487	pio->io_reexecute = 0;
1488	pio->io_flags |= ZIO_FLAG_REEXECUTED;
1489	pio->io_error = 0;
1490	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1491		pio->io_state[w] = 0;
1492	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
1493		pio->io_child_error[c] = 0;
1494
1495	if (IO_IS_ALLOCATING(pio))
1496		BP_ZERO(pio->io_bp);
1497
1498	/*
1499	 * As we reexecute pio's children, new children could be created.
1500	 * New children go to the head of pio's io_child_list, however,
1501	 * so we will (correctly) not reexecute them.  The key is that
1502	 * the remainder of pio's io_child_list, from 'cio_next' onward,
1503	 * cannot be affected by any side effects of reexecuting 'cio'.
1504	 */
1505	for (cio = zio_walk_children(pio); cio != NULL; cio = cio_next) {
1506		cio_next = zio_walk_children(pio);
1507		mutex_enter(&pio->io_lock);
1508		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1509			pio->io_children[cio->io_child_type][w]++;
1510		mutex_exit(&pio->io_lock);
1511		zio_reexecute(cio);
1512	}
1513
1514	/*
1515	 * Now that all children have been reexecuted, execute the parent.
1516	 * We don't reexecute "The Godfather" I/O here as it's the
1517	 * responsibility of the caller to wait on him.
1518	 */
1519	if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
1520		zio_execute(pio);
1521}
1522
1523void
1524zio_suspend(spa_t *spa, zio_t *zio)
1525{
1526	if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
1527		fm_panic("Pool '%s' has encountered an uncorrectable I/O "
1528		    "failure and the failure mode property for this pool "
1529		    "is set to panic.", spa_name(spa));
1530
1531	zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
1532
1533	mutex_enter(&spa->spa_suspend_lock);
1534
1535	if (spa->spa_suspend_zio_root == NULL)
1536		spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
1537		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
1538		    ZIO_FLAG_GODFATHER);
1539
1540	spa->spa_suspended = B_TRUE;
1541
1542	if (zio != NULL) {
1543		ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
1544		ASSERT(zio != spa->spa_suspend_zio_root);
1545		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1546		ASSERT(zio_unique_parent(zio) == NULL);
1547		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
1548		zio_add_child(spa->spa_suspend_zio_root, zio);
1549	}
1550
1551	mutex_exit(&spa->spa_suspend_lock);
1552}
1553
1554int
1555zio_resume(spa_t *spa)
1556{
1557	zio_t *pio;
1558
1559	/*
1560	 * Reexecute all previously suspended i/o.
1561	 */
1562	mutex_enter(&spa->spa_suspend_lock);
1563	spa->spa_suspended = B_FALSE;
1564	cv_broadcast(&spa->spa_suspend_cv);
1565	pio = spa->spa_suspend_zio_root;
1566	spa->spa_suspend_zio_root = NULL;
1567	mutex_exit(&spa->spa_suspend_lock);
1568
1569	if (pio == NULL)
1570		return (0);
1571
1572	zio_reexecute(pio);
1573	return (zio_wait(pio));
1574}
1575
1576void
1577zio_resume_wait(spa_t *spa)
1578{
1579	mutex_enter(&spa->spa_suspend_lock);
1580	while (spa_suspended(spa))
1581		cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
1582	mutex_exit(&spa->spa_suspend_lock);
1583}
1584
1585/*
1586 * ==========================================================================
1587 * Gang blocks.
1588 *
1589 * A gang block is a collection of small blocks that looks to the DMU
1590 * like one large block.  When zio_dva_allocate() cannot find a block
1591 * of the requested size, due to either severe fragmentation or the pool
1592 * being nearly full, it calls zio_write_gang_block() to construct the
1593 * block from smaller fragments.
1594 *
1595 * A gang block consists of a gang header (zio_gbh_phys_t) and up to
1596 * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
1597 * an indirect block: it's an array of block pointers.  It consumes
1598 * only one sector and hence is allocatable regardless of fragmentation.
1599 * The gang header's bps point to its gang members, which hold the data.
1600 *
1601 * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
1602 * as the verifier to ensure uniqueness of the SHA256 checksum.
1603 * Critically, the gang block bp's blk_cksum is the checksum of the data,
1604 * not the gang header.  This ensures that data block signatures (needed for
1605 * deduplication) are independent of how the block is physically stored.
1606 *
1607 * Gang blocks can be nested: a gang member may itself be a gang block.
1608 * Thus every gang block is a tree in which root and all interior nodes are
1609 * gang headers, and the leaves are normal blocks that contain user data.
1610 * The root of the gang tree is called the gang leader.
1611 *
1612 * To perform any operation (read, rewrite, free, claim) on a gang block,
1613 * zio_gang_assemble() first assembles the gang tree (minus data leaves)
1614 * in the io_gang_tree field of the original logical i/o by recursively
1615 * reading the gang leader and all gang headers below it.  This yields
1616 * an in-core tree containing the contents of every gang header and the
1617 * bps for every constituent of the gang block.
1618 *
1619 * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
1620 * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
1621 * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
1622 * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
1623 * zio_read_gang() is a wrapper around zio_read() that omits reading gang
1624 * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
1625 * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
1626 * of the gang header plus zio_checksum_compute() of the data to update the
1627 * gang header's blk_cksum as described above.
1628 *
1629 * The two-phase assemble/issue model solves the problem of partial failure --
1630 * what if you'd freed part of a gang block but then couldn't read the
1631 * gang header for another part?  Assembling the entire gang tree first
1632 * ensures that all the necessary gang header I/O has succeeded before
1633 * starting the actual work of free, claim, or write.  Once the gang tree
1634 * is assembled, free and claim are in-memory operations that cannot fail.
1635 *
1636 * In the event that a gang write fails, zio_dva_unallocate() walks the
1637 * gang tree to immediately free (i.e. insert back into the space map)
1638 * everything we've allocated.  This ensures that we don't get ENOSPC
1639 * errors during repeated suspend/resume cycles due to a flaky device.
1640 *
1641 * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
1642 * the gang tree, we won't modify the block, so we can safely defer the free
1643 * (knowing that the block is still intact).  If we *can* assemble the gang
1644 * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
1645 * each constituent bp and we can allocate a new block on the next sync pass.
1646 *
1647 * In all cases, the gang tree allows complete recovery from partial failure.
1648 * ==========================================================================
1649 */
1650
1651static zio_t *
1652zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1653{
1654	if (gn != NULL)
1655		return (pio);
1656
1657	return (zio_read(pio, pio->io_spa, bp, data, BP_GET_PSIZE(bp),
1658	    NULL, NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1659	    &pio->io_bookmark));
1660}
1661
1662zio_t *
1663zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1664{
1665	zio_t *zio;
1666
1667	if (gn != NULL) {
1668		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1669		    gn->gn_gbh, SPA_GANGBLOCKSIZE, NULL, NULL, pio->io_priority,
1670		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1671		/*
1672		 * As we rewrite each gang header, the pipeline will compute
1673		 * a new gang block header checksum for it; but no one will
1674		 * compute a new data checksum, so we do that here.  The one
1675		 * exception is the gang leader: the pipeline already computed
1676		 * its data checksum because that stage precedes gang assembly.
1677		 * (Presently, nothing actually uses interior data checksums;
1678		 * this is just good hygiene.)
1679		 */
1680		if (gn != pio->io_gang_leader->io_gang_tree) {
1681			zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
1682			    data, BP_GET_PSIZE(bp));
1683		}
1684		/*
1685		 * If we are here to damage data for testing purposes,
1686		 * leave the GBH alone so that we can detect the damage.
1687		 */
1688		if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
1689			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
1690	} else {
1691		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1692		    data, BP_GET_PSIZE(bp), NULL, NULL, pio->io_priority,
1693		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1694	}
1695
1696	return (zio);
1697}
1698
1699/* ARGSUSED */
1700zio_t *
1701zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1702{
1703	return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
1704	    BP_IS_GANG(bp) ? SPA_GANGBLOCKSIZE : BP_GET_PSIZE(bp),
1705	    ZIO_GANG_CHILD_FLAGS(pio)));
1706}
1707
1708/* ARGSUSED */
1709zio_t *
1710zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1711{
1712	return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
1713	    NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
1714}
1715
1716static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
1717	NULL,
1718	zio_read_gang,
1719	zio_rewrite_gang,
1720	zio_free_gang,
1721	zio_claim_gang,
1722	NULL
1723};
1724
1725static void zio_gang_tree_assemble_done(zio_t *zio);
1726
1727static zio_gang_node_t *
1728zio_gang_node_alloc(zio_gang_node_t **gnpp)
1729{
1730	zio_gang_node_t *gn;
1731
1732	ASSERT(*gnpp == NULL);
1733
1734	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
1735	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
1736	*gnpp = gn;
1737
1738	return (gn);
1739}
1740
1741static void
1742zio_gang_node_free(zio_gang_node_t **gnpp)
1743{
1744	zio_gang_node_t *gn = *gnpp;
1745
1746	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
1747		ASSERT(gn->gn_child[g] == NULL);
1748
1749	zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
1750	kmem_free(gn, sizeof (*gn));
1751	*gnpp = NULL;
1752}
1753
1754static void
1755zio_gang_tree_free(zio_gang_node_t **gnpp)
1756{
1757	zio_gang_node_t *gn = *gnpp;
1758
1759	if (gn == NULL)
1760		return;
1761
1762	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
1763		zio_gang_tree_free(&gn->gn_child[g]);
1764
1765	zio_gang_node_free(gnpp);
1766}
1767
1768static void
1769zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
1770{
1771	zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
1772
1773	ASSERT(gio->io_gang_leader == gio);
1774	ASSERT(BP_IS_GANG(bp));
1775
1776	zio_nowait(zio_read(gio, gio->io_spa, bp, gn->gn_gbh,
1777	    SPA_GANGBLOCKSIZE, zio_gang_tree_assemble_done, gn,
1778	    gio->io_priority, ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
1779}
1780
1781static void
1782zio_gang_tree_assemble_done(zio_t *zio)
1783{
1784	zio_t *gio = zio->io_gang_leader;
1785	zio_gang_node_t *gn = zio->io_private;
1786	blkptr_t *bp = zio->io_bp;
1787
1788	ASSERT(gio == zio_unique_parent(zio));
1789	ASSERT(zio->io_child_count == 0);
1790
1791	if (zio->io_error)
1792		return;
1793
1794	if (BP_SHOULD_BYTESWAP(bp))
1795		byteswap_uint64_array(zio->io_data, zio->io_size);
1796
1797	ASSERT(zio->io_data == gn->gn_gbh);
1798	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
1799	ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
1800
1801	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
1802		blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
1803		if (!BP_IS_GANG(gbp))
1804			continue;
1805		zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
1806	}
1807}
1808
1809static void
1810zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
1811{
1812	zio_t *gio = pio->io_gang_leader;
1813	zio_t *zio;
1814
1815	ASSERT(BP_IS_GANG(bp) == !!gn);
1816	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
1817	ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
1818
1819	/*
1820	 * If you're a gang header, your data is in gn->gn_gbh.
1821	 * If you're a gang member, your data is in 'data' and gn == NULL.
1822	 */
1823	zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data);
1824
1825	if (gn != NULL) {
1826		ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
1827
1828		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
1829			blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
1830			if (BP_IS_HOLE(gbp))
1831				continue;
1832			zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data);
1833			data = (char *)data + BP_GET_PSIZE(gbp);
1834		}
1835	}
1836
1837	if (gn == gio->io_gang_tree && gio->io_data != NULL)
1838		ASSERT3P((char *)gio->io_data + gio->io_size, ==, data);
1839
1840	if (zio != pio)
1841		zio_nowait(zio);
1842}
1843
1844static int
1845zio_gang_assemble(zio_t *zio)
1846{
1847	blkptr_t *bp = zio->io_bp;
1848
1849	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
1850	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
1851
1852	zio->io_gang_leader = zio;
1853
1854	zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
1855
1856	return (ZIO_PIPELINE_CONTINUE);
1857}
1858
1859static int
1860zio_gang_issue(zio_t *zio)
1861{
1862	blkptr_t *bp = zio->io_bp;
1863
1864	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
1865		return (ZIO_PIPELINE_STOP);
1866
1867	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
1868	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
1869
1870	if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
1871		zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_data);
1872	else
1873		zio_gang_tree_free(&zio->io_gang_tree);
1874
1875	zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1876
1877	return (ZIO_PIPELINE_CONTINUE);
1878}
1879
1880static void
1881zio_write_gang_member_ready(zio_t *zio)
1882{
1883	zio_t *pio = zio_unique_parent(zio);
1884	zio_t *gio = zio->io_gang_leader;
1885	dva_t *cdva = zio->io_bp->blk_dva;
1886	dva_t *pdva = pio->io_bp->blk_dva;
1887	uint64_t asize;
1888
1889	if (BP_IS_HOLE(zio->io_bp))
1890		return;
1891
1892	ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
1893
1894	ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
1895	ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
1896	ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
1897	ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
1898	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
1899
1900	mutex_enter(&pio->io_lock);
1901	for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
1902		ASSERT(DVA_GET_GANG(&pdva[d]));
1903		asize = DVA_GET_ASIZE(&pdva[d]);
1904		asize += DVA_GET_ASIZE(&cdva[d]);
1905		DVA_SET_ASIZE(&pdva[d], asize);
1906	}
1907	mutex_exit(&pio->io_lock);
1908}
1909
1910static int
1911zio_write_gang_block(zio_t *pio)
1912{
1913	spa_t *spa = pio->io_spa;
1914	blkptr_t *bp = pio->io_bp;
1915	zio_t *gio = pio->io_gang_leader;
1916	zio_t *zio;
1917	zio_gang_node_t *gn, **gnpp;
1918	zio_gbh_phys_t *gbh;
1919	uint64_t txg = pio->io_txg;
1920	uint64_t resid = pio->io_size;
1921	uint64_t lsize;
1922	int copies = gio->io_prop.zp_copies;
1923	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
1924	zio_prop_t zp;
1925	int error;
1926
1927	error = metaslab_alloc(spa, spa_normal_class(spa), SPA_GANGBLOCKSIZE,
1928	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp,
1929	    METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER);
1930	if (error) {
1931		pio->io_error = error;
1932		return (ZIO_PIPELINE_CONTINUE);
1933	}
1934
1935	if (pio == gio) {
1936		gnpp = &gio->io_gang_tree;
1937	} else {
1938		gnpp = pio->io_private;
1939		ASSERT(pio->io_ready == zio_write_gang_member_ready);
1940	}
1941
1942	gn = zio_gang_node_alloc(gnpp);
1943	gbh = gn->gn_gbh;
1944	bzero(gbh, SPA_GANGBLOCKSIZE);
1945
1946	/*
1947	 * Create the gang header.
1948	 */
1949	zio = zio_rewrite(pio, spa, txg, bp, gbh, SPA_GANGBLOCKSIZE, NULL, NULL,
1950	    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1951
1952	/*
1953	 * Create and nowait the gang children.
1954	 */
1955	for (int g = 0; resid != 0; resid -= lsize, g++) {
1956		lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
1957		    SPA_MINBLOCKSIZE);
1958		ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
1959
1960		zp.zp_checksum = gio->io_prop.zp_checksum;
1961		zp.zp_compress = ZIO_COMPRESS_OFF;
1962		zp.zp_type = DMU_OT_NONE;
1963		zp.zp_level = 0;
1964		zp.zp_copies = gio->io_prop.zp_copies;
1965		zp.zp_dedup = B_FALSE;
1966		zp.zp_dedup_verify = B_FALSE;
1967		zp.zp_nopwrite = B_FALSE;
1968
1969		zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
1970		    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
1971		    zio_write_gang_member_ready, NULL, NULL, &gn->gn_child[g],
1972		    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1973		    &pio->io_bookmark));
1974	}
1975
1976	/*
1977	 * Set pio's pipeline to just wait for zio to finish.
1978	 */
1979	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1980
1981	zio_nowait(zio);
1982
1983	return (ZIO_PIPELINE_CONTINUE);
1984}
1985
1986/*
1987 * The zio_nop_write stage in the pipeline determines if allocating
1988 * a new bp is necessary.  By leveraging a cryptographically secure checksum,
1989 * such as SHA256, we can compare the checksums of the new data and the old
1990 * to determine if allocating a new block is required.  The nopwrite
1991 * feature can handle writes in either syncing or open context (i.e. zil
1992 * writes) and as a result is mutually exclusive with dedup.
1993 */
1994static int
1995zio_nop_write(zio_t *zio)
1996{
1997	blkptr_t *bp = zio->io_bp;
1998	blkptr_t *bp_orig = &zio->io_bp_orig;
1999	zio_prop_t *zp = &zio->io_prop;
2000
2001	ASSERT(BP_GET_LEVEL(bp) == 0);
2002	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
2003	ASSERT(zp->zp_nopwrite);
2004	ASSERT(!zp->zp_dedup);
2005	ASSERT(zio->io_bp_override == NULL);
2006	ASSERT(IO_IS_ALLOCATING(zio));
2007
2008	/*
2009	 * Check to see if the original bp and the new bp have matching
2010	 * characteristics (i.e. same checksum, compression algorithms, etc).
2011	 * If they don't then just continue with the pipeline which will
2012	 * allocate a new bp.
2013	 */
2014	if (BP_IS_HOLE(bp_orig) ||
2015	    !zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_dedup ||
2016	    BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
2017	    BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
2018	    BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
2019	    zp->zp_copies != BP_GET_NDVAS(bp_orig))
2020		return (ZIO_PIPELINE_CONTINUE);
2021
2022	/*
2023	 * If the checksums match then reset the pipeline so that we
2024	 * avoid allocating a new bp and issuing any I/O.
2025	 */
2026	if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
2027		ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup);
2028		ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
2029		ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
2030		ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
2031		ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
2032		    sizeof (uint64_t)) == 0);
2033
2034		*bp = *bp_orig;
2035		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2036		zio->io_flags |= ZIO_FLAG_NOPWRITE;
2037	}
2038
2039	return (ZIO_PIPELINE_CONTINUE);
2040}
2041
2042/*
2043 * ==========================================================================
2044 * Dedup
2045 * ==========================================================================
2046 */
2047static void
2048zio_ddt_child_read_done(zio_t *zio)
2049{
2050	blkptr_t *bp = zio->io_bp;
2051	ddt_entry_t *dde = zio->io_private;
2052	ddt_phys_t *ddp;
2053	zio_t *pio = zio_unique_parent(zio);
2054
2055	mutex_enter(&pio->io_lock);
2056	ddp = ddt_phys_select(dde, bp);
2057	if (zio->io_error == 0)
2058		ddt_phys_clear(ddp);	/* this ddp doesn't need repair */
2059	if (zio->io_error == 0 && dde->dde_repair_data == NULL)
2060		dde->dde_repair_data = zio->io_data;
2061	else
2062		zio_buf_free(zio->io_data, zio->io_size);
2063	mutex_exit(&pio->io_lock);
2064}
2065
2066static int
2067zio_ddt_read_start(zio_t *zio)
2068{
2069	blkptr_t *bp = zio->io_bp;
2070
2071	ASSERT(BP_GET_DEDUP(bp));
2072	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2073	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2074
2075	if (zio->io_child_error[ZIO_CHILD_DDT]) {
2076		ddt_t *ddt = ddt_select(zio->io_spa, bp);
2077		ddt_entry_t *dde = ddt_repair_start(ddt, bp);
2078		ddt_phys_t *ddp = dde->dde_phys;
2079		ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
2080		blkptr_t blk;
2081
2082		ASSERT(zio->io_vsd == NULL);
2083		zio->io_vsd = dde;
2084
2085		if (ddp_self == NULL)
2086			return (ZIO_PIPELINE_CONTINUE);
2087
2088		for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
2089			if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
2090				continue;
2091			ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
2092			    &blk);
2093			zio_nowait(zio_read(zio, zio->io_spa, &blk,
2094			    zio_buf_alloc(zio->io_size), zio->io_size,
2095			    zio_ddt_child_read_done, dde, zio->io_priority,
2096			    ZIO_DDT_CHILD_FLAGS(zio) | ZIO_FLAG_DONT_PROPAGATE,
2097			    &zio->io_bookmark));
2098		}
2099		return (ZIO_PIPELINE_CONTINUE);
2100	}
2101
2102	zio_nowait(zio_read(zio, zio->io_spa, bp,
2103	    zio->io_data, zio->io_size, NULL, NULL, zio->io_priority,
2104	    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
2105
2106	return (ZIO_PIPELINE_CONTINUE);
2107}
2108
2109static int
2110zio_ddt_read_done(zio_t *zio)
2111{
2112	blkptr_t *bp = zio->io_bp;
2113
2114	if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
2115		return (ZIO_PIPELINE_STOP);
2116
2117	ASSERT(BP_GET_DEDUP(bp));
2118	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2119	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2120
2121	if (zio->io_child_error[ZIO_CHILD_DDT]) {
2122		ddt_t *ddt = ddt_select(zio->io_spa, bp);
2123		ddt_entry_t *dde = zio->io_vsd;
2124		if (ddt == NULL) {
2125			ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
2126			return (ZIO_PIPELINE_CONTINUE);
2127		}
2128		if (dde == NULL) {
2129			zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
2130			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
2131			return (ZIO_PIPELINE_STOP);
2132		}
2133		if (dde->dde_repair_data != NULL) {
2134			bcopy(dde->dde_repair_data, zio->io_data, zio->io_size);
2135			zio->io_child_error[ZIO_CHILD_DDT] = 0;
2136		}
2137		ddt_repair_done(ddt, dde);
2138		zio->io_vsd = NULL;
2139	}
2140
2141	ASSERT(zio->io_vsd == NULL);
2142
2143	return (ZIO_PIPELINE_CONTINUE);
2144}
2145
2146static boolean_t
2147zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
2148{
2149	spa_t *spa = zio->io_spa;
2150
2151	/*
2152	 * Note: we compare the original data, not the transformed data,
2153	 * because when zio->io_bp is an override bp, we will not have
2154	 * pushed the I/O transforms.  That's an important optimization
2155	 * because otherwise we'd compress/encrypt all dmu_sync() data twice.
2156	 */
2157	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2158		zio_t *lio = dde->dde_lead_zio[p];
2159
2160		if (lio != NULL) {
2161			return (lio->io_orig_size != zio->io_orig_size ||
2162			    bcmp(zio->io_orig_data, lio->io_orig_data,
2163			    zio->io_orig_size) != 0);
2164		}
2165	}
2166
2167	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2168		ddt_phys_t *ddp = &dde->dde_phys[p];
2169
2170		if (ddp->ddp_phys_birth != 0) {
2171			arc_buf_t *abuf = NULL;
2172			uint32_t aflags = ARC_WAIT;
2173			blkptr_t blk = *zio->io_bp;
2174			int error;
2175
2176			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
2177
2178			ddt_exit(ddt);
2179
2180			error = arc_read(NULL, spa, &blk,
2181			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
2182			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
2183			    &aflags, &zio->io_bookmark);
2184
2185			if (error == 0) {
2186				if (arc_buf_size(abuf) != zio->io_orig_size ||
2187				    bcmp(abuf->b_data, zio->io_orig_data,
2188				    zio->io_orig_size) != 0)
2189					error = SET_ERROR(EEXIST);
2190				VERIFY(arc_buf_remove_ref(abuf, &abuf));
2191			}
2192
2193			ddt_enter(ddt);
2194			return (error != 0);
2195		}
2196	}
2197
2198	return (B_FALSE);
2199}
2200
2201static void
2202zio_ddt_child_write_ready(zio_t *zio)
2203{
2204	int p = zio->io_prop.zp_copies;
2205	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2206	ddt_entry_t *dde = zio->io_private;
2207	ddt_phys_t *ddp = &dde->dde_phys[p];
2208	zio_t *pio;
2209
2210	if (zio->io_error)
2211		return;
2212
2213	ddt_enter(ddt);
2214
2215	ASSERT(dde->dde_lead_zio[p] == zio);
2216
2217	ddt_phys_fill(ddp, zio->io_bp);
2218
2219	while ((pio = zio_walk_parents(zio)) != NULL)
2220		ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
2221
2222	ddt_exit(ddt);
2223}
2224
2225static void
2226zio_ddt_child_write_done(zio_t *zio)
2227{
2228	int p = zio->io_prop.zp_copies;
2229	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2230	ddt_entry_t *dde = zio->io_private;
2231	ddt_phys_t *ddp = &dde->dde_phys[p];
2232
2233	ddt_enter(ddt);
2234
2235	ASSERT(ddp->ddp_refcnt == 0);
2236	ASSERT(dde->dde_lead_zio[p] == zio);
2237	dde->dde_lead_zio[p] = NULL;
2238
2239	if (zio->io_error == 0) {
2240		while (zio_walk_parents(zio) != NULL)
2241			ddt_phys_addref(ddp);
2242	} else {
2243		ddt_phys_clear(ddp);
2244	}
2245
2246	ddt_exit(ddt);
2247}
2248
2249static void
2250zio_ddt_ditto_write_done(zio_t *zio)
2251{
2252	int p = DDT_PHYS_DITTO;
2253	zio_prop_t *zp = &zio->io_prop;
2254	blkptr_t *bp = zio->io_bp;
2255	ddt_t *ddt = ddt_select(zio->io_spa, bp);
2256	ddt_entry_t *dde = zio->io_private;
2257	ddt_phys_t *ddp = &dde->dde_phys[p];
2258	ddt_key_t *ddk = &dde->dde_key;
2259
2260	ddt_enter(ddt);
2261
2262	ASSERT(ddp->ddp_refcnt == 0);
2263	ASSERT(dde->dde_lead_zio[p] == zio);
2264	dde->dde_lead_zio[p] = NULL;
2265
2266	if (zio->io_error == 0) {
2267		ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
2268		ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
2269		ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
2270		if (ddp->ddp_phys_birth != 0)
2271			ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
2272		ddt_phys_fill(ddp, bp);
2273	}
2274
2275	ddt_exit(ddt);
2276}
2277
2278static int
2279zio_ddt_write(zio_t *zio)
2280{
2281	spa_t *spa = zio->io_spa;
2282	blkptr_t *bp = zio->io_bp;
2283	uint64_t txg = zio->io_txg;
2284	zio_prop_t *zp = &zio->io_prop;
2285	int p = zp->zp_copies;
2286	int ditto_copies;
2287	zio_t *cio = NULL;
2288	zio_t *dio = NULL;
2289	ddt_t *ddt = ddt_select(spa, bp);
2290	ddt_entry_t *dde;
2291	ddt_phys_t *ddp;
2292
2293	ASSERT(BP_GET_DEDUP(bp));
2294	ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
2295	ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
2296
2297	ddt_enter(ddt);
2298	dde = ddt_lookup(ddt, bp, B_TRUE);
2299	ddp = &dde->dde_phys[p];
2300
2301	if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
2302		/*
2303		 * If we're using a weak checksum, upgrade to a strong checksum
2304		 * and try again.  If we're already using a strong checksum,
2305		 * we can't resolve it, so just convert to an ordinary write.
2306		 * (And automatically e-mail a paper to Nature?)
2307		 */
2308		if (!zio_checksum_table[zp->zp_checksum].ci_dedup) {
2309			zp->zp_checksum = spa_dedup_checksum(spa);
2310			zio_pop_transforms(zio);
2311			zio->io_stage = ZIO_STAGE_OPEN;
2312			BP_ZERO(bp);
2313		} else {
2314			zp->zp_dedup = B_FALSE;
2315		}
2316		zio->io_pipeline = ZIO_WRITE_PIPELINE;
2317		ddt_exit(ddt);
2318		return (ZIO_PIPELINE_CONTINUE);
2319	}
2320
2321	ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
2322	ASSERT(ditto_copies < SPA_DVAS_PER_BP);
2323
2324	if (ditto_copies > ddt_ditto_copies_present(dde) &&
2325	    dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
2326		zio_prop_t czp = *zp;
2327
2328		czp.zp_copies = ditto_copies;
2329
2330		/*
2331		 * If we arrived here with an override bp, we won't have run
2332		 * the transform stack, so we won't have the data we need to
2333		 * generate a child i/o.  So, toss the override bp and restart.
2334		 * This is safe, because using the override bp is just an
2335		 * optimization; and it's rare, so the cost doesn't matter.
2336		 */
2337		if (zio->io_bp_override) {
2338			zio_pop_transforms(zio);
2339			zio->io_stage = ZIO_STAGE_OPEN;
2340			zio->io_pipeline = ZIO_WRITE_PIPELINE;
2341			zio->io_bp_override = NULL;
2342			BP_ZERO(bp);
2343			ddt_exit(ddt);
2344			return (ZIO_PIPELINE_CONTINUE);
2345		}
2346
2347		dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
2348		    zio->io_orig_size, &czp, NULL, NULL,
2349		    zio_ddt_ditto_write_done, dde, zio->io_priority,
2350		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2351
2352		zio_push_transform(dio, zio->io_data, zio->io_size, 0, NULL);
2353		dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
2354	}
2355
2356	if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
2357		if (ddp->ddp_phys_birth != 0)
2358			ddt_bp_fill(ddp, bp, txg);
2359		if (dde->dde_lead_zio[p] != NULL)
2360			zio_add_child(zio, dde->dde_lead_zio[p]);
2361		else
2362			ddt_phys_addref(ddp);
2363	} else if (zio->io_bp_override) {
2364		ASSERT(bp->blk_birth == txg);
2365		ASSERT(BP_EQUAL(bp, zio->io_bp_override));
2366		ddt_phys_fill(ddp, bp);
2367		ddt_phys_addref(ddp);
2368	} else {
2369		cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
2370		    zio->io_orig_size, zp, zio_ddt_child_write_ready, NULL,
2371		    zio_ddt_child_write_done, dde, zio->io_priority,
2372		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2373
2374		zio_push_transform(cio, zio->io_data, zio->io_size, 0, NULL);
2375		dde->dde_lead_zio[p] = cio;
2376	}
2377
2378	ddt_exit(ddt);
2379
2380	if (cio)
2381		zio_nowait(cio);
2382	if (dio)
2383		zio_nowait(dio);
2384
2385	return (ZIO_PIPELINE_CONTINUE);
2386}
2387
2388ddt_entry_t *freedde; /* for debugging */
2389
2390static int
2391zio_ddt_free(zio_t *zio)
2392{
2393	spa_t *spa = zio->io_spa;
2394	blkptr_t *bp = zio->io_bp;
2395	ddt_t *ddt = ddt_select(spa, bp);
2396	ddt_entry_t *dde;
2397	ddt_phys_t *ddp;
2398
2399	ASSERT(BP_GET_DEDUP(bp));
2400	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2401
2402	ddt_enter(ddt);
2403	freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
2404	ddp = ddt_phys_select(dde, bp);
2405	ddt_phys_decref(ddp);
2406	ddt_exit(ddt);
2407
2408	return (ZIO_PIPELINE_CONTINUE);
2409}
2410
2411/*
2412 * ==========================================================================
2413 * Allocate and free blocks
2414 * ==========================================================================
2415 */
2416static int
2417zio_dva_allocate(zio_t *zio)
2418{
2419	spa_t *spa = zio->io_spa;
2420	metaslab_class_t *mc = spa_normal_class(spa);
2421	blkptr_t *bp = zio->io_bp;
2422	int error;
2423	int flags = 0;
2424
2425	if (zio->io_gang_leader == NULL) {
2426		ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2427		zio->io_gang_leader = zio;
2428	}
2429
2430	ASSERT(BP_IS_HOLE(bp));
2431	ASSERT0(BP_GET_NDVAS(bp));
2432	ASSERT3U(zio->io_prop.zp_copies, >, 0);
2433	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
2434	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
2435
2436	/*
2437	 * The dump device does not support gang blocks so allocation on
2438	 * behalf of the dump device (i.e. ZIO_FLAG_NODATA) must avoid
2439	 * the "fast" gang feature.
2440	 */
2441	flags |= (zio->io_flags & ZIO_FLAG_NODATA) ? METASLAB_GANG_AVOID : 0;
2442	flags |= (zio->io_flags & ZIO_FLAG_GANG_CHILD) ?
2443	    METASLAB_GANG_CHILD : 0;
2444	error = metaslab_alloc(spa, mc, zio->io_size, bp,
2445	    zio->io_prop.zp_copies, zio->io_txg, NULL, flags);
2446
2447	if (error) {
2448		spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
2449		    "size %llu, error %d", spa_name(spa), zio, zio->io_size,
2450		    error);
2451		if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
2452			return (zio_write_gang_block(zio));
2453		zio->io_error = error;
2454	}
2455
2456	return (ZIO_PIPELINE_CONTINUE);
2457}
2458
2459static int
2460zio_dva_free(zio_t *zio)
2461{
2462	metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
2463
2464	return (ZIO_PIPELINE_CONTINUE);
2465}
2466
2467static int
2468zio_dva_claim(zio_t *zio)
2469{
2470	int error;
2471
2472	error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
2473	if (error)
2474		zio->io_error = error;
2475
2476	return (ZIO_PIPELINE_CONTINUE);
2477}
2478
2479/*
2480 * Undo an allocation.  This is used by zio_done() when an I/O fails
2481 * and we want to give back the block we just allocated.
2482 * This handles both normal blocks and gang blocks.
2483 */
2484static void
2485zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
2486{
2487	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
2488	ASSERT(zio->io_bp_override == NULL);
2489
2490	if (!BP_IS_HOLE(bp))
2491		metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
2492
2493	if (gn != NULL) {
2494		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2495			zio_dva_unallocate(zio, gn->gn_child[g],
2496			    &gn->gn_gbh->zg_blkptr[g]);
2497		}
2498	}
2499}
2500
2501/*
2502 * Try to allocate an intent log block.  Return 0 on success, errno on failure.
2503 */
2504int
2505zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
2506    uint64_t size, boolean_t use_slog)
2507{
2508	int error = 1;
2509
2510	ASSERT(txg > spa_syncing_txg(spa));
2511
2512	/*
2513	 * ZIL blocks are always contiguous (i.e. not gang blocks) so we
2514	 * set the METASLAB_GANG_AVOID flag so that they don't "fast gang"
2515	 * when allocating them.
2516	 */
2517	if (use_slog) {
2518		error = metaslab_alloc(spa, spa_log_class(spa), size,
2519		    new_bp, 1, txg, old_bp,
2520		    METASLAB_HINTBP_AVOID | METASLAB_GANG_AVOID);
2521	}
2522
2523	if (error) {
2524		error = metaslab_alloc(spa, spa_normal_class(spa), size,
2525		    new_bp, 1, txg, old_bp,
2526		    METASLAB_HINTBP_AVOID);
2527	}
2528
2529	if (error == 0) {
2530		BP_SET_LSIZE(new_bp, size);
2531		BP_SET_PSIZE(new_bp, size);
2532		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
2533		BP_SET_CHECKSUM(new_bp,
2534		    spa_version(spa) >= SPA_VERSION_SLIM_ZIL
2535		    ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
2536		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
2537		BP_SET_LEVEL(new_bp, 0);
2538		BP_SET_DEDUP(new_bp, 0);
2539		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
2540	}
2541
2542	return (error);
2543}
2544
2545/*
2546 * Free an intent log block.
2547 */
2548void
2549zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
2550{
2551	ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
2552	ASSERT(!BP_IS_GANG(bp));
2553
2554	zio_free(spa, txg, bp);
2555}
2556
2557/*
2558 * ==========================================================================
2559 * Read, write and delete to physical devices
2560 * ==========================================================================
2561 */
2562static int
2563zio_vdev_io_start(zio_t *zio)
2564{
2565	vdev_t *vd = zio->io_vd;
2566	uint64_t align;
2567	spa_t *spa = zio->io_spa;
2568	int ret;
2569
2570	ASSERT(zio->io_error == 0);
2571	ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
2572
2573	if (vd == NULL) {
2574		if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
2575			spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
2576
2577		/*
2578		 * The mirror_ops handle multiple DVAs in a single BP.
2579		 */
2580		return (vdev_mirror_ops.vdev_op_io_start(zio));
2581	}
2582
2583	if (vd->vdev_ops->vdev_op_leaf && zio->io_type == ZIO_TYPE_FREE &&
2584	    zio->io_priority == ZIO_PRIORITY_NOW) {
2585		trim_map_free(vd, zio->io_offset, zio->io_size, zio->io_txg);
2586		return (ZIO_PIPELINE_CONTINUE);
2587	}
2588
2589	/*
2590	 * We keep track of time-sensitive I/Os so that the scan thread
2591	 * can quickly react to certain workloads.  In particular, we care
2592	 * about non-scrubbing, top-level reads and writes with the following
2593	 * characteristics:
2594	 * 	- synchronous writes of user data to non-slog devices
2595	 *	- any reads of user data
2596	 * When these conditions are met, adjust the timestamp of spa_last_io
2597	 * which allows the scan thread to adjust its workload accordingly.
2598	 */
2599	if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
2600	    vd == vd->vdev_top && !vd->vdev_islog &&
2601	    zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
2602	    zio->io_txg != spa_syncing_txg(spa)) {
2603		uint64_t old = spa->spa_last_io;
2604		uint64_t new = ddi_get_lbolt64();
2605		if (old != new)
2606			(void) atomic_cas_64(&spa->spa_last_io, old, new);
2607	}
2608
2609	align = 1ULL << vd->vdev_top->vdev_ashift;
2610
2611	if ((!(zio->io_flags & ZIO_FLAG_PHYSICAL) ||
2612	    (vd->vdev_top->vdev_physical_ashift > SPA_MINBLOCKSHIFT)) &&
2613	    P2PHASE(zio->io_size, align) != 0) {
2614		/* Transform logical writes to be a full physical block size. */
2615		uint64_t asize = P2ROUNDUP(zio->io_size, align);
2616		char *abuf = NULL;
2617		if (zio->io_type == ZIO_TYPE_READ ||
2618		    zio->io_type == ZIO_TYPE_WRITE)
2619			abuf = zio_buf_alloc(asize);
2620		ASSERT(vd == vd->vdev_top);
2621		if (zio->io_type == ZIO_TYPE_WRITE) {
2622			bcopy(zio->io_data, abuf, zio->io_size);
2623			bzero(abuf + zio->io_size, asize - zio->io_size);
2624		}
2625		zio_push_transform(zio, abuf, asize, abuf ? asize : 0,
2626		    zio_subblock);
2627	}
2628
2629	/*
2630	 * If this is not a physical io, make sure that it is properly aligned
2631	 * before proceeding.
2632	 */
2633	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
2634		ASSERT0(P2PHASE(zio->io_offset, align));
2635		ASSERT0(P2PHASE(zio->io_size, align));
2636	} else {
2637		/*
2638		 * For physical writes, we allow 512b aligned writes and assume
2639		 * the device will perform a read-modify-write as necessary.
2640		 */
2641		ASSERT0(P2PHASE(zio->io_offset, SPA_MINBLOCKSIZE));
2642		ASSERT0(P2PHASE(zio->io_size, SPA_MINBLOCKSIZE));
2643	}
2644
2645	VERIFY(zio->io_type == ZIO_TYPE_READ || spa_writeable(spa));
2646
2647	/*
2648	 * If this is a repair I/O, and there's no self-healing involved --
2649	 * that is, we're just resilvering what we expect to resilver --
2650	 * then don't do the I/O unless zio's txg is actually in vd's DTL.
2651	 * This prevents spurious resilvering with nested replication.
2652	 * For example, given a mirror of mirrors, (A+B)+(C+D), if only
2653	 * A is out of date, we'll read from C+D, then use the data to
2654	 * resilver A+B -- but we don't actually want to resilver B, just A.
2655	 * The top-level mirror has no way to know this, so instead we just
2656	 * discard unnecessary repairs as we work our way down the vdev tree.
2657	 * The same logic applies to any form of nested replication:
2658	 * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
2659	 */
2660	if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
2661	    !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
2662	    zio->io_txg != 0 &&	/* not a delegated i/o */
2663	    !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
2664		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
2665		zio_vdev_io_bypass(zio);
2666		return (ZIO_PIPELINE_CONTINUE);
2667	}
2668
2669	if (vd->vdev_ops->vdev_op_leaf) {
2670		switch (zio->io_type) {
2671		case ZIO_TYPE_READ:
2672			if (vdev_cache_read(zio))
2673				return (ZIO_PIPELINE_CONTINUE);
2674			/* FALLTHROUGH */
2675		case ZIO_TYPE_WRITE:
2676		case ZIO_TYPE_FREE:
2677			if ((zio = vdev_queue_io(zio)) == NULL)
2678				return (ZIO_PIPELINE_STOP);
2679
2680			if (!vdev_accessible(vd, zio)) {
2681				zio->io_error = SET_ERROR(ENXIO);
2682				zio_interrupt(zio);
2683				return (ZIO_PIPELINE_STOP);
2684			}
2685			break;
2686		}
2687		/*
2688		 * Note that we ignore repair writes for TRIM because they can
2689		 * conflict with normal writes. This isn't an issue because, by
2690		 * definition, we only repair blocks that aren't freed.
2691		 */
2692		if (zio->io_type == ZIO_TYPE_WRITE &&
2693		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
2694		    !trim_map_write_start(zio))
2695			return (ZIO_PIPELINE_STOP);
2696	}
2697
2698	ret = vd->vdev_ops->vdev_op_io_start(zio);
2699	ASSERT(ret == ZIO_PIPELINE_STOP);
2700
2701	return (ret);
2702}
2703
2704static int
2705zio_vdev_io_done(zio_t *zio)
2706{
2707	vdev_t *vd = zio->io_vd;
2708	vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
2709	boolean_t unexpected_error = B_FALSE;
2710
2711	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
2712		return (ZIO_PIPELINE_STOP);
2713
2714	ASSERT(zio->io_type == ZIO_TYPE_READ ||
2715	    zio->io_type == ZIO_TYPE_WRITE || zio->io_type == ZIO_TYPE_FREE);
2716
2717	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
2718	    (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE ||
2719	    zio->io_type == ZIO_TYPE_FREE)) {
2720
2721		if (zio->io_type == ZIO_TYPE_WRITE &&
2722		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR))
2723			trim_map_write_done(zio);
2724
2725		vdev_queue_io_done(zio);
2726
2727		if (zio->io_type == ZIO_TYPE_WRITE)
2728			vdev_cache_write(zio);
2729
2730		if (zio_injection_enabled && zio->io_error == 0)
2731			zio->io_error = zio_handle_device_injection(vd,
2732			    zio, EIO);
2733
2734		if (zio_injection_enabled && zio->io_error == 0)
2735			zio->io_error = zio_handle_label_injection(zio, EIO);
2736
2737		if (zio->io_error) {
2738			if (zio->io_error == ENOTSUP &&
2739			    zio->io_type == ZIO_TYPE_FREE) {
2740				/* Not all devices support TRIM. */
2741			} else if (!vdev_accessible(vd, zio)) {
2742				zio->io_error = SET_ERROR(ENXIO);
2743			} else {
2744				unexpected_error = B_TRUE;
2745			}
2746		}
2747	}
2748
2749	ops->vdev_op_io_done(zio);
2750
2751	if (unexpected_error)
2752		VERIFY(vdev_probe(vd, zio) == NULL);
2753
2754	return (ZIO_PIPELINE_CONTINUE);
2755}
2756
2757/*
2758 * For non-raidz ZIOs, we can just copy aside the bad data read from the
2759 * disk, and use that to finish the checksum ereport later.
2760 */
2761static void
2762zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
2763    const void *good_buf)
2764{
2765	/* no processing needed */
2766	zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
2767}
2768
2769/*ARGSUSED*/
2770void
2771zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
2772{
2773	void *buf = zio_buf_alloc(zio->io_size);
2774
2775	bcopy(zio->io_data, buf, zio->io_size);
2776
2777	zcr->zcr_cbinfo = zio->io_size;
2778	zcr->zcr_cbdata = buf;
2779	zcr->zcr_finish = zio_vsd_default_cksum_finish;
2780	zcr->zcr_free = zio_buf_free;
2781}
2782
2783static int
2784zio_vdev_io_assess(zio_t *zio)
2785{
2786	vdev_t *vd = zio->io_vd;
2787
2788	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
2789		return (ZIO_PIPELINE_STOP);
2790
2791	if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
2792		spa_config_exit(zio->io_spa, SCL_ZIO, zio);
2793
2794	if (zio->io_vsd != NULL) {
2795		zio->io_vsd_ops->vsd_free(zio);
2796		zio->io_vsd = NULL;
2797	}
2798
2799	if (zio_injection_enabled && zio->io_error == 0)
2800		zio->io_error = zio_handle_fault_injection(zio, EIO);
2801
2802	if (zio->io_type == ZIO_TYPE_FREE &&
2803	    zio->io_priority != ZIO_PRIORITY_NOW) {
2804		switch (zio->io_error) {
2805		case 0:
2806			ZIO_TRIM_STAT_INCR(bytes, zio->io_size);
2807			ZIO_TRIM_STAT_BUMP(success);
2808			break;
2809		case EOPNOTSUPP:
2810			ZIO_TRIM_STAT_BUMP(unsupported);
2811			break;
2812		default:
2813			ZIO_TRIM_STAT_BUMP(failed);
2814			break;
2815		}
2816	}
2817
2818	/*
2819	 * If the I/O failed, determine whether we should attempt to retry it.
2820	 *
2821	 * On retry, we cut in line in the issue queue, since we don't want
2822	 * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
2823	 */
2824	if (zio->io_error && vd == NULL &&
2825	    !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
2826		ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE));	/* not a leaf */
2827		ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));	/* not a leaf */
2828		zio->io_error = 0;
2829		zio->io_flags |= ZIO_FLAG_IO_RETRY |
2830		    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
2831		zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
2832		zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
2833		    zio_requeue_io_start_cut_in_line);
2834		return (ZIO_PIPELINE_STOP);
2835	}
2836
2837	/*
2838	 * If we got an error on a leaf device, convert it to ENXIO
2839	 * if the device is not accessible at all.
2840	 */
2841	if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
2842	    !vdev_accessible(vd, zio))
2843		zio->io_error = SET_ERROR(ENXIO);
2844
2845	/*
2846	 * If we can't write to an interior vdev (mirror or RAID-Z),
2847	 * set vdev_cant_write so that we stop trying to allocate from it.
2848	 */
2849	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
2850	    vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
2851		vd->vdev_cant_write = B_TRUE;
2852	}
2853
2854	if (zio->io_error)
2855		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2856
2857	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
2858	    zio->io_physdone != NULL) {
2859		ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
2860		ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
2861		zio->io_physdone(zio->io_logical);
2862	}
2863
2864	return (ZIO_PIPELINE_CONTINUE);
2865}
2866
2867void
2868zio_vdev_io_reissue(zio_t *zio)
2869{
2870	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
2871	ASSERT(zio->io_error == 0);
2872
2873	zio->io_stage >>= 1;
2874}
2875
2876void
2877zio_vdev_io_redone(zio_t *zio)
2878{
2879	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
2880
2881	zio->io_stage >>= 1;
2882}
2883
2884void
2885zio_vdev_io_bypass(zio_t *zio)
2886{
2887	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
2888	ASSERT(zio->io_error == 0);
2889
2890	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
2891	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
2892}
2893
2894/*
2895 * ==========================================================================
2896 * Generate and verify checksums
2897 * ==========================================================================
2898 */
2899static int
2900zio_checksum_generate(zio_t *zio)
2901{
2902	blkptr_t *bp = zio->io_bp;
2903	enum zio_checksum checksum;
2904
2905	if (bp == NULL) {
2906		/*
2907		 * This is zio_write_phys().
2908		 * We're either generating a label checksum, or none at all.
2909		 */
2910		checksum = zio->io_prop.zp_checksum;
2911
2912		if (checksum == ZIO_CHECKSUM_OFF)
2913			return (ZIO_PIPELINE_CONTINUE);
2914
2915		ASSERT(checksum == ZIO_CHECKSUM_LABEL);
2916	} else {
2917		if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
2918			ASSERT(!IO_IS_ALLOCATING(zio));
2919			checksum = ZIO_CHECKSUM_GANG_HEADER;
2920		} else {
2921			checksum = BP_GET_CHECKSUM(bp);
2922		}
2923	}
2924
2925	zio_checksum_compute(zio, checksum, zio->io_data, zio->io_size);
2926
2927	return (ZIO_PIPELINE_CONTINUE);
2928}
2929
2930static int
2931zio_checksum_verify(zio_t *zio)
2932{
2933	zio_bad_cksum_t info;
2934	blkptr_t *bp = zio->io_bp;
2935	int error;
2936
2937	ASSERT(zio->io_vd != NULL);
2938
2939	if (bp == NULL) {
2940		/*
2941		 * This is zio_read_phys().
2942		 * We're either verifying a label checksum, or nothing at all.
2943		 */
2944		if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
2945			return (ZIO_PIPELINE_CONTINUE);
2946
2947		ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
2948	}
2949
2950	if ((error = zio_checksum_error(zio, &info)) != 0) {
2951		zio->io_error = error;
2952		if (!(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
2953			zfs_ereport_start_checksum(zio->io_spa,
2954			    zio->io_vd, zio, zio->io_offset,
2955			    zio->io_size, NULL, &info);
2956		}
2957	}
2958
2959	return (ZIO_PIPELINE_CONTINUE);
2960}
2961
2962/*
2963 * Called by RAID-Z to ensure we don't compute the checksum twice.
2964 */
2965void
2966zio_checksum_verified(zio_t *zio)
2967{
2968	zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
2969}
2970
2971/*
2972 * ==========================================================================
2973 * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
2974 * An error of 0 indicates success.  ENXIO indicates whole-device failure,
2975 * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
2976 * indicate errors that are specific to one I/O, and most likely permanent.
2977 * Any other error is presumed to be worse because we weren't expecting it.
2978 * ==========================================================================
2979 */
2980int
2981zio_worst_error(int e1, int e2)
2982{
2983	static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
2984	int r1, r2;
2985
2986	for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
2987		if (e1 == zio_error_rank[r1])
2988			break;
2989
2990	for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
2991		if (e2 == zio_error_rank[r2])
2992			break;
2993
2994	return (r1 > r2 ? e1 : e2);
2995}
2996
2997/*
2998 * ==========================================================================
2999 * I/O completion
3000 * ==========================================================================
3001 */
3002static int
3003zio_ready(zio_t *zio)
3004{
3005	blkptr_t *bp = zio->io_bp;
3006	zio_t *pio, *pio_next;
3007
3008	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
3009	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
3010		return (ZIO_PIPELINE_STOP);
3011
3012	if (zio->io_ready) {
3013		ASSERT(IO_IS_ALLOCATING(zio));
3014		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
3015		    (zio->io_flags & ZIO_FLAG_NOPWRITE));
3016		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
3017
3018		zio->io_ready(zio);
3019	}
3020
3021	if (bp != NULL && bp != &zio->io_bp_copy)
3022		zio->io_bp_copy = *bp;
3023
3024	if (zio->io_error)
3025		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3026
3027	mutex_enter(&zio->io_lock);
3028	zio->io_state[ZIO_WAIT_READY] = 1;
3029	pio = zio_walk_parents(zio);
3030	mutex_exit(&zio->io_lock);
3031
3032	/*
3033	 * As we notify zio's parents, new parents could be added.
3034	 * New parents go to the head of zio's io_parent_list, however,
3035	 * so we will (correctly) not notify them.  The remainder of zio's
3036	 * io_parent_list, from 'pio_next' onward, cannot change because
3037	 * all parents must wait for us to be done before they can be done.
3038	 */
3039	for (; pio != NULL; pio = pio_next) {
3040		pio_next = zio_walk_parents(zio);
3041		zio_notify_parent(pio, zio, ZIO_WAIT_READY);
3042	}
3043
3044	if (zio->io_flags & ZIO_FLAG_NODATA) {
3045		if (BP_IS_GANG(bp)) {
3046			zio->io_flags &= ~ZIO_FLAG_NODATA;
3047		} else {
3048			ASSERT((uintptr_t)zio->io_data < SPA_MAXBLOCKSIZE);
3049			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
3050		}
3051	}
3052
3053	if (zio_injection_enabled &&
3054	    zio->io_spa->spa_syncing_txg == zio->io_txg)
3055		zio_handle_ignored_writes(zio);
3056
3057	return (ZIO_PIPELINE_CONTINUE);
3058}
3059
3060static int
3061zio_done(zio_t *zio)
3062{
3063	spa_t *spa = zio->io_spa;
3064	zio_t *lio = zio->io_logical;
3065	blkptr_t *bp = zio->io_bp;
3066	vdev_t *vd = zio->io_vd;
3067	uint64_t psize = zio->io_size;
3068	zio_t *pio, *pio_next;
3069
3070	/*
3071	 * If our children haven't all completed,
3072	 * wait for them and then repeat this pipeline stage.
3073	 */
3074	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
3075	    zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
3076	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
3077	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
3078		return (ZIO_PIPELINE_STOP);
3079
3080	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
3081		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
3082			ASSERT(zio->io_children[c][w] == 0);
3083
3084	if (bp != NULL && !BP_IS_EMBEDDED(bp)) {
3085		ASSERT(bp->blk_pad[0] == 0);
3086		ASSERT(bp->blk_pad[1] == 0);
3087		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
3088		    (bp == zio_unique_parent(zio)->io_bp));
3089		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
3090		    zio->io_bp_override == NULL &&
3091		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
3092			ASSERT(!BP_SHOULD_BYTESWAP(bp));
3093			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
3094			ASSERT(BP_COUNT_GANG(bp) == 0 ||
3095			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
3096		}
3097		if (zio->io_flags & ZIO_FLAG_NOPWRITE)
3098			VERIFY(BP_EQUAL(bp, &zio->io_bp_orig));
3099	}
3100
3101	/*
3102	 * If there were child vdev/gang/ddt errors, they apply to us now.
3103	 */
3104	zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
3105	zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
3106	zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
3107
3108	/*
3109	 * If the I/O on the transformed data was successful, generate any
3110	 * checksum reports now while we still have the transformed data.
3111	 */
3112	if (zio->io_error == 0) {
3113		while (zio->io_cksum_report != NULL) {
3114			zio_cksum_report_t *zcr = zio->io_cksum_report;
3115			uint64_t align = zcr->zcr_align;
3116			uint64_t asize = P2ROUNDUP(psize, align);
3117			char *abuf = zio->io_data;
3118
3119			if (asize != psize) {
3120				abuf = zio_buf_alloc(asize);
3121				bcopy(zio->io_data, abuf, psize);
3122				bzero(abuf + psize, asize - psize);
3123			}
3124
3125			zio->io_cksum_report = zcr->zcr_next;
3126			zcr->zcr_next = NULL;
3127			zcr->zcr_finish(zcr, abuf);
3128			zfs_ereport_free_checksum(zcr);
3129
3130			if (asize != psize)
3131				zio_buf_free(abuf, asize);
3132		}
3133	}
3134
3135	zio_pop_transforms(zio);	/* note: may set zio->io_error */
3136
3137	vdev_stat_update(zio, psize);
3138
3139	if (zio->io_error) {
3140		/*
3141		 * If this I/O is attached to a particular vdev,
3142		 * generate an error message describing the I/O failure
3143		 * at the block level.  We ignore these errors if the
3144		 * device is currently unavailable.
3145		 */
3146		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
3147			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
3148
3149		if ((zio->io_error == EIO || !(zio->io_flags &
3150		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
3151		    zio == lio) {
3152			/*
3153			 * For logical I/O requests, tell the SPA to log the
3154			 * error and generate a logical data ereport.
3155			 */
3156			spa_log_error(spa, zio);
3157			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
3158			    0, 0);
3159		}
3160	}
3161
3162	if (zio->io_error && zio == lio) {
3163		/*
3164		 * Determine whether zio should be reexecuted.  This will
3165		 * propagate all the way to the root via zio_notify_parent().
3166		 */
3167		ASSERT(vd == NULL && bp != NULL);
3168		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3169
3170		if (IO_IS_ALLOCATING(zio) &&
3171		    !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
3172			if (zio->io_error != ENOSPC)
3173				zio->io_reexecute |= ZIO_REEXECUTE_NOW;
3174			else
3175				zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3176		}
3177
3178		if ((zio->io_type == ZIO_TYPE_READ ||
3179		    zio->io_type == ZIO_TYPE_FREE) &&
3180		    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
3181		    zio->io_error == ENXIO &&
3182		    spa_load_state(spa) == SPA_LOAD_NONE &&
3183		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
3184			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3185
3186		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
3187			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3188
3189		/*
3190		 * Here is a possibly good place to attempt to do
3191		 * either combinatorial reconstruction or error correction
3192		 * based on checksums.  It also might be a good place
3193		 * to send out preliminary ereports before we suspend
3194		 * processing.
3195		 */
3196	}
3197
3198	/*
3199	 * If there were logical child errors, they apply to us now.
3200	 * We defer this until now to avoid conflating logical child
3201	 * errors with errors that happened to the zio itself when
3202	 * updating vdev stats and reporting FMA events above.
3203	 */
3204	zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
3205
3206	if ((zio->io_error || zio->io_reexecute) &&
3207	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
3208	    !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
3209		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
3210
3211	zio_gang_tree_free(&zio->io_gang_tree);
3212
3213	/*
3214	 * Godfather I/Os should never suspend.
3215	 */
3216	if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
3217	    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
3218		zio->io_reexecute = 0;
3219
3220	if (zio->io_reexecute) {
3221		/*
3222		 * This is a logical I/O that wants to reexecute.
3223		 *
3224		 * Reexecute is top-down.  When an i/o fails, if it's not
3225		 * the root, it simply notifies its parent and sticks around.
3226		 * The parent, seeing that it still has children in zio_done(),
3227		 * does the same.  This percolates all the way up to the root.
3228		 * The root i/o will reexecute or suspend the entire tree.
3229		 *
3230		 * This approach ensures that zio_reexecute() honors
3231		 * all the original i/o dependency relationships, e.g.
3232		 * parents not executing until children are ready.
3233		 */
3234		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3235
3236		zio->io_gang_leader = NULL;
3237
3238		mutex_enter(&zio->io_lock);
3239		zio->io_state[ZIO_WAIT_DONE] = 1;
3240		mutex_exit(&zio->io_lock);
3241
3242		/*
3243		 * "The Godfather" I/O monitors its children but is
3244		 * not a true parent to them. It will track them through
3245		 * the pipeline but severs its ties whenever they get into
3246		 * trouble (e.g. suspended). This allows "The Godfather"
3247		 * I/O to return status without blocking.
3248		 */
3249		for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
3250			zio_link_t *zl = zio->io_walk_link;
3251			pio_next = zio_walk_parents(zio);
3252
3253			if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
3254			    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
3255				zio_remove_child(pio, zio, zl);
3256				zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3257			}
3258		}
3259
3260		if ((pio = zio_unique_parent(zio)) != NULL) {
3261			/*
3262			 * We're not a root i/o, so there's nothing to do
3263			 * but notify our parent.  Don't propagate errors
3264			 * upward since we haven't permanently failed yet.
3265			 */
3266			ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
3267			zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
3268			zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3269		} else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
3270			/*
3271			 * We'd fail again if we reexecuted now, so suspend
3272			 * until conditions improve (e.g. device comes online).
3273			 */
3274			zio_suspend(spa, zio);
3275		} else {
3276			/*
3277			 * Reexecution is potentially a huge amount of work.
3278			 * Hand it off to the otherwise-unused claim taskq.
3279			 */
3280#if defined(illumos) || !defined(_KERNEL)
3281			ASSERT(zio->io_tqent.tqent_next == NULL);
3282#else
3283			ASSERT(zio->io_tqent.tqent_task.ta_pending == 0);
3284#endif
3285			spa_taskq_dispatch_ent(spa, ZIO_TYPE_CLAIM,
3286			    ZIO_TASKQ_ISSUE, (task_func_t *)zio_reexecute, zio,
3287			    0, &zio->io_tqent);
3288		}
3289		return (ZIO_PIPELINE_STOP);
3290	}
3291
3292	ASSERT(zio->io_child_count == 0);
3293	ASSERT(zio->io_reexecute == 0);
3294	ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
3295
3296	/*
3297	 * Report any checksum errors, since the I/O is complete.
3298	 */
3299	while (zio->io_cksum_report != NULL) {
3300		zio_cksum_report_t *zcr = zio->io_cksum_report;
3301		zio->io_cksum_report = zcr->zcr_next;
3302		zcr->zcr_next = NULL;
3303		zcr->zcr_finish(zcr, NULL);
3304		zfs_ereport_free_checksum(zcr);
3305	}
3306
3307	/*
3308	 * It is the responsibility of the done callback to ensure that this
3309	 * particular zio is no longer discoverable for adoption, and as
3310	 * such, cannot acquire any new parents.
3311	 */
3312	if (zio->io_done)
3313		zio->io_done(zio);
3314
3315	mutex_enter(&zio->io_lock);
3316	zio->io_state[ZIO_WAIT_DONE] = 1;
3317	mutex_exit(&zio->io_lock);
3318
3319	for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
3320		zio_link_t *zl = zio->io_walk_link;
3321		pio_next = zio_walk_parents(zio);
3322		zio_remove_child(pio, zio, zl);
3323		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3324	}
3325
3326	if (zio->io_waiter != NULL) {
3327		mutex_enter(&zio->io_lock);
3328		zio->io_executor = NULL;
3329		cv_broadcast(&zio->io_cv);
3330		mutex_exit(&zio->io_lock);
3331	} else {
3332		zio_destroy(zio);
3333	}
3334
3335	return (ZIO_PIPELINE_STOP);
3336}
3337
3338/*
3339 * ==========================================================================
3340 * I/O pipeline definition
3341 * ==========================================================================
3342 */
3343static zio_pipe_stage_t *zio_pipeline[] = {
3344	NULL,
3345	zio_read_bp_init,
3346	zio_free_bp_init,
3347	zio_issue_async,
3348	zio_write_bp_init,
3349	zio_checksum_generate,
3350	zio_nop_write,
3351	zio_ddt_read_start,
3352	zio_ddt_read_done,
3353	zio_ddt_write,
3354	zio_ddt_free,
3355	zio_gang_assemble,
3356	zio_gang_issue,
3357	zio_dva_allocate,
3358	zio_dva_free,
3359	zio_dva_claim,
3360	zio_ready,
3361	zio_vdev_io_start,
3362	zio_vdev_io_done,
3363	zio_vdev_io_assess,
3364	zio_checksum_verify,
3365	zio_done
3366};
3367
3368/* dnp is the dnode for zb1->zb_object */
3369boolean_t
3370zbookmark_is_before(const dnode_phys_t *dnp, const zbookmark_phys_t *zb1,
3371    const zbookmark_phys_t *zb2)
3372{
3373	uint64_t zb1nextL0, zb2thisobj;
3374
3375	ASSERT(zb1->zb_objset == zb2->zb_objset);
3376	ASSERT(zb2->zb_level == 0);
3377
3378	/* The objset_phys_t isn't before anything. */
3379	if (dnp == NULL)
3380		return (B_FALSE);
3381
3382	zb1nextL0 = (zb1->zb_blkid + 1) <<
3383	    ((zb1->zb_level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT));
3384
3385	zb2thisobj = zb2->zb_object ? zb2->zb_object :
3386	    zb2->zb_blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT);
3387
3388	if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
3389		uint64_t nextobj = zb1nextL0 *
3390		    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT) >> DNODE_SHIFT;
3391		return (nextobj <= zb2thisobj);
3392	}
3393
3394	if (zb1->zb_object < zb2thisobj)
3395		return (B_TRUE);
3396	if (zb1->zb_object > zb2thisobj)
3397		return (B_FALSE);
3398	if (zb2->zb_object == DMU_META_DNODE_OBJECT)
3399		return (B_FALSE);
3400	return (zb1nextL0 <= zb2->zb_blkid);
3401}
3402