1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6/*
7 * This file contains the main entry points for normal operations on a vdo as well as functions for
8 * constructing and destroying vdo instances (in memory).
9 */
10
11/**
12 * DOC:
13 *
14 * A read_only_notifier has a single completion which is used to perform read-only notifications,
15 * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected
16 * by a spinlock, are used to control the read-only mode entry process. The first field holds the
17 * read-only error. The second is the state field, which may hold any of the four special values
18 * enumerated here.
19 *
20 * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field
21 * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already
22 * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in
23 * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is
24 * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then
25 * notifications are currently disallowed, generally due to the vdo being suspended. In this case,
26 * the nothing more will be done until the vdo is resumed, at which point the notification will be
27 * performed. In any other case, the vdo is already read-only, and there is nothing more to do.
28 */
29
30#include "vdo.h"
31
32#include <linux/completion.h>
33#include <linux/device-mapper.h>
34#include <linux/kernel.h>
35#include <linux/lz4.h>
36#include <linux/module.h>
37#include <linux/mutex.h>
38#include <linux/spinlock.h>
39#include <linux/types.h>
40
41#include "logger.h"
42#include "memory-alloc.h"
43#include "permassert.h"
44#include "string-utils.h"
45
46#include "block-map.h"
47#include "completion.h"
48#include "data-vio.h"
49#include "dedupe.h"
50#include "encodings.h"
51#include "funnel-workqueue.h"
52#include "io-submitter.h"
53#include "logical-zone.h"
54#include "packer.h"
55#include "physical-zone.h"
56#include "recovery-journal.h"
57#include "slab-depot.h"
58#include "statistics.h"
59#include "status-codes.h"
60#include "vio.h"
61
62#define PARANOID_THREAD_CONSISTENCY_CHECKS 0
63
64struct sync_completion {
65	struct vdo_completion vdo_completion;
66	struct completion completion;
67};
68
69/* A linked list is adequate for the small number of entries we expect. */
70struct device_registry {
71	struct list_head links;
72	/* TODO: Convert to rcu per kernel recommendation. */
73	rwlock_t lock;
74};
75
76static struct device_registry registry;
77
78/**
79 * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device
80 *                                         registry.
81 */
82void vdo_initialize_device_registry_once(void)
83{
84	INIT_LIST_HEAD(&registry.links);
85	rwlock_init(&registry.lock);
86}
87
88/** vdo_is_equal() - Implements vdo_filter_fn. */
89static bool vdo_is_equal(struct vdo *vdo, const void *context)
90{
91	return (vdo == context);
92}
93
94/**
95 * filter_vdos_locked() - Find a vdo in the registry if it exists there.
96 * @filter: The filter function to apply to devices.
97 * @context: A bit of context to provide the filter.
98 *
99 * Context: Must be called holding the lock.
100 *
101 * Return: the vdo object found, if any.
102 */
103static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter,
104						    const void *context)
105{
106	struct vdo *vdo;
107
108	list_for_each_entry(vdo, &registry.links, registration) {
109		if (filter(vdo, context))
110			return vdo;
111	}
112
113	return NULL;
114}
115
116/**
117 * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function.
118 * @filter: The filter function to apply to vdos.
119 * @context: A bit of context to provide the filter.
120 */
121struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context)
122{
123	struct vdo *vdo;
124
125	read_lock(&registry.lock);
126	vdo = filter_vdos_locked(filter, context);
127	read_unlock(&registry.lock);
128
129	return vdo;
130}
131
132static void start_vdo_request_queue(void *ptr)
133{
134	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
135
136	vdo_register_allocating_thread(&thread->allocating_thread,
137				       &thread->vdo->allocations_allowed);
138}
139
140static void finish_vdo_request_queue(void *ptr)
141{
142	vdo_unregister_allocating_thread();
143}
144
145#ifdef MODULE
146#define MODULE_NAME THIS_MODULE->name
147#else
148#define MODULE_NAME "dm-vdo"
149#endif  /* MODULE */
150
151static const struct vdo_work_queue_type default_queue_type = {
152	.start = start_vdo_request_queue,
153	.finish = finish_vdo_request_queue,
154	.max_priority = VDO_DEFAULT_Q_MAX_PRIORITY,
155	.default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY,
156};
157
158static const struct vdo_work_queue_type bio_ack_q_type = {
159	.start = NULL,
160	.finish = NULL,
161	.max_priority = BIO_ACK_Q_MAX_PRIORITY,
162	.default_priority = BIO_ACK_Q_ACK_PRIORITY,
163};
164
165static const struct vdo_work_queue_type cpu_q_type = {
166	.start = NULL,
167	.finish = NULL,
168	.max_priority = CPU_Q_MAX_PRIORITY,
169	.default_priority = CPU_Q_MAX_PRIORITY,
170};
171
172static void uninitialize_thread_config(struct thread_config *config)
173{
174	vdo_free(vdo_forget(config->logical_threads));
175	vdo_free(vdo_forget(config->physical_threads));
176	vdo_free(vdo_forget(config->hash_zone_threads));
177	vdo_free(vdo_forget(config->bio_threads));
178	memset(config, 0, sizeof(struct thread_config));
179}
180
181static void assign_thread_ids(struct thread_config *config,
182			      thread_id_t thread_ids[], zone_count_t count)
183{
184	zone_count_t zone;
185
186	for (zone = 0; zone < count; zone++)
187		thread_ids[zone] = config->thread_count++;
188}
189
190/**
191 * initialize_thread_config() - Initialize the thread mapping
192 *
193 * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all
194 * three plus the packer and recovery journal. Otherwise, there must be at least one of each type,
195 * and each will have its own thread, as will the packer and recovery journal.
196 *
197 * Return: VDO_SUCCESS or an error.
198 */
199static int __must_check initialize_thread_config(struct thread_count_config counts,
200						 struct thread_config *config)
201{
202	int result;
203	bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0);
204
205	config->bio_thread_count = counts.bio_threads;
206	if (single) {
207		config->logical_zone_count = 1;
208		config->physical_zone_count = 1;
209		config->hash_zone_count = 1;
210	} else {
211		config->logical_zone_count = counts.logical_zones;
212		config->physical_zone_count = counts.physical_zones;
213		config->hash_zone_count = counts.hash_zones;
214	}
215
216	result = vdo_allocate(config->logical_zone_count, thread_id_t,
217			      "logical thread array", &config->logical_threads);
218	if (result != VDO_SUCCESS) {
219		uninitialize_thread_config(config);
220		return result;
221	}
222
223	result = vdo_allocate(config->physical_zone_count, thread_id_t,
224			      "physical thread array", &config->physical_threads);
225	if (result != VDO_SUCCESS) {
226		uninitialize_thread_config(config);
227		return result;
228	}
229
230	result = vdo_allocate(config->hash_zone_count, thread_id_t,
231			      "hash thread array", &config->hash_zone_threads);
232	if (result != VDO_SUCCESS) {
233		uninitialize_thread_config(config);
234		return result;
235	}
236
237	result = vdo_allocate(config->bio_thread_count, thread_id_t,
238			      "bio thread array", &config->bio_threads);
239	if (result != VDO_SUCCESS) {
240		uninitialize_thread_config(config);
241		return result;
242	}
243
244	if (single) {
245		config->logical_threads[0] = config->thread_count;
246		config->physical_threads[0] = config->thread_count;
247		config->hash_zone_threads[0] = config->thread_count++;
248	} else {
249		config->admin_thread = config->thread_count;
250		config->journal_thread = config->thread_count++;
251		config->packer_thread = config->thread_count++;
252		assign_thread_ids(config, config->logical_threads, counts.logical_zones);
253		assign_thread_ids(config, config->physical_threads, counts.physical_zones);
254		assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones);
255	}
256
257	config->dedupe_thread = config->thread_count++;
258	config->bio_ack_thread =
259		((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID);
260	config->cpu_thread = config->thread_count++;
261	assign_thread_ids(config, config->bio_threads, counts.bio_threads);
262	return VDO_SUCCESS;
263}
264
265/**
266 * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block
267 *                         device.
268 * @vdo: The vdo whose geometry is to be read.
269 *
270 * Return: VDO_SUCCESS or an error code.
271 */
272static int __must_check read_geometry_block(struct vdo *vdo)
273{
274	struct vio *vio;
275	char *block;
276	int result;
277
278	result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block);
279	if (result != VDO_SUCCESS)
280		return result;
281
282	result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL,
283				     block, &vio);
284	if (result != VDO_SUCCESS) {
285		vdo_free(block);
286		return result;
287	}
288
289	/*
290	 * This is only safe because, having not already loaded the geometry, the vdo's geometry's
291	 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from
292	 * the supplied pbn is not a problem.
293	 */
294	result = vio_reset_bio(vio, block, NULL, REQ_OP_READ,
295			       VDO_GEOMETRY_BLOCK_LOCATION);
296	if (result != VDO_SUCCESS) {
297		free_vio(vdo_forget(vio));
298		vdo_free(block);
299		return result;
300	}
301
302	bio_set_dev(vio->bio, vdo_get_backing_device(vdo));
303	submit_bio_wait(vio->bio);
304	result = blk_status_to_errno(vio->bio->bi_status);
305	free_vio(vdo_forget(vio));
306	if (result != 0) {
307		vdo_log_error_strerror(result, "synchronous read failed");
308		vdo_free(block);
309		return -EIO;
310	}
311
312	result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry);
313	vdo_free(block);
314	return result;
315}
316
317static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count,
318				 thread_id_t id, const char *prefix,
319				 char *buffer, size_t buffer_length)
320{
321	if (id >= thread_ids[0]) {
322		thread_id_t index = id - thread_ids[0];
323
324		if (index < count) {
325			snprintf(buffer, buffer_length, "%s%d", prefix, index);
326			return true;
327		}
328	}
329
330	return false;
331}
332
333/**
334 * get_thread_name() - Format the name of the worker thread desired to support a given work queue.
335 * @thread_config: The thread configuration.
336 * @thread_id: The thread id.
337 * @buffer: Where to put the formatted name.
338 * @buffer_length: Size of the output buffer.
339 *
340 * The physical layer may add a prefix identifying the product; the output from this function
341 * should just identify the thread.
342 */
343static void get_thread_name(const struct thread_config *thread_config,
344			    thread_id_t thread_id, char *buffer, size_t buffer_length)
345{
346	if (thread_id == thread_config->journal_thread) {
347		if (thread_config->packer_thread == thread_id) {
348			/*
349			 * This is the "single thread" config where one thread is used for the
350			 * journal, packer, logical, physical, and hash zones. In that case, it is
351			 * known as the "request queue."
352			 */
353			snprintf(buffer, buffer_length, "reqQ");
354			return;
355		}
356
357		snprintf(buffer, buffer_length, "journalQ");
358		return;
359	} else if (thread_id == thread_config->admin_thread) {
360		/* Theoretically this could be different from the journal thread. */
361		snprintf(buffer, buffer_length, "adminQ");
362		return;
363	} else if (thread_id == thread_config->packer_thread) {
364		snprintf(buffer, buffer_length, "packerQ");
365		return;
366	} else if (thread_id == thread_config->dedupe_thread) {
367		snprintf(buffer, buffer_length, "dedupeQ");
368		return;
369	} else if (thread_id == thread_config->bio_ack_thread) {
370		snprintf(buffer, buffer_length, "ackQ");
371		return;
372	} else if (thread_id == thread_config->cpu_thread) {
373		snprintf(buffer, buffer_length, "cpuQ");
374		return;
375	}
376
377	if (get_zone_thread_name(thread_config->logical_threads,
378				 thread_config->logical_zone_count,
379				 thread_id, "logQ", buffer, buffer_length))
380		return;
381
382	if (get_zone_thread_name(thread_config->physical_threads,
383				 thread_config->physical_zone_count,
384				 thread_id, "physQ", buffer, buffer_length))
385		return;
386
387	if (get_zone_thread_name(thread_config->hash_zone_threads,
388				 thread_config->hash_zone_count,
389				 thread_id, "hashQ", buffer, buffer_length))
390		return;
391
392	if (get_zone_thread_name(thread_config->bio_threads,
393				 thread_config->bio_thread_count,
394				 thread_id, "bioQ", buffer, buffer_length))
395		return;
396
397	/* Some sort of misconfiguration? */
398	snprintf(buffer, buffer_length, "reqQ%d", thread_id);
399}
400
401/**
402 * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for
403 *                     round-robin queues).
404 * @vdo: The vdo which owns the thread.
405 * @thread_id: The id of the thread to create (as determined by the thread_config).
406 * @type: The description of the work queue for this thread.
407 * @queue_count: The number of actual threads/queues contained in the "thread".
408 * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL.
409 *
410 * Each "thread" constructed by this method is represented by a unique thread id in the thread
411 * config, and completions can be enqueued to the queue and run on the threads comprising this
412 * entity.
413 *
414 * Return: VDO_SUCCESS or an error.
415 */
416int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id,
417		    const struct vdo_work_queue_type *type,
418		    unsigned int queue_count, void *contexts[])
419{
420	struct vdo_thread *thread = &vdo->threads[thread_id];
421	char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN];
422
423	if (type == NULL)
424		type = &default_queue_type;
425
426	if (thread->queue != NULL) {
427		return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type),
428				  "already constructed vdo thread %u is of the correct type",
429				  thread_id);
430	}
431
432	thread->vdo = vdo;
433	thread->thread_id = thread_id;
434	get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name));
435	return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread,
436				   type, queue_count, contexts, &thread->queue);
437}
438
439/**
440 * register_vdo() - Register a VDO; it must not already be registered.
441 * @vdo: The vdo to register.
442 *
443 * Return: VDO_SUCCESS or an error.
444 */
445static int register_vdo(struct vdo *vdo)
446{
447	int result;
448
449	write_lock(&registry.lock);
450	result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL,
451			    "VDO not already registered");
452	if (result == VDO_SUCCESS) {
453		INIT_LIST_HEAD(&vdo->registration);
454		list_add_tail(&vdo->registration, &registry.links);
455	}
456	write_unlock(&registry.lock);
457
458	return result;
459}
460
461/**
462 * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on
463 *                    error.
464 * @vdo: The vdo being initialized
465 * @config: The configuration of the vdo
466 * @instance: The instance number of the vdo
467 * @reason: The buffer to hold the failure reason on error
468 */
469static int initialize_vdo(struct vdo *vdo, struct device_config *config,
470			  unsigned int instance, char **reason)
471{
472	int result;
473	zone_count_t i;
474
475	vdo->device_config = config;
476	vdo->starting_sector_offset = config->owning_target->begin;
477	vdo->instance = instance;
478	vdo->allocations_allowed = true;
479	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW);
480	INIT_LIST_HEAD(&vdo->device_config_list);
481	vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION);
482	init_completion(&vdo->admin.callback_sync);
483	mutex_init(&vdo->stats_mutex);
484	result = read_geometry_block(vdo);
485	if (result != VDO_SUCCESS) {
486		*reason = "Could not load geometry block";
487		return result;
488	}
489
490	result = initialize_thread_config(config->thread_counts, &vdo->thread_config);
491	if (result != VDO_SUCCESS) {
492		*reason = "Cannot create thread configuration";
493		return result;
494	}
495
496	vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d",
497		     config->thread_counts.logical_zones,
498		     config->thread_counts.physical_zones,
499		     config->thread_counts.hash_zones, vdo->thread_config.thread_count);
500
501	/* Compression context storage */
502	result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context",
503			      &vdo->compression_context);
504	if (result != VDO_SUCCESS) {
505		*reason = "cannot allocate LZ4 context";
506		return result;
507	}
508
509	for (i = 0; i < config->thread_counts.cpu_threads; i++) {
510		result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context",
511				      &vdo->compression_context[i]);
512		if (result != VDO_SUCCESS) {
513			*reason = "cannot allocate LZ4 context";
514			return result;
515		}
516	}
517
518	result = register_vdo(vdo);
519	if (result != VDO_SUCCESS) {
520		*reason = "Cannot add VDO to device registry";
521		return result;
522	}
523
524	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED);
525	return result;
526}
527
528/**
529 * vdo_make() - Allocate and initialize a vdo.
530 * @instance: Device instantiation counter.
531 * @config: The device configuration.
532 * @reason: The reason for any failure during this call.
533 * @vdo_ptr: A pointer to hold the created vdo.
534 *
535 * Return: VDO_SUCCESS or an error.
536 */
537int vdo_make(unsigned int instance, struct device_config *config, char **reason,
538	     struct vdo **vdo_ptr)
539{
540	int result;
541	struct vdo *vdo;
542
543	/* Initialize with a generic failure reason to prevent returning garbage. */
544	*reason = "Unspecified error";
545
546	result = vdo_allocate(1, struct vdo, __func__, &vdo);
547	if (result != VDO_SUCCESS) {
548		*reason = "Cannot allocate VDO";
549		return result;
550	}
551
552	result = initialize_vdo(vdo, config, instance, reason);
553	if (result != VDO_SUCCESS) {
554		vdo_destroy(vdo);
555		return result;
556	}
557
558	/* From here on, the caller will clean up if there is an error. */
559	*vdo_ptr = vdo;
560
561	snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix),
562		 "%s%u", MODULE_NAME, instance);
563	BUG_ON(vdo->thread_name_prefix[0] == '\0');
564	result = vdo_allocate(vdo->thread_config.thread_count,
565			      struct vdo_thread, __func__, &vdo->threads);
566	if (result != VDO_SUCCESS) {
567		*reason = "Cannot allocate thread structures";
568		return result;
569	}
570
571	result = vdo_make_thread(vdo, vdo->thread_config.admin_thread,
572				 &default_queue_type, 1, NULL);
573	if (result != VDO_SUCCESS) {
574		*reason = "Cannot make admin thread";
575		return result;
576	}
577
578	result = vdo_make_flusher(vdo);
579	if (result != VDO_SUCCESS) {
580		*reason = "Cannot make flusher zones";
581		return result;
582	}
583
584	result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer);
585	if (result != VDO_SUCCESS) {
586		*reason = "Cannot make packer zones";
587		return result;
588	}
589
590	BUG_ON(vdo->device_config->logical_block_size <= 0);
591	BUG_ON(vdo->device_config->owned_device == NULL);
592	result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS,
593				    MAXIMUM_VDO_USER_VIOS * 3 / 4,
594				    &vdo->data_vio_pool);
595	if (result != VDO_SUCCESS) {
596		*reason = "Cannot allocate data_vio pool";
597		return result;
598	}
599
600	result = vdo_make_io_submitter(config->thread_counts.bio_threads,
601				       config->thread_counts.bio_rotation_interval,
602				       get_data_vio_pool_request_limit(vdo->data_vio_pool),
603				       vdo, &vdo->io_submitter);
604	if (result != VDO_SUCCESS) {
605		*reason = "bio submission initialization failed";
606		return result;
607	}
608
609	if (vdo_uses_bio_ack_queue(vdo)) {
610		result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread,
611					 &bio_ack_q_type,
612					 config->thread_counts.bio_ack_threads, NULL);
613		if (result != VDO_SUCCESS) {
614			*reason = "bio ack queue initialization failed";
615			return result;
616		}
617	}
618
619	result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type,
620				 config->thread_counts.cpu_threads,
621				 (void **) vdo->compression_context);
622	if (result != VDO_SUCCESS) {
623		*reason = "CPU queue initialization failed";
624		return result;
625	}
626
627	return VDO_SUCCESS;
628}
629
630static void finish_vdo(struct vdo *vdo)
631{
632	int i;
633
634	if (vdo->threads == NULL)
635		return;
636
637	vdo_cleanup_io_submitter(vdo->io_submitter);
638	vdo_finish_dedupe_index(vdo->hash_zones);
639
640	for (i = 0; i < vdo->thread_config.thread_count; i++)
641		vdo_finish_work_queue(vdo->threads[i].queue);
642}
643
644/**
645 * free_listeners() - Free the list of read-only listeners associated with a thread.
646 * @thread_data: The thread holding the list to free.
647 */
648static void free_listeners(struct vdo_thread *thread)
649{
650	struct read_only_listener *listener, *next;
651
652	for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) {
653		next = vdo_forget(listener->next);
654		vdo_free(listener);
655	}
656}
657
658static void uninitialize_super_block(struct vdo_super_block *super_block)
659{
660	free_vio_components(&super_block->vio);
661	vdo_free(super_block->buffer);
662}
663
664/**
665 * unregister_vdo() - Remove a vdo from the device registry.
666 * @vdo: The vdo to remove.
667 */
668static void unregister_vdo(struct vdo *vdo)
669{
670	write_lock(&registry.lock);
671	if (filter_vdos_locked(vdo_is_equal, vdo) == vdo)
672		list_del_init(&vdo->registration);
673
674	write_unlock(&registry.lock);
675}
676
677/**
678 * vdo_destroy() - Destroy a vdo instance.
679 * @vdo: The vdo to destroy (may be NULL).
680 */
681void vdo_destroy(struct vdo *vdo)
682{
683	unsigned int i;
684
685	if (vdo == NULL)
686		return;
687
688	/* A running VDO should never be destroyed without suspending first. */
689	BUG_ON(vdo_get_admin_state(vdo)->normal);
690
691	vdo->allocations_allowed = true;
692
693	finish_vdo(vdo);
694	unregister_vdo(vdo);
695	free_data_vio_pool(vdo->data_vio_pool);
696	vdo_free_io_submitter(vdo_forget(vdo->io_submitter));
697	vdo_free_flusher(vdo_forget(vdo->flusher));
698	vdo_free_packer(vdo_forget(vdo->packer));
699	vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal));
700	vdo_free_slab_depot(vdo_forget(vdo->depot));
701	vdo_uninitialize_layout(&vdo->layout);
702	vdo_uninitialize_layout(&vdo->next_layout);
703	if (vdo->partition_copier)
704		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
705	uninitialize_super_block(&vdo->super_block);
706	vdo_free_block_map(vdo_forget(vdo->block_map));
707	vdo_free_hash_zones(vdo_forget(vdo->hash_zones));
708	vdo_free_physical_zones(vdo_forget(vdo->physical_zones));
709	vdo_free_logical_zones(vdo_forget(vdo->logical_zones));
710
711	if (vdo->threads != NULL) {
712		for (i = 0; i < vdo->thread_config.thread_count; i++) {
713			free_listeners(&vdo->threads[i]);
714			vdo_free_work_queue(vdo_forget(vdo->threads[i].queue));
715		}
716		vdo_free(vdo_forget(vdo->threads));
717	}
718
719	uninitialize_thread_config(&vdo->thread_config);
720
721	if (vdo->compression_context != NULL) {
722		for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++)
723			vdo_free(vdo_forget(vdo->compression_context[i]));
724
725		vdo_free(vdo_forget(vdo->compression_context));
726	}
727	vdo_free(vdo);
728}
729
730static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block)
731{
732	int result;
733
734	result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block",
735			      (char **) &vdo->super_block.buffer);
736	if (result != VDO_SUCCESS)
737		return result;
738
739	return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK,
740				       VIO_PRIORITY_METADATA, NULL, 1,
741				       (char *) super_block->buffer,
742				       &vdo->super_block.vio);
743}
744
745/**
746 * finish_reading_super_block() - Continue after loading the super block.
747 * @completion: The super block vio.
748 *
749 * This callback is registered in vdo_load_super_block().
750 */
751static void finish_reading_super_block(struct vdo_completion *completion)
752{
753	struct vdo_super_block *super_block =
754		container_of(as_vio(completion), struct vdo_super_block, vio);
755
756	vdo_continue_completion(vdo_forget(completion->parent),
757				vdo_decode_super_block(super_block->buffer));
758}
759
760/**
761 * handle_super_block_read_error() - Handle an error reading the super block.
762 * @completion: The super block vio.
763 *
764 * This error handler is registered in vdo_load_super_block().
765 */
766static void handle_super_block_read_error(struct vdo_completion *completion)
767{
768	vio_record_metadata_io_error(as_vio(completion));
769	finish_reading_super_block(completion);
770}
771
772static void read_super_block_endio(struct bio *bio)
773{
774	struct vio *vio = bio->bi_private;
775	struct vdo_completion *parent = vio->completion.parent;
776
777	continue_vio_after_io(vio, finish_reading_super_block,
778			      parent->callback_thread_id);
779}
780
781/**
782 * vdo_load_super_block() - Allocate a super block and read its contents from storage.
783 * @vdo: The vdo containing the super block on disk.
784 * @parent: The completion to notify after loading the super block.
785 */
786void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent)
787{
788	int result;
789
790	result = initialize_super_block(vdo, &vdo->super_block);
791	if (result != VDO_SUCCESS) {
792		vdo_continue_completion(parent, result);
793		return;
794	}
795
796	vdo->super_block.vio.completion.parent = parent;
797	vdo_submit_metadata_vio(&vdo->super_block.vio,
798				vdo_get_data_region_start(vdo->geometry),
799				read_super_block_endio,
800				handle_super_block_read_error,
801				REQ_OP_READ);
802}
803
804/**
805 * vdo_get_backing_device() - Get the block device object underlying a vdo.
806 * @vdo: The vdo.
807 *
808 * Return: The vdo's current block device.
809 */
810struct block_device *vdo_get_backing_device(const struct vdo *vdo)
811{
812	return vdo->device_config->owned_device->bdev;
813}
814
815/**
816 * vdo_get_device_name() - Get the device name associated with the vdo target.
817 * @target: The target device interface.
818 *
819 * Return: The block device name.
820 */
821const char *vdo_get_device_name(const struct dm_target *target)
822{
823	return dm_device_name(dm_table_get_md(target->table));
824}
825
826/**
827 * vdo_synchronous_flush() - Issue a flush request and wait for it to complete.
828 * @vdo: The vdo.
829 *
830 * Return: VDO_SUCCESS or an error.
831 */
832int vdo_synchronous_flush(struct vdo *vdo)
833{
834	int result;
835	struct bio bio;
836
837	bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0,
838		 REQ_OP_WRITE | REQ_PREFLUSH);
839	submit_bio_wait(&bio);
840	result = blk_status_to_errno(bio.bi_status);
841
842	atomic64_inc(&vdo->stats.flush_out);
843	if (result != 0) {
844		vdo_log_error_strerror(result, "synchronous flush failed");
845		result = -EIO;
846	}
847
848	bio_uninit(&bio);
849	return result;
850}
851
852/**
853 * vdo_get_state() - Get the current state of the vdo.
854 * @vdo: The vdo.
855
856 * Context: This method may be called from any thread.
857 *
858 * Return: The current state of the vdo.
859 */
860enum vdo_state vdo_get_state(const struct vdo *vdo)
861{
862	enum vdo_state state = atomic_read(&vdo->state);
863
864	/* pairs with barriers where state field is changed */
865	smp_rmb();
866	return state;
867}
868
869/**
870 * vdo_set_state() - Set the current state of the vdo.
871 * @vdo: The vdo whose state is to be set.
872 * @state: The new state of the vdo.
873 *
874 * Context: This method may be called from any thread.
875 */
876void vdo_set_state(struct vdo *vdo, enum vdo_state state)
877{
878	/* pairs with barrier in vdo_get_state */
879	smp_wmb();
880	atomic_set(&vdo->state, state);
881}
882
883/**
884 * vdo_get_admin_state() - Get the admin state of the vdo.
885 * @vdo: The vdo.
886 *
887 * Return: The code for the vdo's current admin state.
888 */
889const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo)
890{
891	return vdo_get_admin_state_code(&vdo->admin.state);
892}
893
894/**
895 * record_vdo() - Record the state of the VDO for encoding in the super block.
896 */
897static void record_vdo(struct vdo *vdo)
898{
899	/* This is for backwards compatibility. */
900	vdo->states.unused = vdo->geometry.unused;
901	vdo->states.vdo.state = vdo_get_state(vdo);
902	vdo->states.block_map = vdo_record_block_map(vdo->block_map);
903	vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal);
904	vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot);
905	vdo->states.layout = vdo->layout;
906}
907
908/**
909 * continue_super_block_parent() - Continue the parent of a super block save operation.
910 * @completion: The super block vio.
911 *
912 * This callback is registered in vdo_save_components().
913 */
914static void continue_super_block_parent(struct vdo_completion *completion)
915{
916	vdo_continue_completion(vdo_forget(completion->parent), completion->result);
917}
918
919/**
920 * handle_save_error() - Log a super block save error.
921 * @completion: The super block vio.
922 *
923 * This error handler is registered in vdo_save_components().
924 */
925static void handle_save_error(struct vdo_completion *completion)
926{
927	struct vdo_super_block *super_block =
928		container_of(as_vio(completion), struct vdo_super_block, vio);
929
930	vio_record_metadata_io_error(&super_block->vio);
931	vdo_log_error_strerror(completion->result, "super block save failed");
932	/*
933	 * Mark the super block as unwritable so that we won't attempt to write it again. This
934	 * avoids the case where a growth attempt fails writing the super block with the new size,
935	 * but the subsequent attempt to write out the read-only state succeeds. In this case,
936	 * writes which happened just before the suspend would not be visible if the VDO is
937	 * restarted without rebuilding, but, after a read-only rebuild, the effects of those
938	 * writes would reappear.
939	 */
940	super_block->unwritable = true;
941	completion->callback(completion);
942}
943
944static void super_block_write_endio(struct bio *bio)
945{
946	struct vio *vio = bio->bi_private;
947	struct vdo_completion *parent = vio->completion.parent;
948
949	continue_vio_after_io(vio, continue_super_block_parent,
950			      parent->callback_thread_id);
951}
952
953/**
954 * vdo_save_components() - Encode the vdo and save the super block asynchronously.
955 * @vdo: The vdo whose state is being saved.
956 * @parent: The completion to notify when the save is complete.
957 */
958void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent)
959{
960	struct vdo_super_block *super_block = &vdo->super_block;
961
962	if (super_block->unwritable) {
963		vdo_continue_completion(parent, VDO_READ_ONLY);
964		return;
965	}
966
967	if (super_block->vio.completion.parent != NULL) {
968		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
969		return;
970	}
971
972	record_vdo(vdo);
973
974	vdo_encode_super_block(super_block->buffer, &vdo->states);
975	super_block->vio.completion.parent = parent;
976	super_block->vio.completion.callback_thread_id = parent->callback_thread_id;
977	vdo_submit_metadata_vio(&super_block->vio,
978				vdo_get_data_region_start(vdo->geometry),
979				super_block_write_endio, handle_save_error,
980				REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
981}
982
983/**
984 * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes
985 *                                     read-only.
986 * @vdo: The vdo to register with.
987 * @listener: The object to notify.
988 * @notification: The function to call to send the notification.
989 * @thread_id: The id of the thread on which to send the notification.
990 *
991 * Return: VDO_SUCCESS or an error.
992 */
993int vdo_register_read_only_listener(struct vdo *vdo, void *listener,
994				    vdo_read_only_notification_fn notification,
995				    thread_id_t thread_id)
996{
997	struct vdo_thread *thread = &vdo->threads[thread_id];
998	struct read_only_listener *read_only_listener;
999	int result;
1000
1001	result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread,
1002			    "read only listener not registered on dedupe thread");
1003	if (result != VDO_SUCCESS)
1004		return result;
1005
1006	result = vdo_allocate(1, struct read_only_listener, __func__,
1007			      &read_only_listener);
1008	if (result != VDO_SUCCESS)
1009		return result;
1010
1011	*read_only_listener = (struct read_only_listener) {
1012		.listener = listener,
1013		.notify = notification,
1014		.next = thread->listeners,
1015	};
1016
1017	thread->listeners = read_only_listener;
1018	return VDO_SUCCESS;
1019}
1020
1021/**
1022 * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only.
1023 * @listener: The vdo.
1024 * @parent: The completion to notify in order to acknowledge the notification.
1025 *
1026 * This will save the read-only state to the super block.
1027 *
1028 * Implements vdo_read_only_notification_fn.
1029 */
1030static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent)
1031{
1032	struct vdo *vdo = listener;
1033
1034	if (vdo_in_read_only_mode(vdo))
1035		vdo_finish_completion(parent);
1036
1037	vdo_set_state(vdo, VDO_READ_ONLY_MODE);
1038	vdo_save_components(vdo, parent);
1039}
1040
1041/**
1042 * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors.
1043 * @vdo: The vdo to enable.
1044 *
1045 * Return: VDO_SUCCESS or an error.
1046 */
1047int vdo_enable_read_only_entry(struct vdo *vdo)
1048{
1049	thread_id_t id;
1050	bool is_read_only = vdo_in_read_only_mode(vdo);
1051	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1052
1053	if (is_read_only) {
1054		notifier->read_only_error = VDO_READ_ONLY;
1055		notifier->state = NOTIFIED;
1056	} else {
1057		notifier->state = MAY_NOT_NOTIFY;
1058	}
1059
1060	spin_lock_init(&notifier->lock);
1061	vdo_initialize_completion(&notifier->completion, vdo,
1062				  VDO_READ_ONLY_MODE_COMPLETION);
1063
1064	for (id = 0; id < vdo->thread_config.thread_count; id++)
1065		vdo->threads[id].is_read_only = is_read_only;
1066
1067	return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode,
1068					       vdo->thread_config.admin_thread);
1069}
1070
1071/**
1072 * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in
1073 *                                                progress and prevent any subsequent
1074 *                                                notifications.
1075 * @parent: The completion to notify when no threads are entering read-only mode.
1076 *
1077 * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry().
1078 */
1079void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent)
1080{
1081	struct vdo *vdo = parent->vdo;
1082	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1083
1084	vdo_assert_on_admin_thread(vdo, __func__);
1085
1086	if (notifier->waiter != NULL) {
1087		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1088		return;
1089	}
1090
1091	spin_lock(&notifier->lock);
1092	if (notifier->state == NOTIFYING)
1093		notifier->waiter = parent;
1094	else if (notifier->state == MAY_NOTIFY)
1095		notifier->state = MAY_NOT_NOTIFY;
1096	spin_unlock(&notifier->lock);
1097
1098	if (notifier->waiter == NULL) {
1099		/*
1100		 * A notification was not in progress, and now they are
1101		 * disallowed.
1102		 */
1103		vdo_launch_completion(parent);
1104		return;
1105	}
1106}
1107
1108/**
1109 * as_notifier() - Convert a generic vdo_completion to a read_only_notifier.
1110 * @completion: The completion to convert.
1111 *
1112 * Return: The completion as a read_only_notifier.
1113 */
1114static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion)
1115{
1116	vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION);
1117	return container_of(completion, struct read_only_notifier, completion);
1118}
1119
1120/**
1121 * finish_entering_read_only_mode() - Complete the process of entering read only mode.
1122 * @completion: The read-only mode completion.
1123 */
1124static void finish_entering_read_only_mode(struct vdo_completion *completion)
1125{
1126	struct read_only_notifier *notifier = as_notifier(completion);
1127
1128	vdo_assert_on_admin_thread(completion->vdo, __func__);
1129
1130	spin_lock(&notifier->lock);
1131	notifier->state = NOTIFIED;
1132	spin_unlock(&notifier->lock);
1133
1134	if (notifier->waiter != NULL)
1135		vdo_continue_completion(vdo_forget(notifier->waiter),
1136					completion->result);
1137}
1138
1139/**
1140 * make_thread_read_only() - Inform each thread that the VDO is in read-only mode.
1141 * @completion: The read-only mode completion.
1142 */
1143static void make_thread_read_only(struct vdo_completion *completion)
1144{
1145	struct vdo *vdo = completion->vdo;
1146	thread_id_t thread_id = completion->callback_thread_id;
1147	struct read_only_notifier *notifier = as_notifier(completion);
1148	struct read_only_listener *listener = completion->parent;
1149
1150	if (listener == NULL) {
1151		/* This is the first call on this thread */
1152		struct vdo_thread *thread = &vdo->threads[thread_id];
1153
1154		thread->is_read_only = true;
1155		listener = thread->listeners;
1156		if (thread_id == 0)
1157			vdo_log_error_strerror(READ_ONCE(notifier->read_only_error),
1158					       "Unrecoverable error, entering read-only mode");
1159	} else {
1160		/* We've just finished notifying a listener */
1161		listener = listener->next;
1162	}
1163
1164	if (listener != NULL) {
1165		/* We have a listener to notify */
1166		vdo_prepare_completion(completion, make_thread_read_only,
1167				       make_thread_read_only, thread_id,
1168				       listener);
1169		listener->notify(listener->listener, completion);
1170		return;
1171	}
1172
1173	/* We're done with this thread */
1174	if (++thread_id == vdo->thread_config.dedupe_thread) {
1175		/*
1176		 * We don't want to notify the dedupe thread since it may be
1177		 * blocked rebuilding the index.
1178		 */
1179		thread_id++;
1180	}
1181
1182	if (thread_id >= vdo->thread_config.thread_count) {
1183		/* There are no more threads */
1184		vdo_prepare_completion(completion, finish_entering_read_only_mode,
1185				       finish_entering_read_only_mode,
1186				       vdo->thread_config.admin_thread, NULL);
1187	} else {
1188		vdo_prepare_completion(completion, make_thread_read_only,
1189				       make_thread_read_only, thread_id, NULL);
1190	}
1191
1192	vdo_launch_completion(completion);
1193}
1194
1195/**
1196 * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode,
1197 *                                    reversing the effects of
1198 *                                    vdo_wait_until_not_entering_read_only_mode().
1199 * @parent: The object to notify once the operation is complete.
1200 *
1201 * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it
1202 * will be done when this method is called. If that happens, the parent will not be notified until
1203 * the vdo has actually entered read-only mode and attempted to save the super block.
1204 *
1205 * Context: This method may only be called from the admin thread.
1206 */
1207void vdo_allow_read_only_mode_entry(struct vdo_completion *parent)
1208{
1209	struct vdo *vdo = parent->vdo;
1210	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1211
1212	vdo_assert_on_admin_thread(vdo, __func__);
1213
1214	if (notifier->waiter != NULL) {
1215		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1216		return;
1217	}
1218
1219	spin_lock(&notifier->lock);
1220	if (notifier->state == MAY_NOT_NOTIFY) {
1221		if (notifier->read_only_error == VDO_SUCCESS) {
1222			notifier->state = MAY_NOTIFY;
1223		} else {
1224			notifier->state = NOTIFYING;
1225			notifier->waiter = parent;
1226		}
1227	}
1228	spin_unlock(&notifier->lock);
1229
1230	if (notifier->waiter == NULL) {
1231		/* We're done */
1232		vdo_launch_completion(parent);
1233		return;
1234	}
1235
1236	/* Do the pending notification. */
1237	make_thread_read_only(&notifier->completion);
1238}
1239
1240/**
1241 * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the
1242 *                              super block.
1243 * @vdo: The vdo.
1244 * @error_code: The error which caused the VDO to enter read-only mode.
1245 *
1246 * This method is a no-op if the VDO is already read-only.
1247 */
1248void vdo_enter_read_only_mode(struct vdo *vdo, int error_code)
1249{
1250	bool notify = false;
1251	thread_id_t thread_id = vdo_get_callback_thread_id();
1252	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1253	struct vdo_thread *thread;
1254
1255	if (thread_id != VDO_INVALID_THREAD_ID) {
1256		thread = &vdo->threads[thread_id];
1257		if (thread->is_read_only) {
1258			/* This thread has already gone read-only. */
1259			return;
1260		}
1261
1262		/* Record for this thread that the VDO is read-only. */
1263		thread->is_read_only = true;
1264	}
1265
1266	spin_lock(&notifier->lock);
1267	if (notifier->read_only_error == VDO_SUCCESS) {
1268		WRITE_ONCE(notifier->read_only_error, error_code);
1269		if (notifier->state == MAY_NOTIFY) {
1270			notifier->state = NOTIFYING;
1271			notify = true;
1272		}
1273	}
1274	spin_unlock(&notifier->lock);
1275
1276	if (!notify) {
1277		/* The notifier is already aware of a read-only error */
1278		return;
1279	}
1280
1281	/* Initiate a notification starting on the lowest numbered thread. */
1282	vdo_launch_completion_callback(&notifier->completion, make_thread_read_only, 0);
1283}
1284
1285/**
1286 * vdo_is_read_only() - Check whether the VDO is read-only.
1287 * @vdo: The vdo.
1288 *
1289 * Return: true if the vdo is read-only.
1290 *
1291 * This method may be called from any thread, as opposed to examining the VDO's state field which
1292 * is only safe to check from the admin thread.
1293 */
1294bool vdo_is_read_only(struct vdo *vdo)
1295{
1296	return vdo->threads[vdo_get_callback_thread_id()].is_read_only;
1297}
1298
1299/**
1300 * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode.
1301 * @vdo: The vdo to query.
1302 *
1303 * Return: true if the vdo is in read-only mode.
1304 */
1305bool vdo_in_read_only_mode(const struct vdo *vdo)
1306{
1307	return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE);
1308}
1309
1310/**
1311 * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode.
1312 * @vdo: The vdo to query.
1313 *
1314 * Return: true if the vdo is in recovery mode.
1315 */
1316bool vdo_in_recovery_mode(const struct vdo *vdo)
1317{
1318	return (vdo_get_state(vdo) == VDO_RECOVERING);
1319}
1320
1321/**
1322 * vdo_enter_recovery_mode() - Put the vdo into recovery mode.
1323 * @vdo: The vdo.
1324 */
1325void vdo_enter_recovery_mode(struct vdo *vdo)
1326{
1327	vdo_assert_on_admin_thread(vdo, __func__);
1328
1329	if (vdo_in_read_only_mode(vdo))
1330		return;
1331
1332	vdo_log_info("Entering recovery mode");
1333	vdo_set_state(vdo, VDO_RECOVERING);
1334}
1335
1336/**
1337 * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete.
1338 * @completion: The sync completion.
1339 */
1340static void complete_synchronous_action(struct vdo_completion *completion)
1341{
1342	vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION);
1343	complete(&(container_of(completion, struct sync_completion,
1344				vdo_completion)->completion));
1345}
1346
1347/**
1348 * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete.
1349 * @vdo: The vdo.
1350 * @action: The callback to launch.
1351 * @thread_id: The thread on which to run the action.
1352 * @parent: The parent of the sync completion (may be NULL).
1353 */
1354static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action,
1355				      thread_id_t thread_id, void *parent)
1356{
1357	struct sync_completion sync;
1358
1359	vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION);
1360	init_completion(&sync.completion);
1361	sync.vdo_completion.parent = parent;
1362	vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id);
1363	wait_for_completion(&sync.completion);
1364	return sync.vdo_completion.result;
1365}
1366
1367/**
1368 * set_compression_callback() - Callback to turn compression on or off.
1369 * @completion: The completion.
1370 */
1371static void set_compression_callback(struct vdo_completion *completion)
1372{
1373	struct vdo *vdo = completion->vdo;
1374	bool *enable = completion->parent;
1375	bool was_enabled = vdo_get_compressing(vdo);
1376
1377	if (*enable != was_enabled) {
1378		WRITE_ONCE(vdo->compressing, *enable);
1379		if (was_enabled) {
1380			/* Signal the packer to flush since compression has been disabled. */
1381			vdo_flush_packer(vdo->packer);
1382		}
1383	}
1384
1385	vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled"));
1386	*enable = was_enabled;
1387	complete_synchronous_action(completion);
1388}
1389
1390/**
1391 * vdo_set_compressing() - Turn compression on or off.
1392 * @vdo: The vdo.
1393 * @enable: Whether to enable or disable compression.
1394 *
1395 * Return: Whether compression was previously on or off.
1396 */
1397bool vdo_set_compressing(struct vdo *vdo, bool enable)
1398{
1399	perform_synchronous_action(vdo, set_compression_callback,
1400				   vdo->thread_config.packer_thread,
1401				   &enable);
1402	return enable;
1403}
1404
1405/**
1406 * vdo_get_compressing() - Get whether compression is enabled in a vdo.
1407 * @vdo: The vdo.
1408 *
1409 * Return: State of compression.
1410 */
1411bool vdo_get_compressing(struct vdo *vdo)
1412{
1413	return READ_ONCE(vdo->compressing);
1414}
1415
1416static size_t get_block_map_cache_size(const struct vdo *vdo)
1417{
1418	return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE;
1419}
1420
1421static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo)
1422{
1423	/*
1424	 * The error counts can be incremented from arbitrary threads and so must be incremented
1425	 * atomically, but they are just statistics with no semantics that could rely on memory
1426	 * order, so unfenced reads are sufficient.
1427	 */
1428	const struct atomic_statistics *atoms = &vdo->stats;
1429
1430	return (struct error_statistics) {
1431		.invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count),
1432		.no_space_error_count = atomic64_read(&atoms->no_space_error_count),
1433		.read_only_error_count = atomic64_read(&atoms->read_only_error_count),
1434	};
1435}
1436
1437static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a)
1438{
1439	b->read = atomic64_read(&a->read);
1440	b->write = atomic64_read(&a->write);
1441	b->discard = atomic64_read(&a->discard);
1442	b->flush = atomic64_read(&a->flush);
1443	b->empty_flush = atomic64_read(&a->empty_flush);
1444	b->fua = atomic64_read(&a->fua);
1445}
1446
1447static struct bio_stats subtract_bio_stats(struct bio_stats minuend,
1448					   struct bio_stats subtrahend)
1449{
1450	return (struct bio_stats) {
1451		.read = minuend.read - subtrahend.read,
1452		.write = minuend.write - subtrahend.write,
1453		.discard = minuend.discard - subtrahend.discard,
1454		.flush = minuend.flush - subtrahend.flush,
1455		.empty_flush = minuend.empty_flush - subtrahend.empty_flush,
1456		.fua = minuend.fua - subtrahend.fua,
1457	};
1458}
1459
1460/**
1461 * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data.
1462 * @vdo: The vdo.
1463 *
1464 * Return: The number of blocks allocated for user data.
1465 */
1466static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo)
1467{
1468	return (vdo_get_slab_depot_allocated_blocks(vdo->depot) -
1469		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1470}
1471
1472/**
1473 * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata.
1474 * @vdo: The vdo.
1475 *
1476 * Return: The number of overhead blocks.
1477 */
1478static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo)
1479{
1480	/*
1481	 * config.physical_blocks is mutated during resize and is in a packed structure,
1482	 * but resize runs on admin thread.
1483	 * TODO: Verify that this is always safe.
1484	 */
1485	return (vdo->states.vdo.config.physical_blocks -
1486		vdo_get_slab_depot_data_blocks(vdo->depot) +
1487		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1488}
1489
1490static const char *vdo_describe_state(enum vdo_state state)
1491{
1492	/* These strings should all fit in the 15 chars of VDOStatistics.mode. */
1493	switch (state) {
1494	case VDO_RECOVERING:
1495		return "recovering";
1496
1497	case VDO_READ_ONLY_MODE:
1498		return "read-only";
1499
1500	default:
1501		return "normal";
1502	}
1503}
1504
1505/**
1506 * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread.
1507 * @vdo: The vdo.
1508 * @stats: The statistics structure to populate.
1509 */
1510static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats)
1511{
1512	struct recovery_journal *journal = vdo->recovery_journal;
1513	enum vdo_state state = vdo_get_state(vdo);
1514
1515	vdo_assert_on_admin_thread(vdo, __func__);
1516
1517	/* start with a clean slate */
1518	memset(stats, 0, sizeof(struct vdo_statistics));
1519
1520	/*
1521	 * These are immutable properties of the vdo object, so it is safe to query them from any
1522	 * thread.
1523	 */
1524	stats->version = STATISTICS_VERSION;
1525	stats->logical_blocks = vdo->states.vdo.config.logical_blocks;
1526	/*
1527	 * config.physical_blocks is mutated during resize and is in a packed structure, but resize
1528	 * runs on the admin thread.
1529	 * TODO: verify that this is always safe
1530	 */
1531	stats->physical_blocks = vdo->states.vdo.config.physical_blocks;
1532	stats->block_size = VDO_BLOCK_SIZE;
1533	stats->complete_recoveries = vdo->states.vdo.complete_recoveries;
1534	stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries;
1535	stats->block_map_cache_size = get_block_map_cache_size(vdo);
1536
1537	/* The callees are responsible for thread-safety. */
1538	stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo);
1539	stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo);
1540	stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal);
1541	vdo_get_slab_depot_statistics(vdo->depot, stats);
1542	stats->journal = vdo_get_recovery_journal_statistics(journal);
1543	stats->packer = vdo_get_packer_statistics(vdo->packer);
1544	stats->block_map = vdo_get_block_map_statistics(vdo->block_map);
1545	vdo_get_dedupe_statistics(vdo->hash_zones, stats);
1546	stats->errors = get_vdo_error_statistics(vdo);
1547	stats->in_recovery_mode = (state == VDO_RECOVERING);
1548	snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state));
1549
1550	stats->instance = vdo->instance;
1551	stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool);
1552	stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool);
1553
1554	stats->flush_out = atomic64_read(&vdo->stats.flush_out);
1555	stats->logical_block_size = vdo->device_config->logical_block_size;
1556	copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in);
1557	copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial);
1558	copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out);
1559	copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta);
1560	copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal);
1561	copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache);
1562	copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed);
1563	copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed);
1564	copy_bio_stat(&stats->bios_journal_completed,
1565		      &vdo->stats.bios_journal_completed);
1566	copy_bio_stat(&stats->bios_page_cache_completed,
1567		      &vdo->stats.bios_page_cache_completed);
1568	copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged);
1569	copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial);
1570	stats->bios_in_progress =
1571		subtract_bio_stats(stats->bios_in, stats->bios_acknowledged);
1572	vdo_get_memory_stats(&stats->memory_usage.bytes_used,
1573			     &stats->memory_usage.peak_bytes_used);
1574}
1575
1576/**
1577 * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics
1578 *                                   structure on the admin thread.
1579 * @completion: The completion.
1580 *
1581 * This callback is registered in vdo_fetch_statistics().
1582 */
1583static void vdo_fetch_statistics_callback(struct vdo_completion *completion)
1584{
1585	get_vdo_statistics(completion->vdo, completion->parent);
1586	complete_synchronous_action(completion);
1587}
1588
1589/**
1590 * vdo_fetch_statistics() - Fetch statistics on the correct thread.
1591 * @vdo: The vdo.
1592 * @stats: The vdo statistics are returned here.
1593 */
1594void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats)
1595{
1596	perform_synchronous_action(vdo, vdo_fetch_statistics_callback,
1597				   vdo->thread_config.admin_thread, stats);
1598}
1599
1600/**
1601 * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is
1602 *                                currently running.
1603 *
1604 * Return: The current thread ID, or -1 if no such thread.
1605 */
1606thread_id_t vdo_get_callback_thread_id(void)
1607{
1608	struct vdo_work_queue *queue = vdo_get_current_work_queue();
1609	struct vdo_thread *thread;
1610	thread_id_t thread_id;
1611
1612	if (queue == NULL)
1613		return VDO_INVALID_THREAD_ID;
1614
1615	thread = vdo_get_work_queue_owner(queue);
1616	thread_id = thread->thread_id;
1617
1618	if (PARANOID_THREAD_CONSISTENCY_CHECKS) {
1619		BUG_ON(thread_id >= thread->vdo->thread_config.thread_count);
1620		BUG_ON(thread != &thread->vdo->threads[thread_id]);
1621	}
1622
1623	return thread_id;
1624}
1625
1626/**
1627 * vdo_dump_status() - Dump status information about a vdo to the log for debugging.
1628 * @vdo: The vdo to dump.
1629 */
1630void vdo_dump_status(const struct vdo *vdo)
1631{
1632	zone_count_t zone;
1633
1634	vdo_dump_flusher(vdo->flusher);
1635	vdo_dump_recovery_journal_statistics(vdo->recovery_journal);
1636	vdo_dump_packer(vdo->packer);
1637	vdo_dump_slab_depot(vdo->depot);
1638
1639	for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++)
1640		vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]);
1641
1642	for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++)
1643		vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]);
1644
1645	vdo_dump_hash_zones(vdo->hash_zones);
1646}
1647
1648/**
1649 * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread.
1650 * @vdo: The vdo.
1651 * @name: The name of the function which should be running on the admin thread (for logging).
1652 */
1653void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name)
1654{
1655	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread),
1656			    "%s called on admin thread", name);
1657}
1658
1659/**
1660 * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified
1661 *                                       logical zone thread.
1662 * @vdo: The vdo.
1663 * @logical_zone: The number of the logical zone.
1664 * @name: The name of the calling function.
1665 */
1666void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone,
1667				       const char *name)
1668{
1669	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1670			     vdo->thread_config.logical_threads[logical_zone]),
1671			    "%s called on logical thread", name);
1672}
1673
1674/**
1675 * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified
1676 *                                        physical zone thread.
1677 * @vdo: The vdo.
1678 * @physical_zone: The number of the physical zone.
1679 * @name: The name of the calling function.
1680 */
1681void vdo_assert_on_physical_zone_thread(const struct vdo *vdo,
1682					zone_count_t physical_zone, const char *name)
1683{
1684	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1685			     vdo->thread_config.physical_threads[physical_zone]),
1686			    "%s called on physical thread", name);
1687}
1688
1689/**
1690 * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number.
1691 * @vdo: The vdo containing the physical zones.
1692 * @pbn: The PBN of the data block.
1693 * @zone_ptr: A pointer to return the physical zone.
1694 *
1695 * Gets the physical zone responsible for a given physical block number of a data block in this vdo
1696 * instance, or of the zero block (for which a NULL zone is returned). For any other block number
1697 * that is not in the range of valid data block numbers in any slab, an error will be returned.
1698 * This function is safe to call on invalid block numbers; it will not put the vdo into read-only
1699 * mode.
1700 *
1701 * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any
1702 *         other failure.
1703 */
1704int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn,
1705			  struct physical_zone **zone_ptr)
1706{
1707	struct vdo_slab *slab;
1708	int result;
1709
1710	if (pbn == VDO_ZERO_BLOCK) {
1711		*zone_ptr = NULL;
1712		return VDO_SUCCESS;
1713	}
1714
1715	/*
1716	 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first
1717	 * because it won't trigger read-only mode on an invalid PBN.
1718	 */
1719	if (!vdo_is_physical_data_block(vdo->depot, pbn))
1720		return VDO_OUT_OF_RANGE;
1721
1722	/* With the PBN already checked, we should always succeed in finding a slab. */
1723	slab = vdo_get_slab(vdo->depot, pbn);
1724	result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs");
1725	if (result != VDO_SUCCESS)
1726		return result;
1727
1728	*zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number];
1729	return VDO_SUCCESS;
1730}
1731