154359Sroberto// SPDX-License-Identifier: GPL-2.0-only
2182007Sroberto/*
3282408Scy * Copyright 2023 Red Hat
4182007Sroberto */
554359Sroberto
654359Sroberto#include "repair.h"
754359Sroberto
8285169Scy#include <linux/min_heap.h>
9282408Scy#include <linux/minmax.h>
1054359Sroberto
11182007Sroberto#include "logger.h"
12182007Sroberto#include "memory-alloc.h"
13182007Sroberto#include "permassert.h"
14182007Sroberto
15182007Sroberto#include "block-map.h"
16182007Sroberto#include "completion.h"
17182007Sroberto#include "constants.h"
18182007Sroberto#include "encodings.h"
19182007Sroberto#include "int-map.h"
20182007Sroberto#include "io-submitter.h"
21182007Sroberto#include "recovery-journal.h"
22182007Sroberto#include "slab-depot.h"
23182007Sroberto#include "types.h"
24182007Sroberto#include "vdo.h"
25182007Sroberto#include "wait-queue.h"
26182007Sroberto
27182007Sroberto/*
28182007Sroberto * An explicitly numbered block mapping. Numbering the mappings allows them to be sorted by logical
29182007Sroberto * block number during repair while still preserving the relative order of journal entries with
30182007Sroberto * the same logical block number.
31182007Sroberto */
32182007Srobertostruct numbered_block_mapping {
33182007Sroberto	struct block_map_slot block_map_slot;
34182007Sroberto	struct block_map_entry block_map_entry;
3554359Sroberto	/* A serial number to use during replay */
3654359Sroberto	u32 number;
3754359Sroberto} __packed;
3854359Sroberto
3954359Sroberto/*
4054359Sroberto * The absolute position of an entry in the recovery journal, including the sector number and the
4154359Sroberto * entry number within the sector.
4254359Sroberto */
4354359Srobertostruct recovery_point {
4454359Sroberto	/* Block sequence number */
4554359Sroberto	sequence_number_t sequence_number;
4654359Sroberto	/* Sector number */
4754359Sroberto	u8 sector_count;
4854359Sroberto	/* Entry number */
4954359Sroberto	journal_entry_count_t entry_count;
5054359Sroberto	/* Whether or not the increment portion of the current entry has been applied */
5154359Sroberto	bool increment_applied;
5254359Sroberto};
5354359Sroberto
5454359Srobertostruct repair_completion {
5554359Sroberto	/* The completion header */
5654359Sroberto	struct vdo_completion completion;
5754359Sroberto
5854359Sroberto	/* A buffer to hold the data read off disk */
5954359Sroberto	char *journal_data;
6054359Sroberto
6154359Sroberto	/* For loading the journal */
6254359Sroberto	data_vio_count_t vio_count;
6354359Sroberto	data_vio_count_t vios_complete;
6454359Sroberto	struct vio *vios;
6554359Sroberto
6654359Sroberto	/* The number of entries to be applied to the block map */
6754359Sroberto	size_t block_map_entry_count;
68282408Scy	/* The sequence number of the first valid block for block map recovery */
6954359Sroberto	sequence_number_t block_map_head;
7054359Sroberto	/* The sequence number of the first valid block for slab journal replay */
7154359Sroberto	sequence_number_t slab_journal_head;
7254359Sroberto	/* The sequence number of the last valid block of the journal (if known) */
7354359Sroberto	sequence_number_t tail;
7454359Sroberto	/*
7554359Sroberto	 * The highest sequence number of the journal. During recovery (vs read-only rebuild), not
7654359Sroberto	 * the same as the tail, since the tail ignores blocks after the first hole.
7754359Sroberto	 */
7854359Sroberto	sequence_number_t highest_tail;
7954359Sroberto
8054359Sroberto	/* The number of logical blocks currently known to be in use */
8154359Sroberto	block_count_t logical_blocks_used;
8254359Sroberto	/* The number of block map data blocks known to be allocated */
8354359Sroberto	block_count_t block_map_data_blocks;
8454359Sroberto
8554359Sroberto	/* These fields are for playing the journal into the block map */
8654359Sroberto	/* The entry data for the block map recovery */
8754359Sroberto	struct numbered_block_mapping *entries;
8854359Sroberto	/* The number of entries in the entry array */
8954359Sroberto	size_t entry_count;
9054359Sroberto	/* number of pending (non-ready) requests*/
9154359Sroberto	page_count_t outstanding;
9254359Sroberto	/* number of page completions */
9354359Sroberto	page_count_t page_count;
9454359Sroberto	bool launching;
9554359Sroberto	/*
9654359Sroberto	 * a heap wrapping journal_entries. It re-orders and sorts journal entries in ascending LBN
9754359Sroberto	 * order, then original journal order. This permits efficient iteration over the journal
9854359Sroberto	 * entries in order.
9954359Sroberto	 */
10054359Sroberto	struct min_heap replay_heap;
10154359Sroberto	/* Fields tracking progress through the journal entries. */
10254359Sroberto	struct numbered_block_mapping *current_entry;
10354359Sroberto	struct numbered_block_mapping *current_unfetched_entry;
10454359Sroberto	/* Current requested page's PBN */
10554359Sroberto	physical_block_number_t pbn;
10654359Sroberto
10754359Sroberto	/* These fields are only used during recovery. */
108282408Scy	/* A location just beyond the last valid entry of the journal */
109282408Scy	struct recovery_point tail_recovery_point;
11054359Sroberto	/* The location of the next recovery journal entry to apply */
111282408Scy	struct recovery_point next_recovery_point;
11254359Sroberto	/* The journal point to give to the next synthesized decref */
113282408Scy	struct journal_point next_journal_point;
11454359Sroberto	/* The number of entries played into slab journals */
115282408Scy	size_t entries_added_to_slab_journals;
11654359Sroberto
11754359Sroberto	/* These fields are only used during read-only rebuild */
11854359Sroberto	page_count_t page_to_fetch;
11954359Sroberto	/* the number of leaf pages in the block map */
12054359Sroberto	page_count_t leaf_pages;
12154359Sroberto	/* the last slot of the block map */
12254359Sroberto	struct block_map_slot last_slot;
123282408Scy
12454359Sroberto	/*
12554359Sroberto	 * The page completions used for playing the journal into the block map, and, during
12654359Sroberto	 * read-only rebuild, for rebuilding the reference counts from the block map.
12754359Sroberto	 */
12854359Sroberto	struct vdo_page_completion page_completions[];
12954359Sroberto};
13054359Sroberto
13154359Sroberto/*
13254359Sroberto * This is a min_heap callback function that orders numbered_block_mappings using the
13354359Sroberto * 'block_map_slot' field as the primary key and the mapping 'number' field as the secondary key.
13454359Sroberto * Using the mapping number preserves the journal order of entries for the same slot, allowing us
13554359Sroberto * to sort by slot while still ensuring we replay all entries with the same slot in the exact order
136282408Scy * as they appeared in the journal.
137282408Scy */
138282408Scystatic bool mapping_is_less_than(const void *item1, const void *item2)
139282408Scy{
14054359Sroberto	const struct numbered_block_mapping *mapping1 =
14154359Sroberto		(const struct numbered_block_mapping *) item1;
14254359Sroberto	const struct numbered_block_mapping *mapping2 =
14354359Sroberto		(const struct numbered_block_mapping *) item2;
14454359Sroberto
14554359Sroberto	if (mapping1->block_map_slot.pbn != mapping2->block_map_slot.pbn)
14654359Sroberto		return mapping1->block_map_slot.pbn < mapping2->block_map_slot.pbn;
14754359Sroberto
14854359Sroberto	if (mapping1->block_map_slot.slot != mapping2->block_map_slot.slot)
14954359Sroberto		return mapping1->block_map_slot.slot < mapping2->block_map_slot.slot;
15054359Sroberto
15154359Sroberto	if (mapping1->number != mapping2->number)
15254359Sroberto		return mapping1->number < mapping2->number;
15354359Sroberto
15454359Sroberto	return 0;
15554359Sroberto}
15654359Sroberto
157282408Scystatic void swap_mappings(void *item1, void *item2)
15854359Sroberto{
15954359Sroberto	struct numbered_block_mapping *mapping1 = item1;
16054359Sroberto	struct numbered_block_mapping *mapping2 = item2;
16154359Sroberto
16254359Sroberto	swap(*mapping1, *mapping2);
163282408Scy}
164280849Scy
165282408Scystatic const struct min_heap_callbacks repair_min_heap = {
16654359Sroberto	.elem_size = sizeof(struct numbered_block_mapping),
16754359Sroberto	.less = mapping_is_less_than,
168282408Scy	.swp = swap_mappings,
169282408Scy};
170282408Scy
17154359Srobertostatic struct numbered_block_mapping *sort_next_heap_element(struct repair_completion *repair)
17254359Sroberto{
17354359Sroberto	struct min_heap *heap = &repair->replay_heap;
17454359Sroberto	struct numbered_block_mapping *last;
17554359Sroberto
17654359Sroberto	if (heap->nr == 0)
17754359Sroberto		return NULL;
17854359Sroberto
179282408Scy	/*
180282408Scy	 * Swap the next heap element with the last one on the heap, popping it off the heap,
181282408Scy	 * restore the heap invariant, and return a pointer to the popped element.
182282408Scy	 */
18354359Sroberto	last = &repair->entries[--heap->nr];
18454359Sroberto	swap_mappings(heap->data, last);
18554359Sroberto	min_heapify(heap, 0, &repair_min_heap);
18654359Sroberto	return last;
18754359Sroberto}
18854359Sroberto
18954359Sroberto/**
19054359Sroberto * as_repair_completion() - Convert a generic completion to a repair_completion.
19154359Sroberto * @completion: The completion to convert.
19254359Sroberto *
19354359Sroberto * Return: The repair_completion.
19454359Sroberto */
19554359Srobertostatic inline struct repair_completion * __must_check
19654359Srobertoas_repair_completion(struct vdo_completion *completion)
19754359Sroberto{
19854359Sroberto	vdo_assert_completion_type(completion, VDO_REPAIR_COMPLETION);
19954359Sroberto	return container_of(completion, struct repair_completion, completion);
20054359Sroberto}
20154359Sroberto
20254359Srobertostatic void prepare_repair_completion(struct repair_completion *repair,
20354359Sroberto				      vdo_action_fn callback, enum vdo_zone_type zone_type)
20454359Sroberto{
20554359Sroberto	struct vdo_completion *completion = &repair->completion;
20654359Sroberto	const struct thread_config *thread_config = &completion->vdo->thread_config;
20754359Sroberto	thread_id_t thread_id;
20854359Sroberto
20954359Sroberto	/* All blockmap access is done on single thread, so use logical zone 0. */
21054359Sroberto	thread_id = ((zone_type == VDO_ZONE_TYPE_LOGICAL) ?
21154359Sroberto		     thread_config->logical_threads[0] :
21254359Sroberto		     thread_config->admin_thread);
21354359Sroberto	vdo_reset_completion(completion);
21454359Sroberto	vdo_set_completion_callback(completion, callback, thread_id);
21554359Sroberto}
21654359Sroberto
217282408Scystatic void launch_repair_completion(struct repair_completion *repair,
218282408Scy				     vdo_action_fn callback, enum vdo_zone_type zone_type)
219282408Scy{
220282408Scy	prepare_repair_completion(repair, callback, zone_type);
22154359Sroberto	vdo_launch_completion(&repair->completion);
22254359Sroberto}
22354359Sroberto
22454359Srobertostatic void uninitialize_vios(struct repair_completion *repair)
22554359Sroberto{
22654359Sroberto	while (repair->vio_count > 0)
22754359Sroberto		free_vio_components(&repair->vios[--repair->vio_count]);
22854359Sroberto
22954359Sroberto	vdo_free(vdo_forget(repair->vios));
23054359Sroberto}
23154359Sroberto
232282408Scystatic void free_repair_completion(struct repair_completion *repair)
23354359Sroberto{
23454359Sroberto	if (repair == NULL)
23554359Sroberto		return;
23654359Sroberto
23754359Sroberto	/*
23854359Sroberto	 * We do this here because this function is the only common bottleneck for all clean up
23954359Sroberto	 * paths.
24054359Sroberto	 */
24154359Sroberto	repair->completion.vdo->block_map->zones[0].page_cache.rebuilding = false;
242282408Scy
24354359Sroberto	uninitialize_vios(repair);
24454359Sroberto	vdo_free(vdo_forget(repair->journal_data));
24554359Sroberto	vdo_free(vdo_forget(repair->entries));
24654359Sroberto	vdo_free(repair);
24754359Sroberto}
24854359Sroberto
24954359Srobertostatic void finish_repair(struct vdo_completion *completion)
25054359Sroberto{
251282408Scy	struct vdo_completion *parent = completion->parent;
25254359Sroberto	struct vdo *vdo = completion->vdo;
25354359Sroberto	struct repair_completion *repair = as_repair_completion(completion);
25454359Sroberto
255282408Scy	vdo_assert_on_admin_thread(vdo, __func__);
25654359Sroberto
25754359Sroberto	if (vdo->load_state != VDO_REBUILD_FOR_UPGRADE)
258282408Scy		vdo->states.vdo.complete_recoveries++;
25954359Sroberto
260282408Scy	vdo_initialize_recovery_journal_post_repair(vdo->recovery_journal,
26154359Sroberto						    vdo->states.vdo.complete_recoveries,
26254359Sroberto						    repair->highest_tail,
26354359Sroberto						    repair->logical_blocks_used,
26454359Sroberto						    repair->block_map_data_blocks);
265282408Scy	free_repair_completion(vdo_forget(repair));
26654359Sroberto
26754359Sroberto	if (vdo_state_requires_read_only_rebuild(vdo->load_state)) {
26854359Sroberto		vdo_log_info("Read-only rebuild complete");
26954359Sroberto		vdo_launch_completion(parent);
27054359Sroberto		return;
27154359Sroberto	}
27254359Sroberto
27354359Sroberto	/* FIXME: shouldn't this say either "recovery" or "repair"? */
27454359Sroberto	vdo_log_info("Rebuild complete");
27554359Sroberto
27654359Sroberto	/*
27754359Sroberto	 * Now that we've freed the repair completion and its vast array of journal entries, we
27854359Sroberto	 * can allocate refcounts.
279282408Scy	 */
28054359Sroberto	vdo_continue_completion(parent, vdo_allocate_reference_counters(vdo->depot));
28154359Sroberto}
28254359Sroberto
28354359Sroberto/**
28454359Sroberto * abort_repair() - Handle a repair error.
28554359Sroberto * @completion: The repair completion.
28654359Sroberto */
28754359Srobertostatic void abort_repair(struct vdo_completion *completion)
28854359Sroberto{
28954359Sroberto	struct vdo_completion *parent = completion->parent;
29054359Sroberto	int result = completion->result;
29154359Sroberto	struct repair_completion *repair = as_repair_completion(completion);
29254359Sroberto
29354359Sroberto	if (vdo_state_requires_read_only_rebuild(completion->vdo->load_state))
29454359Sroberto		vdo_log_info("Read-only rebuild aborted");
29554359Sroberto	else
29654359Sroberto		vdo_log_warning("Recovery aborted");
29754359Sroberto
29854359Sroberto	free_repair_completion(vdo_forget(repair));
29954359Sroberto	vdo_continue_completion(parent, result);
30054359Sroberto}
30154359Sroberto
30254359Sroberto/**
30354359Sroberto * abort_on_error() - Abort a repair if there is an error.
30454359Sroberto * @result: The result to check.
30554359Sroberto * @repair: The repair completion.
30654359Sroberto *
30754359Sroberto * Return: true if the result was an error.
30854359Sroberto */
30954359Srobertostatic bool __must_check abort_on_error(int result, struct repair_completion *repair)
31054359Sroberto{
31154359Sroberto	if (result == VDO_SUCCESS)
31254359Sroberto		return false;
31354359Sroberto
31454359Sroberto	vdo_fail_completion(&repair->completion, result);
31554359Sroberto	return true;
31654359Sroberto}
31754359Sroberto
31854359Sroberto/**
31954359Sroberto * drain_slab_depot() - Flush out all dirty refcounts blocks now that they have been rebuilt or
320282408Scy *                      recovered.
32154359Sroberto */
32254359Srobertostatic void drain_slab_depot(struct vdo_completion *completion)
32354359Sroberto{
32454359Sroberto	struct vdo *vdo = completion->vdo;
32554359Sroberto	struct repair_completion *repair = as_repair_completion(completion);
32654359Sroberto	const struct admin_state_code *operation;
32754359Sroberto
32854359Sroberto	vdo_assert_on_admin_thread(vdo, __func__);
32954359Sroberto
33054359Sroberto	prepare_repair_completion(repair, finish_repair, VDO_ZONE_TYPE_ADMIN);
33154359Sroberto	if (vdo_state_requires_read_only_rebuild(vdo->load_state)) {
33254359Sroberto		vdo_log_info("Saving rebuilt state");
33354359Sroberto		operation = VDO_ADMIN_STATE_REBUILDING;
33454359Sroberto	} else {
33554359Sroberto		vdo_log_info("Replayed %zu journal entries into slab journals",
33654359Sroberto			     repair->entries_added_to_slab_journals);
33754359Sroberto		operation = VDO_ADMIN_STATE_RECOVERING;
33854359Sroberto	}
33954359Sroberto
34054359Sroberto	vdo_drain_slab_depot(vdo->depot, operation, completion);
34154359Sroberto}
34254359Sroberto
34354359Sroberto/**
34454359Sroberto * flush_block_map_updates() - Flush the block map now that all the reference counts are rebuilt.
34554359Sroberto * @completion: The repair completion.
34654359Sroberto *
34754359Sroberto * This callback is registered in finish_if_done().
34854359Sroberto */
34954359Srobertostatic void flush_block_map_updates(struct vdo_completion *completion)
35054359Sroberto{
35154359Sroberto	vdo_assert_on_admin_thread(completion->vdo, __func__);
35254359Sroberto
35354359Sroberto	vdo_log_info("Flushing block map changes");
35454359Sroberto	prepare_repair_completion(as_repair_completion(completion), drain_slab_depot,
35554359Sroberto				  VDO_ZONE_TYPE_ADMIN);
356282408Scy	vdo_drain_block_map(completion->vdo->block_map, VDO_ADMIN_STATE_RECOVERING,
35754359Sroberto			    completion);
35854359Sroberto}
35954359Sroberto
36054359Srobertostatic bool fetch_page(struct repair_completion *repair,
36154359Sroberto		       struct vdo_completion *completion);
36254359Sroberto
36354359Sroberto/**
36454359Sroberto * handle_page_load_error() - Handle an error loading a page.
36554359Sroberto * @completion: The vdo_page_completion.
36654359Sroberto */
36754359Srobertostatic void handle_page_load_error(struct vdo_completion *completion)
368282408Scy{
36954359Sroberto	struct repair_completion *repair = completion->parent;
37054359Sroberto
37154359Sroberto	repair->outstanding--;
37254359Sroberto	vdo_set_completion_result(&repair->completion, completion->result);
373282408Scy	vdo_release_page_completion(completion);
37454359Sroberto	fetch_page(repair, completion);
37554359Sroberto}
37654359Sroberto
37754359Sroberto/**
37854359Sroberto * unmap_entry() - Unmap an invalid entry and indicate that its page must be written out.
379282408Scy * @page: The page containing the entries
38054359Sroberto * @completion: The page_completion for writing the page
38154359Sroberto * @slot: The slot to unmap
382282408Scy */
38354359Srobertostatic void unmap_entry(struct block_map_page *page, struct vdo_completion *completion,
38454359Sroberto			slot_number_t slot)
385282408Scy{
38654359Sroberto	page->entries[slot] = UNMAPPED_BLOCK_MAP_ENTRY;
38754359Sroberto	vdo_request_page_write(completion);
388282408Scy}
38954359Sroberto
39054359Sroberto/**
39154359Sroberto * remove_out_of_bounds_entries() - Unmap entries which outside the logical space.
39254359Sroberto * @page: The page containing the entries
39354359Sroberto * @completion: The page_completion for writing the page
39454359Sroberto * @start: The first slot to check
39554359Sroberto */
396282408Scystatic void remove_out_of_bounds_entries(struct block_map_page *page,
39754359Sroberto					 struct vdo_completion *completion,
39854359Sroberto					 slot_number_t start)
399282408Scy{
400282408Scy	slot_number_t slot;
40154359Sroberto
40254359Sroberto	for (slot = start; slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; slot++) {
403285169Scy		struct data_location mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
404282408Scy
40554359Sroberto		if (vdo_is_mapped_location(&mapping))
40654359Sroberto			unmap_entry(page, completion, slot);
40754359Sroberto	}
40854359Sroberto}
40954359Sroberto
41054359Sroberto/**
411282408Scy * process_slot() - Update the reference counts for a single entry.
41254359Sroberto * @page: The page containing the entries
41354359Sroberto * @completion: The page_completion for writing the page
414282408Scy * @slot: The slot to check
41554359Sroberto *
416282408Scy * Return: true if the entry was a valid mapping
41754359Sroberto */
41854359Srobertostatic bool process_slot(struct block_map_page *page, struct vdo_completion *completion,
41954359Sroberto			 slot_number_t slot)
42054359Sroberto{
42154359Sroberto	struct slab_depot *depot = completion->vdo->depot;
42254359Sroberto	int result;
42354359Sroberto	struct data_location mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
424282408Scy
42554359Sroberto	if (!vdo_is_valid_location(&mapping)) {
426282408Scy		/* This entry is invalid, so remove it from the page. */
42754359Sroberto		unmap_entry(page, completion, slot);
42854359Sroberto		return false;
42954359Sroberto	}
43054359Sroberto
431282408Scy	if (!vdo_is_mapped_location(&mapping))
43254359Sroberto		return false;
43354359Sroberto
43454359Sroberto
43554359Sroberto	if (mapping.pbn == VDO_ZERO_BLOCK)
436282408Scy		return true;
437293423Sdelphij
438282408Scy	if (!vdo_is_physical_data_block(depot, mapping.pbn)) {
43954359Sroberto		/*
44054359Sroberto		 * This is a nonsense mapping. Remove it from the map so we're at least consistent
44154359Sroberto		 * and mark the page dirty.
44254359Sroberto		 */
443282408Scy		unmap_entry(page, completion, slot);
44454359Sroberto		return false;
44554359Sroberto	}
44654359Sroberto
44754359Sroberto	result = vdo_adjust_reference_count_for_rebuild(depot, mapping.pbn,
448282408Scy							VDO_JOURNAL_DATA_REMAPPING);
44954359Sroberto	if (result == VDO_SUCCESS)
45054359Sroberto		return true;
45154359Sroberto
45254359Sroberto	vdo_log_error_strerror(result,
45354359Sroberto			       "Could not adjust reference count for PBN %llu, slot %u mapped to PBN %llu",
45454359Sroberto			       (unsigned long long) vdo_get_block_map_page_pbn(page),
45554359Sroberto			       slot, (unsigned long long) mapping.pbn);
45654359Sroberto	unmap_entry(page, completion, slot);
45754359Sroberto	return false;
45854359Sroberto}
45954359Sroberto
46054359Sroberto/**
46154359Sroberto * rebuild_reference_counts_from_page() - Rebuild reference counts from a block map page.
46254359Sroberto * @repair: The repair completion.
463282408Scy * @completion: The page completion holding the page.
46454359Sroberto */
46554359Srobertostatic void rebuild_reference_counts_from_page(struct repair_completion *repair,
46654359Sroberto					       struct vdo_completion *completion)
46754359Sroberto{
46854359Sroberto	slot_number_t slot, last_slot;
46954359Sroberto	struct block_map_page *page;
47054359Sroberto	int result;
47154359Sroberto
47254359Sroberto	result = vdo_get_cached_page(completion, &page);
47354359Sroberto	if (result != VDO_SUCCESS) {
47454359Sroberto		vdo_set_completion_result(&repair->completion, result);
47554359Sroberto		return;
47654359Sroberto	}
47754359Sroberto
47854359Sroberto	if (!page->header.initialized)
47954359Sroberto		return;
48054359Sroberto
48154359Sroberto	/* Remove any bogus entries which exist beyond the end of the logical space. */
48254359Sroberto	if (vdo_get_block_map_page_pbn(page) == repair->last_slot.pbn) {
48354359Sroberto		last_slot = repair->last_slot.slot;
48454359Sroberto		remove_out_of_bounds_entries(page, completion, last_slot);
48554359Sroberto	} else {
48654359Sroberto		last_slot = VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
48754359Sroberto	}
48854359Sroberto
48954359Sroberto	/* Inform the slab depot of all entries on this page. */
49054359Sroberto	for (slot = 0; slot < last_slot; slot++) {
49154359Sroberto		if (process_slot(page, completion, slot))
49254359Sroberto			repair->logical_blocks_used++;
49354359Sroberto	}
49454359Sroberto}
49554359Sroberto
49654359Sroberto/**
49754359Sroberto * page_loaded() - Process a page which has just been loaded.
49854359Sroberto * @completion: The vdo_page_completion for the fetched page.
49954359Sroberto *
50054359Sroberto * This callback is registered by fetch_page().
501282408Scy */
50254359Srobertostatic void page_loaded(struct vdo_completion *completion)
503282408Scy{
50454359Sroberto	struct repair_completion *repair = completion->parent;
50554359Sroberto
50654359Sroberto	repair->outstanding--;
50754359Sroberto	rebuild_reference_counts_from_page(repair, completion);
50854359Sroberto	vdo_release_page_completion(completion);
50954359Sroberto
51054359Sroberto	/* Advance progress to the next page, and fetch the next page we haven't yet requested. */
51154359Sroberto	fetch_page(repair, completion);
51254359Sroberto}
51354359Sroberto
51454359Srobertostatic physical_block_number_t get_pbn_to_fetch(struct repair_completion *repair,
51554359Sroberto						struct block_map *block_map)
51654359Sroberto{
51754359Sroberto	physical_block_number_t pbn = VDO_ZERO_BLOCK;
51854359Sroberto
51954359Sroberto	if (repair->completion.result != VDO_SUCCESS)
52054359Sroberto		return VDO_ZERO_BLOCK;
52154359Sroberto
52254359Sroberto	while ((pbn == VDO_ZERO_BLOCK) && (repair->page_to_fetch < repair->leaf_pages))
52354359Sroberto		pbn = vdo_find_block_map_page_pbn(block_map, repair->page_to_fetch++);
52454359Sroberto
52554359Sroberto	if (vdo_is_physical_data_block(repair->completion.vdo->depot, pbn))
52654359Sroberto		return pbn;
52754359Sroberto
52854359Sroberto	vdo_set_completion_result(&repair->completion, VDO_BAD_MAPPING);
52954359Sroberto	return VDO_ZERO_BLOCK;
530282408Scy}
53154359Sroberto
53254359Sroberto/**
53354359Sroberto * fetch_page() - Fetch a page from the block map.
53454359Sroberto * @repair: The repair_completion.
53554359Sroberto * @completion: The page completion to use.
536282408Scy *
53754359Sroberto * Return true if the rebuild is complete
53854359Sroberto */
539282408Scystatic bool fetch_page(struct repair_completion *repair,
54054359Sroberto		       struct vdo_completion *completion)
54154359Sroberto{
54254359Sroberto	struct vdo_page_completion *page_completion = (struct vdo_page_completion *) completion;
54354359Sroberto	struct block_map *block_map = repair->completion.vdo->block_map;
54454359Sroberto	physical_block_number_t pbn = get_pbn_to_fetch(repair, block_map);
545282408Scy
54654359Sroberto	if (pbn != VDO_ZERO_BLOCK) {
54754359Sroberto		repair->outstanding++;
54854359Sroberto		/*
54954359Sroberto		 * We must set the requeue flag here to ensure that we don't blow the stack if all
55054359Sroberto		 * the requested pages are already in the cache or get load errors.
55154359Sroberto		 */
55254359Sroberto		vdo_get_page(page_completion, &block_map->zones[0], pbn, true, repair,
55354359Sroberto			     page_loaded, handle_page_load_error, true);
554282408Scy	}
55554359Sroberto
55654359Sroberto	if (repair->outstanding > 0)
55754359Sroberto		return false;
55854359Sroberto
55954359Sroberto	launch_repair_completion(repair, flush_block_map_updates, VDO_ZONE_TYPE_ADMIN);
560282408Scy	return true;
56154359Sroberto}
56254359Sroberto
56354359Sroberto/**
56454359Sroberto * rebuild_from_leaves() - Rebuild reference counts from the leaf block map pages.
56554359Sroberto * @completion: The repair completion.
56654359Sroberto *
56754359Sroberto * Rebuilds reference counts from the leaf block map pages now that reference counts have been
568282408Scy * rebuilt from the interior tree pages (which have been loaded in the process). This callback is
56954359Sroberto * registered in rebuild_reference_counts().
57054359Sroberto */
57154359Srobertostatic void rebuild_from_leaves(struct vdo_completion *completion)
57254359Sroberto{
57354359Sroberto	page_count_t i;
57454359Sroberto	struct repair_completion *repair = as_repair_completion(completion);
575282408Scy	struct block_map *map = completion->vdo->block_map;
57654359Sroberto
57754359Sroberto	repair->logical_blocks_used = 0;
57854359Sroberto
57954359Sroberto	/*
58054359Sroberto	 * The PBN calculation doesn't work until the tree pages have been loaded, so we can't set
58154359Sroberto	 * this value at the start of repair.
58254359Sroberto	 */
58354359Sroberto	repair->leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
58454359Sroberto	repair->last_slot = (struct block_map_slot) {
58554359Sroberto		.slot = map->entry_count % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
58654359Sroberto		.pbn = vdo_find_block_map_page_pbn(map, repair->leaf_pages - 1),
58754359Sroberto	};
588282408Scy	if (repair->last_slot.slot == 0)
58954359Sroberto		repair->last_slot.slot = VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
59054359Sroberto
59154359Sroberto	for (i = 0; i < repair->page_count; i++) {
59254359Sroberto		if (fetch_page(repair, &repair->page_completions[i].completion)) {
59354359Sroberto			/*
59454359Sroberto			 * The rebuild has already moved on, so it isn't safe nor is there a need
595282408Scy			 * to launch any more fetches.
59654359Sroberto			 */
59754359Sroberto			return;
59854359Sroberto		}
59954359Sroberto	}
60054359Sroberto}
60154359Sroberto
602282408Scy/**
60354359Sroberto * process_entry() - Process a single entry from the block map tree.
60454359Sroberto * @pbn: A pbn which holds a block map tree page.
605293423Sdelphij * @completion: The parent completion of the traversal.
60654359Sroberto *
60754359Sroberto * Implements vdo_entry_callback_fn.
60854359Sroberto *
609282408Scy * Return: VDO_SUCCESS or an error.
61054359Sroberto */
61154359Srobertostatic int process_entry(physical_block_number_t pbn, struct vdo_completion *completion)
61254359Sroberto{
61354359Sroberto	struct repair_completion *repair = as_repair_completion(completion);
61454359Sroberto	struct slab_depot *depot = completion->vdo->depot;
61554359Sroberto	int result;
616282408Scy
61754359Sroberto	if ((pbn == VDO_ZERO_BLOCK) || !vdo_is_physical_data_block(depot, pbn)) {
61854359Sroberto		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
61954359Sroberto					      "PBN %llu out of range",
62054359Sroberto					      (unsigned long long) pbn);
62154359Sroberto	}
62254359Sroberto
62354359Sroberto	result = vdo_adjust_reference_count_for_rebuild(depot, pbn,
62454359Sroberto							VDO_JOURNAL_BLOCK_MAP_REMAPPING);
62554359Sroberto	if (result != VDO_SUCCESS) {
62654359Sroberto		return vdo_log_error_strerror(result,
62754359Sroberto					      "Could not adjust reference count for block map tree PBN %llu",
62854359Sroberto					      (unsigned long long) pbn);
629282408Scy	}
63054359Sroberto
63154359Sroberto	repair->block_map_data_blocks++;
63254359Sroberto	return VDO_SUCCESS;
63354359Sroberto}
63454359Sroberto
63554359Srobertostatic void rebuild_reference_counts(struct vdo_completion *completion)
63654359Sroberto{
63754359Sroberto	struct repair_completion *repair = as_repair_completion(completion);
63854359Sroberto	struct vdo *vdo = completion->vdo;
63954359Sroberto	struct vdo_page_cache *cache = &vdo->block_map->zones[0].page_cache;
64054359Sroberto
64154359Sroberto	/* We must allocate ref_counts before we can rebuild them. */
64254359Sroberto	if (abort_on_error(vdo_allocate_reference_counters(vdo->depot), repair))
64354359Sroberto		return;
64454359Sroberto
64554359Sroberto	/*
646282408Scy	 * Completion chaining from page cache hits can lead to stack overflow during the rebuild,
64754359Sroberto	 * so clear out the cache before this rebuild phase.
64854359Sroberto	 */
64954359Sroberto	if (abort_on_error(vdo_invalidate_page_cache(cache), repair))
65054359Sroberto		return;
65154359Sroberto
65254359Sroberto	prepare_repair_completion(repair, rebuild_from_leaves, VDO_ZONE_TYPE_LOGICAL);
65354359Sroberto	vdo_traverse_forest(vdo->block_map, process_entry, completion);
654182007Sroberto}
65554359Sroberto
65654359Sroberto/**
657282408Scy * increment_recovery_point() - Move the given recovery point forward by one entry.
65854359Sroberto */
65954359Srobertostatic void increment_recovery_point(struct recovery_point *point)
66054359Sroberto{
66154359Sroberto	if (++point->entry_count < RECOVERY_JOURNAL_ENTRIES_PER_SECTOR)
66254359Sroberto		return;
66354359Sroberto
66454359Sroberto	point->entry_count = 0;
66554359Sroberto	if (point->sector_count < (VDO_SECTORS_PER_BLOCK - 1)) {
66654359Sroberto		point->sector_count++;
66754359Sroberto		return;
66854359Sroberto	}
66954359Sroberto
67054359Sroberto	point->sequence_number++;
67154359Sroberto	point->sector_count = 1;
67254359Sroberto}
67354359Sroberto
67454359Sroberto/**
67554359Sroberto * advance_points() - Advance the current recovery and journal points.
67654359Sroberto * @repair: The repair_completion whose points are to be advanced.
67754359Sroberto * @entries_per_block: The number of entries in a recovery journal block.
678182007Sroberto */
67954359Srobertostatic void advance_points(struct repair_completion *repair,
68054359Sroberto			   journal_entry_count_t entries_per_block)
68154359Sroberto{
68254359Sroberto	if (!repair->next_recovery_point.increment_applied) {
68354359Sroberto		repair->next_recovery_point.increment_applied	= true;
68454359Sroberto		return;
68554359Sroberto	}
68654359Sroberto
68754359Sroberto	increment_recovery_point(&repair->next_recovery_point);
68854359Sroberto	vdo_advance_journal_point(&repair->next_journal_point, entries_per_block);
68954359Sroberto	repair->next_recovery_point.increment_applied	= false;
69054359Sroberto}
69154359Sroberto
69254359Sroberto/**
693282408Scy * before_recovery_point() - Check whether the first point precedes the second point.
69454359Sroberto * @first: The first recovery point.
695282408Scy * @second: The second recovery point.
69654359Sroberto *
697282408Scy * Return: true if the first point precedes the second point.
698282408Scy */
69954359Srobertostatic bool __must_check before_recovery_point(const struct recovery_point *first,
700282408Scy					       const struct recovery_point *second)
70154359Sroberto{
70254359Sroberto	if (first->sequence_number < second->sequence_number)
703282408Scy		return true;
70454359Sroberto
70554359Sroberto	if (first->sequence_number > second->sequence_number)
70654359Sroberto		return false;
707282408Scy
70854359Sroberto	if (first->sector_count < second->sector_count)
70954359Sroberto		return true;
71054359Sroberto
71154359Sroberto	return ((first->sector_count == second->sector_count) &&
712282408Scy		(first->entry_count < second->entry_count));
713282408Scy}
71454359Sroberto
71554359Srobertostatic struct packed_journal_sector * __must_check get_sector(struct recovery_journal *journal,
716282408Scy							      char *journal_data,
71754359Sroberto							      sequence_number_t sequence,
718282408Scy							      u8 sector_number)
719282408Scy{
720282408Scy	off_t offset;
721282408Scy
72254359Sroberto	offset = ((vdo_get_recovery_journal_block_number(journal, sequence) * VDO_BLOCK_SIZE) +
72354359Sroberto		  (VDO_SECTOR_SIZE * sector_number));
72454359Sroberto	return (struct packed_journal_sector *) (journal_data + offset);
72554359Sroberto}
72654359Sroberto
727282408Scy/**
72854359Sroberto * get_entry() - Unpack the recovery journal entry associated with the given recovery point.
72954359Sroberto * @repair: The repair completion.
73054359Sroberto * @point: The recovery point.
73154359Sroberto *
73254359Sroberto * Return: The unpacked contents of the matching recovery journal entry.
73354359Sroberto */
73454359Srobertostatic struct recovery_journal_entry get_entry(const struct repair_completion *repair,
73554359Sroberto					       const struct recovery_point *point)
73654359Sroberto{
73754359Sroberto	struct packed_journal_sector *sector;
73854359Sroberto
73954359Sroberto	sector = get_sector(repair->completion.vdo->recovery_journal,
74054359Sroberto			    repair->journal_data, point->sequence_number,
74154359Sroberto			    point->sector_count);
74254359Sroberto	return vdo_unpack_recovery_journal_entry(&sector->entries[point->entry_count]);
743182007Sroberto}
744182007Sroberto
745182007Sroberto/**
746182007Sroberto * validate_recovery_journal_entry() - Validate a recovery journal entry.
747182007Sroberto * @vdo: The vdo.
748182007Sroberto * @entry: The entry to validate.
749182007Sroberto *
750182007Sroberto * Return: VDO_SUCCESS or an error.
751182007Sroberto */
75256746Srobertostatic int validate_recovery_journal_entry(const struct vdo *vdo,
75356746Sroberto					   const struct recovery_journal_entry *entry)
75456746Sroberto{
75554359Sroberto	if ((entry->slot.pbn >= vdo->states.vdo.config.physical_blocks) ||
75654359Sroberto	    (entry->slot.slot >= VDO_BLOCK_MAP_ENTRIES_PER_PAGE) ||
75754359Sroberto	    !vdo_is_valid_location(&entry->mapping) ||
75854359Sroberto	    !vdo_is_valid_location(&entry->unmapping) ||
75954359Sroberto	    !vdo_is_physical_data_block(vdo->depot, entry->mapping.pbn) ||
76054359Sroberto	    !vdo_is_physical_data_block(vdo->depot, entry->unmapping.pbn)) {
76154359Sroberto		return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
76254359Sroberto					      "Invalid entry: %s (%llu, %u) from %llu to %llu is not within bounds",
76354359Sroberto					      vdo_get_journal_operation_name(entry->operation),
76454359Sroberto					      (unsigned long long) entry->slot.pbn,
76554359Sroberto					      entry->slot.slot,
76654359Sroberto					      (unsigned long long) entry->unmapping.pbn,
76754359Sroberto					      (unsigned long long) entry->mapping.pbn);
76854359Sroberto	}
76954359Sroberto
77054359Sroberto	if ((entry->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) &&
77154359Sroberto	    (vdo_is_state_compressed(entry->mapping.state) ||
77254359Sroberto	     (entry->mapping.pbn == VDO_ZERO_BLOCK) ||
77354359Sroberto	     (entry->unmapping.state != VDO_MAPPING_STATE_UNMAPPED) ||
77454359Sroberto	     (entry->unmapping.pbn != VDO_ZERO_BLOCK))) {
77554359Sroberto		return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
77654359Sroberto					      "Invalid entry: %s (%llu, %u) from %llu to %llu is not a valid tree mapping",
77754359Sroberto					      vdo_get_journal_operation_name(entry->operation),
77854359Sroberto					      (unsigned long long) entry->slot.pbn,
77954359Sroberto					      entry->slot.slot,
78054359Sroberto					      (unsigned long long) entry->unmapping.pbn,
78154359Sroberto					      (unsigned long long) entry->mapping.pbn);
782	}
783
784	return VDO_SUCCESS;
785}
786
787/**
788 * add_slab_journal_entries() - Replay recovery journal entries into the slab journals of the
789 *                              allocator currently being recovered.
790 * @completion: The allocator completion.
791 *
792 * Waits for slab journal tailblock space when necessary. This method is its own callback.
793 */
794static void add_slab_journal_entries(struct vdo_completion *completion)
795{
796	struct recovery_point *recovery_point;
797	struct repair_completion *repair = completion->parent;
798	struct vdo *vdo = completion->vdo;
799	struct recovery_journal *journal = vdo->recovery_journal;
800	struct block_allocator *allocator = vdo_as_block_allocator(completion);
801
802	/* Get ready in case we need to enqueue again. */
803	vdo_prepare_completion(completion, add_slab_journal_entries,
804			       vdo_notify_slab_journals_are_recovered,
805			       completion->callback_thread_id, repair);
806	for (recovery_point = &repair->next_recovery_point;
807	     before_recovery_point(recovery_point, &repair->tail_recovery_point);
808	     advance_points(repair, journal->entries_per_block)) {
809		int result;
810		physical_block_number_t pbn;
811		struct vdo_slab *slab;
812		struct recovery_journal_entry entry = get_entry(repair, recovery_point);
813		bool increment = !repair->next_recovery_point.increment_applied;
814
815		if (increment) {
816			result = validate_recovery_journal_entry(vdo, &entry);
817			if (result != VDO_SUCCESS) {
818				vdo_enter_read_only_mode(vdo, result);
819				vdo_fail_completion(completion, result);
820				return;
821			}
822
823			pbn = entry.mapping.pbn;
824		} else {
825			pbn = entry.unmapping.pbn;
826		}
827
828		if (pbn == VDO_ZERO_BLOCK)
829			continue;
830
831		slab = vdo_get_slab(vdo->depot, pbn);
832		if (slab->allocator != allocator)
833			continue;
834
835		if (!vdo_attempt_replay_into_slab(slab, pbn, entry.operation, increment,
836						  &repair->next_journal_point,
837						  completion))
838			return;
839
840		repair->entries_added_to_slab_journals++;
841	}
842
843	vdo_notify_slab_journals_are_recovered(completion);
844}
845
846/**
847 * vdo_replay_into_slab_journals() - Replay recovery journal entries in the slab journals of slabs
848 *                                   owned by a given block_allocator.
849 * @allocator: The allocator whose slab journals are to be recovered.
850 * @context: The slab depot load context supplied by a recovery when it loads the depot.
851 */
852void vdo_replay_into_slab_journals(struct block_allocator *allocator, void *context)
853{
854	struct vdo_completion *completion = &allocator->completion;
855	struct repair_completion *repair = context;
856	struct vdo *vdo = completion->vdo;
857
858	vdo_assert_on_physical_zone_thread(vdo, allocator->zone_number, __func__);
859	if (repair->entry_count == 0) {
860		/* there's nothing to replay */
861		repair->logical_blocks_used = vdo->recovery_journal->logical_blocks_used;
862		repair->block_map_data_blocks = vdo->recovery_journal->block_map_data_blocks;
863		vdo_notify_slab_journals_are_recovered(completion);
864		return;
865	}
866
867	repair->next_recovery_point = (struct recovery_point) {
868		.sequence_number = repair->slab_journal_head,
869		.sector_count = 1,
870		.entry_count = 0,
871	};
872
873	repair->next_journal_point = (struct journal_point) {
874		.sequence_number = repair->slab_journal_head,
875		.entry_count = 0,
876	};
877
878	vdo_log_info("Replaying entries into slab journals for zone %u",
879		     allocator->zone_number);
880	completion->parent = repair;
881	add_slab_journal_entries(completion);
882}
883
884static void load_slab_depot(struct vdo_completion *completion)
885{
886	struct repair_completion *repair = as_repair_completion(completion);
887	const struct admin_state_code *operation;
888
889	vdo_assert_on_admin_thread(completion->vdo, __func__);
890
891	if (vdo_state_requires_read_only_rebuild(completion->vdo->load_state)) {
892		prepare_repair_completion(repair, rebuild_reference_counts,
893					  VDO_ZONE_TYPE_LOGICAL);
894		operation = VDO_ADMIN_STATE_LOADING_FOR_REBUILD;
895	} else {
896		prepare_repair_completion(repair, drain_slab_depot, VDO_ZONE_TYPE_ADMIN);
897		operation = VDO_ADMIN_STATE_LOADING_FOR_RECOVERY;
898	}
899
900	vdo_load_slab_depot(completion->vdo->depot, operation, completion, repair);
901}
902
903static void flush_block_map(struct vdo_completion *completion)
904{
905	struct repair_completion *repair = as_repair_completion(completion);
906	const struct admin_state_code *operation;
907
908	vdo_assert_on_admin_thread(completion->vdo, __func__);
909
910	vdo_log_info("Flushing block map changes");
911	prepare_repair_completion(repair, load_slab_depot, VDO_ZONE_TYPE_ADMIN);
912	operation = (vdo_state_requires_read_only_rebuild(completion->vdo->load_state) ?
913		     VDO_ADMIN_STATE_REBUILDING :
914		     VDO_ADMIN_STATE_RECOVERING);
915	vdo_drain_block_map(completion->vdo->block_map, operation, completion);
916}
917
918static bool finish_if_done(struct repair_completion *repair)
919{
920	/* Pages are still being launched or there is still work to do */
921	if (repair->launching || (repair->outstanding > 0))
922		return false;
923
924	if (repair->completion.result != VDO_SUCCESS) {
925		page_count_t i;
926
927		for (i = 0; i < repair->page_count; i++) {
928			struct vdo_page_completion *page_completion =
929				&repair->page_completions[i];
930
931			if (page_completion->ready)
932				vdo_release_page_completion(&page_completion->completion);
933		}
934
935		vdo_launch_completion(&repair->completion);
936		return true;
937	}
938
939	if (repair->current_entry >= repair->entries)
940		return false;
941
942	launch_repair_completion(repair, flush_block_map, VDO_ZONE_TYPE_ADMIN);
943	return true;
944}
945
946static void abort_block_map_recovery(struct repair_completion *repair, int result)
947{
948	vdo_set_completion_result(&repair->completion, result);
949	finish_if_done(repair);
950}
951
952/**
953 * find_entry_starting_next_page() - Find the first journal entry after a given entry which is not
954 *                                   on the same block map page.
955 * @current_entry: The entry to search from.
956 * @needs_sort: Whether sorting is needed to proceed.
957 *
958 * Return: Pointer to the first later journal entry on a different block map page, or a pointer to
959 *         just before the journal entries if no subsequent entry is on a different block map page.
960 */
961static struct numbered_block_mapping *
962find_entry_starting_next_page(struct repair_completion *repair,
963			      struct numbered_block_mapping *current_entry, bool needs_sort)
964{
965	size_t current_page;
966
967	/* If current_entry is invalid, return immediately. */
968	if (current_entry < repair->entries)
969		return current_entry;
970
971	current_page = current_entry->block_map_slot.pbn;
972
973	/* Decrement current_entry until it's out of bounds or on a different page. */
974	while ((current_entry >= repair->entries) &&
975	       (current_entry->block_map_slot.pbn == current_page)) {
976		if (needs_sort) {
977			struct numbered_block_mapping *just_sorted_entry =
978				sort_next_heap_element(repair);
979			VDO_ASSERT_LOG_ONLY(just_sorted_entry < current_entry,
980					    "heap is returning elements in an unexpected order");
981		}
982
983		current_entry--;
984	}
985
986	return current_entry;
987}
988
989/*
990 * Apply a range of journal entries [starting_entry, ending_entry) journal
991 * entries to a block map page.
992 */
993static void apply_journal_entries_to_page(struct block_map_page *page,
994					  struct numbered_block_mapping *starting_entry,
995					  struct numbered_block_mapping *ending_entry)
996{
997	struct numbered_block_mapping *current_entry = starting_entry;
998
999	while (current_entry != ending_entry) {
1000		page->entries[current_entry->block_map_slot.slot] = current_entry->block_map_entry;
1001		current_entry--;
1002	}
1003}
1004
1005static void recover_ready_pages(struct repair_completion *repair,
1006				struct vdo_completion *completion);
1007
1008static void block_map_page_loaded(struct vdo_completion *completion)
1009{
1010	struct repair_completion *repair = as_repair_completion(completion->parent);
1011
1012	repair->outstanding--;
1013	if (!repair->launching)
1014		recover_ready_pages(repair, completion);
1015}
1016
1017static void handle_block_map_page_load_error(struct vdo_completion *completion)
1018{
1019	struct repair_completion *repair = as_repair_completion(completion->parent);
1020
1021	repair->outstanding--;
1022	abort_block_map_recovery(repair, completion->result);
1023}
1024
1025static void fetch_block_map_page(struct repair_completion *repair,
1026				 struct vdo_completion *completion)
1027{
1028	physical_block_number_t pbn;
1029
1030	if (repair->current_unfetched_entry < repair->entries)
1031		/* Nothing left to fetch. */
1032		return;
1033
1034	/* Fetch the next page we haven't yet requested. */
1035	pbn = repair->current_unfetched_entry->block_map_slot.pbn;
1036	repair->current_unfetched_entry =
1037		find_entry_starting_next_page(repair, repair->current_unfetched_entry,
1038					      true);
1039	repair->outstanding++;
1040	vdo_get_page(((struct vdo_page_completion *) completion),
1041		     &repair->completion.vdo->block_map->zones[0], pbn, true,
1042		     &repair->completion, block_map_page_loaded,
1043		     handle_block_map_page_load_error, false);
1044}
1045
1046static struct vdo_page_completion *get_next_page_completion(struct repair_completion *repair,
1047							    struct vdo_page_completion *completion)
1048{
1049	completion++;
1050	if (completion == (&repair->page_completions[repair->page_count]))
1051		completion = &repair->page_completions[0];
1052	return completion;
1053}
1054
1055static void recover_ready_pages(struct repair_completion *repair,
1056				struct vdo_completion *completion)
1057{
1058	struct vdo_page_completion *page_completion = (struct vdo_page_completion *) completion;
1059
1060	if (finish_if_done(repair))
1061		return;
1062
1063	if (repair->pbn != page_completion->pbn)
1064		return;
1065
1066	while (page_completion->ready) {
1067		struct numbered_block_mapping *start_of_next_page;
1068		struct block_map_page *page;
1069		int result;
1070
1071		result = vdo_get_cached_page(completion, &page);
1072		if (result != VDO_SUCCESS) {
1073			abort_block_map_recovery(repair, result);
1074			return;
1075		}
1076
1077		start_of_next_page =
1078			find_entry_starting_next_page(repair, repair->current_entry,
1079						      false);
1080		apply_journal_entries_to_page(page, repair->current_entry,
1081					      start_of_next_page);
1082		repair->current_entry = start_of_next_page;
1083		vdo_request_page_write(completion);
1084		vdo_release_page_completion(completion);
1085
1086		if (finish_if_done(repair))
1087			return;
1088
1089		repair->pbn = repair->current_entry->block_map_slot.pbn;
1090		fetch_block_map_page(repair, completion);
1091		page_completion = get_next_page_completion(repair, page_completion);
1092		completion = &page_completion->completion;
1093	}
1094}
1095
1096static void recover_block_map(struct vdo_completion *completion)
1097{
1098	struct repair_completion *repair = as_repair_completion(completion);
1099	struct vdo *vdo = completion->vdo;
1100	struct numbered_block_mapping *first_sorted_entry;
1101	page_count_t i;
1102
1103	vdo_assert_on_logical_zone_thread(vdo, 0, __func__);
1104
1105	/* Suppress block map errors. */
1106	vdo->block_map->zones[0].page_cache.rebuilding =
1107		vdo_state_requires_read_only_rebuild(vdo->load_state);
1108
1109	if (repair->block_map_entry_count == 0) {
1110		vdo_log_info("Replaying 0 recovery entries into block map");
1111		vdo_free(vdo_forget(repair->journal_data));
1112		launch_repair_completion(repair, load_slab_depot, VDO_ZONE_TYPE_ADMIN);
1113		return;
1114	}
1115
1116	/*
1117	 * Organize the journal entries into a binary heap so we can iterate over them in sorted
1118	 * order incrementally, avoiding an expensive sort call.
1119	 */
1120	repair->replay_heap = (struct min_heap) {
1121		.data = repair->entries,
1122		.nr = repair->block_map_entry_count,
1123		.size = repair->block_map_entry_count,
1124	};
1125	min_heapify_all(&repair->replay_heap, &repair_min_heap);
1126
1127	vdo_log_info("Replaying %zu recovery entries into block map",
1128		     repair->block_map_entry_count);
1129
1130	repair->current_entry = &repair->entries[repair->block_map_entry_count - 1];
1131	first_sorted_entry = sort_next_heap_element(repair);
1132	VDO_ASSERT_LOG_ONLY(first_sorted_entry == repair->current_entry,
1133			    "heap is returning elements in an unexpected order");
1134
1135	/* Prevent any page from being processed until all pages have been launched. */
1136	repair->launching = true;
1137	repair->pbn = repair->current_entry->block_map_slot.pbn;
1138	repair->current_unfetched_entry = repair->current_entry;
1139	for (i = 0; i < repair->page_count; i++) {
1140		if (repair->current_unfetched_entry < repair->entries)
1141			break;
1142
1143		fetch_block_map_page(repair, &repair->page_completions[i].completion);
1144	}
1145	repair->launching = false;
1146
1147	/* Process any ready pages. */
1148	recover_ready_pages(repair, &repair->page_completions[0].completion);
1149}
1150
1151/**
1152 * get_recovery_journal_block_header() - Get the block header for a block at a position in the
1153 *                                       journal data and unpack it.
1154 * @journal: The recovery journal.
1155 * @data: The recovery journal data.
1156 * @sequence: The sequence number.
1157 *
1158 * Return: The unpacked header.
1159 */
1160static struct recovery_block_header __must_check
1161get_recovery_journal_block_header(struct recovery_journal *journal, char *data,
1162				  sequence_number_t sequence)
1163{
1164	physical_block_number_t pbn =
1165		vdo_get_recovery_journal_block_number(journal, sequence);
1166	char *header = &data[pbn * VDO_BLOCK_SIZE];
1167
1168	return vdo_unpack_recovery_block_header((struct packed_journal_header *) header);
1169}
1170
1171/**
1172 * is_valid_recovery_journal_block() - Determine whether the given header describes a valid block
1173 *                                     for the given journal.
1174 * @journal: The journal to use.
1175 * @header: The unpacked block header to check.
1176 * @old_ok: Whether an old format header is valid.
1177 *
1178 * A block is not valid if it is unformatted, or if it is older than the last successful recovery
1179 * or reformat.
1180 *
1181 * Return: True if the header is valid.
1182 */
1183static bool __must_check is_valid_recovery_journal_block(const struct recovery_journal *journal,
1184							 const struct recovery_block_header *header,
1185							 bool old_ok)
1186{
1187	if ((header->nonce != journal->nonce) ||
1188	    (header->recovery_count != journal->recovery_count))
1189		return false;
1190
1191	if (header->metadata_type == VDO_METADATA_RECOVERY_JOURNAL_2)
1192		return (header->entry_count <= journal->entries_per_block);
1193
1194	return (old_ok &&
1195		(header->metadata_type == VDO_METADATA_RECOVERY_JOURNAL) &&
1196		(header->entry_count <= RECOVERY_JOURNAL_1_ENTRIES_PER_BLOCK));
1197}
1198
1199/**
1200 * is_exact_recovery_journal_block() - Determine whether the given header describes the exact block
1201 *                                     indicated.
1202 * @journal: The journal to use.
1203 * @header: The unpacked block header to check.
1204 * @sequence: The expected sequence number.
1205 * @type: The expected metadata type.
1206 *
1207 * Return: True if the block matches.
1208 */
1209static bool __must_check is_exact_recovery_journal_block(const struct recovery_journal *journal,
1210							 const struct recovery_block_header *header,
1211							 sequence_number_t sequence,
1212							 enum vdo_metadata_type type)
1213{
1214	return ((header->metadata_type == type) &&
1215		(header->sequence_number == sequence) &&
1216		(is_valid_recovery_journal_block(journal, header, true)));
1217}
1218
1219/**
1220 * find_recovery_journal_head_and_tail() - Find the tail and head of the journal.
1221 *
1222 * Return: True if there were valid journal blocks.
1223 */
1224static bool find_recovery_journal_head_and_tail(struct repair_completion *repair)
1225{
1226	struct recovery_journal *journal = repair->completion.vdo->recovery_journal;
1227	bool found_entries = false;
1228	physical_block_number_t i;
1229
1230	/*
1231	 * Ensure that we don't replay old entries since we know the tail recorded in the super
1232	 * block must be a lower bound. Not doing so can result in extra data loss by setting the
1233	 * tail too early.
1234	 */
1235	repair->highest_tail = journal->tail;
1236	for (i = 0; i < journal->size; i++) {
1237		struct recovery_block_header header =
1238			get_recovery_journal_block_header(journal, repair->journal_data, i);
1239
1240		if (!is_valid_recovery_journal_block(journal, &header, true)) {
1241			/* This block is old or incorrectly formatted */
1242			continue;
1243		}
1244
1245		if (vdo_get_recovery_journal_block_number(journal, header.sequence_number) != i) {
1246			/* This block is in the wrong location */
1247			continue;
1248		}
1249
1250		if (header.sequence_number >= repair->highest_tail) {
1251			found_entries = true;
1252			repair->highest_tail = header.sequence_number;
1253		}
1254
1255		if (!found_entries)
1256			continue;
1257
1258		if (header.block_map_head > repair->block_map_head)
1259			repair->block_map_head = header.block_map_head;
1260
1261		if (header.slab_journal_head > repair->slab_journal_head)
1262			repair->slab_journal_head = header.slab_journal_head;
1263	}
1264
1265	return found_entries;
1266}
1267
1268/**
1269 * unpack_entry() - Unpack a recovery journal entry in either format.
1270 * @vdo: The vdo.
1271 * @packed: The entry to unpack.
1272 * @format: The expected format of the entry.
1273 * @entry: The unpacked entry.
1274 *
1275 * Return: true if the entry should be applied.3
1276 */
1277static bool unpack_entry(struct vdo *vdo, char *packed, enum vdo_metadata_type format,
1278			 struct recovery_journal_entry *entry)
1279{
1280	if (format == VDO_METADATA_RECOVERY_JOURNAL_2) {
1281		struct packed_recovery_journal_entry *packed_entry =
1282			(struct packed_recovery_journal_entry *) packed;
1283
1284		*entry = vdo_unpack_recovery_journal_entry(packed_entry);
1285	} else {
1286		physical_block_number_t low32, high4;
1287
1288		struct packed_recovery_journal_entry_1 *packed_entry =
1289			(struct packed_recovery_journal_entry_1 *) packed;
1290
1291		if (packed_entry->operation == VDO_JOURNAL_DATA_INCREMENT)
1292			entry->operation = VDO_JOURNAL_DATA_REMAPPING;
1293		else if (packed_entry->operation == VDO_JOURNAL_BLOCK_MAP_INCREMENT)
1294			entry->operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING;
1295		else
1296			return false;
1297
1298		low32 = __le32_to_cpu(packed_entry->pbn_low_word);
1299		high4 = packed_entry->pbn_high_nibble;
1300		entry->slot = (struct block_map_slot) {
1301			.pbn = ((high4 << 32) | low32),
1302			.slot = (packed_entry->slot_low | (packed_entry->slot_high << 6)),
1303		};
1304		entry->mapping = vdo_unpack_block_map_entry(&packed_entry->block_map_entry);
1305		entry->unmapping = (struct data_location) {
1306			.pbn = VDO_ZERO_BLOCK,
1307			.state = VDO_MAPPING_STATE_UNMAPPED,
1308		};
1309	}
1310
1311	return (validate_recovery_journal_entry(vdo, entry) == VDO_SUCCESS);
1312}
1313
1314/**
1315 * append_sector_entries() - Append an array of recovery journal entries from a journal block
1316 *                           sector to the array of numbered mappings in the repair completion,
1317 *                           numbering each entry in the order they are appended.
1318 * @repair: The repair completion.
1319 * @entries: The entries in the sector.
1320 * @format: The format of the sector.
1321 * @entry_count: The number of entries to append.
1322 */
1323static void append_sector_entries(struct repair_completion *repair, char *entries,
1324				  enum vdo_metadata_type format,
1325				  journal_entry_count_t entry_count)
1326{
1327	journal_entry_count_t i;
1328	struct vdo *vdo = repair->completion.vdo;
1329	off_t increment = ((format == VDO_METADATA_RECOVERY_JOURNAL_2)
1330			   ? sizeof(struct packed_recovery_journal_entry)
1331			   : sizeof(struct packed_recovery_journal_entry_1));
1332
1333	for (i = 0; i < entry_count; i++, entries += increment) {
1334		struct recovery_journal_entry entry;
1335
1336		if (!unpack_entry(vdo, entries, format, &entry))
1337			/* When recovering from read-only mode, ignore damaged entries. */
1338			continue;
1339
1340		repair->entries[repair->block_map_entry_count] =
1341			(struct numbered_block_mapping) {
1342			.block_map_slot = entry.slot,
1343			.block_map_entry = vdo_pack_block_map_entry(entry.mapping.pbn,
1344								    entry.mapping.state),
1345			.number = repair->block_map_entry_count,
1346		};
1347		repair->block_map_entry_count++;
1348	}
1349}
1350
1351static journal_entry_count_t entries_per_sector(enum vdo_metadata_type format,
1352						u8 sector_number)
1353{
1354	if (format == VDO_METADATA_RECOVERY_JOURNAL_2)
1355		return RECOVERY_JOURNAL_ENTRIES_PER_SECTOR;
1356
1357	return ((sector_number == (VDO_SECTORS_PER_BLOCK - 1))
1358		? RECOVERY_JOURNAL_1_ENTRIES_IN_LAST_SECTOR
1359		: RECOVERY_JOURNAL_1_ENTRIES_PER_SECTOR);
1360}
1361
1362static void extract_entries_from_block(struct repair_completion *repair,
1363				       struct recovery_journal *journal,
1364				       sequence_number_t sequence,
1365				       enum vdo_metadata_type format,
1366				       journal_entry_count_t entries)
1367{
1368	sector_count_t i;
1369	struct recovery_block_header header =
1370		get_recovery_journal_block_header(journal, repair->journal_data,
1371						  sequence);
1372
1373	if (!is_exact_recovery_journal_block(journal, &header, sequence, format)) {
1374		/* This block is invalid, so skip it. */
1375		return;
1376	}
1377
1378	entries = min(entries, header.entry_count);
1379	for (i = 1; i < VDO_SECTORS_PER_BLOCK; i++) {
1380		struct packed_journal_sector *sector =
1381			get_sector(journal, repair->journal_data, sequence, i);
1382		journal_entry_count_t sector_entries =
1383			min(entries, entries_per_sector(format, i));
1384
1385		if (vdo_is_valid_recovery_journal_sector(&header, sector, i)) {
1386			/* Only extract as many as the block header calls for. */
1387			append_sector_entries(repair, (char *) sector->entries, format,
1388					      min_t(journal_entry_count_t,
1389						    sector->entry_count,
1390						    sector_entries));
1391		}
1392
1393		/*
1394		 * Even if the sector wasn't full, count it as full when counting up to the
1395		 * entry count the block header claims.
1396		 */
1397		entries -= sector_entries;
1398	}
1399}
1400
1401static int parse_journal_for_rebuild(struct repair_completion *repair)
1402{
1403	int result;
1404	sequence_number_t i;
1405	block_count_t count;
1406	enum vdo_metadata_type format;
1407	struct vdo *vdo = repair->completion.vdo;
1408	struct recovery_journal *journal = vdo->recovery_journal;
1409	journal_entry_count_t entries_per_block = journal->entries_per_block;
1410
1411	format = get_recovery_journal_block_header(journal, repair->journal_data,
1412						   repair->highest_tail).metadata_type;
1413	if (format == VDO_METADATA_RECOVERY_JOURNAL)
1414		entries_per_block = RECOVERY_JOURNAL_1_ENTRIES_PER_BLOCK;
1415
1416	/*
1417	 * Allocate an array of numbered_block_mapping structures large enough to transcribe every
1418	 * packed_recovery_journal_entry from every valid journal block.
1419	 */
1420	count = ((repair->highest_tail - repair->block_map_head + 1) * entries_per_block);
1421	result = vdo_allocate(count, struct numbered_block_mapping, __func__,
1422			      &repair->entries);
1423	if (result != VDO_SUCCESS)
1424		return result;
1425
1426	for (i = repair->block_map_head; i <= repair->highest_tail; i++)
1427		extract_entries_from_block(repair, journal, i, format, entries_per_block);
1428
1429	return VDO_SUCCESS;
1430}
1431
1432static int validate_heads(struct repair_completion *repair)
1433{
1434	/* Both reap heads must be behind the tail. */
1435	if ((repair->block_map_head <= repair->tail) &&
1436	    (repair->slab_journal_head <= repair->tail))
1437		return VDO_SUCCESS;
1438
1439
1440	return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
1441				      "Journal tail too early. block map head: %llu, slab journal head: %llu, tail: %llu",
1442				      (unsigned long long) repair->block_map_head,
1443				      (unsigned long long) repair->slab_journal_head,
1444				      (unsigned long long) repair->tail);
1445}
1446
1447/**
1448 * extract_new_mappings() - Find all valid new mappings to be applied to the block map.
1449 *
1450 * The mappings are extracted from the journal and stored in a sortable array so that all of the
1451 * mappings to be applied to a given block map page can be done in a single page fetch.
1452 */
1453static int extract_new_mappings(struct repair_completion *repair)
1454{
1455	int result;
1456	struct vdo *vdo = repair->completion.vdo;
1457	struct recovery_point recovery_point = {
1458		.sequence_number = repair->block_map_head,
1459		.sector_count = 1,
1460		.entry_count = 0,
1461	};
1462
1463	/*
1464	 * Allocate an array of numbered_block_mapping structs just large enough to transcribe
1465	 * every packed_recovery_journal_entry from every valid journal block.
1466	 */
1467	result = vdo_allocate(repair->entry_count, struct numbered_block_mapping,
1468			      __func__, &repair->entries);
1469	if (result != VDO_SUCCESS)
1470		return result;
1471
1472	for (; before_recovery_point(&recovery_point, &repair->tail_recovery_point);
1473	     increment_recovery_point(&recovery_point)) {
1474		struct recovery_journal_entry entry = get_entry(repair, &recovery_point);
1475
1476		result = validate_recovery_journal_entry(vdo, &entry);
1477		if (result != VDO_SUCCESS) {
1478			vdo_enter_read_only_mode(vdo, result);
1479			return result;
1480		}
1481
1482		repair->entries[repair->block_map_entry_count] =
1483			(struct numbered_block_mapping) {
1484			.block_map_slot = entry.slot,
1485			.block_map_entry = vdo_pack_block_map_entry(entry.mapping.pbn,
1486								    entry.mapping.state),
1487			.number = repair->block_map_entry_count,
1488		};
1489		repair->block_map_entry_count++;
1490	}
1491
1492	result = VDO_ASSERT((repair->block_map_entry_count <= repair->entry_count),
1493			    "approximate entry count is an upper bound");
1494	if (result != VDO_SUCCESS)
1495		vdo_enter_read_only_mode(vdo, result);
1496
1497	return result;
1498}
1499
1500/**
1501 * compute_usages() - Compute the lbns in use and block map data blocks counts from the tail of
1502 *                    the journal.
1503 */
1504static noinline int compute_usages(struct repair_completion *repair)
1505{
1506	/*
1507	 * This function is declared noinline to avoid a spurious valgrind error regarding the
1508	 * following structure being uninitialized.
1509	 */
1510	struct recovery_point recovery_point = {
1511		.sequence_number = repair->tail,
1512		.sector_count = 1,
1513		.entry_count = 0,
1514	};
1515
1516	struct vdo *vdo = repair->completion.vdo;
1517	struct recovery_journal *journal = vdo->recovery_journal;
1518	struct recovery_block_header header =
1519		get_recovery_journal_block_header(journal, repair->journal_data,
1520						  repair->tail);
1521
1522	repair->logical_blocks_used = header.logical_blocks_used;
1523	repair->block_map_data_blocks = header.block_map_data_blocks;
1524
1525	for (; before_recovery_point(&recovery_point, &repair->tail_recovery_point);
1526	     increment_recovery_point(&recovery_point)) {
1527		struct recovery_journal_entry entry = get_entry(repair, &recovery_point);
1528		int result;
1529
1530		result = validate_recovery_journal_entry(vdo, &entry);
1531		if (result != VDO_SUCCESS) {
1532			vdo_enter_read_only_mode(vdo, result);
1533			return result;
1534		}
1535
1536		if (entry.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
1537			repair->block_map_data_blocks++;
1538			continue;
1539		}
1540
1541		if (vdo_is_mapped_location(&entry.mapping))
1542			repair->logical_blocks_used++;
1543
1544		if (vdo_is_mapped_location(&entry.unmapping))
1545			repair->logical_blocks_used--;
1546	}
1547
1548	return VDO_SUCCESS;
1549}
1550
1551static int parse_journal_for_recovery(struct repair_completion *repair)
1552{
1553	int result;
1554	sequence_number_t i, head;
1555	bool found_entries = false;
1556	struct recovery_journal *journal = repair->completion.vdo->recovery_journal;
1557
1558	head = min(repair->block_map_head, repair->slab_journal_head);
1559	for (i = head; i <= repair->highest_tail; i++) {
1560		struct recovery_block_header header;
1561		journal_entry_count_t block_entries;
1562		u8 j;
1563
1564		repair->tail = i;
1565		repair->tail_recovery_point = (struct recovery_point) {
1566			.sequence_number = i,
1567			.sector_count = 0,
1568			.entry_count = 0,
1569		};
1570
1571		header = get_recovery_journal_block_header(journal, repair->journal_data, i);
1572		if (header.metadata_type == VDO_METADATA_RECOVERY_JOURNAL) {
1573			/* This is an old format block, so we need to upgrade */
1574			vdo_log_error_strerror(VDO_UNSUPPORTED_VERSION,
1575					       "Recovery journal is in the old format, a read-only rebuild is required.");
1576			vdo_enter_read_only_mode(repair->completion.vdo,
1577						 VDO_UNSUPPORTED_VERSION);
1578			return VDO_UNSUPPORTED_VERSION;
1579		}
1580
1581		if (!is_exact_recovery_journal_block(journal, &header, i,
1582						     VDO_METADATA_RECOVERY_JOURNAL_2)) {
1583			/* A bad block header was found so this must be the end of the journal. */
1584			break;
1585		}
1586
1587		block_entries = header.entry_count;
1588
1589		/* Examine each sector in turn to determine the last valid sector. */
1590		for (j = 1; j < VDO_SECTORS_PER_BLOCK; j++) {
1591			struct packed_journal_sector *sector =
1592				get_sector(journal, repair->journal_data, i, j);
1593			journal_entry_count_t sector_entries =
1594				min_t(journal_entry_count_t, sector->entry_count,
1595				      block_entries);
1596
1597			/* A bad sector means that this block was torn. */
1598			if (!vdo_is_valid_recovery_journal_sector(&header, sector, j))
1599				break;
1600
1601			if (sector_entries > 0) {
1602				found_entries = true;
1603				repair->tail_recovery_point.sector_count++;
1604				repair->tail_recovery_point.entry_count = sector_entries;
1605				block_entries -= sector_entries;
1606				repair->entry_count += sector_entries;
1607			}
1608
1609			/* If this sector is short, the later sectors can't matter. */
1610			if ((sector_entries < RECOVERY_JOURNAL_ENTRIES_PER_SECTOR) ||
1611			    (block_entries == 0))
1612				break;
1613		}
1614
1615		/* If this block was not filled, or if it tore, no later block can matter. */
1616		if ((header.entry_count != journal->entries_per_block) || (block_entries > 0))
1617			break;
1618	}
1619
1620	if (!found_entries)
1621		return validate_heads(repair);
1622
1623	/* Set the tail to the last valid tail block, if there is one. */
1624	if (repair->tail_recovery_point.sector_count == 0)
1625		repair->tail--;
1626
1627	result = validate_heads(repair);
1628	if (result != VDO_SUCCESS)
1629		return result;
1630
1631	vdo_log_info("Highest-numbered recovery journal block has sequence number %llu, and the highest-numbered usable block is %llu",
1632		     (unsigned long long) repair->highest_tail,
1633		     (unsigned long long) repair->tail);
1634
1635	result = extract_new_mappings(repair);
1636	if (result != VDO_SUCCESS)
1637		return result;
1638
1639	return compute_usages(repair);
1640}
1641
1642static int parse_journal(struct repair_completion *repair)
1643{
1644	if (!find_recovery_journal_head_and_tail(repair))
1645		return VDO_SUCCESS;
1646
1647	return (vdo_state_requires_read_only_rebuild(repair->completion.vdo->load_state) ?
1648		parse_journal_for_rebuild(repair) :
1649		parse_journal_for_recovery(repair));
1650}
1651
1652static void finish_journal_load(struct vdo_completion *completion)
1653{
1654	struct repair_completion *repair = completion->parent;
1655
1656	if (++repair->vios_complete != repair->vio_count)
1657		return;
1658
1659	vdo_log_info("Finished reading recovery journal");
1660	uninitialize_vios(repair);
1661	prepare_repair_completion(repair, recover_block_map, VDO_ZONE_TYPE_LOGICAL);
1662	vdo_continue_completion(&repair->completion, parse_journal(repair));
1663}
1664
1665static void handle_journal_load_error(struct vdo_completion *completion)
1666{
1667	struct repair_completion *repair = completion->parent;
1668
1669	/* Preserve the error */
1670	vdo_set_completion_result(&repair->completion, completion->result);
1671	vio_record_metadata_io_error(as_vio(completion));
1672	completion->callback(completion);
1673}
1674
1675static void read_journal_endio(struct bio *bio)
1676{
1677	struct vio *vio = bio->bi_private;
1678	struct vdo *vdo = vio->completion.vdo;
1679
1680	continue_vio_after_io(vio, finish_journal_load, vdo->thread_config.admin_thread);
1681}
1682
1683/**
1684 * vdo_repair() - Load the recovery journal and then recover or rebuild a vdo.
1685 * @parent: The completion to notify when the operation is complete
1686 */
1687void vdo_repair(struct vdo_completion *parent)
1688{
1689	int result;
1690	char *ptr;
1691	struct repair_completion *repair;
1692	struct vdo *vdo = parent->vdo;
1693	struct recovery_journal *journal = vdo->recovery_journal;
1694	physical_block_number_t pbn = journal->origin;
1695	block_count_t remaining = journal->size;
1696	block_count_t vio_count = DIV_ROUND_UP(remaining, MAX_BLOCKS_PER_VIO);
1697	page_count_t page_count = min_t(page_count_t,
1698					vdo->device_config->cache_size >> 1,
1699					MAXIMUM_SIMULTANEOUS_VDO_BLOCK_MAP_RESTORATION_READS);
1700
1701	vdo_assert_on_admin_thread(vdo, __func__);
1702
1703	if (vdo->load_state == VDO_FORCE_REBUILD) {
1704		vdo_log_warning("Rebuilding reference counts to clear read-only mode");
1705		vdo->states.vdo.read_only_recoveries++;
1706	} else if (vdo->load_state == VDO_REBUILD_FOR_UPGRADE) {
1707		vdo_log_warning("Rebuilding reference counts for upgrade");
1708	} else {
1709		vdo_log_warning("Device was dirty, rebuilding reference counts");
1710	}
1711
1712	result = vdo_allocate_extended(struct repair_completion, page_count,
1713				       struct vdo_page_completion, __func__,
1714				       &repair);
1715	if (result != VDO_SUCCESS) {
1716		vdo_fail_completion(parent, result);
1717		return;
1718	}
1719
1720	vdo_initialize_completion(&repair->completion, vdo, VDO_REPAIR_COMPLETION);
1721	repair->completion.error_handler = abort_repair;
1722	repair->completion.parent = parent;
1723	prepare_repair_completion(repair, finish_repair, VDO_ZONE_TYPE_ADMIN);
1724	repair->page_count = page_count;
1725
1726	result = vdo_allocate(remaining * VDO_BLOCK_SIZE, char, __func__,
1727			      &repair->journal_data);
1728	if (abort_on_error(result, repair))
1729		return;
1730
1731	result = vdo_allocate(vio_count, struct vio, __func__, &repair->vios);
1732	if (abort_on_error(result, repair))
1733		return;
1734
1735	ptr = repair->journal_data;
1736	for (repair->vio_count = 0; repair->vio_count < vio_count; repair->vio_count++) {
1737		block_count_t blocks = min_t(block_count_t, remaining,
1738					     MAX_BLOCKS_PER_VIO);
1739
1740		result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL,
1741						 VIO_PRIORITY_METADATA,
1742						 repair, blocks, ptr,
1743						 &repair->vios[repair->vio_count]);
1744		if (abort_on_error(result, repair))
1745			return;
1746
1747		ptr += (blocks * VDO_BLOCK_SIZE);
1748		remaining -= blocks;
1749	}
1750
1751	for (vio_count = 0; vio_count < repair->vio_count;
1752	     vio_count++, pbn += MAX_BLOCKS_PER_VIO) {
1753		vdo_submit_metadata_vio(&repair->vios[vio_count], pbn, read_journal_endio,
1754					handle_journal_load_error, REQ_OP_READ);
1755	}
1756}
1757