1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or https://opensource.org/licenses/CDDL-1.0.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright 2022 Axcient.  All rights reserved.
24 * Use is subject to license terms.
25 */
26
27/*
28 * Copyright (c) 2022 by Delphix. All rights reserved.
29 */
30
31#include <err.h>
32#include <stdio.h>
33#include <stdlib.h>
34#include <unistd.h>
35#include <sys/zfs_ioctl.h>
36#include <sys/zio_checksum.h>
37#include <sys/zstd/zstd.h>
38#include "zfs_fletcher.h"
39#include "zstream.h"
40
41static int
42dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
43    zio_cksum_t *zc, int outfd)
44{
45	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
46	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
47	fletcher_4_incremental_native(drr,
48	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
49	if (drr->drr_type != DRR_BEGIN) {
50		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
51		    drr_checksum.drr_checksum));
52		drr->drr_u.drr_checksum.drr_checksum = *zc;
53	}
54	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
55	    sizeof (zio_cksum_t), zc);
56	if (write(outfd, drr, sizeof (*drr)) == -1)
57		return (errno);
58	if (payload_len != 0) {
59		fletcher_4_incremental_native(payload, payload_len, zc);
60		if (write(outfd, payload, payload_len) == -1)
61			return (errno);
62	}
63	return (0);
64}
65
66int
67zstream_do_recompress(int argc, char *argv[])
68{
69	int bufsz = SPA_MAXBLOCKSIZE;
70	char *buf = safe_malloc(bufsz);
71	dmu_replay_record_t thedrr;
72	dmu_replay_record_t *drr = &thedrr;
73	zio_cksum_t stream_cksum;
74	int c;
75	int level = -1;
76
77	while ((c = getopt(argc, argv, "l:")) != -1) {
78		switch (c) {
79		case 'l':
80			if (sscanf(optarg, "%d", &level) != 1) {
81				fprintf(stderr,
82				    "failed to parse level '%s'\n",
83				    optarg);
84				zstream_usage();
85			}
86			break;
87		case '?':
88			(void) fprintf(stderr, "invalid option '%c'\n",
89			    optopt);
90			zstream_usage();
91			break;
92		}
93	}
94
95	argc -= optind;
96	argv += optind;
97
98	if (argc != 1)
99		zstream_usage();
100	int type = 0;
101	zio_compress_info_t *cinfo = NULL;
102	if (0 == strcmp(argv[0], "off")) {
103		type = ZIO_COMPRESS_OFF;
104		cinfo = &zio_compress_table[type];
105	} else if (0 == strcmp(argv[0], "inherit") ||
106	    0 == strcmp(argv[0], "empty") ||
107	    0 == strcmp(argv[0], "on")) {
108		// Fall through to invalid compression type case
109	} else {
110		for (int i = 0; i < ZIO_COMPRESS_FUNCTIONS; i++) {
111			if (0 == strcmp(zio_compress_table[i].ci_name,
112			    argv[0])) {
113				cinfo = &zio_compress_table[i];
114				type = i;
115				break;
116			}
117		}
118	}
119	if (cinfo == NULL) {
120		fprintf(stderr, "Invalid compression type %s.\n",
121		    argv[0]);
122		exit(2);
123	}
124
125	if (cinfo->ci_compress == NULL) {
126		type = 0;
127		cinfo = &zio_compress_table[0];
128	}
129
130	if (isatty(STDIN_FILENO)) {
131		(void) fprintf(stderr,
132		    "Error: The send stream is a binary format "
133		    "and can not be read from a\n"
134		    "terminal.  Standard input must be redirected.\n");
135		exit(1);
136	}
137
138	fletcher_4_init();
139	zio_init();
140	zstd_init();
141	int begin = 0;
142	boolean_t seen = B_FALSE;
143	while (sfread(drr, sizeof (*drr), stdin) != 0) {
144		struct drr_write *drrw;
145		uint64_t payload_size = 0;
146
147		/*
148		 * We need to regenerate the checksum.
149		 */
150		if (drr->drr_type != DRR_BEGIN) {
151			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
152			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
153		}
154
155
156		switch (drr->drr_type) {
157		case DRR_BEGIN:
158		{
159			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
160			VERIFY0(begin++);
161			seen = B_TRUE;
162
163			uint32_t sz = drr->drr_payloadlen;
164
165			VERIFY3U(sz, <=, 1U << 28);
166
167			if (sz != 0) {
168				if (sz > bufsz) {
169					buf = realloc(buf, sz);
170					if (buf == NULL)
171						err(1, "realloc");
172					bufsz = sz;
173				}
174				(void) sfread(buf, sz, stdin);
175			}
176			payload_size = sz;
177			break;
178		}
179		case DRR_END:
180		{
181			struct drr_end *drre = &drr->drr_u.drr_end;
182			/*
183			 * We would prefer to just check --begin == 0, but
184			 * replication streams have an end of stream END
185			 * record, so we must avoid tripping it.
186			 */
187			VERIFY3B(seen, ==, B_TRUE);
188			begin--;
189			/*
190			 * Use the recalculated checksum, unless this is
191			 * the END record of a stream package, which has
192			 * no checksum.
193			 */
194			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
195				drre->drr_checksum = stream_cksum;
196			break;
197		}
198
199		case DRR_OBJECT:
200		{
201			struct drr_object *drro = &drr->drr_u.drr_object;
202			VERIFY3S(begin, ==, 1);
203
204			if (drro->drr_bonuslen > 0) {
205				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
206				(void) sfread(buf, payload_size, stdin);
207			}
208			break;
209		}
210
211		case DRR_SPILL:
212		{
213			struct drr_spill *drrs = &drr->drr_u.drr_spill;
214			VERIFY3S(begin, ==, 1);
215			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
216			(void) sfread(buf, payload_size, stdin);
217			break;
218		}
219
220		case DRR_WRITE_BYREF:
221			VERIFY3S(begin, ==, 1);
222			fprintf(stderr,
223			    "Deduplicated streams are not supported\n");
224			exit(1);
225			break;
226
227		case DRR_WRITE:
228		{
229			VERIFY3S(begin, ==, 1);
230			drrw = &thedrr.drr_u.drr_write;
231			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
232			/*
233			 * In order to recompress an encrypted block, you have
234			 * to decrypt, decompress, recompress, and
235			 * re-encrypt. That can be a future enhancement (along
236			 * with decryption or re-encryption), but for now we
237			 * skip encrypted blocks.
238			 */
239			boolean_t encrypted = B_FALSE;
240			for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
241				if (drrw->drr_salt[i] != 0) {
242					encrypted = B_TRUE;
243					break;
244				}
245			}
246			if (encrypted) {
247				(void) sfread(buf, payload_size, stdin);
248				break;
249			}
250			if (drrw->drr_compressiontype >=
251			    ZIO_COMPRESS_FUNCTIONS) {
252				fprintf(stderr, "Invalid compression type in "
253				    "stream: %d\n", drrw->drr_compressiontype);
254				exit(3);
255			}
256			zio_compress_info_t *dinfo =
257			    &zio_compress_table[drrw->drr_compressiontype];
258
259			/* Set up buffers to minimize memcpys */
260			char *cbuf, *dbuf;
261			if (cinfo->ci_compress == NULL)
262				dbuf = buf;
263			else
264				dbuf = safe_calloc(bufsz);
265
266			if (dinfo->ci_decompress == NULL)
267				cbuf = dbuf;
268			else
269				cbuf = safe_calloc(payload_size);
270
271			/* Read and decompress the payload */
272			(void) sfread(cbuf, payload_size, stdin);
273			if (dinfo->ci_decompress != NULL) {
274				if (0 != dinfo->ci_decompress(cbuf, dbuf,
275				    payload_size, MIN(bufsz,
276				    drrw->drr_logical_size), dinfo->ci_level)) {
277					warnx("decompression type %d failed "
278					    "for ino %llu offset %llu",
279					    type,
280					    (u_longlong_t)drrw->drr_object,
281					    (u_longlong_t)drrw->drr_offset);
282					exit(4);
283				}
284				payload_size = drrw->drr_logical_size;
285				free(cbuf);
286			}
287
288			/* Recompress the payload */
289			if (cinfo->ci_compress != NULL) {
290				payload_size = P2ROUNDUP(cinfo->ci_compress(
291				    dbuf, buf, drrw->drr_logical_size,
292				    MIN(payload_size, bufsz), (level == -1 ?
293				    cinfo->ci_level : level)),
294				    SPA_MINBLOCKSIZE);
295				if (payload_size != drrw->drr_logical_size) {
296					drrw->drr_compressiontype = type;
297					drrw->drr_compressed_size =
298					    payload_size;
299				} else {
300					memcpy(buf, dbuf, payload_size);
301					drrw->drr_compressiontype = 0;
302					drrw->drr_compressed_size = 0;
303				}
304				free(dbuf);
305			} else {
306				drrw->drr_compressiontype = type;
307				drrw->drr_compressed_size = 0;
308			}
309			break;
310		}
311
312		case DRR_WRITE_EMBEDDED:
313		{
314			struct drr_write_embedded *drrwe =
315			    &drr->drr_u.drr_write_embedded;
316			VERIFY3S(begin, ==, 1);
317			payload_size =
318			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
319			(void) sfread(buf, payload_size, stdin);
320			break;
321		}
322
323		case DRR_FREEOBJECTS:
324		case DRR_FREE:
325		case DRR_OBJECT_RANGE:
326			VERIFY3S(begin, ==, 1);
327			break;
328
329		default:
330			(void) fprintf(stderr, "INVALID record type 0x%x\n",
331			    drr->drr_type);
332			/* should never happen, so assert */
333			assert(B_FALSE);
334		}
335
336		if (feof(stdout)) {
337			fprintf(stderr, "Error: unexpected end-of-file\n");
338			exit(1);
339		}
340		if (ferror(stdout)) {
341			fprintf(stderr, "Error while reading file: %s\n",
342			    strerror(errno));
343			exit(1);
344		}
345
346		/*
347		 * We need to recalculate the checksum, and it needs to be
348		 * initially zero to do that.  BEGIN records don't have
349		 * a checksum.
350		 */
351		if (drr->drr_type != DRR_BEGIN) {
352			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
353			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
354		}
355		if (dump_record(drr, buf, payload_size,
356		    &stream_cksum, STDOUT_FILENO) != 0)
357			break;
358		if (drr->drr_type == DRR_END) {
359			/*
360			 * Typically the END record is either the last
361			 * thing in the stream, or it is followed
362			 * by a BEGIN record (which also zeros the checksum).
363			 * However, a stream package ends with two END
364			 * records.  The last END record's checksum starts
365			 * from zero.
366			 */
367			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
368		}
369	}
370	free(buf);
371	fletcher_4_fini();
372	zio_fini();
373	zstd_fini();
374
375	return (0);
376}
377