1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include <linux/atomic.h>
7#include <linux/bitops.h>
8#include <linux/completion.h>
9#include <linux/delay.h>
10#include <linux/device-mapper.h>
11#include <linux/err.h>
12#include <linux/module.h>
13#include <linux/mutex.h>
14#include <linux/spinlock.h>
15
16#include "admin-state.h"
17#include "block-map.h"
18#include "completion.h"
19#include "constants.h"
20#include "data-vio.h"
21#include "dedupe.h"
22#include "dump.h"
23#include "encodings.h"
24#include "errors.h"
25#include "flush.h"
26#include "io-submitter.h"
27#include "logger.h"
28#include "memory-alloc.h"
29#include "message-stats.h"
30#include "recovery-journal.h"
31#include "repair.h"
32#include "slab-depot.h"
33#include "status-codes.h"
34#include "string-utils.h"
35#include "thread-device.h"
36#include "thread-registry.h"
37#include "thread-utils.h"
38#include "types.h"
39#include "vdo.h"
40#include "vio.h"
41
42enum admin_phases {
43	GROW_LOGICAL_PHASE_START,
44	GROW_LOGICAL_PHASE_GROW_BLOCK_MAP,
45	GROW_LOGICAL_PHASE_END,
46	GROW_LOGICAL_PHASE_ERROR,
47	GROW_PHYSICAL_PHASE_START,
48	GROW_PHYSICAL_PHASE_COPY_SUMMARY,
49	GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS,
50	GROW_PHYSICAL_PHASE_USE_NEW_SLABS,
51	GROW_PHYSICAL_PHASE_END,
52	GROW_PHYSICAL_PHASE_ERROR,
53	LOAD_PHASE_START,
54	LOAD_PHASE_LOAD_DEPOT,
55	LOAD_PHASE_MAKE_DIRTY,
56	LOAD_PHASE_PREPARE_TO_ALLOCATE,
57	LOAD_PHASE_SCRUB_SLABS,
58	LOAD_PHASE_DATA_REDUCTION,
59	LOAD_PHASE_FINISHED,
60	LOAD_PHASE_DRAIN_JOURNAL,
61	LOAD_PHASE_WAIT_FOR_READ_ONLY,
62	PRE_LOAD_PHASE_START,
63	PRE_LOAD_PHASE_LOAD_COMPONENTS,
64	PRE_LOAD_PHASE_END,
65	PREPARE_GROW_PHYSICAL_PHASE_START,
66	RESUME_PHASE_START,
67	RESUME_PHASE_ALLOW_READ_ONLY_MODE,
68	RESUME_PHASE_DEDUPE,
69	RESUME_PHASE_DEPOT,
70	RESUME_PHASE_JOURNAL,
71	RESUME_PHASE_BLOCK_MAP,
72	RESUME_PHASE_LOGICAL_ZONES,
73	RESUME_PHASE_PACKER,
74	RESUME_PHASE_FLUSHER,
75	RESUME_PHASE_DATA_VIOS,
76	RESUME_PHASE_END,
77	SUSPEND_PHASE_START,
78	SUSPEND_PHASE_PACKER,
79	SUSPEND_PHASE_DATA_VIOS,
80	SUSPEND_PHASE_DEDUPE,
81	SUSPEND_PHASE_FLUSHES,
82	SUSPEND_PHASE_LOGICAL_ZONES,
83	SUSPEND_PHASE_BLOCK_MAP,
84	SUSPEND_PHASE_JOURNAL,
85	SUSPEND_PHASE_DEPOT,
86	SUSPEND_PHASE_READ_ONLY_WAIT,
87	SUSPEND_PHASE_WRITE_SUPER_BLOCK,
88	SUSPEND_PHASE_END,
89};
90
91static const char * const ADMIN_PHASE_NAMES[] = {
92	"GROW_LOGICAL_PHASE_START",
93	"GROW_LOGICAL_PHASE_GROW_BLOCK_MAP",
94	"GROW_LOGICAL_PHASE_END",
95	"GROW_LOGICAL_PHASE_ERROR",
96	"GROW_PHYSICAL_PHASE_START",
97	"GROW_PHYSICAL_PHASE_COPY_SUMMARY",
98	"GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS",
99	"GROW_PHYSICAL_PHASE_USE_NEW_SLABS",
100	"GROW_PHYSICAL_PHASE_END",
101	"GROW_PHYSICAL_PHASE_ERROR",
102	"LOAD_PHASE_START",
103	"LOAD_PHASE_LOAD_DEPOT",
104	"LOAD_PHASE_MAKE_DIRTY",
105	"LOAD_PHASE_PREPARE_TO_ALLOCATE",
106	"LOAD_PHASE_SCRUB_SLABS",
107	"LOAD_PHASE_DATA_REDUCTION",
108	"LOAD_PHASE_FINISHED",
109	"LOAD_PHASE_DRAIN_JOURNAL",
110	"LOAD_PHASE_WAIT_FOR_READ_ONLY",
111	"PRE_LOAD_PHASE_START",
112	"PRE_LOAD_PHASE_LOAD_COMPONENTS",
113	"PRE_LOAD_PHASE_END",
114	"PREPARE_GROW_PHYSICAL_PHASE_START",
115	"RESUME_PHASE_START",
116	"RESUME_PHASE_ALLOW_READ_ONLY_MODE",
117	"RESUME_PHASE_DEDUPE",
118	"RESUME_PHASE_DEPOT",
119	"RESUME_PHASE_JOURNAL",
120	"RESUME_PHASE_BLOCK_MAP",
121	"RESUME_PHASE_LOGICAL_ZONES",
122	"RESUME_PHASE_PACKER",
123	"RESUME_PHASE_FLUSHER",
124	"RESUME_PHASE_DATA_VIOS",
125	"RESUME_PHASE_END",
126	"SUSPEND_PHASE_START",
127	"SUSPEND_PHASE_PACKER",
128	"SUSPEND_PHASE_DATA_VIOS",
129	"SUSPEND_PHASE_DEDUPE",
130	"SUSPEND_PHASE_FLUSHES",
131	"SUSPEND_PHASE_LOGICAL_ZONES",
132	"SUSPEND_PHASE_BLOCK_MAP",
133	"SUSPEND_PHASE_JOURNAL",
134	"SUSPEND_PHASE_DEPOT",
135	"SUSPEND_PHASE_READ_ONLY_WAIT",
136	"SUSPEND_PHASE_WRITE_SUPER_BLOCK",
137	"SUSPEND_PHASE_END",
138};
139
140/* If we bump this, update the arrays below */
141#define TABLE_VERSION 4
142
143/* arrays for handling different table versions */
144static const u8 REQUIRED_ARGC[] = { 10, 12, 9, 7, 6 };
145/* pool name no longer used. only here for verification of older versions */
146static const u8 POOL_NAME_ARG_INDEX[] = { 8, 10, 8 };
147
148/*
149 * Track in-use instance numbers using a flat bit array.
150 *
151 * O(n) run time isn't ideal, but if we have 1000 VDO devices in use simultaneously we still only
152 * need to scan 16 words, so it's not likely to be a big deal compared to other resource usage.
153 */
154
155/*
156 * This minimum size for the bit array creates a numbering space of 0-999, which allows
157 * successive starts of the same volume to have different instance numbers in any
158 * reasonably-sized test. Changing instances on restart allows vdoMonReport to detect that
159 * the ephemeral stats have reset to zero.
160 */
161#define BIT_COUNT_MINIMUM 1000
162/* Grow the bit array by this many bits when needed */
163#define BIT_COUNT_INCREMENT 100
164
165struct instance_tracker {
166	unsigned int bit_count;
167	unsigned long *words;
168	unsigned int count;
169	unsigned int next;
170};
171
172static DEFINE_MUTEX(instances_lock);
173static struct instance_tracker instances;
174
175/**
176 * free_device_config() - Free a device config created by parse_device_config().
177 * @config: The config to free.
178 */
179static void free_device_config(struct device_config *config)
180{
181	if (config == NULL)
182		return;
183
184	if (config->owned_device != NULL)
185		dm_put_device(config->owning_target, config->owned_device);
186
187	vdo_free(config->parent_device_name);
188	vdo_free(config->original_string);
189
190	/* Reduce the chance a use-after-free (as in BZ 1669960) happens to work. */
191	memset(config, 0, sizeof(*config));
192	vdo_free(config);
193}
194
195/**
196 * get_version_number() - Decide the version number from argv.
197 *
198 * @argc: The number of table values.
199 * @argv: The array of table values.
200 * @error_ptr: A pointer to return a error string in.
201 * @version_ptr: A pointer to return the version.
202 *
203 * Return: VDO_SUCCESS or an error code.
204 */
205static int get_version_number(int argc, char **argv, char **error_ptr,
206			      unsigned int *version_ptr)
207{
208	/* version, if it exists, is in a form of V<n> */
209	if (sscanf(argv[0], "V%u", version_ptr) == 1) {
210		if (*version_ptr < 1 || *version_ptr > TABLE_VERSION) {
211			*error_ptr = "Unknown version number detected";
212			return VDO_BAD_CONFIGURATION;
213		}
214	} else {
215		/* V0 actually has no version number in the table string */
216		*version_ptr = 0;
217	}
218
219	/*
220	 * V0 and V1 have no optional parameters. There will always be a parameter for thread
221	 * config, even if it's a "." to show it's an empty list.
222	 */
223	if (*version_ptr <= 1) {
224		if (argc != REQUIRED_ARGC[*version_ptr]) {
225			*error_ptr = "Incorrect number of arguments for version";
226			return VDO_BAD_CONFIGURATION;
227		}
228	} else if (argc < REQUIRED_ARGC[*version_ptr]) {
229		*error_ptr = "Incorrect number of arguments for version";
230		return VDO_BAD_CONFIGURATION;
231	}
232
233	if (*version_ptr != TABLE_VERSION) {
234		vdo_log_warning("Detected version mismatch between kernel module and tools kernel: %d, tool: %d",
235				TABLE_VERSION, *version_ptr);
236		vdo_log_warning("Please consider upgrading management tools to match kernel.");
237	}
238	return VDO_SUCCESS;
239}
240
241/* Free a list of non-NULL string pointers, and then the list itself. */
242static void free_string_array(char **string_array)
243{
244	unsigned int offset;
245
246	for (offset = 0; string_array[offset] != NULL; offset++)
247		vdo_free(string_array[offset]);
248	vdo_free(string_array);
249}
250
251/*
252 * Split the input string into substrings, separated at occurrences of the indicated character,
253 * returning a null-terminated list of string pointers.
254 *
255 * The string pointers and the pointer array itself should both be freed with vdo_free() when no
256 * longer needed. This can be done with vdo_free_string_array (below) if the pointers in the array
257 * are not changed. Since the array and copied strings are allocated by this function, it may only
258 * be used in contexts where allocation is permitted.
259 *
260 * Empty substrings are not ignored; that is, returned substrings may be empty strings if the
261 * separator occurs twice in a row.
262 */
263static int split_string(const char *string, char separator, char ***substring_array_ptr)
264{
265	unsigned int current_substring = 0, substring_count = 1;
266	const char *s;
267	char **substrings;
268	int result;
269	ptrdiff_t length;
270
271	for (s = string; *s != 0; s++) {
272		if (*s == separator)
273			substring_count++;
274	}
275
276	result = vdo_allocate(substring_count + 1, char *, "string-splitting array",
277			      &substrings);
278	if (result != VDO_SUCCESS)
279		return result;
280
281	for (s = string; *s != 0; s++) {
282		if (*s == separator) {
283			ptrdiff_t length = s - string;
284
285			result = vdo_allocate(length + 1, char, "split string",
286					      &substrings[current_substring]);
287			if (result != VDO_SUCCESS) {
288				free_string_array(substrings);
289				return result;
290			}
291			/*
292			 * Trailing NUL is already in place after allocation; deal with the zero or
293			 * more non-NUL bytes in the string.
294			 */
295			if (length > 0)
296				memcpy(substrings[current_substring], string, length);
297			string = s + 1;
298			current_substring++;
299			BUG_ON(current_substring >= substring_count);
300		}
301	}
302	/* Process final string, with no trailing separator. */
303	BUG_ON(current_substring != (substring_count - 1));
304	length = strlen(string);
305
306	result = vdo_allocate(length + 1, char, "split string",
307			      &substrings[current_substring]);
308	if (result != VDO_SUCCESS) {
309		free_string_array(substrings);
310		return result;
311	}
312	memcpy(substrings[current_substring], string, length);
313	current_substring++;
314	/* substrings[current_substring] is NULL already */
315	*substring_array_ptr = substrings;
316	return VDO_SUCCESS;
317}
318
319/*
320 * Join the input substrings into one string, joined with the indicated character, returning a
321 * string. array_length is a bound on the number of valid elements in substring_array, in case it
322 * is not NULL-terminated.
323 */
324static int join_strings(char **substring_array, size_t array_length, char separator,
325			char **string_ptr)
326{
327	size_t string_length = 0;
328	size_t i;
329	int result;
330	char *output, *current_position;
331
332	for (i = 0; (i < array_length) && (substring_array[i] != NULL); i++)
333		string_length += strlen(substring_array[i]) + 1;
334
335	result = vdo_allocate(string_length, char, __func__, &output);
336	if (result != VDO_SUCCESS)
337		return result;
338
339	current_position = &output[0];
340
341	for (i = 0; (i < array_length) && (substring_array[i] != NULL); i++) {
342		current_position = vdo_append_to_buffer(current_position,
343							output + string_length, "%s",
344							substring_array[i]);
345		*current_position = separator;
346		current_position++;
347	}
348
349	/* We output one too many separators; replace the last with a zero byte. */
350	if (current_position != output)
351		*(current_position - 1) = '\0';
352
353	*string_ptr = output;
354	return VDO_SUCCESS;
355}
356
357/**
358 * parse_bool() - Parse a two-valued option into a bool.
359 * @bool_str: The string value to convert to a bool.
360 * @true_str: The string value which should be converted to true.
361 * @false_str: The string value which should be converted to false.
362 * @bool_ptr: A pointer to return the bool value in.
363 *
364 * Return: VDO_SUCCESS or an error if bool_str is neither true_str nor false_str.
365 */
366static inline int __must_check parse_bool(const char *bool_str, const char *true_str,
367					  const char *false_str, bool *bool_ptr)
368{
369	bool value = false;
370
371	if (strcmp(bool_str, true_str) == 0)
372		value = true;
373	else if (strcmp(bool_str, false_str) == 0)
374		value = false;
375	else
376		return VDO_BAD_CONFIGURATION;
377
378	*bool_ptr = value;
379	return VDO_SUCCESS;
380}
381
382/**
383 * process_one_thread_config_spec() - Process one component of a thread parameter configuration
384 *				      string and update the configuration data structure.
385 * @thread_param_type: The type of thread specified.
386 * @count: The thread count requested.
387 * @config: The configuration data structure to update.
388 *
389 * If the thread count requested is invalid, a message is logged and -EINVAL returned. If the
390 * thread name is unknown, a message is logged but no error is returned.
391 *
392 * Return: VDO_SUCCESS or -EINVAL
393 */
394static int process_one_thread_config_spec(const char *thread_param_type,
395					  unsigned int count,
396					  struct thread_count_config *config)
397{
398	/* Handle limited thread parameters */
399	if (strcmp(thread_param_type, "bioRotationInterval") == 0) {
400		if (count == 0) {
401			vdo_log_error("thread config string error:  'bioRotationInterval' of at least 1 is required");
402			return -EINVAL;
403		} else if (count > VDO_BIO_ROTATION_INTERVAL_LIMIT) {
404			vdo_log_error("thread config string error: 'bioRotationInterval' cannot be higher than %d",
405				      VDO_BIO_ROTATION_INTERVAL_LIMIT);
406			return -EINVAL;
407		}
408		config->bio_rotation_interval = count;
409		return VDO_SUCCESS;
410	}
411	if (strcmp(thread_param_type, "logical") == 0) {
412		if (count > MAX_VDO_LOGICAL_ZONES) {
413			vdo_log_error("thread config string error: at most %d 'logical' threads are allowed",
414				      MAX_VDO_LOGICAL_ZONES);
415			return -EINVAL;
416		}
417		config->logical_zones = count;
418		return VDO_SUCCESS;
419	}
420	if (strcmp(thread_param_type, "physical") == 0) {
421		if (count > MAX_VDO_PHYSICAL_ZONES) {
422			vdo_log_error("thread config string error: at most %d 'physical' threads are allowed",
423				      MAX_VDO_PHYSICAL_ZONES);
424			return -EINVAL;
425		}
426		config->physical_zones = count;
427		return VDO_SUCCESS;
428	}
429	/* Handle other thread count parameters */
430	if (count > MAXIMUM_VDO_THREADS) {
431		vdo_log_error("thread config string error: at most %d '%s' threads are allowed",
432			      MAXIMUM_VDO_THREADS, thread_param_type);
433		return -EINVAL;
434	}
435	if (strcmp(thread_param_type, "hash") == 0) {
436		config->hash_zones = count;
437		return VDO_SUCCESS;
438	}
439	if (strcmp(thread_param_type, "cpu") == 0) {
440		if (count == 0) {
441			vdo_log_error("thread config string error: at least one 'cpu' thread required");
442			return -EINVAL;
443		}
444		config->cpu_threads = count;
445		return VDO_SUCCESS;
446	}
447	if (strcmp(thread_param_type, "ack") == 0) {
448		config->bio_ack_threads = count;
449		return VDO_SUCCESS;
450	}
451	if (strcmp(thread_param_type, "bio") == 0) {
452		if (count == 0) {
453			vdo_log_error("thread config string error: at least one 'bio' thread required");
454			return -EINVAL;
455		}
456		config->bio_threads = count;
457		return VDO_SUCCESS;
458	}
459
460	/*
461	 * Don't fail, just log. This will handle version mismatches between user mode tools and
462	 * kernel.
463	 */
464	vdo_log_info("unknown thread parameter type \"%s\"", thread_param_type);
465	return VDO_SUCCESS;
466}
467
468/**
469 * parse_one_thread_config_spec() - Parse one component of a thread parameter configuration string
470 *				    and update the configuration data structure.
471 * @spec: The thread parameter specification string.
472 * @config: The configuration data to be updated.
473 */
474static int parse_one_thread_config_spec(const char *spec,
475					struct thread_count_config *config)
476{
477	unsigned int count;
478	char **fields;
479	int result;
480
481	result = split_string(spec, '=', &fields);
482	if (result != VDO_SUCCESS)
483		return result;
484
485	if ((fields[0] == NULL) || (fields[1] == NULL) || (fields[2] != NULL)) {
486		vdo_log_error("thread config string error: expected thread parameter assignment, saw \"%s\"",
487			      spec);
488		free_string_array(fields);
489		return -EINVAL;
490	}
491
492	result = kstrtouint(fields[1], 10, &count);
493	if (result) {
494		vdo_log_error("thread config string error: integer value needed, found \"%s\"",
495			      fields[1]);
496		free_string_array(fields);
497		return result;
498	}
499
500	result = process_one_thread_config_spec(fields[0], count, config);
501	free_string_array(fields);
502	return result;
503}
504
505/**
506 * parse_thread_config_string() - Parse the configuration string passed and update the specified
507 *				  counts and other parameters of various types of threads to be
508 *				  created.
509 * @string: Thread parameter configuration string.
510 * @config: The thread configuration data to update.
511 *
512 * The configuration string should contain one or more comma-separated specs of the form
513 * "typename=number"; the supported type names are "cpu", "ack", "bio", "bioRotationInterval",
514 * "logical", "physical", and "hash".
515 *
516 * If an error occurs during parsing of a single key/value pair, we deem it serious enough to stop
517 * further parsing.
518 *
519 * This function can't set the "reason" value the caller wants to pass back, because we'd want to
520 * format it to say which field was invalid, and we can't allocate the "reason" strings
521 * dynamically. So if an error occurs, we'll log the details and pass back an error.
522 *
523 * Return: VDO_SUCCESS or -EINVAL or -ENOMEM
524 */
525static int parse_thread_config_string(const char *string,
526				      struct thread_count_config *config)
527{
528	int result = VDO_SUCCESS;
529	char **specs;
530
531	if (strcmp(".", string) != 0) {
532		unsigned int i;
533
534		result = split_string(string, ',', &specs);
535		if (result != VDO_SUCCESS)
536			return result;
537
538		for (i = 0; specs[i] != NULL; i++) {
539			result = parse_one_thread_config_spec(specs[i], config);
540			if (result != VDO_SUCCESS)
541				break;
542		}
543		free_string_array(specs);
544	}
545	return result;
546}
547
548/**
549 * process_one_key_value_pair() - Process one component of an optional parameter string and update
550 *				  the configuration data structure.
551 * @key: The optional parameter key name.
552 * @value: The optional parameter value.
553 * @config: The configuration data structure to update.
554 *
555 * If the value requested is invalid, a message is logged and -EINVAL returned. If the key is
556 * unknown, a message is logged but no error is returned.
557 *
558 * Return: VDO_SUCCESS or -EINVAL
559 */
560static int process_one_key_value_pair(const char *key, unsigned int value,
561				      struct device_config *config)
562{
563	/* Non thread optional parameters */
564	if (strcmp(key, "maxDiscard") == 0) {
565		if (value == 0) {
566			vdo_log_error("optional parameter error: at least one max discard block required");
567			return -EINVAL;
568		}
569		/* Max discard sectors in blkdev_issue_discard is UINT_MAX >> 9 */
570		if (value > (UINT_MAX / VDO_BLOCK_SIZE)) {
571			vdo_log_error("optional parameter error: at most %d max discard	 blocks are allowed",
572				      UINT_MAX / VDO_BLOCK_SIZE);
573			return -EINVAL;
574		}
575		config->max_discard_blocks = value;
576		return VDO_SUCCESS;
577	}
578	/* Handles unknown key names */
579	return process_one_thread_config_spec(key, value, &config->thread_counts);
580}
581
582/**
583 * parse_one_key_value_pair() - Parse one key/value pair and update the configuration data
584 *				structure.
585 * @key: The optional key name.
586 * @value: The optional value.
587 * @config: The configuration data to be updated.
588 *
589 * Return: VDO_SUCCESS or error.
590 */
591static int parse_one_key_value_pair(const char *key, const char *value,
592				    struct device_config *config)
593{
594	unsigned int count;
595	int result;
596
597	if (strcmp(key, "deduplication") == 0)
598		return parse_bool(value, "on", "off", &config->deduplication);
599
600	if (strcmp(key, "compression") == 0)
601		return parse_bool(value, "on", "off", &config->compression);
602
603	/* The remaining arguments must have integral values. */
604	result = kstrtouint(value, 10, &count);
605	if (result) {
606		vdo_log_error("optional config string error: integer value needed, found \"%s\"",
607			      value);
608		return result;
609	}
610	return process_one_key_value_pair(key, count, config);
611}
612
613/**
614 * parse_key_value_pairs() - Parse all key/value pairs from a list of arguments.
615 * @argc: The total number of arguments in list.
616 * @argv: The list of key/value pairs.
617 * @config: The device configuration data to update.
618 *
619 * If an error occurs during parsing of a single key/value pair, we deem it serious enough to stop
620 * further parsing.
621 *
622 * This function can't set the "reason" value the caller wants to pass back, because we'd want to
623 * format it to say which field was invalid, and we can't allocate the "reason" strings
624 * dynamically. So if an error occurs, we'll log the details and return the error.
625 *
626 * Return: VDO_SUCCESS or error
627 */
628static int parse_key_value_pairs(int argc, char **argv, struct device_config *config)
629{
630	int result = VDO_SUCCESS;
631
632	while (argc) {
633		result = parse_one_key_value_pair(argv[0], argv[1], config);
634		if (result != VDO_SUCCESS)
635			break;
636
637		argc -= 2;
638		argv += 2;
639	}
640
641	return result;
642}
643
644/**
645 * parse_optional_arguments() - Parse the configuration string passed in for optional arguments.
646 * @arg_set: The structure holding the arguments to parse.
647 * @error_ptr: Pointer to a buffer to hold the error string.
648 * @config: Pointer to device configuration data to update.
649 *
650 * For V0/V1 configurations, there will only be one optional parameter; the thread configuration.
651 * The configuration string should contain one or more comma-separated specs of the form
652 * "typename=number"; the supported type names are "cpu", "ack", "bio", "bioRotationInterval",
653 * "logical", "physical", and "hash".
654 *
655 * For V2 configurations and beyond, there could be any number of arguments. They should contain
656 * one or more key/value pairs separated by a space.
657 *
658 * Return: VDO_SUCCESS or error
659 */
660static int parse_optional_arguments(struct dm_arg_set *arg_set, char **error_ptr,
661				    struct device_config *config)
662{
663	int result = VDO_SUCCESS;
664
665	if (config->version == 0 || config->version == 1) {
666		result = parse_thread_config_string(arg_set->argv[0],
667						    &config->thread_counts);
668		if (result != VDO_SUCCESS) {
669			*error_ptr = "Invalid thread-count configuration";
670			return VDO_BAD_CONFIGURATION;
671		}
672	} else {
673		if ((arg_set->argc % 2) != 0) {
674			*error_ptr = "Odd number of optional arguments given but they should be <key> <value> pairs";
675			return VDO_BAD_CONFIGURATION;
676		}
677		result = parse_key_value_pairs(arg_set->argc, arg_set->argv, config);
678		if (result != VDO_SUCCESS) {
679			*error_ptr = "Invalid optional argument configuration";
680			return VDO_BAD_CONFIGURATION;
681		}
682	}
683	return result;
684}
685
686/**
687 * handle_parse_error() - Handle a parsing error.
688 * @config: The config to free.
689 * @error_ptr: A place to store a constant string about the error.
690 * @error_str: A constant string to store in error_ptr.
691 */
692static void handle_parse_error(struct device_config *config, char **error_ptr,
693			       char *error_str)
694{
695	free_device_config(config);
696	*error_ptr = error_str;
697}
698
699/**
700 * parse_device_config() - Convert the dmsetup table into a struct device_config.
701 * @argc: The number of table values.
702 * @argv: The array of table values.
703 * @ti: The target structure for this table.
704 * @config_ptr: A pointer to return the allocated config.
705 *
706 * Return: VDO_SUCCESS or an error code.
707 */
708static int parse_device_config(int argc, char **argv, struct dm_target *ti,
709			       struct device_config **config_ptr)
710{
711	bool enable_512e;
712	size_t logical_bytes = to_bytes(ti->len);
713	struct dm_arg_set arg_set;
714	char **error_ptr = &ti->error;
715	struct device_config *config = NULL;
716	int result;
717
718	if ((logical_bytes % VDO_BLOCK_SIZE) != 0) {
719		handle_parse_error(config, error_ptr,
720				   "Logical size must be a multiple of 4096");
721		return VDO_BAD_CONFIGURATION;
722	}
723
724	if (argc == 0) {
725		handle_parse_error(config, error_ptr, "Incorrect number of arguments");
726		return VDO_BAD_CONFIGURATION;
727	}
728
729	result = vdo_allocate(1, struct device_config, "device_config", &config);
730	if (result != VDO_SUCCESS) {
731		handle_parse_error(config, error_ptr,
732				   "Could not allocate config structure");
733		return VDO_BAD_CONFIGURATION;
734	}
735
736	config->owning_target = ti;
737	config->logical_blocks = logical_bytes / VDO_BLOCK_SIZE;
738	INIT_LIST_HEAD(&config->config_list);
739
740	/* Save the original string. */
741	result = join_strings(argv, argc, ' ', &config->original_string);
742	if (result != VDO_SUCCESS) {
743		handle_parse_error(config, error_ptr, "Could not populate string");
744		return VDO_BAD_CONFIGURATION;
745	}
746
747	vdo_log_info("table line: %s", config->original_string);
748
749	config->thread_counts = (struct thread_count_config) {
750		.bio_ack_threads = 1,
751		.bio_threads = DEFAULT_VDO_BIO_SUBMIT_QUEUE_COUNT,
752		.bio_rotation_interval = DEFAULT_VDO_BIO_SUBMIT_QUEUE_ROTATE_INTERVAL,
753		.cpu_threads = 1,
754		.logical_zones = 0,
755		.physical_zones = 0,
756		.hash_zones = 0,
757	};
758	config->max_discard_blocks = 1;
759	config->deduplication = true;
760	config->compression = false;
761
762	arg_set.argc = argc;
763	arg_set.argv = argv;
764
765	result = get_version_number(argc, argv, error_ptr, &config->version);
766	if (result != VDO_SUCCESS) {
767		/* get_version_number sets error_ptr itself. */
768		handle_parse_error(config, error_ptr, *error_ptr);
769		return result;
770	}
771	/* Move the arg pointer forward only if the argument was there. */
772	if (config->version >= 1)
773		dm_shift_arg(&arg_set);
774
775	result = vdo_duplicate_string(dm_shift_arg(&arg_set), "parent device name",
776				      &config->parent_device_name);
777	if (result != VDO_SUCCESS) {
778		handle_parse_error(config, error_ptr,
779				   "Could not copy parent device name");
780		return VDO_BAD_CONFIGURATION;
781	}
782
783	/* Get the physical blocks, if known. */
784	if (config->version >= 1) {
785		result = kstrtoull(dm_shift_arg(&arg_set), 10, &config->physical_blocks);
786		if (result != VDO_SUCCESS) {
787			handle_parse_error(config, error_ptr,
788					   "Invalid physical block count");
789			return VDO_BAD_CONFIGURATION;
790		}
791	}
792
793	/* Get the logical block size and validate */
794	result = parse_bool(dm_shift_arg(&arg_set), "512", "4096", &enable_512e);
795	if (result != VDO_SUCCESS) {
796		handle_parse_error(config, error_ptr, "Invalid logical block size");
797		return VDO_BAD_CONFIGURATION;
798	}
799	config->logical_block_size = (enable_512e ? 512 : 4096);
800
801	/* Skip past the two no longer used read cache options. */
802	if (config->version <= 1)
803		dm_consume_args(&arg_set, 2);
804
805	/* Get the page cache size. */
806	result = kstrtouint(dm_shift_arg(&arg_set), 10, &config->cache_size);
807	if (result != VDO_SUCCESS) {
808		handle_parse_error(config, error_ptr,
809				   "Invalid block map page cache size");
810		return VDO_BAD_CONFIGURATION;
811	}
812
813	/* Get the block map era length. */
814	result = kstrtouint(dm_shift_arg(&arg_set), 10, &config->block_map_maximum_age);
815	if (result != VDO_SUCCESS) {
816		handle_parse_error(config, error_ptr, "Invalid block map maximum age");
817		return VDO_BAD_CONFIGURATION;
818	}
819
820	/* Skip past the no longer used MD RAID5 optimization mode */
821	if (config->version <= 2)
822		dm_consume_args(&arg_set, 1);
823
824	/* Skip past the no longer used write policy setting */
825	if (config->version <= 3)
826		dm_consume_args(&arg_set, 1);
827
828	/* Skip past the no longer used pool name for older table lines */
829	if (config->version <= 2) {
830		/*
831		 * Make sure the enum to get the pool name from argv directly is still in sync with
832		 * the parsing of the table line.
833		 */
834		if (&arg_set.argv[0] != &argv[POOL_NAME_ARG_INDEX[config->version]]) {
835			handle_parse_error(config, error_ptr,
836					   "Pool name not in expected location");
837			return VDO_BAD_CONFIGURATION;
838		}
839		dm_shift_arg(&arg_set);
840	}
841
842	/* Get the optional arguments and validate. */
843	result = parse_optional_arguments(&arg_set, error_ptr, config);
844	if (result != VDO_SUCCESS) {
845		/* parse_optional_arguments sets error_ptr itself. */
846		handle_parse_error(config, error_ptr, *error_ptr);
847		return result;
848	}
849
850	/*
851	 * Logical, physical, and hash zone counts can all be zero; then we get one thread doing
852	 * everything, our older configuration. If any zone count is non-zero, the others must be
853	 * as well.
854	 */
855	if (((config->thread_counts.logical_zones == 0) !=
856	     (config->thread_counts.physical_zones == 0)) ||
857	    ((config->thread_counts.physical_zones == 0) !=
858	     (config->thread_counts.hash_zones == 0))) {
859		handle_parse_error(config, error_ptr,
860				   "Logical, physical, and hash zones counts must all be zero or all non-zero");
861		return VDO_BAD_CONFIGURATION;
862	}
863
864	if (config->cache_size <
865	    (2 * MAXIMUM_VDO_USER_VIOS * config->thread_counts.logical_zones)) {
866		handle_parse_error(config, error_ptr,
867				   "Insufficient block map cache for logical zones");
868		return VDO_BAD_CONFIGURATION;
869	}
870
871	result = dm_get_device(ti, config->parent_device_name,
872			       dm_table_get_mode(ti->table), &config->owned_device);
873	if (result != 0) {
874		vdo_log_error("couldn't open device \"%s\": error %d",
875			      config->parent_device_name, result);
876		handle_parse_error(config, error_ptr, "Unable to open storage device");
877		return VDO_BAD_CONFIGURATION;
878	}
879
880	if (config->version == 0) {
881		u64 device_size = i_size_read(config->owned_device->bdev->bd_inode);
882
883		config->physical_blocks = device_size / VDO_BLOCK_SIZE;
884	}
885
886	*config_ptr = config;
887	return result;
888}
889
890static struct vdo *get_vdo_for_target(struct dm_target *ti)
891{
892	return ((struct device_config *) ti->private)->vdo;
893}
894
895
896static int vdo_map_bio(struct dm_target *ti, struct bio *bio)
897{
898	struct vdo *vdo = get_vdo_for_target(ti);
899	struct vdo_work_queue *current_work_queue;
900	const struct admin_state_code *code = vdo_get_admin_state_code(&vdo->admin.state);
901
902	VDO_ASSERT_LOG_ONLY(code->normal, "vdo should not receive bios while in state %s",
903			    code->name);
904
905	/* Count all incoming bios. */
906	vdo_count_bios(&vdo->stats.bios_in, bio);
907
908
909	/* Handle empty bios.  Empty flush bios are not associated with a vio. */
910	if ((bio_op(bio) == REQ_OP_FLUSH) || ((bio->bi_opf & REQ_PREFLUSH) != 0)) {
911		vdo_launch_flush(vdo, bio);
912		return DM_MAPIO_SUBMITTED;
913	}
914
915	/* This could deadlock, */
916	current_work_queue = vdo_get_current_work_queue();
917	BUG_ON((current_work_queue != NULL) &&
918	       (vdo == vdo_get_work_queue_owner(current_work_queue)->vdo));
919	vdo_launch_bio(vdo->data_vio_pool, bio);
920	return DM_MAPIO_SUBMITTED;
921}
922
923static void vdo_io_hints(struct dm_target *ti, struct queue_limits *limits)
924{
925	struct vdo *vdo = get_vdo_for_target(ti);
926
927	limits->logical_block_size = vdo->device_config->logical_block_size;
928	limits->physical_block_size = VDO_BLOCK_SIZE;
929
930	/* The minimum io size for random io */
931	blk_limits_io_min(limits, VDO_BLOCK_SIZE);
932	/* The optimal io size for streamed/sequential io */
933	blk_limits_io_opt(limits, VDO_BLOCK_SIZE);
934
935	/*
936	 * Sets the maximum discard size that will be passed into VDO. This value comes from a
937	 * table line value passed in during dmsetup create.
938	 *
939	 * The value 1024 is the largest usable value on HD systems. A 2048 sector discard on a
940	 * busy HD system takes 31 seconds. We should use a value no higher than 1024, which takes
941	 * 15 to 16 seconds on a busy HD system. However, using large values results in 120 second
942	 * blocked task warnings in kernel logs. In order to avoid these warnings, we choose to
943	 * use the smallest reasonable value.
944	 *
945	 * The value is used by dm-thin to determine whether to pass down discards. The block layer
946	 * splits large discards on this boundary when this is set.
947	 */
948	limits->max_discard_sectors =
949		(vdo->device_config->max_discard_blocks * VDO_SECTORS_PER_BLOCK);
950
951	/*
952	 * Force discards to not begin or end with a partial block by stating the granularity is
953	 * 4k.
954	 */
955	limits->discard_granularity = VDO_BLOCK_SIZE;
956}
957
958static int vdo_iterate_devices(struct dm_target *ti, iterate_devices_callout_fn fn,
959			       void *data)
960{
961	struct device_config *config = get_vdo_for_target(ti)->device_config;
962
963	return fn(ti, config->owned_device, 0,
964		  config->physical_blocks * VDO_SECTORS_PER_BLOCK, data);
965}
966
967/*
968 * Status line is:
969 *    <device> <operating mode> <in recovery> <index state> <compression state>
970 *    <used physical blocks> <total physical blocks>
971 */
972
973static void vdo_status(struct dm_target *ti, status_type_t status_type,
974		       unsigned int status_flags, char *result, unsigned int maxlen)
975{
976	struct vdo *vdo = get_vdo_for_target(ti);
977	struct vdo_statistics *stats;
978	struct device_config *device_config;
979	/* N.B.: The DMEMIT macro uses the variables named "sz", "result", "maxlen". */
980	int sz = 0;
981
982	switch (status_type) {
983	case STATUSTYPE_INFO:
984		/* Report info for dmsetup status */
985		mutex_lock(&vdo->stats_mutex);
986		vdo_fetch_statistics(vdo, &vdo->stats_buffer);
987		stats = &vdo->stats_buffer;
988
989		DMEMIT("/dev/%pg %s %s %s %s %llu %llu",
990		       vdo_get_backing_device(vdo), stats->mode,
991		       stats->in_recovery_mode ? "recovering" : "-",
992		       vdo_get_dedupe_index_state_name(vdo->hash_zones),
993		       vdo_get_compressing(vdo) ? "online" : "offline",
994		       stats->data_blocks_used + stats->overhead_blocks_used,
995		       stats->physical_blocks);
996		mutex_unlock(&vdo->stats_mutex);
997		break;
998
999	case STATUSTYPE_TABLE:
1000		/* Report the string actually specified in the beginning. */
1001		device_config = (struct device_config *) ti->private;
1002		DMEMIT("%s", device_config->original_string);
1003		break;
1004
1005	case STATUSTYPE_IMA:
1006		/* FIXME: We ought to be more detailed here, but this is what thin does. */
1007		*result = '\0';
1008		break;
1009	}
1010}
1011
1012static block_count_t __must_check get_underlying_device_block_count(const struct vdo *vdo)
1013{
1014	return i_size_read(vdo_get_backing_device(vdo)->bd_inode) / VDO_BLOCK_SIZE;
1015}
1016
1017static int __must_check process_vdo_message_locked(struct vdo *vdo, unsigned int argc,
1018						   char **argv)
1019{
1020	if ((argc == 2) && (strcasecmp(argv[0], "compression") == 0)) {
1021		if (strcasecmp(argv[1], "on") == 0) {
1022			vdo_set_compressing(vdo, true);
1023			return 0;
1024		}
1025
1026		if (strcasecmp(argv[1], "off") == 0) {
1027			vdo_set_compressing(vdo, false);
1028			return 0;
1029		}
1030
1031		vdo_log_warning("invalid argument '%s' to dmsetup compression message",
1032				argv[1]);
1033		return -EINVAL;
1034	}
1035
1036	vdo_log_warning("unrecognized dmsetup message '%s' received", argv[0]);
1037	return -EINVAL;
1038}
1039
1040/*
1041 * If the message is a dump, just do it. Otherwise, check that no other message is being processed,
1042 * and only proceed if so.
1043 * Returns -EBUSY if another message is being processed
1044 */
1045static int __must_check process_vdo_message(struct vdo *vdo, unsigned int argc,
1046					    char **argv)
1047{
1048	int result;
1049
1050	/*
1051	 * All messages which may be processed in parallel with other messages should be handled
1052	 * here before the atomic check below. Messages which should be exclusive should be
1053	 * processed in process_vdo_message_locked().
1054	 */
1055
1056	/* Dump messages should always be processed */
1057	if (strcasecmp(argv[0], "dump") == 0)
1058		return vdo_dump(vdo, argc, argv, "dmsetup message");
1059
1060	if (argc == 1) {
1061		if (strcasecmp(argv[0], "dump-on-shutdown") == 0) {
1062			vdo->dump_on_shutdown = true;
1063			return 0;
1064		}
1065
1066		/* Index messages should always be processed */
1067		if ((strcasecmp(argv[0], "index-close") == 0) ||
1068		    (strcasecmp(argv[0], "index-create") == 0) ||
1069		    (strcasecmp(argv[0], "index-disable") == 0) ||
1070		    (strcasecmp(argv[0], "index-enable") == 0))
1071			return vdo_message_dedupe_index(vdo->hash_zones, argv[0]);
1072	}
1073
1074	if (atomic_cmpxchg(&vdo->processing_message, 0, 1) != 0)
1075		return -EBUSY;
1076
1077	result = process_vdo_message_locked(vdo, argc, argv);
1078
1079	/* Pairs with the implicit barrier in cmpxchg just above */
1080	smp_wmb();
1081	atomic_set(&vdo->processing_message, 0);
1082	return result;
1083}
1084
1085static int vdo_message(struct dm_target *ti, unsigned int argc, char **argv,
1086		       char *result_buffer, unsigned int maxlen)
1087{
1088	struct registered_thread allocating_thread, instance_thread;
1089	struct vdo *vdo;
1090	int result;
1091
1092	if (argc == 0) {
1093		vdo_log_warning("unspecified dmsetup message");
1094		return -EINVAL;
1095	}
1096
1097	vdo = get_vdo_for_target(ti);
1098	vdo_register_allocating_thread(&allocating_thread, NULL);
1099	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
1100
1101	/*
1102	 * Must be done here so we don't map return codes. The code in dm-ioctl expects a 1 for a
1103	 * return code to look at the buffer and see if it is full or not.
1104	 */
1105	if ((argc == 1) && (strcasecmp(argv[0], "stats") == 0)) {
1106		vdo_write_stats(vdo, result_buffer, maxlen);
1107		result = 1;
1108	} else {
1109		result = vdo_status_to_errno(process_vdo_message(vdo, argc, argv));
1110	}
1111
1112	vdo_unregister_thread_device_id();
1113	vdo_unregister_allocating_thread();
1114	return result;
1115}
1116
1117static void configure_target_capabilities(struct dm_target *ti)
1118{
1119	ti->discards_supported = 1;
1120	ti->flush_supported = true;
1121	ti->num_discard_bios = 1;
1122	ti->num_flush_bios = 1;
1123
1124	/*
1125	 * If this value changes, please make sure to update the value for max_discard_sectors
1126	 * accordingly.
1127	 */
1128	BUG_ON(dm_set_target_max_io_len(ti, VDO_SECTORS_PER_BLOCK) != 0);
1129}
1130
1131/*
1132 * Implements vdo_filter_fn.
1133 */
1134static bool vdo_uses_device(struct vdo *vdo, const void *context)
1135{
1136	const struct device_config *config = context;
1137
1138	return vdo_get_backing_device(vdo)->bd_dev == config->owned_device->bdev->bd_dev;
1139}
1140
1141/**
1142 * get_thread_id_for_phase() - Get the thread id for the current phase of the admin operation in
1143 *                             progress.
1144 */
1145static thread_id_t __must_check get_thread_id_for_phase(struct vdo *vdo)
1146{
1147	switch (vdo->admin.phase) {
1148	case RESUME_PHASE_PACKER:
1149	case RESUME_PHASE_FLUSHER:
1150	case SUSPEND_PHASE_PACKER:
1151	case SUSPEND_PHASE_FLUSHES:
1152		return vdo->thread_config.packer_thread;
1153
1154	case RESUME_PHASE_DATA_VIOS:
1155	case SUSPEND_PHASE_DATA_VIOS:
1156		return vdo->thread_config.cpu_thread;
1157
1158	case LOAD_PHASE_DRAIN_JOURNAL:
1159	case RESUME_PHASE_JOURNAL:
1160	case SUSPEND_PHASE_JOURNAL:
1161		return vdo->thread_config.journal_thread;
1162
1163	default:
1164		return vdo->thread_config.admin_thread;
1165	}
1166}
1167
1168static struct vdo_completion *prepare_admin_completion(struct vdo *vdo,
1169						       vdo_action_fn callback,
1170						       vdo_action_fn error_handler)
1171{
1172	struct vdo_completion *completion = &vdo->admin.completion;
1173
1174	/*
1175	 * We can't use vdo_prepare_completion_for_requeue() here because we don't want to reset
1176	 * any error in the completion.
1177	 */
1178	completion->callback = callback;
1179	completion->error_handler = error_handler;
1180	completion->callback_thread_id = get_thread_id_for_phase(vdo);
1181	completion->requeue = true;
1182	return completion;
1183}
1184
1185/**
1186 * advance_phase() - Increment the phase of the current admin operation and prepare the admin
1187 *                   completion to run on the thread for the next phase.
1188 * @vdo: The on which an admin operation is being performed
1189 *
1190 * Return: The current phase
1191 */
1192static u32 advance_phase(struct vdo *vdo)
1193{
1194	u32 phase = vdo->admin.phase++;
1195
1196	vdo->admin.completion.callback_thread_id = get_thread_id_for_phase(vdo);
1197	vdo->admin.completion.requeue = true;
1198	return phase;
1199}
1200
1201/*
1202 * Perform an administrative operation (load, suspend, grow logical, or grow physical). This method
1203 * should not be called from vdo threads.
1204 */
1205static int perform_admin_operation(struct vdo *vdo, u32 starting_phase,
1206				   vdo_action_fn callback, vdo_action_fn error_handler,
1207				   const char *type)
1208{
1209	int result;
1210	struct vdo_administrator *admin = &vdo->admin;
1211
1212	if (atomic_cmpxchg(&admin->busy, 0, 1) != 0) {
1213		return vdo_log_error_strerror(VDO_COMPONENT_BUSY,
1214					      "Can't start %s operation, another operation is already in progress",
1215					      type);
1216	}
1217
1218	admin->phase = starting_phase;
1219	reinit_completion(&admin->callback_sync);
1220	vdo_reset_completion(&admin->completion);
1221	vdo_launch_completion(prepare_admin_completion(vdo, callback, error_handler));
1222
1223	/*
1224	 * Using the "interruptible" interface means that Linux will not log a message when we wait
1225	 * for more than 120 seconds.
1226	 */
1227	while (wait_for_completion_interruptible(&admin->callback_sync)) {
1228		/* However, if we get a signal in a user-mode process, we could spin... */
1229		fsleep(1000);
1230	}
1231
1232	result = admin->completion.result;
1233	/* pairs with implicit barrier in cmpxchg above */
1234	smp_wmb();
1235	atomic_set(&admin->busy, 0);
1236	return result;
1237}
1238
1239/* Assert that we are operating on the correct thread for the current phase. */
1240static void assert_admin_phase_thread(struct vdo *vdo, const char *what)
1241{
1242	VDO_ASSERT_LOG_ONLY(vdo_get_callback_thread_id() == get_thread_id_for_phase(vdo),
1243			    "%s on correct thread for %s", what,
1244			    ADMIN_PHASE_NAMES[vdo->admin.phase]);
1245}
1246
1247/**
1248 * finish_operation_callback() - Callback to finish an admin operation.
1249 * @completion: The admin_completion.
1250 */
1251static void finish_operation_callback(struct vdo_completion *completion)
1252{
1253	struct vdo_administrator *admin = &completion->vdo->admin;
1254
1255	vdo_finish_operation(&admin->state, completion->result);
1256	complete(&admin->callback_sync);
1257}
1258
1259/**
1260 * decode_from_super_block() - Decode the VDO state from the super block and validate that it is
1261 *                             correct.
1262 * @vdo: The vdo being loaded.
1263 *
1264 * On error from this method, the component states must be destroyed explicitly. If this method
1265 * returns successfully, the component states must not be destroyed.
1266 *
1267 * Return: VDO_SUCCESS or an error.
1268 */
1269static int __must_check decode_from_super_block(struct vdo *vdo)
1270{
1271	const struct device_config *config = vdo->device_config;
1272	int result;
1273
1274	result = vdo_decode_component_states(vdo->super_block.buffer, &vdo->geometry,
1275					     &vdo->states);
1276	if (result != VDO_SUCCESS)
1277		return result;
1278
1279	vdo_set_state(vdo, vdo->states.vdo.state);
1280	vdo->load_state = vdo->states.vdo.state;
1281
1282	/*
1283	 * If the device config specifies a larger logical size than was recorded in the super
1284	 * block, just accept it.
1285	 */
1286	if (vdo->states.vdo.config.logical_blocks < config->logical_blocks) {
1287		vdo_log_warning("Growing logical size: a logical size of %llu blocks was specified, but that differs from the %llu blocks configured in the vdo super block",
1288				(unsigned long long) config->logical_blocks,
1289				(unsigned long long) vdo->states.vdo.config.logical_blocks);
1290		vdo->states.vdo.config.logical_blocks = config->logical_blocks;
1291	}
1292
1293	result = vdo_validate_component_states(&vdo->states, vdo->geometry.nonce,
1294					       config->physical_blocks,
1295					       config->logical_blocks);
1296	if (result != VDO_SUCCESS)
1297		return result;
1298
1299	vdo->layout = vdo->states.layout;
1300	return VDO_SUCCESS;
1301}
1302
1303/**
1304 * decode_vdo() - Decode the component data portion of a super block and fill in the corresponding
1305 *                portions of the vdo being loaded.
1306 * @vdo: The vdo being loaded.
1307 *
1308 * This will also allocate the recovery journal and slab depot. If this method is called with an
1309 * asynchronous layer (i.e. a thread config which specifies at least one base thread), the block
1310 * map and packer will be constructed as well.
1311 *
1312 * Return: VDO_SUCCESS or an error.
1313 */
1314static int __must_check decode_vdo(struct vdo *vdo)
1315{
1316	block_count_t maximum_age, journal_length;
1317	struct partition *partition;
1318	int result;
1319
1320	result = decode_from_super_block(vdo);
1321	if (result != VDO_SUCCESS) {
1322		vdo_destroy_component_states(&vdo->states);
1323		return result;
1324	}
1325
1326	maximum_age = vdo_convert_maximum_age(vdo->device_config->block_map_maximum_age);
1327	journal_length =
1328		vdo_get_recovery_journal_length(vdo->states.vdo.config.recovery_journal_size);
1329	if (maximum_age > (journal_length / 2)) {
1330		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
1331					      "maximum age: %llu exceeds limit %llu",
1332					      (unsigned long long) maximum_age,
1333					      (unsigned long long) (journal_length / 2));
1334	}
1335
1336	if (maximum_age == 0) {
1337		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
1338					      "maximum age must be greater than 0");
1339	}
1340
1341	result = vdo_enable_read_only_entry(vdo);
1342	if (result != VDO_SUCCESS)
1343		return result;
1344
1345	partition = vdo_get_known_partition(&vdo->layout,
1346					    VDO_RECOVERY_JOURNAL_PARTITION);
1347	result = vdo_decode_recovery_journal(vdo->states.recovery_journal,
1348					     vdo->states.vdo.nonce, vdo, partition,
1349					     vdo->states.vdo.complete_recoveries,
1350					     vdo->states.vdo.config.recovery_journal_size,
1351					     &vdo->recovery_journal);
1352	if (result != VDO_SUCCESS)
1353		return result;
1354
1355	partition = vdo_get_known_partition(&vdo->layout, VDO_SLAB_SUMMARY_PARTITION);
1356	result = vdo_decode_slab_depot(vdo->states.slab_depot, vdo, partition,
1357				       &vdo->depot);
1358	if (result != VDO_SUCCESS)
1359		return result;
1360
1361	result = vdo_decode_block_map(vdo->states.block_map,
1362				      vdo->states.vdo.config.logical_blocks, vdo,
1363				      vdo->recovery_journal, vdo->states.vdo.nonce,
1364				      vdo->device_config->cache_size, maximum_age,
1365				      &vdo->block_map);
1366	if (result != VDO_SUCCESS)
1367		return result;
1368
1369	result = vdo_make_physical_zones(vdo, &vdo->physical_zones);
1370	if (result != VDO_SUCCESS)
1371		return result;
1372
1373	/* The logical zones depend on the physical zones already existing. */
1374	result = vdo_make_logical_zones(vdo, &vdo->logical_zones);
1375	if (result != VDO_SUCCESS)
1376		return result;
1377
1378	return vdo_make_hash_zones(vdo, &vdo->hash_zones);
1379}
1380
1381/**
1382 * pre_load_callback() - Callback to initiate a pre-load, registered in vdo_initialize().
1383 * @completion: The admin completion.
1384 */
1385static void pre_load_callback(struct vdo_completion *completion)
1386{
1387	struct vdo *vdo = completion->vdo;
1388	int result;
1389
1390	assert_admin_phase_thread(vdo, __func__);
1391
1392	switch (advance_phase(vdo)) {
1393	case PRE_LOAD_PHASE_START:
1394		result = vdo_start_operation(&vdo->admin.state,
1395					     VDO_ADMIN_STATE_PRE_LOADING);
1396		if (result != VDO_SUCCESS) {
1397			vdo_continue_completion(completion, result);
1398			return;
1399		}
1400
1401		vdo_load_super_block(vdo, completion);
1402		return;
1403
1404	case PRE_LOAD_PHASE_LOAD_COMPONENTS:
1405		vdo_continue_completion(completion, decode_vdo(vdo));
1406		return;
1407
1408	case PRE_LOAD_PHASE_END:
1409		break;
1410
1411	default:
1412		vdo_set_completion_result(completion, UDS_BAD_STATE);
1413	}
1414
1415	finish_operation_callback(completion);
1416}
1417
1418static void release_instance(unsigned int instance)
1419{
1420	mutex_lock(&instances_lock);
1421	if (instance >= instances.bit_count) {
1422		VDO_ASSERT_LOG_ONLY(false,
1423				    "instance number %u must be less than bit count %u",
1424				    instance, instances.bit_count);
1425	} else if (test_bit(instance, instances.words) == 0) {
1426		VDO_ASSERT_LOG_ONLY(false, "instance number %u must be allocated", instance);
1427	} else {
1428		__clear_bit(instance, instances.words);
1429		instances.count -= 1;
1430	}
1431	mutex_unlock(&instances_lock);
1432}
1433
1434static void set_device_config(struct dm_target *ti, struct vdo *vdo,
1435			      struct device_config *config)
1436{
1437	list_del_init(&config->config_list);
1438	list_add_tail(&config->config_list, &vdo->device_config_list);
1439	config->vdo = vdo;
1440	ti->private = config;
1441	configure_target_capabilities(ti);
1442}
1443
1444static int vdo_initialize(struct dm_target *ti, unsigned int instance,
1445			  struct device_config *config)
1446{
1447	struct vdo *vdo;
1448	int result;
1449	u64 block_size = VDO_BLOCK_SIZE;
1450	u64 logical_size = to_bytes(ti->len);
1451	block_count_t logical_blocks = logical_size / block_size;
1452
1453	vdo_log_info("loading device '%s'", vdo_get_device_name(ti));
1454	vdo_log_debug("Logical block size     = %llu", (u64) config->logical_block_size);
1455	vdo_log_debug("Logical blocks         = %llu", logical_blocks);
1456	vdo_log_debug("Physical block size    = %llu", (u64) block_size);
1457	vdo_log_debug("Physical blocks        = %llu", config->physical_blocks);
1458	vdo_log_debug("Block map cache blocks = %u", config->cache_size);
1459	vdo_log_debug("Block map maximum age  = %u", config->block_map_maximum_age);
1460	vdo_log_debug("Deduplication          = %s", (config->deduplication ? "on" : "off"));
1461	vdo_log_debug("Compression            = %s", (config->compression ? "on" : "off"));
1462
1463	vdo = vdo_find_matching(vdo_uses_device, config);
1464	if (vdo != NULL) {
1465		vdo_log_error("Existing vdo already uses device %s",
1466			      vdo->device_config->parent_device_name);
1467		ti->error = "Cannot share storage device with already-running VDO";
1468		return VDO_BAD_CONFIGURATION;
1469	}
1470
1471	result = vdo_make(instance, config, &ti->error, &vdo);
1472	if (result != VDO_SUCCESS) {
1473		vdo_log_error("Could not create VDO device. (VDO error %d, message %s)",
1474			      result, ti->error);
1475		vdo_destroy(vdo);
1476		return result;
1477	}
1478
1479	result = perform_admin_operation(vdo, PRE_LOAD_PHASE_START, pre_load_callback,
1480					 finish_operation_callback, "pre-load");
1481	if (result != VDO_SUCCESS) {
1482		ti->error = ((result == VDO_INVALID_ADMIN_STATE) ?
1483			     "Pre-load is only valid immediately after initialization" :
1484			     "Cannot load metadata from device");
1485		vdo_log_error("Could not start VDO device. (VDO error %d, message %s)",
1486			      result, ti->error);
1487		vdo_destroy(vdo);
1488		return result;
1489	}
1490
1491	set_device_config(ti, vdo, config);
1492	vdo->device_config = config;
1493	return VDO_SUCCESS;
1494}
1495
1496/* Implements vdo_filter_fn. */
1497static bool __must_check vdo_is_named(struct vdo *vdo, const void *context)
1498{
1499	struct dm_target *ti = vdo->device_config->owning_target;
1500	const char *device_name = vdo_get_device_name(ti);
1501
1502	return strcmp(device_name, context) == 0;
1503}
1504
1505/**
1506 * get_bit_array_size() - Return the number of bytes needed to store a bit array of the specified
1507 *                        capacity in an array of unsigned longs.
1508 * @bit_count: The number of bits the array must hold.
1509 *
1510 * Return: the number of bytes needed for the array representation.
1511 */
1512static size_t get_bit_array_size(unsigned int bit_count)
1513{
1514	/* Round up to a multiple of the word size and convert to a byte count. */
1515	return (BITS_TO_LONGS(bit_count) * sizeof(unsigned long));
1516}
1517
1518/**
1519 * grow_bit_array() - Re-allocate the bitmap word array so there will more instance numbers that
1520 *                    can be allocated.
1521 *
1522 * Since the array is initially NULL, this also initializes the array the first time we allocate an
1523 * instance number.
1524 *
1525 * Return: VDO_SUCCESS or an error code from the allocation
1526 */
1527static int grow_bit_array(void)
1528{
1529	unsigned int new_count = max(instances.bit_count + BIT_COUNT_INCREMENT,
1530				     (unsigned int) BIT_COUNT_MINIMUM);
1531	unsigned long *new_words;
1532	int result;
1533
1534	result = vdo_reallocate_memory(instances.words,
1535				       get_bit_array_size(instances.bit_count),
1536				       get_bit_array_size(new_count),
1537				       "instance number bit array", &new_words);
1538	if (result != VDO_SUCCESS)
1539		return result;
1540
1541	instances.bit_count = new_count;
1542	instances.words = new_words;
1543	return VDO_SUCCESS;
1544}
1545
1546/**
1547 * allocate_instance() - Allocate an instance number.
1548 * @instance_ptr: A point to hold the instance number
1549 *
1550 * Return: VDO_SUCCESS or an error code
1551 *
1552 * This function must be called while holding the instances lock.
1553 */
1554static int allocate_instance(unsigned int *instance_ptr)
1555{
1556	unsigned int instance;
1557	int result;
1558
1559	/* If there are no unallocated instances, grow the bit array. */
1560	if (instances.count >= instances.bit_count) {
1561		result = grow_bit_array();
1562		if (result != VDO_SUCCESS)
1563			return result;
1564	}
1565
1566	/*
1567	 * There must be a zero bit somewhere now. Find it, starting just after the last instance
1568	 * allocated.
1569	 */
1570	instance = find_next_zero_bit(instances.words, instances.bit_count,
1571				      instances.next);
1572	if (instance >= instances.bit_count) {
1573		/* Nothing free after next, so wrap around to instance zero. */
1574		instance = find_first_zero_bit(instances.words, instances.bit_count);
1575		result = VDO_ASSERT(instance < instances.bit_count,
1576				    "impossibly, no zero bit found");
1577		if (result != VDO_SUCCESS)
1578			return result;
1579	}
1580
1581	__set_bit(instance, instances.words);
1582	instances.count++;
1583	instances.next = instance + 1;
1584	*instance_ptr = instance;
1585	return VDO_SUCCESS;
1586}
1587
1588static int construct_new_vdo_registered(struct dm_target *ti, unsigned int argc,
1589					char **argv, unsigned int instance)
1590{
1591	int result;
1592	struct device_config *config;
1593
1594	result = parse_device_config(argc, argv, ti, &config);
1595	if (result != VDO_SUCCESS) {
1596		vdo_log_error_strerror(result, "parsing failed: %s", ti->error);
1597		release_instance(instance);
1598		return -EINVAL;
1599	}
1600
1601	/* Beyond this point, the instance number will be cleaned up for us if needed */
1602	result = vdo_initialize(ti, instance, config);
1603	if (result != VDO_SUCCESS) {
1604		release_instance(instance);
1605		free_device_config(config);
1606		return vdo_status_to_errno(result);
1607	}
1608
1609	return VDO_SUCCESS;
1610}
1611
1612static int construct_new_vdo(struct dm_target *ti, unsigned int argc, char **argv)
1613{
1614	int result;
1615	unsigned int instance;
1616	struct registered_thread instance_thread;
1617
1618	mutex_lock(&instances_lock);
1619	result = allocate_instance(&instance);
1620	mutex_unlock(&instances_lock);
1621	if (result != VDO_SUCCESS)
1622		return -ENOMEM;
1623
1624	vdo_register_thread_device_id(&instance_thread, &instance);
1625	result = construct_new_vdo_registered(ti, argc, argv, instance);
1626	vdo_unregister_thread_device_id();
1627	return result;
1628}
1629
1630/**
1631 * check_may_grow_physical() - Callback to check that we're not in recovery mode, used in
1632 *                             vdo_prepare_to_grow_physical().
1633 * @completion: The admin completion.
1634 */
1635static void check_may_grow_physical(struct vdo_completion *completion)
1636{
1637	struct vdo *vdo = completion->vdo;
1638
1639	assert_admin_phase_thread(vdo, __func__);
1640
1641	/* These checks can only be done from a vdo thread. */
1642	if (vdo_is_read_only(vdo))
1643		vdo_set_completion_result(completion, VDO_READ_ONLY);
1644
1645	if (vdo_in_recovery_mode(vdo))
1646		vdo_set_completion_result(completion, VDO_RETRY_AFTER_REBUILD);
1647
1648	finish_operation_callback(completion);
1649}
1650
1651static block_count_t get_partition_size(struct layout *layout, enum partition_id id)
1652{
1653	return vdo_get_known_partition(layout, id)->count;
1654}
1655
1656/**
1657 * grow_layout() - Make the layout for growing a vdo.
1658 * @vdo: The vdo preparing to grow.
1659 * @old_size: The current size of the vdo.
1660 * @new_size: The size to which the vdo will be grown.
1661 *
1662 * Return: VDO_SUCCESS or an error code.
1663 */
1664static int grow_layout(struct vdo *vdo, block_count_t old_size, block_count_t new_size)
1665{
1666	int result;
1667	block_count_t min_new_size;
1668
1669	if (vdo->next_layout.size == new_size) {
1670		/* We are already prepared to grow to the new size, so we're done. */
1671		return VDO_SUCCESS;
1672	}
1673
1674	/* Make a copy completion if there isn't one */
1675	if (vdo->partition_copier == NULL) {
1676		vdo->partition_copier = dm_kcopyd_client_create(NULL);
1677		if (IS_ERR(vdo->partition_copier)) {
1678			result = PTR_ERR(vdo->partition_copier);
1679			vdo->partition_copier = NULL;
1680			return result;
1681		}
1682	}
1683
1684	/* Free any unused preparation. */
1685	vdo_uninitialize_layout(&vdo->next_layout);
1686
1687	/*
1688	 * Make a new layout with the existing partition sizes for everything but the slab depot
1689	 * partition.
1690	 */
1691	result = vdo_initialize_layout(new_size, vdo->layout.start,
1692				       get_partition_size(&vdo->layout,
1693							  VDO_BLOCK_MAP_PARTITION),
1694				       get_partition_size(&vdo->layout,
1695							  VDO_RECOVERY_JOURNAL_PARTITION),
1696				       get_partition_size(&vdo->layout,
1697							  VDO_SLAB_SUMMARY_PARTITION),
1698				       &vdo->next_layout);
1699	if (result != VDO_SUCCESS) {
1700		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
1701		return result;
1702	}
1703
1704	/* Ensure the new journal and summary are entirely within the added blocks. */
1705	min_new_size = (old_size +
1706			get_partition_size(&vdo->next_layout,
1707					   VDO_SLAB_SUMMARY_PARTITION) +
1708			get_partition_size(&vdo->next_layout,
1709					   VDO_RECOVERY_JOURNAL_PARTITION));
1710	if (min_new_size > new_size) {
1711		/* Copying the journal and summary would destroy some old metadata. */
1712		vdo_uninitialize_layout(&vdo->next_layout);
1713		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
1714		return VDO_INCREMENT_TOO_SMALL;
1715	}
1716
1717	return VDO_SUCCESS;
1718}
1719
1720static int prepare_to_grow_physical(struct vdo *vdo, block_count_t new_physical_blocks)
1721{
1722	int result;
1723	block_count_t current_physical_blocks = vdo->states.vdo.config.physical_blocks;
1724
1725	vdo_log_info("Preparing to resize physical to %llu",
1726		     (unsigned long long) new_physical_blocks);
1727	VDO_ASSERT_LOG_ONLY((new_physical_blocks > current_physical_blocks),
1728			    "New physical size is larger than current physical size");
1729	result = perform_admin_operation(vdo, PREPARE_GROW_PHYSICAL_PHASE_START,
1730					 check_may_grow_physical,
1731					 finish_operation_callback,
1732					 "prepare grow-physical");
1733	if (result != VDO_SUCCESS)
1734		return result;
1735
1736	result = grow_layout(vdo, current_physical_blocks, new_physical_blocks);
1737	if (result != VDO_SUCCESS)
1738		return result;
1739
1740	result = vdo_prepare_to_grow_slab_depot(vdo->depot,
1741						vdo_get_known_partition(&vdo->next_layout,
1742									VDO_SLAB_DEPOT_PARTITION));
1743	if (result != VDO_SUCCESS) {
1744		vdo_uninitialize_layout(&vdo->next_layout);
1745		return result;
1746	}
1747
1748	vdo_log_info("Done preparing to resize physical");
1749	return VDO_SUCCESS;
1750}
1751
1752/**
1753 * validate_new_device_config() - Check whether a new device config represents a valid modification
1754 *				  to an existing config.
1755 * @to_validate: The new config to validate.
1756 * @config: The existing config.
1757 * @may_grow: Set to true if growing the logical and physical size of the vdo is currently
1758 *	      permitted.
1759 * @error_ptr: A pointer to hold the reason for any error.
1760 *
1761 * Return: VDO_SUCCESS or an error.
1762 */
1763static int validate_new_device_config(struct device_config *to_validate,
1764				      struct device_config *config, bool may_grow,
1765				      char **error_ptr)
1766{
1767	if (to_validate->owning_target->begin != config->owning_target->begin) {
1768		*error_ptr = "Starting sector cannot change";
1769		return VDO_PARAMETER_MISMATCH;
1770	}
1771
1772	if (to_validate->logical_block_size != config->logical_block_size) {
1773		*error_ptr = "Logical block size cannot change";
1774		return VDO_PARAMETER_MISMATCH;
1775	}
1776
1777	if (to_validate->logical_blocks < config->logical_blocks) {
1778		*error_ptr = "Can't shrink VDO logical size";
1779		return VDO_PARAMETER_MISMATCH;
1780	}
1781
1782	if (to_validate->cache_size != config->cache_size) {
1783		*error_ptr = "Block map cache size cannot change";
1784		return VDO_PARAMETER_MISMATCH;
1785	}
1786
1787	if (to_validate->block_map_maximum_age != config->block_map_maximum_age) {
1788		*error_ptr = "Block map maximum age cannot change";
1789		return VDO_PARAMETER_MISMATCH;
1790	}
1791
1792	if (memcmp(&to_validate->thread_counts, &config->thread_counts,
1793		   sizeof(struct thread_count_config)) != 0) {
1794		*error_ptr = "Thread configuration cannot change";
1795		return VDO_PARAMETER_MISMATCH;
1796	}
1797
1798	if (to_validate->physical_blocks < config->physical_blocks) {
1799		*error_ptr = "Removing physical storage from a VDO is not supported";
1800		return VDO_NOT_IMPLEMENTED;
1801	}
1802
1803	if (!may_grow && (to_validate->physical_blocks > config->physical_blocks)) {
1804		*error_ptr = "VDO physical size may not grow in current state";
1805		return VDO_NOT_IMPLEMENTED;
1806	}
1807
1808	return VDO_SUCCESS;
1809}
1810
1811static int prepare_to_modify(struct dm_target *ti, struct device_config *config,
1812			     struct vdo *vdo)
1813{
1814	int result;
1815	bool may_grow = (vdo_get_admin_state(vdo) != VDO_ADMIN_STATE_PRE_LOADED);
1816
1817	result = validate_new_device_config(config, vdo->device_config, may_grow,
1818					    &ti->error);
1819	if (result != VDO_SUCCESS)
1820		return -EINVAL;
1821
1822	if (config->logical_blocks > vdo->device_config->logical_blocks) {
1823		block_count_t logical_blocks = vdo->states.vdo.config.logical_blocks;
1824
1825		vdo_log_info("Preparing to resize logical to %llu",
1826			     (unsigned long long) config->logical_blocks);
1827		VDO_ASSERT_LOG_ONLY((config->logical_blocks > logical_blocks),
1828				    "New logical size is larger than current size");
1829
1830		result = vdo_prepare_to_grow_block_map(vdo->block_map,
1831						       config->logical_blocks);
1832		if (result != VDO_SUCCESS) {
1833			ti->error = "Device vdo_prepare_to_grow_logical failed";
1834			return result;
1835		}
1836
1837		vdo_log_info("Done preparing to resize logical");
1838	}
1839
1840	if (config->physical_blocks > vdo->device_config->physical_blocks) {
1841		result = prepare_to_grow_physical(vdo, config->physical_blocks);
1842		if (result != VDO_SUCCESS) {
1843			if (result == VDO_PARAMETER_MISMATCH) {
1844				/*
1845				 * If we don't trap this case, vdo_status_to_errno() will remap
1846				 * it to -EIO, which is misleading and ahistorical.
1847				 */
1848				result = -EINVAL;
1849			}
1850
1851			if (result == VDO_TOO_MANY_SLABS)
1852				ti->error = "Device vdo_prepare_to_grow_physical failed (specified physical size too big based on formatted slab size)";
1853			else
1854				ti->error = "Device vdo_prepare_to_grow_physical failed";
1855
1856			return result;
1857		}
1858	}
1859
1860	if (strcmp(config->parent_device_name, vdo->device_config->parent_device_name) != 0) {
1861		const char *device_name = vdo_get_device_name(config->owning_target);
1862
1863		vdo_log_info("Updating backing device of %s from %s to %s", device_name,
1864			     vdo->device_config->parent_device_name,
1865			     config->parent_device_name);
1866	}
1867
1868	return VDO_SUCCESS;
1869}
1870
1871static int update_existing_vdo(const char *device_name, struct dm_target *ti,
1872			       unsigned int argc, char **argv, struct vdo *vdo)
1873{
1874	int result;
1875	struct device_config *config;
1876
1877	result = parse_device_config(argc, argv, ti, &config);
1878	if (result != VDO_SUCCESS)
1879		return -EINVAL;
1880
1881	vdo_log_info("preparing to modify device '%s'", device_name);
1882	result = prepare_to_modify(ti, config, vdo);
1883	if (result != VDO_SUCCESS) {
1884		free_device_config(config);
1885		return vdo_status_to_errno(result);
1886	}
1887
1888	set_device_config(ti, vdo, config);
1889	return VDO_SUCCESS;
1890}
1891
1892static int vdo_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1893{
1894	int result;
1895	struct registered_thread allocating_thread, instance_thread;
1896	const char *device_name;
1897	struct vdo *vdo;
1898
1899	vdo_register_allocating_thread(&allocating_thread, NULL);
1900	device_name = vdo_get_device_name(ti);
1901	vdo = vdo_find_matching(vdo_is_named, device_name);
1902	if (vdo == NULL) {
1903		result = construct_new_vdo(ti, argc, argv);
1904	} else {
1905		vdo_register_thread_device_id(&instance_thread, &vdo->instance);
1906		result = update_existing_vdo(device_name, ti, argc, argv, vdo);
1907		vdo_unregister_thread_device_id();
1908	}
1909
1910	vdo_unregister_allocating_thread();
1911	return result;
1912}
1913
1914static void vdo_dtr(struct dm_target *ti)
1915{
1916	struct device_config *config = ti->private;
1917	struct vdo *vdo = vdo_forget(config->vdo);
1918
1919	list_del_init(&config->config_list);
1920	if (list_empty(&vdo->device_config_list)) {
1921		const char *device_name;
1922
1923		/* This was the last config referencing the VDO. Free it. */
1924		unsigned int instance = vdo->instance;
1925		struct registered_thread allocating_thread, instance_thread;
1926
1927		vdo_register_thread_device_id(&instance_thread, &instance);
1928		vdo_register_allocating_thread(&allocating_thread, NULL);
1929
1930		device_name = vdo_get_device_name(ti);
1931		vdo_log_info("stopping device '%s'", device_name);
1932		if (vdo->dump_on_shutdown)
1933			vdo_dump_all(vdo, "device shutdown");
1934
1935		vdo_destroy(vdo_forget(vdo));
1936		vdo_log_info("device '%s' stopped", device_name);
1937		vdo_unregister_thread_device_id();
1938		vdo_unregister_allocating_thread();
1939		release_instance(instance);
1940	} else if (config == vdo->device_config) {
1941		/*
1942		 * The VDO still references this config. Give it a reference to a config that isn't
1943		 * being destroyed.
1944		 */
1945		vdo->device_config = list_first_entry(&vdo->device_config_list,
1946						      struct device_config, config_list);
1947	}
1948
1949	free_device_config(config);
1950	ti->private = NULL;
1951}
1952
1953static void vdo_presuspend(struct dm_target *ti)
1954{
1955	get_vdo_for_target(ti)->suspend_type =
1956		(dm_noflush_suspending(ti) ? VDO_ADMIN_STATE_SUSPENDING : VDO_ADMIN_STATE_SAVING);
1957}
1958
1959/**
1960 * write_super_block_for_suspend() - Update the VDO state and save the super block.
1961 * @completion: The admin completion
1962 */
1963static void write_super_block_for_suspend(struct vdo_completion *completion)
1964{
1965	struct vdo *vdo = completion->vdo;
1966
1967	switch (vdo_get_state(vdo)) {
1968	case VDO_DIRTY:
1969	case VDO_NEW:
1970		vdo_set_state(vdo, VDO_CLEAN);
1971		break;
1972
1973	case VDO_CLEAN:
1974	case VDO_READ_ONLY_MODE:
1975	case VDO_FORCE_REBUILD:
1976	case VDO_RECOVERING:
1977	case VDO_REBUILD_FOR_UPGRADE:
1978		break;
1979
1980	case VDO_REPLAYING:
1981	default:
1982		vdo_continue_completion(completion, UDS_BAD_STATE);
1983		return;
1984	}
1985
1986	vdo_save_components(vdo, completion);
1987}
1988
1989/**
1990 * suspend_callback() - Callback to initiate a suspend, registered in vdo_postsuspend().
1991 * @completion: The sub-task completion.
1992 */
1993static void suspend_callback(struct vdo_completion *completion)
1994{
1995	struct vdo *vdo = completion->vdo;
1996	struct admin_state *state = &vdo->admin.state;
1997	int result;
1998
1999	assert_admin_phase_thread(vdo, __func__);
2000
2001	switch (advance_phase(vdo)) {
2002	case SUSPEND_PHASE_START:
2003		if (vdo_get_admin_state_code(state)->quiescent) {
2004			/* Already suspended */
2005			break;
2006		}
2007
2008		vdo_continue_completion(completion,
2009					vdo_start_operation(state, vdo->suspend_type));
2010		return;
2011
2012	case SUSPEND_PHASE_PACKER:
2013		/*
2014		 * If the VDO was already resumed from a prior suspend while read-only, some of the
2015		 * components may not have been resumed. By setting a read-only error here, we
2016		 * guarantee that the result of this suspend will be VDO_READ_ONLY and not
2017		 * VDO_INVALID_ADMIN_STATE in that case.
2018		 */
2019		if (vdo_in_read_only_mode(vdo))
2020			vdo_set_completion_result(completion, VDO_READ_ONLY);
2021
2022		vdo_drain_packer(vdo->packer, completion);
2023		return;
2024
2025	case SUSPEND_PHASE_DATA_VIOS:
2026		drain_data_vio_pool(vdo->data_vio_pool, completion);
2027		return;
2028
2029	case SUSPEND_PHASE_DEDUPE:
2030		vdo_drain_hash_zones(vdo->hash_zones, completion);
2031		return;
2032
2033	case SUSPEND_PHASE_FLUSHES:
2034		vdo_drain_flusher(vdo->flusher, completion);
2035		return;
2036
2037	case SUSPEND_PHASE_LOGICAL_ZONES:
2038		/*
2039		 * Attempt to flush all I/O before completing post suspend work. We believe a
2040		 * suspended device is expected to have persisted all data written before the
2041		 * suspend, even if it hasn't been flushed yet.
2042		 */
2043		result = vdo_synchronous_flush(vdo);
2044		if (result != VDO_SUCCESS)
2045			vdo_enter_read_only_mode(vdo, result);
2046
2047		vdo_drain_logical_zones(vdo->logical_zones,
2048					vdo_get_admin_state_code(state), completion);
2049		return;
2050
2051	case SUSPEND_PHASE_BLOCK_MAP:
2052		vdo_drain_block_map(vdo->block_map, vdo_get_admin_state_code(state),
2053				    completion);
2054		return;
2055
2056	case SUSPEND_PHASE_JOURNAL:
2057		vdo_drain_recovery_journal(vdo->recovery_journal,
2058					   vdo_get_admin_state_code(state), completion);
2059		return;
2060
2061	case SUSPEND_PHASE_DEPOT:
2062		vdo_drain_slab_depot(vdo->depot, vdo_get_admin_state_code(state),
2063				     completion);
2064		return;
2065
2066	case SUSPEND_PHASE_READ_ONLY_WAIT:
2067		vdo_wait_until_not_entering_read_only_mode(completion);
2068		return;
2069
2070	case SUSPEND_PHASE_WRITE_SUPER_BLOCK:
2071		if (vdo_is_state_suspending(state) || (completion->result != VDO_SUCCESS)) {
2072			/* If we didn't save the VDO or there was an error, we're done. */
2073			break;
2074		}
2075
2076		write_super_block_for_suspend(completion);
2077		return;
2078
2079	case SUSPEND_PHASE_END:
2080		break;
2081
2082	default:
2083		vdo_set_completion_result(completion, UDS_BAD_STATE);
2084	}
2085
2086	finish_operation_callback(completion);
2087}
2088
2089static void vdo_postsuspend(struct dm_target *ti)
2090{
2091	struct vdo *vdo = get_vdo_for_target(ti);
2092	struct registered_thread instance_thread;
2093	const char *device_name;
2094	int result;
2095
2096	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2097	device_name = vdo_get_device_name(vdo->device_config->owning_target);
2098	vdo_log_info("suspending device '%s'", device_name);
2099
2100	/*
2101	 * It's important to note any error here does not actually stop device-mapper from
2102	 * suspending the device. All this work is done post suspend.
2103	 */
2104	result = perform_admin_operation(vdo, SUSPEND_PHASE_START, suspend_callback,
2105					 suspend_callback, "suspend");
2106
2107	if ((result == VDO_SUCCESS) || (result == VDO_READ_ONLY)) {
2108		/*
2109		 * Treat VDO_READ_ONLY as a success since a read-only suspension still leaves the
2110		 * VDO suspended.
2111		 */
2112		vdo_log_info("device '%s' suspended", device_name);
2113	} else if (result == VDO_INVALID_ADMIN_STATE) {
2114		vdo_log_error("Suspend invoked while in unexpected state: %s",
2115			      vdo_get_admin_state(vdo)->name);
2116	} else {
2117		vdo_log_error_strerror(result, "Suspend of device '%s' failed",
2118				       device_name);
2119	}
2120
2121	vdo_unregister_thread_device_id();
2122}
2123
2124/**
2125 * was_new() - Check whether the vdo was new when it was loaded.
2126 * @vdo: The vdo to query.
2127 *
2128 * Return: true if the vdo was new.
2129 */
2130static bool was_new(const struct vdo *vdo)
2131{
2132	return (vdo->load_state == VDO_NEW);
2133}
2134
2135/**
2136 * requires_repair() - Check whether a vdo requires recovery or rebuild.
2137 * @vdo: The vdo to query.
2138 *
2139 * Return: true if the vdo must be repaired.
2140 */
2141static bool __must_check requires_repair(const struct vdo *vdo)
2142{
2143	switch (vdo_get_state(vdo)) {
2144	case VDO_DIRTY:
2145	case VDO_FORCE_REBUILD:
2146	case VDO_REPLAYING:
2147	case VDO_REBUILD_FOR_UPGRADE:
2148		return true;
2149
2150	default:
2151		return false;
2152	}
2153}
2154
2155/**
2156 * get_load_type() - Determine how the slab depot was loaded.
2157 * @vdo: The vdo.
2158 *
2159 * Return: How the depot was loaded.
2160 */
2161static enum slab_depot_load_type get_load_type(struct vdo *vdo)
2162{
2163	if (vdo_state_requires_read_only_rebuild(vdo->load_state))
2164		return VDO_SLAB_DEPOT_REBUILD_LOAD;
2165
2166	if (vdo_state_requires_recovery(vdo->load_state))
2167		return VDO_SLAB_DEPOT_RECOVERY_LOAD;
2168
2169	return VDO_SLAB_DEPOT_NORMAL_LOAD;
2170}
2171
2172/**
2173 * load_callback() - Callback to do the destructive parts of loading a VDO.
2174 * @completion: The sub-task completion.
2175 */
2176static void load_callback(struct vdo_completion *completion)
2177{
2178	struct vdo *vdo = completion->vdo;
2179	int result;
2180
2181	assert_admin_phase_thread(vdo, __func__);
2182
2183	switch (advance_phase(vdo)) {
2184	case LOAD_PHASE_START:
2185		result = vdo_start_operation(&vdo->admin.state, VDO_ADMIN_STATE_LOADING);
2186		if (result != VDO_SUCCESS) {
2187			vdo_continue_completion(completion, result);
2188			return;
2189		}
2190
2191		/* Prepare the recovery journal for new entries. */
2192		vdo_open_recovery_journal(vdo->recovery_journal, vdo->depot,
2193					  vdo->block_map);
2194		vdo_allow_read_only_mode_entry(completion);
2195		return;
2196
2197	case LOAD_PHASE_LOAD_DEPOT:
2198		vdo_set_dedupe_state_normal(vdo->hash_zones);
2199		if (vdo_is_read_only(vdo)) {
2200			/*
2201			 * In read-only mode we don't use the allocator and it may not even be
2202			 * readable, so don't bother trying to load it.
2203			 */
2204			vdo_set_completion_result(completion, VDO_READ_ONLY);
2205			break;
2206		}
2207
2208		if (requires_repair(vdo)) {
2209			vdo_repair(completion);
2210			return;
2211		}
2212
2213		vdo_load_slab_depot(vdo->depot,
2214				    (was_new(vdo) ? VDO_ADMIN_STATE_FORMATTING :
2215				     VDO_ADMIN_STATE_LOADING),
2216				    completion, NULL);
2217		return;
2218
2219	case LOAD_PHASE_MAKE_DIRTY:
2220		vdo_set_state(vdo, VDO_DIRTY);
2221		vdo_save_components(vdo, completion);
2222		return;
2223
2224	case LOAD_PHASE_PREPARE_TO_ALLOCATE:
2225		vdo_initialize_block_map_from_journal(vdo->block_map,
2226						      vdo->recovery_journal);
2227		vdo_prepare_slab_depot_to_allocate(vdo->depot, get_load_type(vdo),
2228						   completion);
2229		return;
2230
2231	case LOAD_PHASE_SCRUB_SLABS:
2232		if (vdo_state_requires_recovery(vdo->load_state))
2233			vdo_enter_recovery_mode(vdo);
2234
2235		vdo_scrub_all_unrecovered_slabs(vdo->depot, completion);
2236		return;
2237
2238	case LOAD_PHASE_DATA_REDUCTION:
2239		WRITE_ONCE(vdo->compressing, vdo->device_config->compression);
2240		if (vdo->device_config->deduplication) {
2241			/*
2242			 * Don't try to load or rebuild the index first (and log scary error
2243			 * messages) if this is known to be a newly-formatted volume.
2244			 */
2245			vdo_start_dedupe_index(vdo->hash_zones, was_new(vdo));
2246		}
2247
2248		vdo->allocations_allowed = false;
2249		fallthrough;
2250
2251	case LOAD_PHASE_FINISHED:
2252		break;
2253
2254	case LOAD_PHASE_DRAIN_JOURNAL:
2255		vdo_drain_recovery_journal(vdo->recovery_journal, VDO_ADMIN_STATE_SAVING,
2256					   completion);
2257		return;
2258
2259	case LOAD_PHASE_WAIT_FOR_READ_ONLY:
2260		/* Avoid an infinite loop */
2261		completion->error_handler = NULL;
2262		vdo->admin.phase = LOAD_PHASE_FINISHED;
2263		vdo_wait_until_not_entering_read_only_mode(completion);
2264		return;
2265
2266	default:
2267		vdo_set_completion_result(completion, UDS_BAD_STATE);
2268	}
2269
2270	finish_operation_callback(completion);
2271}
2272
2273/**
2274 * handle_load_error() - Handle an error during the load operation.
2275 * @completion: The admin completion.
2276 *
2277 * If at all possible, brings the vdo online in read-only mode. This handler is registered in
2278 * vdo_preresume_registered().
2279 */
2280static void handle_load_error(struct vdo_completion *completion)
2281{
2282	struct vdo *vdo = completion->vdo;
2283
2284	if (vdo_requeue_completion_if_needed(completion,
2285					     vdo->thread_config.admin_thread))
2286		return;
2287
2288	if (vdo_state_requires_read_only_rebuild(vdo->load_state) &&
2289	    (vdo->admin.phase == LOAD_PHASE_MAKE_DIRTY)) {
2290		vdo_log_error_strerror(completion->result, "aborting load");
2291		vdo->admin.phase = LOAD_PHASE_DRAIN_JOURNAL;
2292		load_callback(vdo_forget(completion));
2293		return;
2294	}
2295
2296	vdo_log_error_strerror(completion->result,
2297			       "Entering read-only mode due to load error");
2298	vdo->admin.phase = LOAD_PHASE_WAIT_FOR_READ_ONLY;
2299	vdo_enter_read_only_mode(vdo, completion->result);
2300	completion->result = VDO_READ_ONLY;
2301	load_callback(completion);
2302}
2303
2304/**
2305 * write_super_block_for_resume() - Update the VDO state and save the super block.
2306 * @completion: The admin completion
2307 */
2308static void write_super_block_for_resume(struct vdo_completion *completion)
2309{
2310	struct vdo *vdo = completion->vdo;
2311
2312	switch (vdo_get_state(vdo)) {
2313	case VDO_CLEAN:
2314	case VDO_NEW:
2315		vdo_set_state(vdo, VDO_DIRTY);
2316		vdo_save_components(vdo, completion);
2317		return;
2318
2319	case VDO_DIRTY:
2320	case VDO_READ_ONLY_MODE:
2321	case VDO_FORCE_REBUILD:
2322	case VDO_RECOVERING:
2323	case VDO_REBUILD_FOR_UPGRADE:
2324		/* No need to write the super block in these cases */
2325		vdo_launch_completion(completion);
2326		return;
2327
2328	case VDO_REPLAYING:
2329	default:
2330		vdo_continue_completion(completion, UDS_BAD_STATE);
2331	}
2332}
2333
2334/**
2335 * resume_callback() - Callback to resume a VDO.
2336 * @completion: The admin completion.
2337 */
2338static void resume_callback(struct vdo_completion *completion)
2339{
2340	struct vdo *vdo = completion->vdo;
2341	int result;
2342
2343	assert_admin_phase_thread(vdo, __func__);
2344
2345	switch (advance_phase(vdo)) {
2346	case RESUME_PHASE_START:
2347		result = vdo_start_operation(&vdo->admin.state,
2348					     VDO_ADMIN_STATE_RESUMING);
2349		if (result != VDO_SUCCESS) {
2350			vdo_continue_completion(completion, result);
2351			return;
2352		}
2353
2354		write_super_block_for_resume(completion);
2355		return;
2356
2357	case RESUME_PHASE_ALLOW_READ_ONLY_MODE:
2358		vdo_allow_read_only_mode_entry(completion);
2359		return;
2360
2361	case RESUME_PHASE_DEDUPE:
2362		vdo_resume_hash_zones(vdo->hash_zones, completion);
2363		return;
2364
2365	case RESUME_PHASE_DEPOT:
2366		vdo_resume_slab_depot(vdo->depot, completion);
2367		return;
2368
2369	case RESUME_PHASE_JOURNAL:
2370		vdo_resume_recovery_journal(vdo->recovery_journal, completion);
2371		return;
2372
2373	case RESUME_PHASE_BLOCK_MAP:
2374		vdo_resume_block_map(vdo->block_map, completion);
2375		return;
2376
2377	case RESUME_PHASE_LOGICAL_ZONES:
2378		vdo_resume_logical_zones(vdo->logical_zones, completion);
2379		return;
2380
2381	case RESUME_PHASE_PACKER:
2382	{
2383		bool was_enabled = vdo_get_compressing(vdo);
2384		bool enable = vdo->device_config->compression;
2385
2386		if (enable != was_enabled)
2387			WRITE_ONCE(vdo->compressing, enable);
2388		vdo_log_info("compression is %s", (enable ? "enabled" : "disabled"));
2389
2390		vdo_resume_packer(vdo->packer, completion);
2391		return;
2392	}
2393
2394	case RESUME_PHASE_FLUSHER:
2395		vdo_resume_flusher(vdo->flusher, completion);
2396		return;
2397
2398	case RESUME_PHASE_DATA_VIOS:
2399		resume_data_vio_pool(vdo->data_vio_pool, completion);
2400		return;
2401
2402	case RESUME_PHASE_END:
2403		break;
2404
2405	default:
2406		vdo_set_completion_result(completion, UDS_BAD_STATE);
2407	}
2408
2409	finish_operation_callback(completion);
2410}
2411
2412/**
2413 * grow_logical_callback() - Callback to initiate a grow logical.
2414 * @completion: The admin completion.
2415 *
2416 * Registered in perform_grow_logical().
2417 */
2418static void grow_logical_callback(struct vdo_completion *completion)
2419{
2420	struct vdo *vdo = completion->vdo;
2421	int result;
2422
2423	assert_admin_phase_thread(vdo, __func__);
2424
2425	switch (advance_phase(vdo)) {
2426	case GROW_LOGICAL_PHASE_START:
2427		if (vdo_is_read_only(vdo)) {
2428			vdo_log_error_strerror(VDO_READ_ONLY,
2429					       "Can't grow logical size of a read-only VDO");
2430			vdo_set_completion_result(completion, VDO_READ_ONLY);
2431			break;
2432		}
2433
2434		result = vdo_start_operation(&vdo->admin.state,
2435					     VDO_ADMIN_STATE_SUSPENDED_OPERATION);
2436		if (result != VDO_SUCCESS) {
2437			vdo_continue_completion(completion, result);
2438			return;
2439		}
2440
2441		vdo->states.vdo.config.logical_blocks = vdo->block_map->next_entry_count;
2442		vdo_save_components(vdo, completion);
2443		return;
2444
2445	case GROW_LOGICAL_PHASE_GROW_BLOCK_MAP:
2446		vdo_grow_block_map(vdo->block_map, completion);
2447		return;
2448
2449	case GROW_LOGICAL_PHASE_END:
2450		break;
2451
2452	case GROW_LOGICAL_PHASE_ERROR:
2453		vdo_enter_read_only_mode(vdo, completion->result);
2454		break;
2455
2456	default:
2457		vdo_set_completion_result(completion, UDS_BAD_STATE);
2458	}
2459
2460	finish_operation_callback(completion);
2461}
2462
2463/**
2464 * handle_logical_growth_error() - Handle an error during the grow physical process.
2465 * @completion: The admin completion.
2466 */
2467static void handle_logical_growth_error(struct vdo_completion *completion)
2468{
2469	struct vdo *vdo = completion->vdo;
2470
2471	if (vdo->admin.phase == GROW_LOGICAL_PHASE_GROW_BLOCK_MAP) {
2472		/*
2473		 * We've failed to write the new size in the super block, so set our in memory
2474		 * config back to the old size.
2475		 */
2476		vdo->states.vdo.config.logical_blocks = vdo->block_map->entry_count;
2477		vdo_abandon_block_map_growth(vdo->block_map);
2478	}
2479
2480	vdo->admin.phase = GROW_LOGICAL_PHASE_ERROR;
2481	grow_logical_callback(completion);
2482}
2483
2484/**
2485 * perform_grow_logical() - Grow the logical size of the vdo.
2486 * @vdo: The vdo to grow.
2487 * @new_logical_blocks: The size to which the vdo should be grown.
2488 *
2489 * Context: This method may only be called when the vdo has been suspended and must not be called
2490 * from a base thread.
2491 *
2492 * Return: VDO_SUCCESS or an error.
2493 */
2494static int perform_grow_logical(struct vdo *vdo, block_count_t new_logical_blocks)
2495{
2496	int result;
2497
2498	if (vdo->device_config->logical_blocks == new_logical_blocks) {
2499		/*
2500		 * A table was loaded for which we prepared to grow, but a table without that
2501		 * growth was what we are resuming with.
2502		 */
2503		vdo_abandon_block_map_growth(vdo->block_map);
2504		return VDO_SUCCESS;
2505	}
2506
2507	vdo_log_info("Resizing logical to %llu",
2508		     (unsigned long long) new_logical_blocks);
2509	if (vdo->block_map->next_entry_count != new_logical_blocks)
2510		return VDO_PARAMETER_MISMATCH;
2511
2512	result = perform_admin_operation(vdo, GROW_LOGICAL_PHASE_START,
2513					 grow_logical_callback,
2514					 handle_logical_growth_error, "grow logical");
2515	if (result != VDO_SUCCESS)
2516		return result;
2517
2518	vdo_log_info("Logical blocks now %llu", (unsigned long long) new_logical_blocks);
2519	return VDO_SUCCESS;
2520}
2521
2522static void copy_callback(int read_err, unsigned long write_err, void *context)
2523{
2524	struct vdo_completion *completion = context;
2525	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
2526
2527	vdo_continue_completion(completion, result);
2528}
2529
2530static void partition_to_region(struct partition *partition, struct vdo *vdo,
2531				struct dm_io_region *region)
2532{
2533	physical_block_number_t pbn = partition->offset - vdo->geometry.bio_offset;
2534
2535	*region = (struct dm_io_region) {
2536		.bdev = vdo_get_backing_device(vdo),
2537		.sector = pbn * VDO_SECTORS_PER_BLOCK,
2538		.count = partition->count * VDO_SECTORS_PER_BLOCK,
2539	};
2540}
2541
2542/**
2543 * copy_partition() - Copy a partition from the location specified in the current layout to that in
2544 *                    the next layout.
2545 * @vdo: The vdo preparing to grow.
2546 * @id: The ID of the partition to copy.
2547 * @parent: The completion to notify when the copy is complete.
2548 */
2549static void copy_partition(struct vdo *vdo, enum partition_id id,
2550			   struct vdo_completion *parent)
2551{
2552	struct dm_io_region read_region, write_regions[1];
2553	struct partition *from = vdo_get_known_partition(&vdo->layout, id);
2554	struct partition *to = vdo_get_known_partition(&vdo->next_layout, id);
2555
2556	partition_to_region(from, vdo, &read_region);
2557	partition_to_region(to, vdo, &write_regions[0]);
2558	dm_kcopyd_copy(vdo->partition_copier, &read_region, 1, write_regions, 0,
2559		       copy_callback, parent);
2560}
2561
2562/**
2563 * grow_physical_callback() - Callback to initiate a grow physical.
2564 * @completion: The admin completion.
2565 *
2566 * Registered in perform_grow_physical().
2567 */
2568static void grow_physical_callback(struct vdo_completion *completion)
2569{
2570	struct vdo *vdo = completion->vdo;
2571	int result;
2572
2573	assert_admin_phase_thread(vdo, __func__);
2574
2575	switch (advance_phase(vdo)) {
2576	case GROW_PHYSICAL_PHASE_START:
2577		if (vdo_is_read_only(vdo)) {
2578			vdo_log_error_strerror(VDO_READ_ONLY,
2579					       "Can't grow physical size of a read-only VDO");
2580			vdo_set_completion_result(completion, VDO_READ_ONLY);
2581			break;
2582		}
2583
2584		result = vdo_start_operation(&vdo->admin.state,
2585					     VDO_ADMIN_STATE_SUSPENDED_OPERATION);
2586		if (result != VDO_SUCCESS) {
2587			vdo_continue_completion(completion, result);
2588			return;
2589		}
2590
2591		/* Copy the journal into the new layout. */
2592		copy_partition(vdo, VDO_RECOVERY_JOURNAL_PARTITION, completion);
2593		return;
2594
2595	case GROW_PHYSICAL_PHASE_COPY_SUMMARY:
2596		copy_partition(vdo, VDO_SLAB_SUMMARY_PARTITION, completion);
2597		return;
2598
2599	case GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS:
2600		vdo_uninitialize_layout(&vdo->layout);
2601		vdo->layout = vdo->next_layout;
2602		vdo_forget(vdo->next_layout.head);
2603		vdo->states.vdo.config.physical_blocks = vdo->layout.size;
2604		vdo_update_slab_depot_size(vdo->depot);
2605		vdo_save_components(vdo, completion);
2606		return;
2607
2608	case GROW_PHYSICAL_PHASE_USE_NEW_SLABS:
2609		vdo_use_new_slabs(vdo->depot, completion);
2610		return;
2611
2612	case GROW_PHYSICAL_PHASE_END:
2613		vdo->depot->summary_origin =
2614			vdo_get_known_partition(&vdo->layout,
2615						VDO_SLAB_SUMMARY_PARTITION)->offset;
2616		vdo->recovery_journal->origin =
2617			vdo_get_known_partition(&vdo->layout,
2618						VDO_RECOVERY_JOURNAL_PARTITION)->offset;
2619		break;
2620
2621	case GROW_PHYSICAL_PHASE_ERROR:
2622		vdo_enter_read_only_mode(vdo, completion->result);
2623		break;
2624
2625	default:
2626		vdo_set_completion_result(completion, UDS_BAD_STATE);
2627	}
2628
2629	vdo_uninitialize_layout(&vdo->next_layout);
2630	finish_operation_callback(completion);
2631}
2632
2633/**
2634 * handle_physical_growth_error() - Handle an error during the grow physical process.
2635 * @completion: The sub-task completion.
2636 */
2637static void handle_physical_growth_error(struct vdo_completion *completion)
2638{
2639	completion->vdo->admin.phase = GROW_PHYSICAL_PHASE_ERROR;
2640	grow_physical_callback(completion);
2641}
2642
2643/**
2644 * perform_grow_physical() - Grow the physical size of the vdo.
2645 * @vdo: The vdo to resize.
2646 * @new_physical_blocks: The new physical size in blocks.
2647 *
2648 * Context: This method may only be called when the vdo has been suspended and must not be called
2649 * from a base thread.
2650 *
2651 * Return: VDO_SUCCESS or an error.
2652 */
2653static int perform_grow_physical(struct vdo *vdo, block_count_t new_physical_blocks)
2654{
2655	int result;
2656	block_count_t new_depot_size, prepared_depot_size;
2657	block_count_t old_physical_blocks = vdo->states.vdo.config.physical_blocks;
2658
2659	/* Skip any noop grows. */
2660	if (old_physical_blocks == new_physical_blocks)
2661		return VDO_SUCCESS;
2662
2663	if (new_physical_blocks != vdo->next_layout.size) {
2664		/*
2665		 * Either the VDO isn't prepared to grow, or it was prepared to grow to a different
2666		 * size. Doing this check here relies on the fact that the call to this method is
2667		 * done under the dmsetup message lock.
2668		 */
2669		vdo_uninitialize_layout(&vdo->next_layout);
2670		vdo_abandon_new_slabs(vdo->depot);
2671		return VDO_PARAMETER_MISMATCH;
2672	}
2673
2674	/* Validate that we are prepared to grow appropriately. */
2675	new_depot_size =
2676		vdo_get_known_partition(&vdo->next_layout, VDO_SLAB_DEPOT_PARTITION)->count;
2677	prepared_depot_size = (vdo->depot->new_slabs == NULL) ? 0 : vdo->depot->new_size;
2678	if (prepared_depot_size != new_depot_size)
2679		return VDO_PARAMETER_MISMATCH;
2680
2681	result = perform_admin_operation(vdo, GROW_PHYSICAL_PHASE_START,
2682					 grow_physical_callback,
2683					 handle_physical_growth_error, "grow physical");
2684	if (result != VDO_SUCCESS)
2685		return result;
2686
2687	vdo_log_info("Physical block count was %llu, now %llu",
2688		     (unsigned long long) old_physical_blocks,
2689		     (unsigned long long) new_physical_blocks);
2690	return VDO_SUCCESS;
2691}
2692
2693/**
2694 * apply_new_vdo_configuration() - Attempt to make any configuration changes from the table being
2695 *                                 resumed.
2696 * @vdo: The vdo being resumed.
2697 * @config: The new device configuration derived from the table with which the vdo is being
2698 *          resumed.
2699 *
2700 * Return: VDO_SUCCESS or an error.
2701 */
2702static int __must_check apply_new_vdo_configuration(struct vdo *vdo,
2703						    struct device_config *config)
2704{
2705	int result;
2706
2707	result = perform_grow_logical(vdo, config->logical_blocks);
2708	if (result != VDO_SUCCESS) {
2709		vdo_log_error("grow logical operation failed, result = %d", result);
2710		return result;
2711	}
2712
2713	result = perform_grow_physical(vdo, config->physical_blocks);
2714	if (result != VDO_SUCCESS)
2715		vdo_log_error("resize operation failed, result = %d", result);
2716
2717	return result;
2718}
2719
2720static int vdo_preresume_registered(struct dm_target *ti, struct vdo *vdo)
2721{
2722	struct device_config *config = ti->private;
2723	const char *device_name = vdo_get_device_name(ti);
2724	block_count_t backing_blocks;
2725	int result;
2726
2727	backing_blocks = get_underlying_device_block_count(vdo);
2728	if (backing_blocks < config->physical_blocks) {
2729		/* FIXME: can this still happen? */
2730		vdo_log_error("resume of device '%s' failed: backing device has %llu blocks but VDO physical size is %llu blocks",
2731			      device_name, (unsigned long long) backing_blocks,
2732			      (unsigned long long) config->physical_blocks);
2733		return -EINVAL;
2734	}
2735
2736	if (vdo_get_admin_state(vdo) == VDO_ADMIN_STATE_PRE_LOADED) {
2737		vdo_log_info("starting device '%s'", device_name);
2738		result = perform_admin_operation(vdo, LOAD_PHASE_START, load_callback,
2739						 handle_load_error, "load");
2740		if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
2741			/*
2742			 * Something has gone very wrong. Make sure everything has drained and
2743			 * leave the device in an unresumable state.
2744			 */
2745			vdo_log_error_strerror(result,
2746					       "Start failed, could not load VDO metadata");
2747			vdo->suspend_type = VDO_ADMIN_STATE_STOPPING;
2748			perform_admin_operation(vdo, SUSPEND_PHASE_START,
2749						suspend_callback, suspend_callback,
2750						"suspend");
2751			return result;
2752		}
2753
2754		/* Even if the VDO is read-only, it is now able to handle read requests. */
2755		vdo_log_info("device '%s' started", device_name);
2756	}
2757
2758	vdo_log_info("resuming device '%s'", device_name);
2759
2760	/* If this fails, the VDO was not in a state to be resumed. This should never happen. */
2761	result = apply_new_vdo_configuration(vdo, config);
2762	BUG_ON(result == VDO_INVALID_ADMIN_STATE);
2763
2764	/*
2765	 * Now that we've tried to modify the vdo, the new config *is* the config, whether the
2766	 * modifications worked or not.
2767	 */
2768	vdo->device_config = config;
2769
2770	/*
2771	 * Any error here is highly unexpected and the state of the vdo is questionable, so we mark
2772	 * it read-only in memory. Because we are suspended, the read-only state will not be
2773	 * written to disk.
2774	 */
2775	if (result != VDO_SUCCESS) {
2776		vdo_log_error_strerror(result,
2777				       "Commit of modifications to device '%s' failed",
2778				       device_name);
2779		vdo_enter_read_only_mode(vdo, result);
2780		return result;
2781	}
2782
2783	if (vdo_get_admin_state(vdo)->normal) {
2784		/* The VDO was just started, so we don't need to resume it. */
2785		return VDO_SUCCESS;
2786	}
2787
2788	result = perform_admin_operation(vdo, RESUME_PHASE_START, resume_callback,
2789					 resume_callback, "resume");
2790	BUG_ON(result == VDO_INVALID_ADMIN_STATE);
2791	if (result == VDO_READ_ONLY) {
2792		/* Even if the vdo is read-only, it has still resumed. */
2793		result = VDO_SUCCESS;
2794	}
2795
2796	if (result != VDO_SUCCESS)
2797		vdo_log_error("resume of device '%s' failed with error: %d", device_name,
2798			      result);
2799
2800	return result;
2801}
2802
2803static int vdo_preresume(struct dm_target *ti)
2804{
2805	struct registered_thread instance_thread;
2806	struct vdo *vdo = get_vdo_for_target(ti);
2807	int result;
2808
2809	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2810	result = vdo_preresume_registered(ti, vdo);
2811	if ((result == VDO_PARAMETER_MISMATCH) || (result == VDO_INVALID_ADMIN_STATE))
2812		result = -EINVAL;
2813	vdo_unregister_thread_device_id();
2814	return vdo_status_to_errno(result);
2815}
2816
2817static void vdo_resume(struct dm_target *ti)
2818{
2819	struct registered_thread instance_thread;
2820
2821	vdo_register_thread_device_id(&instance_thread,
2822				      &get_vdo_for_target(ti)->instance);
2823	vdo_log_info("device '%s' resumed", vdo_get_device_name(ti));
2824	vdo_unregister_thread_device_id();
2825}
2826
2827/*
2828 * If anything changes that affects how user tools will interact with vdo, update the version
2829 * number and make sure documentation about the change is complete so tools can properly update
2830 * their management code.
2831 */
2832static struct target_type vdo_target_bio = {
2833	.features = DM_TARGET_SINGLETON,
2834	.name = "vdo",
2835	.version = { 9, 0, 0 },
2836	.module = THIS_MODULE,
2837	.ctr = vdo_ctr,
2838	.dtr = vdo_dtr,
2839	.io_hints = vdo_io_hints,
2840	.iterate_devices = vdo_iterate_devices,
2841	.map = vdo_map_bio,
2842	.message = vdo_message,
2843	.status = vdo_status,
2844	.presuspend = vdo_presuspend,
2845	.postsuspend = vdo_postsuspend,
2846	.preresume = vdo_preresume,
2847	.resume = vdo_resume,
2848};
2849
2850static bool dm_registered;
2851
2852static void vdo_module_destroy(void)
2853{
2854	vdo_log_debug("unloading");
2855
2856	if (dm_registered)
2857		dm_unregister_target(&vdo_target_bio);
2858
2859	VDO_ASSERT_LOG_ONLY(instances.count == 0,
2860			    "should have no instance numbers still in use, but have %u",
2861			    instances.count);
2862	vdo_free(instances.words);
2863	memset(&instances, 0, sizeof(struct instance_tracker));
2864}
2865
2866static int __init vdo_init(void)
2867{
2868	int result = 0;
2869
2870	/* Memory tracking must be initialized first for accurate accounting. */
2871	vdo_memory_init();
2872	vdo_initialize_threads_mutex();
2873	vdo_initialize_thread_device_registry();
2874	vdo_initialize_device_registry_once();
2875
2876	/* Add VDO errors to the set of errors registered by the indexer. */
2877	result = vdo_register_status_codes();
2878	if (result != VDO_SUCCESS) {
2879		vdo_log_error("vdo_register_status_codes failed %d", result);
2880		vdo_module_destroy();
2881		return result;
2882	}
2883
2884	result = dm_register_target(&vdo_target_bio);
2885	if (result < 0) {
2886		vdo_log_error("dm_register_target failed %d", result);
2887		vdo_module_destroy();
2888		return result;
2889	}
2890	dm_registered = true;
2891
2892	return result;
2893}
2894
2895static void __exit vdo_exit(void)
2896{
2897	vdo_module_destroy();
2898	/* Memory tracking cleanup must be done last. */
2899	vdo_memory_exit();
2900}
2901
2902module_init(vdo_init);
2903module_exit(vdo_exit);
2904
2905module_param_named(log_level, vdo_log_level, uint, 0644);
2906MODULE_PARM_DESC(log_level, "Log level for log messages");
2907
2908MODULE_DESCRIPTION(DM_NAME " target for transparent deduplication");
2909MODULE_AUTHOR("Red Hat, Inc.");
2910MODULE_LICENSE("GPL");
2911