1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "slab-depot.h"
7
8#include <linux/atomic.h>
9#include <linux/bio.h>
10#include <linux/err.h>
11#include <linux/log2.h>
12#include <linux/min_heap.h>
13#include <linux/minmax.h>
14
15#include "logger.h"
16#include "memory-alloc.h"
17#include "numeric.h"
18#include "permassert.h"
19#include "string-utils.h"
20
21#include "action-manager.h"
22#include "admin-state.h"
23#include "completion.h"
24#include "constants.h"
25#include "data-vio.h"
26#include "encodings.h"
27#include "io-submitter.h"
28#include "physical-zone.h"
29#include "priority-table.h"
30#include "recovery-journal.h"
31#include "repair.h"
32#include "status-codes.h"
33#include "types.h"
34#include "vdo.h"
35#include "vio.h"
36#include "wait-queue.h"
37
38static const u64 BYTES_PER_WORD = sizeof(u64);
39static const bool NORMAL_OPERATION = true;
40
41/**
42 * get_lock() - Get the lock object for a slab journal block by sequence number.
43 * @journal: vdo_slab journal to retrieve from.
44 * @sequence_number: Sequence number of the block.
45 *
46 * Return: The lock object for the given sequence number.
47 */
48static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49							  sequence_number_t sequence_number)
50{
51	return &journal->locks[sequence_number % journal->size];
52}
53
54static bool is_slab_open(struct vdo_slab *slab)
55{
56	return (!vdo_is_state_quiescing(&slab->state) &&
57		!vdo_is_state_quiescent(&slab->state));
58}
59
60/**
61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62 * @journal: The journal to check.
63 *
64 * Return: true if there are no entry waiters, or if the slab is unrecovered.
65 */
66static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67{
68	return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69		vdo_waitq_has_waiters(&journal->entry_waiters));
70}
71
72/**
73 * is_reaping() - Check whether a reap is currently in progress.
74 * @journal: The journal which may be reaping.
75 *
76 * Return: true if the journal is reaping.
77 */
78static inline bool __must_check is_reaping(struct slab_journal *journal)
79{
80	return (journal->head != journal->unreapable);
81}
82
83/**
84 * initialize_tail_block() - Initialize tail block as a new block.
85 * @journal: The journal whose tail block is being initialized.
86 */
87static void initialize_tail_block(struct slab_journal *journal)
88{
89	struct slab_journal_block_header *header = &journal->tail_header;
90
91	header->sequence_number = journal->tail;
92	header->entry_count = 0;
93	header->has_block_map_increments = false;
94}
95
96/**
97 * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98 * @journal: The journal to be reset, based on its tail sequence number.
99 */
100static void initialize_journal_state(struct slab_journal *journal)
101{
102	journal->unreapable = journal->head;
103	journal->reap_lock = get_lock(journal, journal->unreapable);
104	journal->next_commit = journal->tail;
105	journal->summarized = journal->last_summarized = journal->tail;
106	initialize_tail_block(journal);
107}
108
109/**
110 * block_is_full() - Check whether a journal block is full.
111 * @journal: The slab journal for the block.
112 *
113 * Return: true if the tail block is full.
114 */
115static bool __must_check block_is_full(struct slab_journal *journal)
116{
117	journal_entry_count_t count = journal->tail_header.entry_count;
118
119	return (journal->tail_header.has_block_map_increments ?
120		(journal->full_entries_per_block == count) :
121		(journal->entries_per_block == count));
122}
123
124static void add_entries(struct slab_journal *journal);
125static void update_tail_block_location(struct slab_journal *journal);
126static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127
128/**
129 * is_slab_journal_blank() - Check whether a slab's journal is blank.
130 *
131 * A slab journal is blank if it has never had any entries recorded in it.
132 *
133 * Return: true if the slab's journal has never been modified.
134 */
135static bool is_slab_journal_blank(const struct vdo_slab *slab)
136{
137	return ((slab->journal.tail == 1) &&
138		(slab->journal.tail_header.entry_count == 0));
139}
140
141/**
142 * mark_slab_journal_dirty() - Put a slab journal on the dirty ring of its allocator in the correct
143 *                             order.
144 * @journal: The journal to be marked dirty.
145 * @lock: The recovery journal lock held by the slab journal.
146 */
147static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
148{
149	struct slab_journal *dirty_journal;
150	struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
151
152	VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
153
154	journal->recovery_lock = lock;
155	list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
156		if (dirty_journal->recovery_lock <= journal->recovery_lock)
157			break;
158	}
159
160	list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
161}
162
163static void mark_slab_journal_clean(struct slab_journal *journal)
164{
165	journal->recovery_lock = 0;
166	list_del_init(&journal->dirty_entry);
167}
168
169static void check_if_slab_drained(struct vdo_slab *slab)
170{
171	bool read_only;
172	struct slab_journal *journal = &slab->journal;
173	const struct admin_state_code *code;
174
175	if (!vdo_is_state_draining(&slab->state) ||
176	    must_make_entries_to_flush(journal) ||
177	    is_reaping(journal) ||
178	    journal->waiting_to_commit ||
179	    !list_empty(&journal->uncommitted_blocks) ||
180	    journal->updating_slab_summary ||
181	    (slab->active_count > 0))
182		return;
183
184	/* When not suspending or recovering, the slab must be clean. */
185	code = vdo_get_admin_state_code(&slab->state);
186	read_only = vdo_is_read_only(slab->allocator->depot->vdo);
187	if (!read_only &&
188	    vdo_waitq_has_waiters(&slab->dirty_blocks) &&
189	    (code != VDO_ADMIN_STATE_SUSPENDING) &&
190	    (code != VDO_ADMIN_STATE_RECOVERING))
191		return;
192
193	vdo_finish_draining_with_result(&slab->state,
194					(read_only ? VDO_READ_ONLY : VDO_SUCCESS));
195}
196
197/* FULLNESS HINT COMPUTATION */
198
199/**
200 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
201 *                           stored in a slab_summary_entry's 7 bits that are dedicated to its free
202 *                           count.
203 * @depot: The depot whose summary being updated.
204 * @free_blocks: The number of free blocks.
205 *
206 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
207 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
208 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
209 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
210 * is 0, which would make it impossible to distinguish completely full from completely empty.
211 *
212 * Return: A fullness hint, which can be stored in 7 bits.
213 */
214static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
215					     block_count_t free_blocks)
216{
217	block_count_t hint;
218
219	VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
220
221	if (free_blocks == 0)
222		return 0;
223
224	hint = free_blocks >> depot->hint_shift;
225	return ((hint == 0) ? 1 : hint);
226}
227
228/**
229 * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
230 */
231static void check_summary_drain_complete(struct block_allocator *allocator)
232{
233	if (!vdo_is_state_draining(&allocator->summary_state) ||
234	    (allocator->summary_write_count > 0))
235		return;
236
237	vdo_finish_operation(&allocator->summary_state,
238			     (vdo_is_read_only(allocator->depot->vdo) ?
239			      VDO_READ_ONLY : VDO_SUCCESS));
240}
241
242/**
243 * notify_summary_waiters() - Wake all the waiters in a given queue.
244 * @allocator: The block allocator summary which owns the queue.
245 * @queue: The queue to notify.
246 */
247static void notify_summary_waiters(struct block_allocator *allocator,
248				   struct vdo_wait_queue *queue)
249{
250	int result = (vdo_is_read_only(allocator->depot->vdo) ?
251		      VDO_READ_ONLY : VDO_SUCCESS);
252
253	vdo_waitq_notify_all_waiters(queue, NULL, &result);
254}
255
256static void launch_write(struct slab_summary_block *summary_block);
257
258/**
259 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
260 *                                        whether or not the attempt succeeded.
261 * @block: The block.
262 */
263static void finish_updating_slab_summary_block(struct slab_summary_block *block)
264{
265	notify_summary_waiters(block->allocator, &block->current_update_waiters);
266	block->writing = false;
267	block->allocator->summary_write_count--;
268	if (vdo_waitq_has_waiters(&block->next_update_waiters))
269		launch_write(block);
270	else
271		check_summary_drain_complete(block->allocator);
272}
273
274/**
275 * finish_update() - This is the callback for a successful summary block write.
276 * @completion: The write vio.
277 */
278static void finish_update(struct vdo_completion *completion)
279{
280	struct slab_summary_block *block =
281		container_of(as_vio(completion), struct slab_summary_block, vio);
282
283	atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
284	finish_updating_slab_summary_block(block);
285}
286
287/**
288 * handle_write_error() - Handle an error writing a slab summary block.
289 * @completion: The write VIO.
290 */
291static void handle_write_error(struct vdo_completion *completion)
292{
293	struct slab_summary_block *block =
294		container_of(as_vio(completion), struct slab_summary_block, vio);
295
296	vio_record_metadata_io_error(as_vio(completion));
297	vdo_enter_read_only_mode(completion->vdo, completion->result);
298	finish_updating_slab_summary_block(block);
299}
300
301static void write_slab_summary_endio(struct bio *bio)
302{
303	struct vio *vio = bio->bi_private;
304	struct slab_summary_block *block =
305		container_of(vio, struct slab_summary_block, vio);
306
307	continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
308}
309
310/**
311 * launch_write() - Write a slab summary block unless it is currently out for writing.
312 * @block: The block that needs to be committed.
313 */
314static void launch_write(struct slab_summary_block *block)
315{
316	struct block_allocator *allocator = block->allocator;
317	struct slab_depot *depot = allocator->depot;
318	physical_block_number_t pbn;
319
320	if (block->writing)
321		return;
322
323	allocator->summary_write_count++;
324	vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
325				       &block->current_update_waiters);
326	block->writing = true;
327
328	if (vdo_is_read_only(depot->vdo)) {
329		finish_updating_slab_summary_block(block);
330		return;
331	}
332
333	memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
334
335	/*
336	 * Flush before writing to ensure that the slab journal tail blocks and reference updates
337	 * covered by this summary update are stable. Otherwise, a subsequent recovery could
338	 * encounter a slab summary update that refers to a slab journal tail block that has not
339	 * actually been written. In such cases, the slab journal referenced will be treated as
340	 * empty, causing any data within the slab which predates the existing recovery journal
341	 * entries to be lost.
342	 */
343	pbn = (depot->summary_origin +
344	       (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
345	       block->index);
346	vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
347				handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
348}
349
350/**
351 * update_slab_summary_entry() - Update the entry for a slab.
352 * @slab: The slab whose entry is to be updated
353 * @waiter: The waiter that is updating the summary.
354 * @tail_block_offset: The offset of the slab journal's tail block.
355 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
356 * @is_clean: Whether the slab is clean.
357 * @free_blocks: The number of free blocks.
358 */
359static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
360				      tail_block_offset_t tail_block_offset,
361				      bool load_ref_counts, bool is_clean,
362				      block_count_t free_blocks)
363{
364	u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
365	struct block_allocator *allocator = slab->allocator;
366	struct slab_summary_block *block = &allocator->summary_blocks[index];
367	int result;
368	struct slab_summary_entry *entry;
369
370	if (vdo_is_read_only(block->vio.completion.vdo)) {
371		result = VDO_READ_ONLY;
372		waiter->callback(waiter, &result);
373		return;
374	}
375
376	if (vdo_is_state_draining(&allocator->summary_state) ||
377	    vdo_is_state_quiescent(&allocator->summary_state)) {
378		result = VDO_INVALID_ADMIN_STATE;
379		waiter->callback(waiter, &result);
380		return;
381	}
382
383	entry = &allocator->summary_entries[slab->slab_number];
384	*entry = (struct slab_summary_entry) {
385		.tail_block_offset = tail_block_offset,
386		.load_ref_counts = (entry->load_ref_counts || load_ref_counts),
387		.is_dirty = !is_clean,
388		.fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
389	};
390	vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
391	launch_write(block);
392}
393
394/**
395 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
396 *                    complete.
397 * @journal: The journal to be reaped.
398 */
399static void finish_reaping(struct slab_journal *journal)
400{
401	journal->head = journal->unreapable;
402	add_entries(journal);
403	check_if_slab_drained(journal->slab);
404}
405
406static void reap_slab_journal(struct slab_journal *journal);
407
408/**
409 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
410 *                      reaping again in case we deferred reaping due to an outstanding vio.
411 * @completion: The flush vio.
412 */
413static void complete_reaping(struct vdo_completion *completion)
414{
415	struct slab_journal *journal = completion->parent;
416
417	return_vio_to_pool(journal->slab->allocator->vio_pool,
418			   vio_as_pooled_vio(as_vio(vdo_forget(completion))));
419	finish_reaping(journal);
420	reap_slab_journal(journal);
421}
422
423/**
424 * handle_flush_error() - Handle an error flushing the lower layer.
425 * @completion: The flush vio.
426 */
427static void handle_flush_error(struct vdo_completion *completion)
428{
429	vio_record_metadata_io_error(as_vio(completion));
430	vdo_enter_read_only_mode(completion->vdo, completion->result);
431	complete_reaping(completion);
432}
433
434static void flush_endio(struct bio *bio)
435{
436	struct vio *vio = bio->bi_private;
437	struct slab_journal *journal = vio->completion.parent;
438
439	continue_vio_after_io(vio, complete_reaping,
440			      journal->slab->allocator->thread_id);
441}
442
443/**
444 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
445 *                       prior to reaping.
446 * @waiter: The journal as a flush waiter.
447 * @context: The newly acquired flush vio.
448 */
449static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
450{
451	struct slab_journal *journal =
452		container_of(waiter, struct slab_journal, flush_waiter);
453	struct pooled_vio *pooled = context;
454	struct vio *vio = &pooled->vio;
455
456	vio->completion.parent = journal;
457	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
458}
459
460/**
461 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
462 * @journal: The slab journal.
463 */
464static void reap_slab_journal(struct slab_journal *journal)
465{
466	bool reaped = false;
467
468	if (is_reaping(journal)) {
469		/* We already have a reap in progress so wait for it to finish. */
470		return;
471	}
472
473	if ((journal->slab->status != VDO_SLAB_REBUILT) ||
474	    !vdo_is_state_normal(&journal->slab->state) ||
475	    vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
476		/*
477		 * We must not reap in the first two cases, and there's no point in read-only mode.
478		 */
479		return;
480	}
481
482	/*
483	 * Start reclaiming blocks only when the journal head has no references. Then stop when a
484	 * block is referenced or reap reaches the most recently written block, referenced by the
485	 * slab summary, which has the sequence number just before the tail.
486	 */
487	while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
488		reaped = true;
489		journal->unreapable++;
490		journal->reap_lock++;
491		if (journal->reap_lock == &journal->locks[journal->size])
492			journal->reap_lock = &journal->locks[0];
493	}
494
495	if (!reaped)
496		return;
497
498	/*
499	 * It is never safe to reap a slab journal block without first issuing a flush, regardless
500	 * of whether a user flush has been received or not. In the absence of the flush, the
501	 * reference block write which released the locks allowing the slab journal to reap may not
502	 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
503	 * journal block writes can be issued while previous slab summary updates have not yet been
504	 * made. Even though those slab journal block writes will be ignored if the slab summary
505	 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
506	 * resulting in a loss of reference count updates.
507	 */
508	journal->flush_waiter.callback = flush_for_reaping;
509	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
510			      &journal->flush_waiter);
511}
512
513/**
514 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
515 * @journal: The slab journal.
516 * @sequence_number: The journal sequence number of the referenced block.
517 * @adjustment: Amount to adjust the reference counter.
518 *
519 * Note that when the adjustment is negative, the slab journal will be reaped.
520 */
521static void adjust_slab_journal_block_reference(struct slab_journal *journal,
522						sequence_number_t sequence_number,
523						int adjustment)
524{
525	struct journal_lock *lock;
526
527	if (sequence_number == 0)
528		return;
529
530	if (journal->slab->status == VDO_SLAB_REPLAYING) {
531		/* Locks should not be used during offline replay. */
532		return;
533	}
534
535	VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
536	lock = get_lock(journal, sequence_number);
537	if (adjustment < 0) {
538		VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count),
539				    "adjustment %d of lock count %u for slab journal block %llu must not underflow",
540				    adjustment, lock->count,
541				    (unsigned long long) sequence_number);
542	}
543
544	lock->count += adjustment;
545	if (lock->count == 0)
546		reap_slab_journal(journal);
547}
548
549/**
550 * release_journal_locks() - Callback invoked after a slab summary update completes.
551 * @waiter: The slab summary waiter that has just been notified.
552 * @context: The result code of the update.
553 *
554 * Registered in the constructor on behalf of update_tail_block_location().
555 *
556 * Implements waiter_callback_fn.
557 */
558static void release_journal_locks(struct vdo_waiter *waiter, void *context)
559{
560	sequence_number_t first, i;
561	struct slab_journal *journal =
562		container_of(waiter, struct slab_journal, slab_summary_waiter);
563	int result = *((int *) context);
564
565	if (result != VDO_SUCCESS) {
566		if (result != VDO_READ_ONLY) {
567			/*
568			 * Don't bother logging what might be lots of errors if we are already in
569			 * read-only mode.
570			 */
571			vdo_log_error_strerror(result, "failed slab summary update %llu",
572					       (unsigned long long) journal->summarized);
573		}
574
575		journal->updating_slab_summary = false;
576		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
577		check_if_slab_drained(journal->slab);
578		return;
579	}
580
581	if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
582		journal->partial_write_in_progress = false;
583		add_entries(journal);
584	}
585
586	first = journal->last_summarized;
587	journal->last_summarized = journal->summarized;
588	for (i = journal->summarized - 1; i >= first; i--) {
589		/*
590		 * Release the lock the summarized block held on the recovery journal. (During
591		 * replay, recovery_start will always be 0.)
592		 */
593		if (journal->recovery_journal != NULL) {
594			zone_count_t zone_number = journal->slab->allocator->zone_number;
595			struct journal_lock *lock = get_lock(journal, i);
596
597			vdo_release_recovery_journal_block_reference(journal->recovery_journal,
598								     lock->recovery_start,
599								     VDO_ZONE_TYPE_PHYSICAL,
600								     zone_number);
601		}
602
603		/*
604		 * Release our own lock against reaping for blocks that are committed. (This
605		 * function will not change locks during replay.)
606		 */
607		adjust_slab_journal_block_reference(journal, i, -1);
608	}
609
610	journal->updating_slab_summary = false;
611
612	reap_slab_journal(journal);
613
614	/* Check if the slab summary needs to be updated again. */
615	update_tail_block_location(journal);
616}
617
618/**
619 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
620 * @journal: The slab journal that is updating its tail block location.
621 */
622static void update_tail_block_location(struct slab_journal *journal)
623{
624	block_count_t free_block_count;
625	struct vdo_slab *slab = journal->slab;
626
627	if (journal->updating_slab_summary ||
628	    vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
629	    (journal->last_summarized >= journal->next_commit)) {
630		check_if_slab_drained(slab);
631		return;
632	}
633
634	if (slab->status != VDO_SLAB_REBUILT) {
635		u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
636
637		free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
638	} else {
639		free_block_count = slab->free_blocks;
640	}
641
642	journal->summarized = journal->next_commit;
643	journal->updating_slab_summary = true;
644
645	/*
646	 * Update slab summary as dirty.
647	 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
648	 * slab have been written to the layer. Therefore, indicate that the ref counts must be
649	 * loaded when the journal head has reaped past sequence number 1.
650	 */
651	update_slab_summary_entry(slab, &journal->slab_summary_waiter,
652				  journal->summarized % journal->size,
653				  (journal->head > 1), false, free_block_count);
654}
655
656/**
657 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
658 */
659static void reopen_slab_journal(struct vdo_slab *slab)
660{
661	struct slab_journal *journal = &slab->journal;
662	sequence_number_t block;
663
664	VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
665			    "vdo_slab journal's active block empty before reopening");
666	journal->head = journal->tail;
667	initialize_journal_state(journal);
668
669	/* Ensure no locks are spuriously held on an empty journal. */
670	for (block = 1; block <= journal->size; block++) {
671		VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
672				    "Scrubbed journal's block %llu is not locked",
673				    (unsigned long long) block);
674	}
675
676	add_entries(journal);
677}
678
679static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
680{
681	const struct packed_slab_journal_block *block =
682		(const struct packed_slab_journal_block *) vio->vio.data;
683
684	return __le64_to_cpu(block->header.sequence_number);
685}
686
687/**
688 * complete_write() - Handle post-commit processing.
689 * @completion: The write vio as a completion.
690 *
691 * This is the callback registered by write_slab_journal_block().
692 */
693static void complete_write(struct vdo_completion *completion)
694{
695	int result = completion->result;
696	struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
697	struct slab_journal *journal = completion->parent;
698	sequence_number_t committed = get_committing_sequence_number(pooled);
699
700	list_del_init(&pooled->list_entry);
701	return_vio_to_pool(journal->slab->allocator->vio_pool, vdo_forget(pooled));
702
703	if (result != VDO_SUCCESS) {
704		vio_record_metadata_io_error(as_vio(completion));
705		vdo_log_error_strerror(result, "cannot write slab journal block %llu",
706				       (unsigned long long) committed);
707		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
708		check_if_slab_drained(journal->slab);
709		return;
710	}
711
712	WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
713
714	if (list_empty(&journal->uncommitted_blocks)) {
715		/* If no blocks are outstanding, then the commit point is at the tail. */
716		journal->next_commit = journal->tail;
717	} else {
718		/* The commit point is always the beginning of the oldest incomplete block. */
719		pooled = container_of(journal->uncommitted_blocks.next,
720				      struct pooled_vio, list_entry);
721		journal->next_commit = get_committing_sequence_number(pooled);
722	}
723
724	update_tail_block_location(journal);
725}
726
727static void write_slab_journal_endio(struct bio *bio)
728{
729	struct vio *vio = bio->bi_private;
730	struct slab_journal *journal = vio->completion.parent;
731
732	continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
733}
734
735/**
736 * write_slab_journal_block() - Write a slab journal block.
737 * @waiter: The vio pool waiter which was just notified.
738 * @context: The vio pool entry for the write.
739 *
740 * Callback from acquire_vio_from_pool() registered in commit_tail().
741 */
742static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
743{
744	struct pooled_vio *pooled = context;
745	struct vio *vio = &pooled->vio;
746	struct slab_journal *journal =
747		container_of(waiter, struct slab_journal, resource_waiter);
748	struct slab_journal_block_header *header = &journal->tail_header;
749	int unused_entries = journal->entries_per_block - header->entry_count;
750	physical_block_number_t block_number;
751	const struct admin_state_code *operation;
752
753	header->head = journal->head;
754	list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
755	vdo_pack_slab_journal_block_header(header, &journal->block->header);
756
757	/* Copy the tail block into the vio. */
758	memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
759
760	VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
761	if (unused_entries > 0) {
762		/*
763		 * Release the per-entry locks for any unused entries in the block we are about to
764		 * write.
765		 */
766		adjust_slab_journal_block_reference(journal, header->sequence_number,
767						    -unused_entries);
768		journal->partial_write_in_progress = !block_is_full(journal);
769	}
770
771	block_number = journal->slab->journal_origin +
772		(header->sequence_number % journal->size);
773	vio->completion.parent = journal;
774
775	/*
776	 * This block won't be read in recovery until the slab summary is updated to refer to it.
777	 * The slab summary update does a flush which is sufficient to protect us from corruption
778	 * due to out of order slab journal, reference block, or block map writes.
779	 */
780	vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio,
781				complete_write, REQ_OP_WRITE);
782
783	/* Since the write is submitted, the tail block structure can be reused. */
784	journal->tail++;
785	initialize_tail_block(journal);
786	journal->waiting_to_commit = false;
787
788	operation = vdo_get_admin_state_code(&journal->slab->state);
789	if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
790		vdo_finish_operation(&journal->slab->state,
791				     (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
792				      VDO_READ_ONLY : VDO_SUCCESS));
793		return;
794	}
795
796	add_entries(journal);
797}
798
799/**
800 * commit_tail() - Commit the tail block of the slab journal.
801 * @journal: The journal whose tail block should be committed.
802 */
803static void commit_tail(struct slab_journal *journal)
804{
805	if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
806		/*
807		 * There are no entries at the moment, but there are some waiters, so defer
808		 * initiating the flush until those entries are ready to write.
809		 */
810		return;
811	}
812
813	if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
814	    journal->waiting_to_commit ||
815	    (journal->tail_header.entry_count == 0)) {
816		/*
817		 * There is nothing to do since the tail block is empty, or writing, or the journal
818		 * is in read-only mode.
819		 */
820		return;
821	}
822
823	/*
824	 * Since we are about to commit the tail block, this journal no longer needs to be on the
825	 * ring of journals which the recovery journal might ask to commit.
826	 */
827	mark_slab_journal_clean(journal);
828
829	journal->waiting_to_commit = true;
830
831	journal->resource_waiter.callback = write_slab_journal_block;
832	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
833			      &journal->resource_waiter);
834}
835
836/**
837 * encode_slab_journal_entry() - Encode a slab journal entry.
838 * @tail_header: The unpacked header for the block.
839 * @payload: The journal block payload to hold the entry.
840 * @sbn: The slab block number of the entry to encode.
841 * @operation: The type of the entry.
842 * @increment: True if this is an increment.
843 *
844 * Exposed for unit tests.
845 */
846static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
847				      slab_journal_payload *payload,
848				      slab_block_number sbn,
849				      enum journal_operation operation,
850				      bool increment)
851{
852	journal_entry_count_t entry_number = tail_header->entry_count++;
853
854	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
855		if (!tail_header->has_block_map_increments) {
856			memset(payload->full_entries.entry_types, 0,
857			       VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
858			tail_header->has_block_map_increments = true;
859		}
860
861		payload->full_entries.entry_types[entry_number / 8] |=
862			((u8)1 << (entry_number % 8));
863	}
864
865	vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
866}
867
868/**
869 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
870 *                          increment and a decrement to a single point which refers to one or the
871 *                          other.
872 * @recovery_point: The journal point to convert.
873 * @increment: Whether the current entry is an increment.
874 *
875 * Return: The expanded journal point
876 *
877 * Because each data_vio has but a single recovery journal point, but may need to make both
878 * increment and decrement entries in the same slab journal. In order to distinguish the two
879 * entries, the entry count of the expanded journal point is twice the actual recovery journal
880 * entry count for increments, and one more than that for decrements.
881 */
882static struct journal_point expand_journal_point(struct journal_point recovery_point,
883						 bool increment)
884{
885	recovery_point.entry_count *= 2;
886	if (!increment)
887		recovery_point.entry_count++;
888
889	return recovery_point;
890}
891
892/**
893 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
894 *               block becomes full.
895 * @journal: The slab journal to append to.
896 * @pbn: The pbn being adjusted.
897 * @operation: The type of entry to make.
898 * @increment: True if this is an increment.
899 * @recovery_point: The expanded recovery point.
900 *
901 * This function is synchronous.
902 */
903static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
904		      enum journal_operation operation, bool increment,
905		      struct journal_point recovery_point)
906{
907	struct packed_slab_journal_block *block = journal->block;
908	int result;
909
910	result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
911						     &recovery_point),
912			    "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
913			    (unsigned long long) recovery_point.sequence_number,
914			    recovery_point.entry_count,
915			    (unsigned long long) journal->tail_header.recovery_point.sequence_number,
916			    journal->tail_header.recovery_point.entry_count);
917	if (result != VDO_SUCCESS) {
918		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
919		return;
920	}
921
922	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
923		result = VDO_ASSERT((journal->tail_header.entry_count <
924				     journal->full_entries_per_block),
925				    "block has room for full entries");
926		if (result != VDO_SUCCESS) {
927			vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
928						 result);
929			return;
930		}
931	}
932
933	encode_slab_journal_entry(&journal->tail_header, &block->payload,
934				  pbn - journal->slab->start, operation, increment);
935	journal->tail_header.recovery_point = recovery_point;
936	if (block_is_full(journal))
937		commit_tail(journal);
938}
939
940static inline block_count_t journal_length(const struct slab_journal *journal)
941{
942	return journal->tail - journal->head;
943}
944
945/**
946 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
947 * @slab: The slab to play into.
948 * @pbn: The PBN for the entry.
949 * @operation: The type of entry to add.
950 * @increment: True if this entry is an increment.
951 * @recovery_point: The recovery journal point corresponding to this entry.
952 * @parent: The completion to notify when there is space to add the entry if the entry could not be
953 *          added immediately.
954 *
955 * Return: true if the entry was added immediately.
956 */
957bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
958				  enum journal_operation operation, bool increment,
959				  struct journal_point *recovery_point,
960				  struct vdo_completion *parent)
961{
962	struct slab_journal *journal = &slab->journal;
963	struct slab_journal_block_header *header = &journal->tail_header;
964	struct journal_point expanded = expand_journal_point(*recovery_point, increment);
965
966	/* Only accept entries after the current recovery point. */
967	if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
968		return true;
969
970	if ((header->entry_count >= journal->full_entries_per_block) &&
971	    (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
972		/*
973		 * The tail block does not have room for the entry we are attempting to add so
974		 * commit the tail block now.
975		 */
976		commit_tail(journal);
977	}
978
979	if (journal->waiting_to_commit) {
980		vdo_start_operation_with_waiter(&journal->slab->state,
981						VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
982						parent, NULL);
983		return false;
984	}
985
986	if (journal_length(journal) >= journal->size) {
987		/*
988		 * We must have reaped the current head before the crash, since the blocked
989		 * threshold keeps us from having more entries than fit in a slab journal; hence we
990		 * can just advance the head (and unreapable block), as needed.
991		 */
992		journal->head++;
993		journal->unreapable++;
994	}
995
996	if (journal->slab->status == VDO_SLAB_REBUILT)
997		journal->slab->status = VDO_SLAB_REPLAYING;
998
999	add_entry(journal, pbn, operation, increment, expanded);
1000	return true;
1001}
1002
1003/**
1004 * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1005 * @journal: The journal to check.
1006 *
1007 * Return: true if the journal must be reaped.
1008 */
1009static bool requires_reaping(const struct slab_journal *journal)
1010{
1011	return (journal_length(journal) >= journal->blocking_threshold);
1012}
1013
1014/** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
1015static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1016{
1017	struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1018	int result = *((int *) context);
1019
1020	slab->active_count--;
1021
1022	if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1023		vdo_log_error_strerror(result, "failed to update slab summary");
1024		vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1025	}
1026
1027	check_if_slab_drained(slab);
1028}
1029
1030static void write_reference_block(struct vdo_waiter *waiter, void *context);
1031
1032/**
1033 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1034 *                                  a VIO for it from the pool.
1035 * @waiter: The waiter of the block which is starting to write.
1036 * @context: The parent slab of the block.
1037 *
1038 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1039 * currently in use.
1040 */
1041static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1042{
1043	struct vdo_slab *slab = context;
1044
1045	if (vdo_is_read_only(slab->allocator->depot->vdo))
1046		return;
1047
1048	slab->active_count++;
1049	container_of(waiter, struct reference_block, waiter)->is_writing = true;
1050	waiter->callback = write_reference_block;
1051	acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1052}
1053
1054static void save_dirty_reference_blocks(struct vdo_slab *slab)
1055{
1056	vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1057				     launch_reference_block_write, slab);
1058	check_if_slab_drained(slab);
1059}
1060
1061/**
1062 * finish_reference_block_write() - After a reference block has written, clean it, release its
1063 *                                  locks, and return its VIO to the pool.
1064 * @completion: The VIO that just finished writing.
1065 */
1066static void finish_reference_block_write(struct vdo_completion *completion)
1067{
1068	struct vio *vio = as_vio(completion);
1069	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1070	struct reference_block *block = completion->parent;
1071	struct vdo_slab *slab = block->slab;
1072	tail_block_offset_t offset;
1073
1074	slab->active_count--;
1075
1076	/* Release the slab journal lock. */
1077	adjust_slab_journal_block_reference(&slab->journal,
1078					    block->slab_journal_lock_to_release, -1);
1079	return_vio_to_pool(slab->allocator->vio_pool, pooled);
1080
1081	/*
1082	 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1083	 * us to be dirtied again, but we don't want to double enqueue.
1084	 */
1085	block->is_writing = false;
1086
1087	if (vdo_is_read_only(completion->vdo)) {
1088		check_if_slab_drained(slab);
1089		return;
1090	}
1091
1092	/* Re-queue the block if it was re-dirtied while it was writing. */
1093	if (block->is_dirty) {
1094		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1095		if (vdo_is_state_draining(&slab->state)) {
1096			/* We must be saving, and this block will otherwise not be relaunched. */
1097			save_dirty_reference_blocks(slab);
1098		}
1099
1100		return;
1101	}
1102
1103	/*
1104	 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1105	 * and no summary update in progress.
1106	 */
1107	if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1108		check_if_slab_drained(slab);
1109		return;
1110	}
1111
1112	offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1113	slab->active_count++;
1114	slab->summary_waiter.callback = finish_summary_update;
1115	update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1116				  true, true, slab->free_blocks);
1117}
1118
1119/**
1120 * get_reference_counters_for_block() - Find the reference counters for a given block.
1121 * @block: The reference_block in question.
1122 *
1123 * Return: A pointer to the reference counters for this block.
1124 */
1125static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1126{
1127	size_t block_index = block - block->slab->reference_blocks;
1128
1129	return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1130}
1131
1132/**
1133 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1134 * @block: The block to copy.
1135 * @buffer: The char buffer to fill with the packed block.
1136 */
1137static void pack_reference_block(struct reference_block *block, void *buffer)
1138{
1139	struct packed_reference_block *packed = buffer;
1140	vdo_refcount_t *counters = get_reference_counters_for_block(block);
1141	sector_count_t i;
1142	struct packed_journal_point commit_point;
1143
1144	vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1145
1146	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1147		packed->sectors[i].commit_point = commit_point;
1148		memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1149		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1150	}
1151}
1152
1153static void write_reference_block_endio(struct bio *bio)
1154{
1155	struct vio *vio = bio->bi_private;
1156	struct reference_block *block = vio->completion.parent;
1157	thread_id_t thread_id = block->slab->allocator->thread_id;
1158
1159	continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1160}
1161
1162/**
1163 * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1164 * @completion: The VIO doing the I/O as a completion.
1165 */
1166static void handle_io_error(struct vdo_completion *completion)
1167{
1168	int result = completion->result;
1169	struct vio *vio = as_vio(completion);
1170	struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1171
1172	vio_record_metadata_io_error(vio);
1173	return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
1174	slab->active_count--;
1175	vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1176	check_if_slab_drained(slab);
1177}
1178
1179/**
1180 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1181 *                           its counters and associated data into the VIO, and launch the write.
1182 * @waiter: The waiter of the dirty block.
1183 * @context: The VIO returned by the pool.
1184 */
1185static void write_reference_block(struct vdo_waiter *waiter, void *context)
1186{
1187	size_t block_offset;
1188	physical_block_number_t pbn;
1189	struct pooled_vio *pooled = context;
1190	struct vdo_completion *completion = &pooled->vio.completion;
1191	struct reference_block *block = container_of(waiter, struct reference_block,
1192						     waiter);
1193
1194	pack_reference_block(block, pooled->vio.data);
1195	block_offset = (block - block->slab->reference_blocks);
1196	pbn = (block->slab->ref_counts_origin + block_offset);
1197	block->slab_journal_lock_to_release = block->slab_journal_lock;
1198	completion->parent = block;
1199
1200	/*
1201	 * Mark the block as clean, since we won't be committing any updates that happen after this
1202	 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1203	 * cause complications.
1204	 */
1205	block->is_dirty = false;
1206
1207	/*
1208	 * Flush before writing to ensure that the recovery journal and slab journal entries which
1209	 * cover this reference update are stable. This prevents data corruption that can be caused
1210	 * by out of order writes.
1211	 */
1212	WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1213		   block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1214
1215	completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1216	vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1217				handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1218}
1219
1220static void reclaim_journal_space(struct slab_journal *journal)
1221{
1222	block_count_t length = journal_length(journal);
1223	struct vdo_slab *slab = journal->slab;
1224	block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1225	block_count_t written;
1226
1227	if ((length < journal->flushing_threshold) || (write_count == 0))
1228		return;
1229
1230	/* The slab journal is over the first threshold, schedule some reference block writes. */
1231	WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1232	if (length < journal->flushing_deadline) {
1233		/* Schedule more writes the closer to the deadline we get. */
1234		write_count /= journal->flushing_deadline - length + 1;
1235		write_count = max_t(block_count_t, write_count, 1);
1236	}
1237
1238	for (written = 0; written < write_count; written++) {
1239		vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1240					     launch_reference_block_write, slab);
1241	}
1242}
1243
1244/**
1245 * reference_count_to_status() - Convert a reference count to a reference status.
1246 * @count: The count to convert.
1247 *
1248 * Return: The appropriate reference status.
1249 */
1250static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1251{
1252	if (count == EMPTY_REFERENCE_COUNT)
1253		return RS_FREE;
1254	else if (count == 1)
1255		return RS_SINGLE;
1256	else if (count == PROVISIONAL_REFERENCE_COUNT)
1257		return RS_PROVISIONAL;
1258	else
1259		return RS_SHARED;
1260}
1261
1262/**
1263 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1264 *                 if it wasn't already dirty.
1265 * @block: The reference block to mark as dirty.
1266 */
1267static void dirty_block(struct reference_block *block)
1268{
1269	if (block->is_dirty)
1270		return;
1271
1272	block->is_dirty = true;
1273	if (!block->is_writing)
1274		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1275}
1276
1277/**
1278 * get_reference_block() - Get the reference block that covers the given block index.
1279 */
1280static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1281								 slab_block_number index)
1282{
1283	return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1284}
1285
1286/**
1287 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1288 *                                block number.
1289 * @slab: The slab.
1290 * @physical_block_number: The physical block number.
1291 * @slab_block_number_ptr: A pointer to the slab block number.
1292 *
1293 * Return: VDO_SUCCESS or an error code.
1294 */
1295static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1296						   physical_block_number_t pbn,
1297						   slab_block_number *slab_block_number_ptr)
1298{
1299	u64 slab_block_number;
1300
1301	if (pbn < slab->start)
1302		return VDO_OUT_OF_RANGE;
1303
1304	slab_block_number = pbn - slab->start;
1305	if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1306		return VDO_OUT_OF_RANGE;
1307
1308	*slab_block_number_ptr = slab_block_number;
1309	return VDO_SUCCESS;
1310}
1311
1312/**
1313 * get_reference_counter() - Get the reference counter that covers the given physical block number.
1314 * @slab: The slab to query.
1315 * @pbn: The physical block number.
1316 * @counter_ptr: A pointer to the reference counter.
1317 */
1318static int __must_check get_reference_counter(struct vdo_slab *slab,
1319					      physical_block_number_t pbn,
1320					      vdo_refcount_t **counter_ptr)
1321{
1322	slab_block_number index;
1323	int result = slab_block_number_from_pbn(slab, pbn, &index);
1324
1325	if (result != VDO_SUCCESS)
1326		return result;
1327
1328	*counter_ptr = &slab->counters[index];
1329
1330	return VDO_SUCCESS;
1331}
1332
1333static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1334{
1335	block_count_t free_blocks = slab->free_blocks;
1336	unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1337	unsigned int priority;
1338
1339	/*
1340	 * Wholly full slabs must be the only ones with lowest priority, 0.
1341	 *
1342	 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1343	 * have lower priority than previously opened slabs that have a significant number of free
1344	 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1345	 * unless there are very few free blocks that have been previously written to.
1346	 *
1347	 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1348	 * a better client of any underlying storage that is thinly-provisioned (though discarding
1349	 * would be better).
1350	 *
1351	 * For all other slabs, the priority is derived from the logarithm of the number of free
1352	 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1353	 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1354	 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1355	 */
1356
1357	if (free_blocks == 0)
1358		return 0;
1359
1360	if (is_slab_journal_blank(slab))
1361		return unopened_slab_priority;
1362
1363	priority = (1 + ilog2(free_blocks));
1364	return ((priority < unopened_slab_priority) ? priority : priority + 1);
1365}
1366
1367/*
1368 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1369 * so slabs with lots of free blocks will be opened for allocation before slabs that have few free
1370 * blocks.
1371 */
1372static void prioritize_slab(struct vdo_slab *slab)
1373{
1374	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1375			    "a slab must not already be on a ring when prioritizing");
1376	slab->priority = calculate_slab_priority(slab);
1377	vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1378				   slab->priority, &slab->allocq_entry);
1379}
1380
1381/**
1382 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1383 * @incremented: true if the free block count went up.
1384 */
1385static void adjust_free_block_count(struct vdo_slab *slab, bool incremented)
1386{
1387	struct block_allocator *allocator = slab->allocator;
1388
1389	WRITE_ONCE(allocator->allocated_blocks,
1390		   allocator->allocated_blocks + (incremented ? -1 : 1));
1391
1392	/* The open slab doesn't need to be reprioritized until it is closed. */
1393	if (slab == allocator->open_slab)
1394		return;
1395
1396	/* Don't bother adjusting the priority table if unneeded. */
1397	if (slab->priority == calculate_slab_priority(slab))
1398		return;
1399
1400	/*
1401	 * Reprioritize the slab to reflect the new free block count by removing it from the table
1402	 * and re-enqueuing it with the new priority.
1403	 */
1404	vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1405	prioritize_slab(slab);
1406}
1407
1408/**
1409 * increment_for_data() - Increment the reference count for a data block.
1410 * @slab: The slab which owns the block.
1411 * @block: The reference block which contains the block being updated.
1412 * @block_number: The block to update.
1413 * @old_status: The reference status of the data block before this increment.
1414 * @lock: The pbn_lock associated with this increment (may be NULL).
1415 * @counter_ptr: A pointer to the count for the data block (in, out).
1416 * @adjust_block_count: Whether to update the allocator's free block count.
1417 *
1418 * Return: VDO_SUCCESS or an error.
1419 */
1420static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1421			      slab_block_number block_number,
1422			      enum reference_status old_status,
1423			      struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1424			      bool adjust_block_count)
1425{
1426	switch (old_status) {
1427	case RS_FREE:
1428		*counter_ptr = 1;
1429		block->allocated_count++;
1430		slab->free_blocks--;
1431		if (adjust_block_count)
1432			adjust_free_block_count(slab, false);
1433
1434		break;
1435
1436	case RS_PROVISIONAL:
1437		*counter_ptr = 1;
1438		break;
1439
1440	default:
1441		/* Single or shared */
1442		if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1443			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1444						      "Incrementing a block already having 254 references (slab %u, offset %u)",
1445						      slab->slab_number, block_number);
1446		}
1447		(*counter_ptr)++;
1448	}
1449
1450	if (lock != NULL)
1451		vdo_unassign_pbn_lock_provisional_reference(lock);
1452	return VDO_SUCCESS;
1453}
1454
1455/**
1456 * decrement_for_data() - Decrement the reference count for a data block.
1457 * @slab: The slab which owns the block.
1458 * @block: The reference block which contains the block being updated.
1459 * @block_number: The block to update.
1460 * @old_status: The reference status of the data block before this decrement.
1461 * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1462 * @lock: The pbn_lock associated with the block being decremented (may be NULL).
1463 * @counter_ptr: A pointer to the count for the data block (in, out).
1464 * @adjust_block_count: Whether to update the allocator's free block count.
1465 *
1466 * Return: VDO_SUCCESS or an error.
1467 */
1468static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1469			      slab_block_number block_number,
1470			      enum reference_status old_status,
1471			      struct reference_updater *updater,
1472			      vdo_refcount_t *counter_ptr, bool adjust_block_count)
1473{
1474	switch (old_status) {
1475	case RS_FREE:
1476		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1477					      "Decrementing free block at offset %u in slab %u",
1478					      block_number, slab->slab_number);
1479
1480	case RS_PROVISIONAL:
1481	case RS_SINGLE:
1482		if (updater->zpbn.zone != NULL) {
1483			struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1484									       updater->zpbn.pbn);
1485
1486			if (lock != NULL) {
1487				/*
1488				 * There is a read lock on this block, so the block must not become
1489				 * unreferenced.
1490				 */
1491				*counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1492				vdo_assign_pbn_lock_provisional_reference(lock);
1493				break;
1494			}
1495		}
1496
1497		*counter_ptr = EMPTY_REFERENCE_COUNT;
1498		block->allocated_count--;
1499		slab->free_blocks++;
1500		if (adjust_block_count)
1501			adjust_free_block_count(slab, true);
1502
1503		break;
1504
1505	default:
1506		/* Shared */
1507		(*counter_ptr)--;
1508	}
1509
1510	return VDO_SUCCESS;
1511}
1512
1513/**
1514 * increment_for_block_map() - Increment the reference count for a block map page.
1515 * @slab: The slab which owns the block.
1516 * @block: The reference block which contains the block being updated.
1517 * @block_number: The block to update.
1518 * @old_status: The reference status of the block before this increment.
1519 * @lock: The pbn_lock associated with this increment (may be NULL).
1520 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1521 * @counter_ptr: A pointer to the count for the block (in, out).
1522 * @adjust_block_count: Whether to update the allocator's free block count.
1523 *
1524 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1525 * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1526 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1527 * blocks.
1528 *
1529 * Return: VDO_SUCCESS or an error.
1530 */
1531static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1532				   slab_block_number block_number,
1533				   enum reference_status old_status,
1534				   struct pbn_lock *lock, bool normal_operation,
1535				   vdo_refcount_t *counter_ptr, bool adjust_block_count)
1536{
1537	switch (old_status) {
1538	case RS_FREE:
1539		if (normal_operation) {
1540			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1541						      "Incrementing unallocated block map block (slab %u, offset %u)",
1542						      slab->slab_number, block_number);
1543		}
1544
1545		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1546		block->allocated_count++;
1547		slab->free_blocks--;
1548		if (adjust_block_count)
1549			adjust_free_block_count(slab, false);
1550
1551		return VDO_SUCCESS;
1552
1553	case RS_PROVISIONAL:
1554		if (!normal_operation)
1555			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1556						      "Block map block had provisional reference during replay (slab %u, offset %u)",
1557						      slab->slab_number, block_number);
1558
1559		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1560		if (lock != NULL)
1561			vdo_unassign_pbn_lock_provisional_reference(lock);
1562		return VDO_SUCCESS;
1563
1564	default:
1565		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1566					      "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1567					      *counter_ptr, slab->slab_number,
1568					      block_number);
1569	}
1570}
1571
1572static bool __must_check is_valid_journal_point(const struct journal_point *point)
1573{
1574	return ((point != NULL) && (point->sequence_number > 0));
1575}
1576
1577/**
1578 * update_reference_count() - Update the reference count of a block.
1579 * @slab: The slab which owns the block.
1580 * @block: The reference block which contains the block being updated.
1581 * @block_number: The block to update.
1582 * @slab_journal_point: The slab journal point at which this update is journaled.
1583 * @updater: The reference updater.
1584 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1585 * @adjust_block_count: Whether to update the slab's free block count.
1586 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1587 *                             of a provisional reference.
1588 *
1589 * Return: VDO_SUCCESS or an error.
1590 */
1591static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1592				  slab_block_number block_number,
1593				  const struct journal_point *slab_journal_point,
1594				  struct reference_updater *updater,
1595				  bool normal_operation, bool adjust_block_count,
1596				  bool *provisional_decrement_ptr)
1597{
1598	vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1599	enum reference_status old_status = reference_count_to_status(*counter_ptr);
1600	int result;
1601
1602	if (!updater->increment) {
1603		result = decrement_for_data(slab, block, block_number, old_status,
1604					    updater, counter_ptr, adjust_block_count);
1605		if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1606			if (provisional_decrement_ptr != NULL)
1607				*provisional_decrement_ptr = true;
1608			return VDO_SUCCESS;
1609		}
1610	} else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1611		result = increment_for_data(slab, block, block_number, old_status,
1612					    updater->lock, counter_ptr, adjust_block_count);
1613	} else {
1614		result = increment_for_block_map(slab, block, block_number, old_status,
1615						 updater->lock, normal_operation,
1616						 counter_ptr, adjust_block_count);
1617	}
1618
1619	if (result != VDO_SUCCESS)
1620		return result;
1621
1622	if (is_valid_journal_point(slab_journal_point))
1623		slab->slab_journal_point = *slab_journal_point;
1624
1625	return VDO_SUCCESS;
1626}
1627
1628static int __must_check adjust_reference_count(struct vdo_slab *slab,
1629					       struct reference_updater *updater,
1630					       const struct journal_point *slab_journal_point)
1631{
1632	slab_block_number block_number;
1633	int result;
1634	struct reference_block *block;
1635	bool provisional_decrement = false;
1636
1637	if (!is_slab_open(slab))
1638		return VDO_INVALID_ADMIN_STATE;
1639
1640	result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1641	if (result != VDO_SUCCESS)
1642		return result;
1643
1644	block = get_reference_block(slab, block_number);
1645	result = update_reference_count(slab, block, block_number, slab_journal_point,
1646					updater, NORMAL_OPERATION, true,
1647					&provisional_decrement);
1648	if ((result != VDO_SUCCESS) || provisional_decrement)
1649		return result;
1650
1651	if (block->is_dirty && (block->slab_journal_lock > 0)) {
1652		sequence_number_t entry_lock = slab_journal_point->sequence_number;
1653		/*
1654		 * This block is already dirty and a slab journal entry has been made for it since
1655		 * the last time it was clean. We must release the per-entry slab journal lock for
1656		 * the entry associated with the update we are now doing.
1657		 */
1658		result = VDO_ASSERT(is_valid_journal_point(slab_journal_point),
1659				    "Reference count adjustments need slab journal points.");
1660		if (result != VDO_SUCCESS)
1661			return result;
1662
1663		adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1664		return VDO_SUCCESS;
1665	}
1666
1667	/*
1668	 * This may be the first time we are applying an update for which there is a slab journal
1669	 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1670	 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1671	 */
1672	if (is_valid_journal_point(slab_journal_point))
1673		block->slab_journal_lock = slab_journal_point->sequence_number;
1674	else
1675		block->slab_journal_lock = 0;
1676
1677	dirty_block(block);
1678	return VDO_SUCCESS;
1679}
1680
1681/**
1682 * add_entry_from_waiter() - Add an entry to the slab journal.
1683 * @waiter: The vio which should make an entry now.
1684 * @context: The slab journal to make an entry in.
1685 *
1686 * This callback is invoked by add_entries() once it has determined that we are ready to make
1687 * another entry in the slab journal. Implements waiter_callback_fn.
1688 */
1689static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1690{
1691	int result;
1692	struct reference_updater *updater =
1693		container_of(waiter, struct reference_updater, waiter);
1694	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1695	struct slab_journal *journal = context;
1696	struct slab_journal_block_header *header = &journal->tail_header;
1697	struct journal_point slab_journal_point = {
1698		.sequence_number = header->sequence_number,
1699		.entry_count = header->entry_count,
1700	};
1701	sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1702
1703	if (header->entry_count == 0) {
1704		/*
1705		 * This is the first entry in the current tail block, so get a lock on the recovery
1706		 * journal which we will hold until this tail block is committed.
1707		 */
1708		get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1709		if (journal->recovery_journal != NULL) {
1710			zone_count_t zone_number = journal->slab->allocator->zone_number;
1711
1712			vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1713								     recovery_block,
1714								     VDO_ZONE_TYPE_PHYSICAL,
1715								     zone_number);
1716		}
1717
1718		mark_slab_journal_dirty(journal, recovery_block);
1719		reclaim_journal_space(journal);
1720	}
1721
1722	add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1723		  expand_journal_point(data_vio->recovery_journal_point,
1724				       updater->increment));
1725
1726	if (journal->slab->status != VDO_SLAB_REBUILT) {
1727		/*
1728		 * If the slab is unrecovered, scrubbing will take care of the count since the
1729		 * update is now recorded in the journal.
1730		 */
1731		adjust_slab_journal_block_reference(journal,
1732						    slab_journal_point.sequence_number, -1);
1733		result = VDO_SUCCESS;
1734	} else {
1735		/* Now that an entry has been made in the slab journal, update the counter. */
1736		result = adjust_reference_count(journal->slab, updater,
1737						&slab_journal_point);
1738	}
1739
1740	if (updater->increment)
1741		continue_data_vio_with_error(data_vio, result);
1742	else
1743		vdo_continue_completion(&data_vio->decrement_completion, result);
1744}
1745
1746/**
1747 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1748 *                                         increment.
1749 * @journal: The journal.
1750 *
1751 * Return: true if the first entry waiter's operation is a block map increment.
1752 */
1753static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1754{
1755	struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1756	struct reference_updater *updater =
1757		container_of(waiter, struct reference_updater, waiter);
1758
1759	return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1760}
1761
1762/**
1763 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1764 * @journal: The journal to which entries may be added.
1765 *
1766 * By processing the queue in order, we ensure that slab journal entries are made in the same order
1767 * as recovery journal entries for the same increment or decrement.
1768 */
1769static void add_entries(struct slab_journal *journal)
1770{
1771	if (journal->adding_entries) {
1772		/* Protect against re-entrancy. */
1773		return;
1774	}
1775
1776	journal->adding_entries = true;
1777	while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1778		struct slab_journal_block_header *header = &journal->tail_header;
1779
1780		if (journal->partial_write_in_progress ||
1781		    (journal->slab->status == VDO_SLAB_REBUILDING)) {
1782			/*
1783			 * Don't add entries while rebuilding or while a partial write is
1784			 * outstanding, as it could result in reference count corruption.
1785			 */
1786			break;
1787		}
1788
1789		if (journal->waiting_to_commit) {
1790			/*
1791			 * If we are waiting for resources to write the tail block, and the tail
1792			 * block is full, we can't make another entry.
1793			 */
1794			WRITE_ONCE(journal->events->tail_busy_count,
1795				   journal->events->tail_busy_count + 1);
1796			break;
1797		} else if (is_next_entry_a_block_map_increment(journal) &&
1798			   (header->entry_count >= journal->full_entries_per_block)) {
1799			/*
1800			 * The tail block does not have room for a block map increment, so commit
1801			 * it now.
1802			 */
1803			commit_tail(journal);
1804			if (journal->waiting_to_commit) {
1805				WRITE_ONCE(journal->events->tail_busy_count,
1806					   journal->events->tail_busy_count + 1);
1807				break;
1808			}
1809		}
1810
1811		/* If the slab is over the blocking threshold, make the vio wait. */
1812		if (requires_reaping(journal)) {
1813			WRITE_ONCE(journal->events->blocked_count,
1814				   journal->events->blocked_count + 1);
1815			save_dirty_reference_blocks(journal->slab);
1816			break;
1817		}
1818
1819		if (header->entry_count == 0) {
1820			struct journal_lock *lock =
1821				get_lock(journal, header->sequence_number);
1822
1823			/*
1824			 * Check if the on disk slab journal is full. Because of the blocking and
1825			 * scrubbing thresholds, this should never happen.
1826			 */
1827			if (lock->count > 0) {
1828				VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1829						    "New block has locks, but journal is not full");
1830
1831				/*
1832				 * The blocking threshold must let the journal fill up if the new
1833				 * block has locks; if the blocking threshold is smaller than the
1834				 * journal size, the new block cannot possibly have locks already.
1835				 */
1836				VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1837						    "New block can have locks already iff blocking threshold is at the end of the journal");
1838
1839				WRITE_ONCE(journal->events->disk_full_count,
1840					   journal->events->disk_full_count + 1);
1841				save_dirty_reference_blocks(journal->slab);
1842				break;
1843			}
1844
1845			/*
1846			 * Don't allow the new block to be reaped until all of the reference count
1847			 * blocks are written and the journal block has been fully committed as
1848			 * well.
1849			 */
1850			lock->count = journal->entries_per_block + 1;
1851
1852			if (header->sequence_number == 1) {
1853				struct vdo_slab *slab = journal->slab;
1854				block_count_t i;
1855
1856				/*
1857				 * This is the first entry in this slab journal, ever. Dirty all of
1858				 * the reference count blocks. Each will acquire a lock on the tail
1859				 * block so that the journal won't be reaped until the reference
1860				 * counts are initialized. The lock acquisition must be done by the
1861				 * ref_counts since here we don't know how many reference blocks
1862				 * the ref_counts has.
1863				 */
1864				for (i = 0; i < slab->reference_block_count; i++) {
1865					slab->reference_blocks[i].slab_journal_lock = 1;
1866					dirty_block(&slab->reference_blocks[i]);
1867				}
1868
1869				adjust_slab_journal_block_reference(journal, 1,
1870								    slab->reference_block_count);
1871			}
1872		}
1873
1874		vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1875					     add_entry_from_waiter, journal);
1876	}
1877
1878	journal->adding_entries = false;
1879
1880	/* If there are no waiters, and we are flushing or saving, commit the tail block. */
1881	if (vdo_is_state_draining(&journal->slab->state) &&
1882	    !vdo_is_state_suspending(&journal->slab->state) &&
1883	    !vdo_waitq_has_waiters(&journal->entry_waiters))
1884		commit_tail(journal);
1885}
1886
1887/**
1888 * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1889 *                         first reference block of a slab.
1890 */
1891static void reset_search_cursor(struct vdo_slab *slab)
1892{
1893	struct search_cursor *cursor = &slab->search_cursor;
1894
1895	cursor->block = cursor->first_block;
1896	cursor->index = 0;
1897	/* Unit tests have slabs with only one reference block (and it's a runt). */
1898	cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1899}
1900
1901/**
1902 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1903 *                           a slab,
1904 *
1905 * Wraps around to the first reference block if the current block is the last reference block.
1906 *
1907 * Return: true unless the cursor was at the last reference block.
1908 */
1909static bool advance_search_cursor(struct vdo_slab *slab)
1910{
1911	struct search_cursor *cursor = &slab->search_cursor;
1912
1913	/*
1914	 * If we just finished searching the last reference block, then wrap back around to the
1915	 * start of the array.
1916	 */
1917	if (cursor->block == cursor->last_block) {
1918		reset_search_cursor(slab);
1919		return false;
1920	}
1921
1922	/* We're not already at the end, so advance to cursor to the next block. */
1923	cursor->block++;
1924	cursor->index = cursor->end_index;
1925
1926	if (cursor->block == cursor->last_block) {
1927		/* The last reference block will usually be a runt. */
1928		cursor->end_index = slab->block_count;
1929	} else {
1930		cursor->end_index += COUNTS_PER_BLOCK;
1931	}
1932
1933	return true;
1934}
1935
1936/**
1937 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1938 *
1939 * Return: VDO_SUCCESS or an error.
1940 */
1941int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1942					   physical_block_number_t pbn,
1943					   enum journal_operation operation)
1944{
1945	int result;
1946	slab_block_number block_number;
1947	struct reference_block *block;
1948	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1949	struct reference_updater updater = {
1950		.operation = operation,
1951		.increment = true,
1952	};
1953
1954	result = slab_block_number_from_pbn(slab, pbn, &block_number);
1955	if (result != VDO_SUCCESS)
1956		return result;
1957
1958	block = get_reference_block(slab, block_number);
1959	result = update_reference_count(slab, block, block_number, NULL,
1960					&updater, !NORMAL_OPERATION, false, NULL);
1961	if (result != VDO_SUCCESS)
1962		return result;
1963
1964	dirty_block(block);
1965	return VDO_SUCCESS;
1966}
1967
1968/**
1969 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1970 *                                   entry into the reference count for a block.
1971 * @slab: The slab.
1972 * @entry_point: The slab journal point for the entry.
1973 * @entry: The slab journal entry being replayed.
1974 *
1975 * The adjustment will be ignored if it was already recorded in the reference count.
1976 *
1977 * Return: VDO_SUCCESS or an error code.
1978 */
1979static int replay_reference_count_change(struct vdo_slab *slab,
1980					 const struct journal_point *entry_point,
1981					 struct slab_journal_entry entry)
1982{
1983	int result;
1984	struct reference_block *block = get_reference_block(slab, entry.sbn);
1985	sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1986	struct reference_updater updater = {
1987		.operation = entry.operation,
1988		.increment = entry.increment,
1989	};
1990
1991	if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1992		/* This entry is already reflected in the existing counts, so do nothing. */
1993		return VDO_SUCCESS;
1994	}
1995
1996	/* This entry is not yet counted in the reference counts. */
1997	result = update_reference_count(slab, block, entry.sbn, entry_point,
1998					&updater, !NORMAL_OPERATION, false, NULL);
1999	if (result != VDO_SUCCESS)
2000		return result;
2001
2002	dirty_block(block);
2003	return VDO_SUCCESS;
2004}
2005
2006/**
2007 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2008 *                            reference counters.
2009 * @word_ptr: A pointer to the eight counter bytes to check.
2010 * @start_index: The array index corresponding to word_ptr[0].
2011 * @fail_index: The array index to return if no zero byte is found.
2012 *
2013 * The search does no bounds checking; the function relies on the array being sufficiently padded.
2014 *
2015 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2016 *         no zero byte was found.
2017 */
2018static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2019						       slab_block_number start_index,
2020						       slab_block_number fail_index)
2021{
2022	u64 word = get_unaligned_le64(word_ptr);
2023
2024	/* This looks like a loop, but GCC will unroll the eight iterations for us. */
2025	unsigned int offset;
2026
2027	for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2028		/* Assumes little-endian byte order, which we have on X86. */
2029		if ((word & 0xFF) == 0)
2030			return (start_index + offset);
2031		word >>= 8;
2032	}
2033
2034	return fail_index;
2035}
2036
2037/**
2038 * find_free_block() - Find the first block with a reference count of zero in the specified
2039 *                     range of reference counter indexes.
2040 * @slab: The slab counters to scan.
2041 * @index_ptr: A pointer to hold the array index of the free block.
2042 *
2043 * Exposed for unit testing.
2044 *
2045 * Return: true if a free block was found in the specified range.
2046 */
2047static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2048{
2049	slab_block_number zero_index;
2050	slab_block_number next_index = slab->search_cursor.index;
2051	slab_block_number end_index = slab->search_cursor.end_index;
2052	u8 *next_counter = &slab->counters[next_index];
2053	u8 *end_counter = &slab->counters[end_index];
2054
2055	/*
2056	 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2057	 * safe.)
2058	 */
2059	zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2060	if (zero_index < end_index) {
2061		*index_ptr = zero_index;
2062		return true;
2063	}
2064
2065	/*
2066	 * On architectures where unaligned word access is expensive, this would be a good place to
2067	 * advance to an alignment boundary.
2068	 */
2069	next_index += BYTES_PER_WORD;
2070	next_counter += BYTES_PER_WORD;
2071
2072	/*
2073	 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2074	 * (Array is padded so reading past end is safe.)
2075	 */
2076	while (next_counter < end_counter) {
2077		/*
2078		 * The following code is currently an exact copy of the code preceding the loop,
2079		 * but if you try to merge them by using a do loop, it runs slower because a jump
2080		 * instruction gets added at the start of the iteration.
2081		 */
2082		zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2083		if (zero_index < end_index) {
2084			*index_ptr = zero_index;
2085			return true;
2086		}
2087
2088		next_index += BYTES_PER_WORD;
2089		next_counter += BYTES_PER_WORD;
2090	}
2091
2092	return false;
2093}
2094
2095/**
2096 * search_current_reference_block() - Search the reference block currently saved in the search
2097 *                                    cursor for a reference count of zero, starting at the saved
2098 *                                    counter index.
2099 * @slab: The slab to search.
2100 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2101 *
2102 * Return: true if an unreferenced counter was found.
2103 */
2104static bool search_current_reference_block(const struct vdo_slab *slab,
2105					   slab_block_number *free_index_ptr)
2106{
2107	/* Don't bother searching if the current block is known to be full. */
2108	return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2109		find_free_block(slab, free_index_ptr));
2110}
2111
2112/**
2113 * search_reference_blocks() - Search each reference block for a reference count of zero.
2114 * @slab: The slab to search.
2115 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2116 *
2117 * Searches each reference block for a reference count of zero, starting at the reference block and
2118 * counter index saved in the search cursor and searching up to the end of the last reference
2119 * block. The search does not wrap.
2120 *
2121 * Return: true if an unreferenced counter was found.
2122 */
2123static bool search_reference_blocks(struct vdo_slab *slab,
2124				    slab_block_number *free_index_ptr)
2125{
2126	/* Start searching at the saved search position in the current block. */
2127	if (search_current_reference_block(slab, free_index_ptr))
2128		return true;
2129
2130	/* Search each reference block up to the end of the slab. */
2131	while (advance_search_cursor(slab)) {
2132		if (search_current_reference_block(slab, free_index_ptr))
2133			return true;
2134	}
2135
2136	return false;
2137}
2138
2139/**
2140 * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2141 */
2142static void make_provisional_reference(struct vdo_slab *slab,
2143				       slab_block_number block_number)
2144{
2145	struct reference_block *block = get_reference_block(slab, block_number);
2146
2147	/*
2148	 * Make the initial transition from an unreferenced block to a
2149	 * provisionally allocated block.
2150	 */
2151	slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2152
2153	/* Account for the allocation. */
2154	block->allocated_count++;
2155	slab->free_blocks--;
2156}
2157
2158/**
2159 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2160 */
2161static void dirty_all_reference_blocks(struct vdo_slab *slab)
2162{
2163	block_count_t i;
2164
2165	for (i = 0; i < slab->reference_block_count; i++)
2166		dirty_block(&slab->reference_blocks[i]);
2167}
2168
2169/**
2170 * clear_provisional_references() - Clear the provisional reference counts from a reference block.
2171 * @block: The block to clear.
2172 */
2173static void clear_provisional_references(struct reference_block *block)
2174{
2175	vdo_refcount_t *counters = get_reference_counters_for_block(block);
2176	block_count_t j;
2177
2178	for (j = 0; j < COUNTS_PER_BLOCK; j++) {
2179		if (counters[j] == PROVISIONAL_REFERENCE_COUNT) {
2180			counters[j] = EMPTY_REFERENCE_COUNT;
2181			block->allocated_count--;
2182		}
2183	}
2184}
2185
2186static inline bool journal_points_equal(struct journal_point first,
2187					struct journal_point second)
2188{
2189	return ((first.sequence_number == second.sequence_number) &&
2190		(first.entry_count == second.entry_count));
2191}
2192
2193/**
2194 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2195 * @packed: The written reference block to be unpacked.
2196 * @block: The internal reference block to be loaded.
2197 */
2198static void unpack_reference_block(struct packed_reference_block *packed,
2199				   struct reference_block *block)
2200{
2201	block_count_t index;
2202	sector_count_t i;
2203	struct vdo_slab *slab = block->slab;
2204	vdo_refcount_t *counters = get_reference_counters_for_block(block);
2205
2206	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2207		struct packed_reference_sector *sector = &packed->sectors[i];
2208
2209		vdo_unpack_journal_point(&sector->commit_point, &block->commit_points[i]);
2210		memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2211		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2212		/* The slab_journal_point must be the latest point found in any sector. */
2213		if (vdo_before_journal_point(&slab->slab_journal_point,
2214					     &block->commit_points[i]))
2215			slab->slab_journal_point = block->commit_points[i];
2216
2217		if ((i > 0) &&
2218		    !journal_points_equal(block->commit_points[0],
2219					  block->commit_points[i])) {
2220			size_t block_index = block - block->slab->reference_blocks;
2221
2222			vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2223					i, block_index, block->slab->slab_number);
2224		}
2225	}
2226
2227	block->allocated_count = 0;
2228	for (index = 0; index < COUNTS_PER_BLOCK; index++) {
2229		if (counters[index] != EMPTY_REFERENCE_COUNT)
2230			block->allocated_count++;
2231	}
2232}
2233
2234/**
2235 * finish_reference_block_load() - After a reference block has been read, unpack it.
2236 * @completion: The VIO that just finished reading.
2237 */
2238static void finish_reference_block_load(struct vdo_completion *completion)
2239{
2240	struct vio *vio = as_vio(completion);
2241	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2242	struct reference_block *block = completion->parent;
2243	struct vdo_slab *slab = block->slab;
2244
2245	unpack_reference_block((struct packed_reference_block *) vio->data, block);
2246	return_vio_to_pool(slab->allocator->vio_pool, pooled);
2247	slab->active_count--;
2248	clear_provisional_references(block);
2249
2250	slab->free_blocks -= block->allocated_count;
2251	check_if_slab_drained(slab);
2252}
2253
2254static void load_reference_block_endio(struct bio *bio)
2255{
2256	struct vio *vio = bio->bi_private;
2257	struct reference_block *block = vio->completion.parent;
2258
2259	continue_vio_after_io(vio, finish_reference_block_load,
2260			      block->slab->allocator->thread_id);
2261}
2262
2263/**
2264 * load_reference_block() - After a block waiter has gotten a VIO from the VIO pool, load the
2265 *                          block.
2266 * @waiter: The waiter of the block to load.
2267 * @context: The VIO returned by the pool.
2268 */
2269static void load_reference_block(struct vdo_waiter *waiter, void *context)
2270{
2271	struct pooled_vio *pooled = context;
2272	struct vio *vio = &pooled->vio;
2273	struct reference_block *block =
2274		container_of(waiter, struct reference_block, waiter);
2275	size_t block_offset = (block - block->slab->reference_blocks);
2276
2277	vio->completion.parent = block;
2278	vdo_submit_metadata_vio(vio, block->slab->ref_counts_origin + block_offset,
2279				load_reference_block_endio, handle_io_error,
2280				REQ_OP_READ);
2281}
2282
2283/**
2284 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2285 *                           pre-allocated reference counter.
2286 */
2287static void load_reference_blocks(struct vdo_slab *slab)
2288{
2289	block_count_t i;
2290
2291	slab->free_blocks = slab->block_count;
2292	slab->active_count = slab->reference_block_count;
2293	for (i = 0; i < slab->reference_block_count; i++) {
2294		struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2295
2296		waiter->callback = load_reference_block;
2297		acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
2298	}
2299}
2300
2301/**
2302 * drain_slab() - Drain all reference count I/O.
2303 *
2304 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2305 * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2306 */
2307static void drain_slab(struct vdo_slab *slab)
2308{
2309	bool save;
2310	bool load;
2311	const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2312
2313	if (state == VDO_ADMIN_STATE_SUSPENDING)
2314		return;
2315
2316	if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2317	    (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2318		commit_tail(&slab->journal);
2319
2320	if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2321		return;
2322
2323	save = false;
2324	load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2325	if (state == VDO_ADMIN_STATE_SCRUBBING) {
2326		if (load) {
2327			load_reference_blocks(slab);
2328			return;
2329		}
2330	} else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2331		if (!load) {
2332			/* These reference counts were never written, so mark them all dirty. */
2333			dirty_all_reference_blocks(slab);
2334		}
2335		save = true;
2336	} else if (state == VDO_ADMIN_STATE_REBUILDING) {
2337		/*
2338		 * Write out the counters if the slab has written them before, or it has any
2339		 * non-zero reference counts, or there are any slab journal blocks.
2340		 */
2341		block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2342
2343		if (load || (slab->free_blocks != data_blocks) ||
2344		    !is_slab_journal_blank(slab)) {
2345			dirty_all_reference_blocks(slab);
2346			save = true;
2347		}
2348	} else if (state == VDO_ADMIN_STATE_SAVING) {
2349		save = (slab->status == VDO_SLAB_REBUILT);
2350	} else {
2351		vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2352		return;
2353	}
2354
2355	if (save)
2356		save_dirty_reference_blocks(slab);
2357}
2358
2359static int allocate_slab_counters(struct vdo_slab *slab)
2360{
2361	int result;
2362	size_t index, bytes;
2363
2364	result = VDO_ASSERT(slab->reference_blocks == NULL,
2365			    "vdo_slab %u doesn't allocate refcounts twice",
2366			    slab->slab_number);
2367	if (result != VDO_SUCCESS)
2368		return result;
2369
2370	result = vdo_allocate(slab->reference_block_count, struct reference_block,
2371			      __func__, &slab->reference_blocks);
2372	if (result != VDO_SUCCESS)
2373		return result;
2374
2375	/*
2376	 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2377	 * so we can word-search even at the very end.
2378	 */
2379	bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2380	result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array",
2381			      &slab->counters);
2382	if (result != VDO_SUCCESS) {
2383		vdo_free(vdo_forget(slab->reference_blocks));
2384		return result;
2385	}
2386
2387	slab->search_cursor.first_block = slab->reference_blocks;
2388	slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2389	reset_search_cursor(slab);
2390
2391	for (index = 0; index < slab->reference_block_count; index++) {
2392		slab->reference_blocks[index] = (struct reference_block) {
2393			.slab = slab,
2394		};
2395	}
2396
2397	return VDO_SUCCESS;
2398}
2399
2400static int allocate_counters_if_clean(struct vdo_slab *slab)
2401{
2402	if (vdo_is_state_clean_load(&slab->state))
2403		return allocate_slab_counters(slab);
2404
2405	return VDO_SUCCESS;
2406}
2407
2408static void finish_loading_journal(struct vdo_completion *completion)
2409{
2410	struct vio *vio = as_vio(completion);
2411	struct slab_journal *journal = completion->parent;
2412	struct vdo_slab *slab = journal->slab;
2413	struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2414	struct slab_journal_block_header header;
2415
2416	vdo_unpack_slab_journal_block_header(&block->header, &header);
2417
2418	/* FIXME: should it be an error if the following conditional fails? */
2419	if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2420	    (header.nonce == slab->allocator->nonce)) {
2421		journal->tail = header.sequence_number + 1;
2422
2423		/*
2424		 * If the slab is clean, this implies the slab journal is empty, so advance the
2425		 * head appropriately.
2426		 */
2427		journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2428				 header.head : journal->tail);
2429		journal->tail_header = header;
2430		initialize_journal_state(journal);
2431	}
2432
2433	return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2434	vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2435}
2436
2437static void read_slab_journal_tail_endio(struct bio *bio)
2438{
2439	struct vio *vio = bio->bi_private;
2440	struct slab_journal *journal = vio->completion.parent;
2441
2442	continue_vio_after_io(vio, finish_loading_journal,
2443			      journal->slab->allocator->thread_id);
2444}
2445
2446static void handle_load_error(struct vdo_completion *completion)
2447{
2448	int result = completion->result;
2449	struct slab_journal *journal = completion->parent;
2450	struct vio *vio = as_vio(completion);
2451
2452	vio_record_metadata_io_error(vio);
2453	return_vio_to_pool(journal->slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2454	vdo_finish_loading_with_result(&journal->slab->state, result);
2455}
2456
2457/**
2458 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2459 *                            pool.
2460 * @waiter: The vio pool waiter which has just been notified.
2461 * @context: The vio pool entry given to the waiter.
2462 *
2463 * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2464 */
2465static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2466{
2467	struct slab_journal *journal =
2468		container_of(waiter, struct slab_journal, resource_waiter);
2469	struct vdo_slab *slab = journal->slab;
2470	struct pooled_vio *pooled = context;
2471	struct vio *vio = &pooled->vio;
2472	tail_block_offset_t last_commit_point =
2473		slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2474
2475	/*
2476	 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2477	 * Calculation supports small journals in unit tests.
2478	 */
2479	tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2480					  (tail_block_offset_t)(journal->size - 1) :
2481					  (last_commit_point - 1));
2482
2483	vio->completion.parent = journal;
2484	vio->completion.callback_thread_id = slab->allocator->thread_id;
2485	vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2486				read_slab_journal_tail_endio, handle_load_error,
2487				REQ_OP_READ);
2488}
2489
2490/**
2491 * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2492 */
2493static void load_slab_journal(struct vdo_slab *slab)
2494{
2495	struct slab_journal *journal = &slab->journal;
2496	tail_block_offset_t last_commit_point;
2497
2498	last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2499	if ((last_commit_point == 0) &&
2500	    !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2501		/*
2502		 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2503		 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2504		 * don't bother reading the (bogus) data off disk.
2505		 */
2506		VDO_ASSERT_LOG_ONLY(((journal->size < 16) ||
2507				     (journal->scrubbing_threshold < (journal->size - 1))),
2508				    "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2509		vdo_finish_loading_with_result(&slab->state,
2510					       allocate_counters_if_clean(slab));
2511		return;
2512	}
2513
2514	journal->resource_waiter.callback = read_slab_journal_tail;
2515	acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2516}
2517
2518static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2519{
2520	struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2521
2522	VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2523			    "slab to be scrubbed is unrecovered");
2524
2525	if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2526		return;
2527
2528	list_del_init(&slab->allocq_entry);
2529	if (!slab->was_queued_for_scrubbing) {
2530		WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2531		slab->was_queued_for_scrubbing = true;
2532	}
2533
2534	if (high_priority) {
2535		slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2536		list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2537		return;
2538	}
2539
2540	list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2541}
2542
2543/* Queue a slab for allocation or scrubbing. */
2544static void queue_slab(struct vdo_slab *slab)
2545{
2546	struct block_allocator *allocator = slab->allocator;
2547	block_count_t free_blocks;
2548	int result;
2549
2550	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2551			"a requeued slab must not already be on a ring");
2552
2553	if (vdo_is_read_only(allocator->depot->vdo))
2554		return;
2555
2556	free_blocks = slab->free_blocks;
2557	result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2558			    "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2559			    slab->slab_number, (unsigned long long) free_blocks,
2560			    (unsigned long long) allocator->depot->slab_config.data_blocks);
2561	if (result != VDO_SUCCESS) {
2562		vdo_enter_read_only_mode(allocator->depot->vdo, result);
2563		return;
2564	}
2565
2566	if (slab->status != VDO_SLAB_REBUILT) {
2567		register_slab_for_scrubbing(slab, false);
2568		return;
2569	}
2570
2571	if (!vdo_is_state_resuming(&slab->state)) {
2572		/*
2573		 * If the slab is resuming, we've already accounted for it here, so don't do it
2574		 * again.
2575		 * FIXME: under what situation would the slab be resuming here?
2576		 */
2577		WRITE_ONCE(allocator->allocated_blocks,
2578			   allocator->allocated_blocks - free_blocks);
2579		if (!is_slab_journal_blank(slab)) {
2580			WRITE_ONCE(allocator->statistics.slabs_opened,
2581				   allocator->statistics.slabs_opened + 1);
2582		}
2583	}
2584
2585	if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2586		reopen_slab_journal(slab);
2587
2588	prioritize_slab(slab);
2589}
2590
2591/**
2592 * initiate_slab_action() - Initiate a slab action.
2593 *
2594 * Implements vdo_admin_initiator_fn.
2595 */
2596static void initiate_slab_action(struct admin_state *state)
2597{
2598	struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2599
2600	if (vdo_is_state_draining(state)) {
2601		const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2602
2603		if (operation == VDO_ADMIN_STATE_SCRUBBING)
2604			slab->status = VDO_SLAB_REBUILDING;
2605
2606		drain_slab(slab);
2607		check_if_slab_drained(slab);
2608		return;
2609	}
2610
2611	if (vdo_is_state_loading(state)) {
2612		load_slab_journal(slab);
2613		return;
2614	}
2615
2616	if (vdo_is_state_resuming(state)) {
2617		queue_slab(slab);
2618		vdo_finish_resuming(state);
2619		return;
2620	}
2621
2622	vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2623}
2624
2625/**
2626 * get_next_slab() - Get the next slab to scrub.
2627 * @scrubber: The slab scrubber.
2628 *
2629 * Return: The next slab to scrub or NULL if there are none.
2630 */
2631static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2632{
2633	struct vdo_slab *slab;
2634
2635	slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2636					struct vdo_slab, allocq_entry);
2637	if (slab != NULL)
2638		return slab;
2639
2640	return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2641					allocq_entry);
2642}
2643
2644/**
2645 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2646 * @scrubber: The scrubber to check.
2647 *
2648 * Return: true if the scrubber has slabs to scrub.
2649 */
2650static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2651{
2652	return (get_next_slab(scrubber) != NULL);
2653}
2654
2655/**
2656 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2657 * @scrubber: The scrubber.
2658 */
2659static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2660{
2661	vdo_free(vdo_forget(scrubber->vio.data));
2662	free_vio_components(&scrubber->vio);
2663}
2664
2665/**
2666 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2667 *                      there's been an error.
2668 * @scrubber: The scrubber.
2669 */
2670static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2671{
2672	bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2673	bool done = !has_slabs_to_scrub(scrubber);
2674	struct block_allocator *allocator =
2675		container_of(scrubber, struct block_allocator, scrubber);
2676
2677	if (done)
2678		uninitialize_scrubber_vio(scrubber);
2679
2680	if (scrubber->high_priority_only) {
2681		scrubber->high_priority_only = false;
2682		vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result);
2683	} else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2684		/* All of our slabs were scrubbed, and we're the last allocator to finish. */
2685		enum vdo_state prior_state =
2686			atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2687				       VDO_DIRTY);
2688
2689		/*
2690		 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2691		 * respect to whatever state change did happen.
2692		 */
2693		smp_mb__after_atomic();
2694
2695		/*
2696		 * We must check the VDO state here and not the depot's read_only_notifier since
2697		 * the compare-swap-above could have failed due to a read-only entry which our own
2698		 * thread does not yet know about.
2699		 */
2700		if (prior_state == VDO_DIRTY)
2701			vdo_log_info("VDO commencing normal operation");
2702		else if (prior_state == VDO_RECOVERING)
2703			vdo_log_info("Exiting recovery mode");
2704	}
2705
2706	/*
2707	 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2708	 * happen.
2709	 */
2710	if (!vdo_finish_draining(&scrubber->admin_state))
2711		WRITE_ONCE(scrubber->admin_state.current_state,
2712			   VDO_ADMIN_STATE_SUSPENDED);
2713
2714	/*
2715	 * We can't notify waiters until after we've finished draining or they'll just requeue.
2716	 * Fortunately if there were waiters, we can't have been freed yet.
2717	 */
2718	if (notify)
2719		vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2720}
2721
2722static void scrub_next_slab(struct slab_scrubber *scrubber);
2723
2724/**
2725 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2726 * @completion: The slab rebuild completion.
2727 *
2728 * This callback is registered in apply_journal_entries().
2729 */
2730static void slab_scrubbed(struct vdo_completion *completion)
2731{
2732	struct slab_scrubber *scrubber =
2733		container_of(as_vio(completion), struct slab_scrubber, vio);
2734	struct vdo_slab *slab = scrubber->slab;
2735
2736	slab->status = VDO_SLAB_REBUILT;
2737	queue_slab(slab);
2738	reopen_slab_journal(slab);
2739	WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2740	scrub_next_slab(scrubber);
2741}
2742
2743/**
2744 * abort_scrubbing() - Abort scrubbing due to an error.
2745 * @scrubber: The slab scrubber.
2746 * @result: The error.
2747 */
2748static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2749{
2750	vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2751	finish_scrubbing(scrubber, result);
2752}
2753
2754/**
2755 * handle_scrubber_error() - Handle errors while rebuilding a slab.
2756 * @completion: The slab rebuild completion.
2757 */
2758static void handle_scrubber_error(struct vdo_completion *completion)
2759{
2760	struct vio *vio = as_vio(completion);
2761
2762	vio_record_metadata_io_error(vio);
2763	abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2764			completion->result);
2765}
2766
2767/**
2768 * apply_block_entries() - Apply all the entries in a block to the reference counts.
2769 * @block: A block with entries to apply.
2770 * @entry_count: The number of entries to apply.
2771 * @block_number: The sequence number of the block.
2772 * @slab: The slab to apply the entries to.
2773 *
2774 * Return: VDO_SUCCESS or an error code.
2775 */
2776static int apply_block_entries(struct packed_slab_journal_block *block,
2777			       journal_entry_count_t entry_count,
2778			       sequence_number_t block_number, struct vdo_slab *slab)
2779{
2780	struct journal_point entry_point = {
2781		.sequence_number = block_number,
2782		.entry_count = 0,
2783	};
2784	int result;
2785	slab_block_number max_sbn = slab->end - slab->start;
2786
2787	while (entry_point.entry_count < entry_count) {
2788		struct slab_journal_entry entry =
2789			vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2790
2791		if (entry.sbn > max_sbn) {
2792			/* This entry is out of bounds. */
2793			return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
2794						      "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2795						      (unsigned long long) block_number,
2796						      entry_point.entry_count,
2797						      entry.sbn, max_sbn);
2798		}
2799
2800		result = replay_reference_count_change(slab, &entry_point, entry);
2801		if (result != VDO_SUCCESS) {
2802			vdo_log_error_strerror(result,
2803					       "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2804					       (unsigned long long) block_number,
2805					       entry_point.entry_count,
2806					       vdo_get_journal_operation_name(entry.operation),
2807					       entry.sbn, slab->slab_number);
2808			return result;
2809		}
2810		entry_point.entry_count++;
2811	}
2812
2813	return VDO_SUCCESS;
2814}
2815
2816/**
2817 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2818 * @completion: The metadata read vio completion.
2819 *
2820 * This is a callback registered in start_scrubbing().
2821 */
2822static void apply_journal_entries(struct vdo_completion *completion)
2823{
2824	int result;
2825	struct slab_scrubber *scrubber =
2826		container_of(as_vio(completion), struct slab_scrubber, vio);
2827	struct vdo_slab *slab = scrubber->slab;
2828	struct slab_journal *journal = &slab->journal;
2829
2830	/* Find the boundaries of the useful part of the journal. */
2831	sequence_number_t tail = journal->tail;
2832	tail_block_offset_t end_index = (tail - 1) % journal->size;
2833	char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2834	struct packed_slab_journal_block *end_block =
2835		(struct packed_slab_journal_block *) end_data;
2836
2837	sequence_number_t head = __le64_to_cpu(end_block->header.head);
2838	tail_block_offset_t head_index = head % journal->size;
2839	block_count_t index = head_index;
2840
2841	struct journal_point ref_counts_point = slab->slab_journal_point;
2842	struct journal_point last_entry_applied = ref_counts_point;
2843	sequence_number_t sequence;
2844
2845	for (sequence = head; sequence < tail; sequence++) {
2846		char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2847		struct packed_slab_journal_block *block =
2848			(struct packed_slab_journal_block *) block_data;
2849		struct slab_journal_block_header header;
2850
2851		vdo_unpack_slab_journal_block_header(&block->header, &header);
2852
2853		if ((header.nonce != slab->allocator->nonce) ||
2854		    (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2855		    (header.sequence_number != sequence) ||
2856		    (header.entry_count > journal->entries_per_block) ||
2857		    (header.has_block_map_increments &&
2858		     (header.entry_count > journal->full_entries_per_block))) {
2859			/* The block is not what we expect it to be. */
2860			vdo_log_error("vdo_slab journal block for slab %u was invalid",
2861				      slab->slab_number);
2862			abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2863			return;
2864		}
2865
2866		result = apply_block_entries(block, header.entry_count, sequence, slab);
2867		if (result != VDO_SUCCESS) {
2868			abort_scrubbing(scrubber, result);
2869			return;
2870		}
2871
2872		last_entry_applied.sequence_number = sequence;
2873		last_entry_applied.entry_count = header.entry_count - 1;
2874		index++;
2875		if (index == journal->size)
2876			index = 0;
2877	}
2878
2879	/*
2880	 * At the end of rebuild, the reference counters should be accurate to the end of the
2881	 * journal we just applied.
2882	 */
2883	result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied,
2884						      &ref_counts_point),
2885			    "Refcounts are not more accurate than the slab journal");
2886	if (result != VDO_SUCCESS) {
2887		abort_scrubbing(scrubber, result);
2888		return;
2889	}
2890
2891	/* Save out the rebuilt reference blocks. */
2892	vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2893			       slab->allocator->thread_id, completion->parent);
2894	vdo_start_operation_with_waiter(&slab->state,
2895					VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2896					completion, initiate_slab_action);
2897}
2898
2899static void read_slab_journal_endio(struct bio *bio)
2900{
2901	struct vio *vio = bio->bi_private;
2902	struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2903
2904	continue_vio_after_io(bio->bi_private, apply_journal_entries,
2905			      scrubber->slab->allocator->thread_id);
2906}
2907
2908/**
2909 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2910 * @completion: The scrubber's vio completion.
2911 *
2912 * This callback is registered in scrub_next_slab().
2913 */
2914static void start_scrubbing(struct vdo_completion *completion)
2915{
2916	struct slab_scrubber *scrubber =
2917		container_of(as_vio(completion), struct slab_scrubber, vio);
2918	struct vdo_slab *slab = scrubber->slab;
2919
2920	if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
2921		slab_scrubbed(completion);
2922		return;
2923	}
2924
2925	vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
2926				read_slab_journal_endio, handle_scrubber_error,
2927				REQ_OP_READ);
2928}
2929
2930/**
2931 * scrub_next_slab() - Scrub the next slab if there is one.
2932 * @scrubber: The scrubber.
2933 */
2934static void scrub_next_slab(struct slab_scrubber *scrubber)
2935{
2936	struct vdo_completion *completion = &scrubber->vio.completion;
2937	struct vdo_slab *slab;
2938
2939	/*
2940	 * Note: this notify call is always safe only because scrubbing can only be started when
2941	 * the VDO is quiescent.
2942	 */
2943	vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2944
2945	if (vdo_is_read_only(completion->vdo)) {
2946		finish_scrubbing(scrubber, VDO_READ_ONLY);
2947		return;
2948	}
2949
2950	slab = get_next_slab(scrubber);
2951	if ((slab == NULL) ||
2952	    (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
2953		finish_scrubbing(scrubber, VDO_SUCCESS);
2954		return;
2955	}
2956
2957	if (vdo_finish_draining(&scrubber->admin_state))
2958		return;
2959
2960	list_del_init(&slab->allocq_entry);
2961	scrubber->slab = slab;
2962	vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
2963			       slab->allocator->thread_id, completion->parent);
2964	vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
2965					completion, initiate_slab_action);
2966}
2967
2968/**
2969 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
2970 * @allocator: The block_allocator to scrub.
2971 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
2972 */
2973static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
2974{
2975	struct slab_scrubber *scrubber = &allocator->scrubber;
2976
2977	scrubber->vio.completion.parent = parent;
2978	scrubber->high_priority_only = (parent != NULL);
2979	if (!has_slabs_to_scrub(scrubber)) {
2980		finish_scrubbing(scrubber, VDO_SUCCESS);
2981		return;
2982	}
2983
2984	if (scrubber->high_priority_only &&
2985	    vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
2986	    list_empty(&scrubber->high_priority_slabs))
2987		register_slab_for_scrubbing(get_next_slab(scrubber), true);
2988
2989	vdo_resume_if_quiescent(&scrubber->admin_state);
2990	scrub_next_slab(scrubber);
2991}
2992
2993static inline void assert_on_allocator_thread(thread_id_t thread_id,
2994					      const char *function_name)
2995{
2996	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
2997			    "%s called on correct thread", function_name);
2998}
2999
3000static void register_slab_with_allocator(struct block_allocator *allocator,
3001					 struct vdo_slab *slab)
3002{
3003	allocator->slab_count++;
3004	allocator->last_slab = slab->slab_number;
3005}
3006
3007/**
3008 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3009 * @depot: The depot over which to iterate.
3010 * @start: The number of the slab to start iterating from.
3011 * @end: The number of the last slab which may be returned.
3012 * @stride: The difference in slab number between successive slabs.
3013 *
3014 * Iteration always occurs from higher to lower numbered slabs.
3015 *
3016 * Return: An initialized iterator structure.
3017 */
3018static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3019						    slab_count_t start, slab_count_t end,
3020						    slab_count_t stride)
3021{
3022	struct vdo_slab **slabs = depot->slabs;
3023
3024	return (struct slab_iterator) {
3025		.slabs = slabs,
3026		.next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3027		.end = end,
3028		.stride = stride,
3029	};
3030}
3031
3032static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3033{
3034	return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3035				       allocator->zone_number,
3036				       allocator->depot->zone_count);
3037}
3038
3039/**
3040 * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3041 * @iterator: The slab_iterator.
3042 *
3043 * Return: The next slab or NULL if the iterator is exhausted.
3044 */
3045static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3046{
3047	struct vdo_slab *slab = iterator->next;
3048
3049	if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3050		iterator->next = NULL;
3051	else
3052		iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3053
3054	return slab;
3055}
3056
3057/**
3058 * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3059 *
3060 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3061 * into read-only mode. Implements waiter_callback_fn.
3062 */
3063static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
3064{
3065	struct reference_updater *updater =
3066		container_of(waiter, struct reference_updater, waiter);
3067	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3068
3069	if (updater->increment) {
3070		continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3071		return;
3072	}
3073
3074	vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3075}
3076
3077/* Implements vdo_read_only_notification_fn. */
3078static void notify_block_allocator_of_read_only_mode(void *listener,
3079						     struct vdo_completion *parent)
3080{
3081	struct block_allocator *allocator = listener;
3082	struct slab_iterator iterator;
3083
3084	assert_on_allocator_thread(allocator->thread_id, __func__);
3085	iterator = get_slab_iterator(allocator);
3086	while (iterator.next != NULL) {
3087		struct vdo_slab *slab = next_slab(&iterator);
3088
3089		vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3090					     abort_waiter, &slab->journal);
3091		check_if_slab_drained(slab);
3092	}
3093
3094	vdo_finish_completion(parent);
3095}
3096
3097/**
3098 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3099 *                                       the block it locks is unreferenced.
3100 * @slab: The slab which contains the block.
3101 * @pbn: The physical block to reference.
3102 * @lock: The lock.
3103 *
3104 * Return: VDO_SUCCESS or an error.
3105 */
3106int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3107				      struct pbn_lock *lock)
3108{
3109	slab_block_number block_number;
3110	int result;
3111
3112	if (vdo_pbn_lock_has_provisional_reference(lock))
3113		return VDO_SUCCESS;
3114
3115	if (!is_slab_open(slab))
3116		return VDO_INVALID_ADMIN_STATE;
3117
3118	result = slab_block_number_from_pbn(slab, pbn, &block_number);
3119	if (result != VDO_SUCCESS)
3120		return result;
3121
3122	if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3123		make_provisional_reference(slab, block_number);
3124		if (lock != NULL)
3125			vdo_assign_pbn_lock_provisional_reference(lock);
3126	}
3127
3128	if (vdo_pbn_lock_has_provisional_reference(lock))
3129		adjust_free_block_count(slab, false);
3130
3131	return VDO_SUCCESS;
3132}
3133
3134static int __must_check allocate_slab_block(struct vdo_slab *slab,
3135					    physical_block_number_t *block_number_ptr)
3136{
3137	slab_block_number free_index;
3138
3139	if (!is_slab_open(slab))
3140		return VDO_INVALID_ADMIN_STATE;
3141
3142	if (!search_reference_blocks(slab, &free_index))
3143		return VDO_NO_SPACE;
3144
3145	VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3146			    "free block must have ref count of zero");
3147	make_provisional_reference(slab, free_index);
3148	adjust_free_block_count(slab, false);
3149
3150	/*
3151	 * Update the search hint so the next search will start at the array index just past the
3152	 * free block we just found.
3153	 */
3154	slab->search_cursor.index = (free_index + 1);
3155
3156	*block_number_ptr = slab->start + free_index;
3157	return VDO_SUCCESS;
3158}
3159
3160/**
3161 * open_slab() - Prepare a slab to be allocated from.
3162 * @slab: The slab.
3163 */
3164static void open_slab(struct vdo_slab *slab)
3165{
3166	reset_search_cursor(slab);
3167	if (is_slab_journal_blank(slab)) {
3168		WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3169			   slab->allocator->statistics.slabs_opened + 1);
3170		dirty_all_reference_blocks(slab);
3171	} else {
3172		WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3173			   slab->allocator->statistics.slabs_reopened + 1);
3174	}
3175
3176	slab->allocator->open_slab = slab;
3177}
3178
3179
3180/*
3181 * The block allocated will have a provisional reference and the reference must be either confirmed
3182 * with a subsequent increment or vacated with a subsequent decrement via
3183 * vdo_release_block_reference().
3184 */
3185int vdo_allocate_block(struct block_allocator *allocator,
3186		       physical_block_number_t *block_number_ptr)
3187{
3188	int result;
3189
3190	if (allocator->open_slab != NULL) {
3191		/* Try to allocate the next block in the currently open slab. */
3192		result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3193		if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3194			return result;
3195
3196		/* Put the exhausted open slab back into the priority table. */
3197		prioritize_slab(allocator->open_slab);
3198	}
3199
3200	/* Remove the highest priority slab from the priority table and make it the open slab. */
3201	open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3202			     struct vdo_slab, allocq_entry));
3203
3204	/*
3205	 * Try allocating again. If we're out of space immediately after opening a slab, then every
3206	 * slab must be fully allocated.
3207	 */
3208	return allocate_slab_block(allocator->open_slab, block_number_ptr);
3209}
3210
3211/**
3212 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3213 * @allocator: The block_allocator on which to wait.
3214 * @waiter: The waiter.
3215 *
3216 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3217 *         some other error otherwise.
3218 */
3219int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3220				  struct vdo_waiter *waiter)
3221{
3222	if (vdo_is_read_only(allocator->depot->vdo))
3223		return VDO_READ_ONLY;
3224
3225	if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3226		return VDO_NO_SPACE;
3227
3228	vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3229	return VDO_SUCCESS;
3230}
3231
3232/**
3233 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3234 *                                journal entry and then updating the reference counter.
3235 *
3236 * @data_vio: The data_vio for which to add the entry.
3237 * @updater: Which of the data_vio's reference updaters is being submitted.
3238 */
3239void vdo_modify_reference_count(struct vdo_completion *completion,
3240				struct reference_updater *updater)
3241{
3242	struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3243
3244	if (!is_slab_open(slab)) {
3245		vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3246		return;
3247	}
3248
3249	if (vdo_is_read_only(completion->vdo)) {
3250		vdo_continue_completion(completion, VDO_READ_ONLY);
3251		return;
3252	}
3253
3254	vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3255	if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3256		register_slab_for_scrubbing(slab, true);
3257
3258	add_entries(&slab->journal);
3259}
3260
3261/* Release an unused provisional reference. */
3262int vdo_release_block_reference(struct block_allocator *allocator,
3263				physical_block_number_t pbn)
3264{
3265	struct reference_updater updater;
3266
3267	if (pbn == VDO_ZERO_BLOCK)
3268		return VDO_SUCCESS;
3269
3270	updater = (struct reference_updater) {
3271		.operation = VDO_JOURNAL_DATA_REMAPPING,
3272		.increment = false,
3273		.zpbn = {
3274			.pbn = pbn,
3275		},
3276	};
3277
3278	return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3279				      &updater, NULL);
3280}
3281
3282/*
3283 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3284 * the primary key and the 'emptiness' field as the secondary key.
3285 *
3286 * Slabs need to be pushed onto the rings in the same order they are to be popped off. Popping
3287 * should always get the most empty first, so pushing should be from most empty to least empty.
3288 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3289 * before larger ones.
3290 */
3291static bool slab_status_is_less_than(const void *item1, const void *item2)
3292{
3293	const struct slab_status *info1 = item1;
3294	const struct slab_status *info2 = item2;
3295
3296	if (info1->is_clean != info2->is_clean)
3297		return info1->is_clean;
3298	if (info1->emptiness != info2->emptiness)
3299		return info1->emptiness > info2->emptiness;
3300	return info1->slab_number < info2->slab_number;
3301}
3302
3303static void swap_slab_statuses(void *item1, void *item2)
3304{
3305	struct slab_status *info1 = item1;
3306	struct slab_status *info2 = item2;
3307
3308	swap(*info1, *info2);
3309}
3310
3311static const struct min_heap_callbacks slab_status_min_heap = {
3312	.elem_size = sizeof(struct slab_status),
3313	.less = slab_status_is_less_than,
3314	.swp = swap_slab_statuses,
3315};
3316
3317/* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
3318static void slab_action_callback(struct vdo_completion *completion)
3319{
3320	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3321	struct slab_actor *actor = &allocator->slab_actor;
3322
3323	if (--actor->slab_action_count == 0) {
3324		actor->callback(completion);
3325		return;
3326	}
3327
3328	vdo_reset_completion(completion);
3329}
3330
3331/* Preserve the error from part of an action and continue. */
3332static void handle_operation_error(struct vdo_completion *completion)
3333{
3334	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3335
3336	if (allocator->state.waiter != NULL)
3337		vdo_set_completion_result(allocator->state.waiter, completion->result);
3338	completion->callback(completion);
3339}
3340
3341/* Perform an action on each of an allocator's slabs in parallel. */
3342static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3343{
3344	struct slab_iterator iterator;
3345
3346	vdo_prepare_completion(&allocator->completion, slab_action_callback,
3347			       handle_operation_error, allocator->thread_id, NULL);
3348	allocator->completion.requeue = false;
3349
3350	/*
3351	 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3352	 * clear it.
3353	 */
3354	allocator->open_slab = NULL;
3355
3356	/* Ensure that we don't finish before we're done starting. */
3357	allocator->slab_actor = (struct slab_actor) {
3358		.slab_action_count = 1,
3359		.callback = callback,
3360	};
3361
3362	iterator = get_slab_iterator(allocator);
3363	while (iterator.next != NULL) {
3364		const struct admin_state_code *operation =
3365			vdo_get_admin_state_code(&allocator->state);
3366		struct vdo_slab *slab = next_slab(&iterator);
3367
3368		list_del_init(&slab->allocq_entry);
3369		allocator->slab_actor.slab_action_count++;
3370		vdo_start_operation_with_waiter(&slab->state, operation,
3371						&allocator->completion,
3372						initiate_slab_action);
3373	}
3374
3375	slab_action_callback(&allocator->completion);
3376}
3377
3378static void finish_loading_allocator(struct vdo_completion *completion)
3379{
3380	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3381	const struct admin_state_code *operation =
3382		vdo_get_admin_state_code(&allocator->state);
3383
3384	if (allocator->eraser != NULL)
3385		dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
3386
3387	if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3388		void *context =
3389			vdo_get_current_action_context(allocator->depot->action_manager);
3390
3391		vdo_replay_into_slab_journals(allocator, context);
3392		return;
3393	}
3394
3395	vdo_finish_loading(&allocator->state);
3396}
3397
3398static void erase_next_slab_journal(struct block_allocator *allocator);
3399
3400static void copy_callback(int read_err, unsigned long write_err, void *context)
3401{
3402	struct block_allocator *allocator = context;
3403	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3404
3405	if (result != VDO_SUCCESS) {
3406		vdo_fail_completion(&allocator->completion, result);
3407		return;
3408	}
3409
3410	erase_next_slab_journal(allocator);
3411}
3412
3413/* erase_next_slab_journal() - Erase the next slab journal. */
3414static void erase_next_slab_journal(struct block_allocator *allocator)
3415{
3416	struct vdo_slab *slab;
3417	physical_block_number_t pbn;
3418	struct dm_io_region regions[1];
3419	struct slab_depot *depot = allocator->depot;
3420	block_count_t blocks = depot->slab_config.slab_journal_blocks;
3421
3422	if (allocator->slabs_to_erase.next == NULL) {
3423		vdo_finish_completion(&allocator->completion);
3424		return;
3425	}
3426
3427	slab = next_slab(&allocator->slabs_to_erase);
3428	pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3429	regions[0] = (struct dm_io_region) {
3430		.bdev = vdo_get_backing_device(depot->vdo),
3431		.sector = pbn * VDO_SECTORS_PER_BLOCK,
3432		.count = blocks * VDO_SECTORS_PER_BLOCK,
3433	};
3434	dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3435}
3436
3437/* Implements vdo_admin_initiator_fn. */
3438static void initiate_load(struct admin_state *state)
3439{
3440	struct block_allocator *allocator =
3441		container_of(state, struct block_allocator, state);
3442	const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3443
3444	if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3445		/*
3446		 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3447		 * as the kcopyd callback, lest it deadlock.
3448		 */
3449		vdo_prepare_completion_for_requeue(&allocator->completion,
3450						   finish_loading_allocator,
3451						   handle_operation_error,
3452						   allocator->thread_id, NULL);
3453		allocator->eraser = dm_kcopyd_client_create(NULL);
3454		if (IS_ERR(allocator->eraser)) {
3455			vdo_fail_completion(&allocator->completion,
3456					    PTR_ERR(allocator->eraser));
3457			allocator->eraser = NULL;
3458			return;
3459		}
3460		allocator->slabs_to_erase = get_slab_iterator(allocator);
3461
3462		erase_next_slab_journal(allocator);
3463		return;
3464	}
3465
3466	apply_to_slabs(allocator, finish_loading_allocator);
3467}
3468
3469/**
3470 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3471 *                                            been recovered from the recovery journal.
3472 * @completion The allocator completion
3473 */
3474void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3475{
3476	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3477
3478	vdo_finish_loading_with_result(&allocator->state, completion->result);
3479}
3480
3481static int get_slab_statuses(struct block_allocator *allocator,
3482			     struct slab_status **statuses_ptr)
3483{
3484	int result;
3485	struct slab_status *statuses;
3486	struct slab_iterator iterator = get_slab_iterator(allocator);
3487
3488	result = vdo_allocate(allocator->slab_count, struct slab_status, __func__,
3489			      &statuses);
3490	if (result != VDO_SUCCESS)
3491		return result;
3492
3493	*statuses_ptr = statuses;
3494
3495	while (iterator.next != NULL)  {
3496		slab_count_t slab_number = next_slab(&iterator)->slab_number;
3497
3498		*statuses++ = (struct slab_status) {
3499			.slab_number = slab_number,
3500			.is_clean = !allocator->summary_entries[slab_number].is_dirty,
3501			.emptiness = allocator->summary_entries[slab_number].fullness_hint,
3502		};
3503	}
3504
3505	return VDO_SUCCESS;
3506}
3507
3508/* Prepare slabs for allocation or scrubbing. */
3509static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3510{
3511	struct slab_status current_slab_status;
3512	struct min_heap heap;
3513	int result;
3514	struct slab_status *slab_statuses;
3515	struct slab_depot *depot = allocator->depot;
3516
3517	WRITE_ONCE(allocator->allocated_blocks,
3518		   allocator->slab_count * depot->slab_config.data_blocks);
3519	result = get_slab_statuses(allocator, &slab_statuses);
3520	if (result != VDO_SUCCESS)
3521		return result;
3522
3523	/* Sort the slabs by cleanliness, then by emptiness hint. */
3524	heap = (struct min_heap) {
3525		.data = slab_statuses,
3526		.nr = allocator->slab_count,
3527		.size = allocator->slab_count,
3528	};
3529	min_heapify_all(&heap, &slab_status_min_heap);
3530
3531	while (heap.nr > 0) {
3532		bool high_priority;
3533		struct vdo_slab *slab;
3534		struct slab_journal *journal;
3535
3536		current_slab_status = slab_statuses[0];
3537		min_heap_pop(&heap, &slab_status_min_heap);
3538		slab = depot->slabs[current_slab_status.slab_number];
3539
3540		if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3541		    (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3542		     current_slab_status.is_clean)) {
3543			queue_slab(slab);
3544			continue;
3545		}
3546
3547		slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3548		journal = &slab->journal;
3549		high_priority = ((current_slab_status.is_clean &&
3550				 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3551				 (journal_length(journal) >= journal->scrubbing_threshold));
3552		register_slab_for_scrubbing(slab, high_priority);
3553	}
3554
3555	vdo_free(slab_statuses);
3556	return VDO_SUCCESS;
3557}
3558
3559static const char *status_to_string(enum slab_rebuild_status status)
3560{
3561	switch (status) {
3562	case VDO_SLAB_REBUILT:
3563		return "REBUILT";
3564	case VDO_SLAB_REQUIRES_SCRUBBING:
3565		return "SCRUBBING";
3566	case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3567		return "PRIORITY_SCRUBBING";
3568	case VDO_SLAB_REBUILDING:
3569		return "REBUILDING";
3570	case VDO_SLAB_REPLAYING:
3571		return "REPLAYING";
3572	default:
3573		return "UNKNOWN";
3574	}
3575}
3576
3577void vdo_dump_block_allocator(const struct block_allocator *allocator)
3578{
3579	unsigned int pause_counter = 0;
3580	struct slab_iterator iterator = get_slab_iterator(allocator);
3581	const struct slab_scrubber *scrubber = &allocator->scrubber;
3582
3583	vdo_log_info("block_allocator zone %u", allocator->zone_number);
3584	while (iterator.next != NULL) {
3585		struct vdo_slab *slab = next_slab(&iterator);
3586		struct slab_journal *journal = &slab->journal;
3587
3588		if (slab->reference_blocks != NULL) {
3589			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3590			vdo_log_info("slab %u: P%u, %llu free", slab->slab_number,
3591				     slab->priority,
3592				     (unsigned long long) slab->free_blocks);
3593		} else {
3594			vdo_log_info("slab %u: status %s", slab->slab_number,
3595				     status_to_string(slab->status));
3596		}
3597
3598		vdo_log_info("  slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3599			     vdo_waitq_num_waiters(&journal->entry_waiters),
3600			     vdo_bool_to_string(journal->waiting_to_commit),
3601			     vdo_bool_to_string(journal->updating_slab_summary),
3602			     (unsigned long long) journal->head,
3603			     (unsigned long long) journal->unreapable,
3604			     (unsigned long long) journal->tail,
3605			     (unsigned long long) journal->next_commit,
3606			     (unsigned long long) journal->summarized,
3607			     (unsigned long long) journal->last_summarized,
3608			     (unsigned long long) journal->recovery_lock,
3609			     vdo_bool_to_string(journal->recovery_lock != 0));
3610		/*
3611		 * Given the frequency with which the locks are just a tiny bit off, it might be
3612		 * worth dumping all the locks, but that might be too much logging.
3613		 */
3614
3615		if (slab->counters != NULL) {
3616			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3617			vdo_log_info("  slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3618				     slab->free_blocks, slab->block_count,
3619				     slab->reference_block_count,
3620				     vdo_waitq_num_waiters(&slab->dirty_blocks),
3621				     slab->active_count,
3622				     (unsigned long long) slab->slab_journal_point.sequence_number,
3623				     slab->slab_journal_point.entry_count);
3624		} else {
3625			vdo_log_info("  no counters");
3626		}
3627
3628		/*
3629		 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3630		 * allowing the kernel log a chance to be flushed instead of being overrun.
3631		 */
3632		if (pause_counter++ == 31) {
3633			pause_counter = 0;
3634			vdo_pause_for_logger();
3635		}
3636	}
3637
3638	vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3639		     READ_ONCE(scrubber->slab_count),
3640		     vdo_waitq_num_waiters(&scrubber->waiters),
3641		     vdo_get_admin_state_code(&scrubber->admin_state)->name,
3642		     scrubber->high_priority_only ? ", high_priority_only " : "");
3643}
3644
3645static void free_slab(struct vdo_slab *slab)
3646{
3647	if (slab == NULL)
3648		return;
3649
3650	list_del(&slab->allocq_entry);
3651	vdo_free(vdo_forget(slab->journal.block));
3652	vdo_free(vdo_forget(slab->journal.locks));
3653	vdo_free(vdo_forget(slab->counters));
3654	vdo_free(vdo_forget(slab->reference_blocks));
3655	vdo_free(slab);
3656}
3657
3658static int initialize_slab_journal(struct vdo_slab *slab)
3659{
3660	struct slab_journal *journal = &slab->journal;
3661	const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3662	int result;
3663
3664	result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3665			      __func__, &journal->locks);
3666	if (result != VDO_SUCCESS)
3667		return result;
3668
3669	result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3670			      (char **) &journal->block);
3671	if (result != VDO_SUCCESS)
3672		return result;
3673
3674	journal->slab = slab;
3675	journal->size = slab_config->slab_journal_blocks;
3676	journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3677	journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3678	journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3679	journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3680	journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3681	journal->events = &slab->allocator->slab_journal_statistics;
3682	journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3683	journal->tail = 1;
3684	journal->head = 1;
3685
3686	journal->flushing_deadline = journal->flushing_threshold;
3687	/*
3688	 * Set there to be some time between the deadline and the blocking threshold, so that
3689	 * hopefully all are done before blocking.
3690	 */
3691	if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3692		journal->flushing_deadline = journal->blocking_threshold - 5;
3693
3694	journal->slab_summary_waiter.callback = release_journal_locks;
3695
3696	INIT_LIST_HEAD(&journal->dirty_entry);
3697	INIT_LIST_HEAD(&journal->uncommitted_blocks);
3698
3699	journal->tail_header.nonce = slab->allocator->nonce;
3700	journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3701	initialize_journal_state(journal);
3702	return VDO_SUCCESS;
3703}
3704
3705/**
3706 * make_slab() - Construct a new, empty slab.
3707 * @slab_origin: The physical block number within the block allocator partition of the first block
3708 *               in the slab.
3709 * @allocator: The block allocator to which the slab belongs.
3710 * @slab_number: The slab number of the slab.
3711 * @is_new: true if this slab is being allocated as part of a resize.
3712 * @slab_ptr: A pointer to receive the new slab.
3713 *
3714 * Return: VDO_SUCCESS or an error code.
3715 */
3716static int __must_check make_slab(physical_block_number_t slab_origin,
3717				  struct block_allocator *allocator,
3718				  slab_count_t slab_number, bool is_new,
3719				  struct vdo_slab **slab_ptr)
3720{
3721	const struct slab_config *slab_config = &allocator->depot->slab_config;
3722	struct vdo_slab *slab;
3723	int result;
3724
3725	result = vdo_allocate(1, struct vdo_slab, __func__, &slab);
3726	if (result != VDO_SUCCESS)
3727		return result;
3728
3729	*slab = (struct vdo_slab) {
3730		.allocator = allocator,
3731		.start = slab_origin,
3732		.end = slab_origin + slab_config->slab_blocks,
3733		.slab_number = slab_number,
3734		.ref_counts_origin = slab_origin + slab_config->data_blocks,
3735		.journal_origin =
3736			vdo_get_slab_journal_start_block(slab_config, slab_origin),
3737		.block_count = slab_config->data_blocks,
3738		.free_blocks = slab_config->data_blocks,
3739		.reference_block_count =
3740			vdo_get_saved_reference_count_size(slab_config->data_blocks),
3741	};
3742	INIT_LIST_HEAD(&slab->allocq_entry);
3743
3744	result = initialize_slab_journal(slab);
3745	if (result != VDO_SUCCESS) {
3746		free_slab(slab);
3747		return result;
3748	}
3749
3750	if (is_new) {
3751		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3752		result = allocate_slab_counters(slab);
3753		if (result != VDO_SUCCESS) {
3754			free_slab(slab);
3755			return result;
3756		}
3757	} else {
3758		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3759	}
3760
3761	*slab_ptr = slab;
3762	return VDO_SUCCESS;
3763}
3764
3765/**
3766 * allocate_slabs() - Allocate a new slab pointer array.
3767 * @depot: The depot.
3768 * @slab_count: The number of slabs the depot should have in the new array.
3769 *
3770 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3771 * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3772 *
3773 * Return: VDO_SUCCESS or an error code.
3774 */
3775static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3776{
3777	block_count_t slab_size;
3778	bool resizing = false;
3779	physical_block_number_t slab_origin;
3780	int result;
3781
3782	result = vdo_allocate(slab_count, struct vdo_slab *,
3783			      "slab pointer array", &depot->new_slabs);
3784	if (result != VDO_SUCCESS)
3785		return result;
3786
3787	if (depot->slabs != NULL) {
3788		memcpy(depot->new_slabs, depot->slabs,
3789		       depot->slab_count * sizeof(struct vdo_slab *));
3790		resizing = true;
3791	}
3792
3793	slab_size = depot->slab_config.slab_blocks;
3794	slab_origin = depot->first_block + (depot->slab_count * slab_size);
3795
3796	for (depot->new_slab_count = depot->slab_count;
3797	     depot->new_slab_count < slab_count;
3798	     depot->new_slab_count++, slab_origin += slab_size) {
3799		struct block_allocator *allocator =
3800			&depot->allocators[depot->new_slab_count % depot->zone_count];
3801		struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3802
3803		result = make_slab(slab_origin, allocator, depot->new_slab_count,
3804				   resizing, slab_ptr);
3805		if (result != VDO_SUCCESS)
3806			return result;
3807	}
3808
3809	return VDO_SUCCESS;
3810}
3811
3812/**
3813 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3814 * @depot: The depot.
3815 */
3816void vdo_abandon_new_slabs(struct slab_depot *depot)
3817{
3818	slab_count_t i;
3819
3820	if (depot->new_slabs == NULL)
3821		return;
3822
3823	for (i = depot->slab_count; i < depot->new_slab_count; i++)
3824		free_slab(vdo_forget(depot->new_slabs[i]));
3825	depot->new_slab_count = 0;
3826	depot->new_size = 0;
3827	vdo_free(vdo_forget(depot->new_slabs));
3828}
3829
3830/**
3831 * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates.
3832 *
3833 * Implements vdo_zone_thread_getter_fn.
3834 */
3835static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3836{
3837	return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3838}
3839
3840/**
3841 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3842 *                                   it may hold on a specified recovery journal block.
3843 * @journal: The slab journal.
3844 * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3845 *                 released.
3846 *
3847 * Return: true if the journal does hold a lock on the specified block (which it will release).
3848 */
3849static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3850						       sequence_number_t recovery_lock)
3851{
3852	if (recovery_lock > journal->recovery_lock) {
3853		VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3854				    "slab journal recovery lock is not older than the recovery journal head");
3855		return false;
3856	}
3857
3858	if ((recovery_lock < journal->recovery_lock) ||
3859	    vdo_is_read_only(journal->slab->allocator->depot->vdo))
3860		return false;
3861
3862	/* All locks are held by the block which is in progress; write it. */
3863	commit_tail(journal);
3864	return true;
3865}
3866
3867/*
3868 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3869 * is seeking to release.
3870 *
3871 * Implements vdo_zone_action_fn.
3872 */
3873static void release_tail_block_locks(void *context, zone_count_t zone_number,
3874				     struct vdo_completion *parent)
3875{
3876	struct slab_journal *journal, *tmp;
3877	struct slab_depot *depot = context;
3878	struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3879
3880	list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3881		if (!release_recovery_journal_lock(journal,
3882						   depot->active_release_request))
3883			break;
3884	}
3885
3886	vdo_finish_completion(parent);
3887}
3888
3889/**
3890 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3891 *
3892 * Implements vdo_action_preamble_fn.
3893 */
3894static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3895{
3896	struct slab_depot *depot = context;
3897
3898	depot->active_release_request = depot->new_release_request;
3899	vdo_finish_completion(parent);
3900}
3901
3902/**
3903 * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3904 *
3905 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3906 * depot's action manager.
3907 *
3908 * Implements vdo_action_scheduler_fn.
3909 */
3910static bool schedule_tail_block_commit(void *context)
3911{
3912	struct slab_depot *depot = context;
3913
3914	if (depot->new_release_request == depot->active_release_request)
3915		return false;
3916
3917	return vdo_schedule_action(depot->action_manager,
3918				   prepare_for_tail_block_commit,
3919				   release_tail_block_locks,
3920				   NULL, NULL);
3921}
3922
3923/**
3924 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3925 * @allocator: The allocator being initialized
3926 *
3927 * Return: VDO_SUCCESS or an error.
3928 */
3929static int initialize_slab_scrubber(struct block_allocator *allocator)
3930{
3931	struct slab_scrubber *scrubber = &allocator->scrubber;
3932	block_count_t slab_journal_size =
3933		allocator->depot->slab_config.slab_journal_blocks;
3934	char *journal_data;
3935	int result;
3936
3937	result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size,
3938			      char, __func__, &journal_data);
3939	if (result != VDO_SUCCESS)
3940		return result;
3941
3942	result = allocate_vio_components(allocator->completion.vdo,
3943					 VIO_TYPE_SLAB_JOURNAL,
3944					 VIO_PRIORITY_METADATA,
3945					 allocator, slab_journal_size,
3946					 journal_data, &scrubber->vio);
3947	if (result != VDO_SUCCESS) {
3948		vdo_free(journal_data);
3949		return result;
3950	}
3951
3952	INIT_LIST_HEAD(&scrubber->high_priority_slabs);
3953	INIT_LIST_HEAD(&scrubber->slabs);
3954	vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
3955	return VDO_SUCCESS;
3956}
3957
3958/**
3959 * initialize_slab_summary_block() - Initialize a slab_summary_block.
3960 * @allocator: The allocator which owns the block.
3961 * @index: The index of this block in its zone's summary.
3962 *
3963 * Return: VDO_SUCCESS or an error.
3964 */
3965static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
3966						      block_count_t index)
3967{
3968	struct slab_summary_block *block = &allocator->summary_blocks[index];
3969	int result;
3970
3971	result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
3972	if (result != VDO_SUCCESS)
3973		return result;
3974
3975	result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
3976					 VIO_PRIORITY_METADATA, NULL, 1,
3977					 block->outgoing_entries, &block->vio);
3978	if (result != VDO_SUCCESS)
3979		return result;
3980
3981	block->allocator = allocator;
3982	block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
3983	block->index = index;
3984	return VDO_SUCCESS;
3985}
3986
3987static int __must_check initialize_block_allocator(struct slab_depot *depot,
3988						   zone_count_t zone)
3989{
3990	int result;
3991	block_count_t i;
3992	struct block_allocator *allocator = &depot->allocators[zone];
3993	struct vdo *vdo = depot->vdo;
3994	block_count_t max_free_blocks = depot->slab_config.data_blocks;
3995	unsigned int max_priority = (2 + ilog2(max_free_blocks));
3996
3997	*allocator = (struct block_allocator) {
3998		.depot = depot,
3999		.zone_number = zone,
4000		.thread_id = vdo->thread_config.physical_threads[zone],
4001		.nonce = vdo->states.vdo.nonce,
4002	};
4003
4004	INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4005	vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4006	result = vdo_register_read_only_listener(vdo, allocator,
4007						 notify_block_allocator_of_read_only_mode,
4008						 allocator->thread_id);
4009	if (result != VDO_SUCCESS)
4010		return result;
4011
4012	vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4013	result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, allocator->thread_id,
4014			       VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4015			       allocator, &allocator->vio_pool);
4016	if (result != VDO_SUCCESS)
4017		return result;
4018
4019	result = initialize_slab_scrubber(allocator);
4020	if (result != VDO_SUCCESS)
4021		return result;
4022
4023	result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4024	if (result != VDO_SUCCESS)
4025		return result;
4026
4027	result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4028			      struct slab_summary_block, __func__,
4029			      &allocator->summary_blocks);
4030	if (result != VDO_SUCCESS)
4031		return result;
4032
4033	vdo_set_admin_state_code(&allocator->summary_state,
4034				 VDO_ADMIN_STATE_NORMAL_OPERATION);
4035	allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4036
4037	/* Initialize each summary block. */
4038	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4039		result = initialize_slab_summary_block(allocator, i);
4040		if (result != VDO_SUCCESS)
4041			return result;
4042	}
4043
4044	/*
4045	 * Performing well atop thin provisioned storage requires either that VDO discards freed
4046	 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4047	 * in preference to slabs that have never been opened. For reasons we have not been able to
4048	 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4049	 * test throughput) to very slight differences in the timing and locality of block
4050	 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4051	 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4052	 * hurts on these machines.
4053	 *
4054	 * This sets the free block threshold for preferring to open an unopened slab to the binary
4055	 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4056	 * to about half the slab size.
4057	 */
4058	allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4059
4060	return VDO_SUCCESS;
4061}
4062
4063static int allocate_components(struct slab_depot *depot,
4064			       struct partition *summary_partition)
4065{
4066	int result;
4067	zone_count_t zone;
4068	slab_count_t slab_count;
4069	u8 hint;
4070	u32 i;
4071	const struct thread_config *thread_config = &depot->vdo->thread_config;
4072
4073	result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4074					 thread_config->journal_thread, depot,
4075					 schedule_tail_block_commit,
4076					 depot->vdo, &depot->action_manager);
4077	if (result != VDO_SUCCESS)
4078		return result;
4079
4080	depot->origin = depot->first_block;
4081
4082	/* block size must be a multiple of entry size */
4083	BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4084
4085	depot->summary_origin = summary_partition->offset;
4086	depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4087	result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4088			      struct slab_summary_entry, __func__,
4089			      &depot->summary_entries);
4090	if (result != VDO_SUCCESS)
4091		return result;
4092
4093
4094	/* Initialize all the entries. */
4095	hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4096	for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4097		/*
4098		 * This default tail block offset must be reflected in
4099		 * slabJournal.c::read_slab_journal_tail().
4100		 */
4101		depot->summary_entries[i] = (struct slab_summary_entry) {
4102			.tail_block_offset = 0,
4103			.fullness_hint = hint,
4104			.load_ref_counts = false,
4105			.is_dirty = false,
4106		};
4107	}
4108
4109	slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4110					    depot->slab_size_shift);
4111	if (thread_config->physical_zone_count > slab_count) {
4112		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
4113					      "%u physical zones exceeds slab count %u",
4114					      thread_config->physical_zone_count,
4115					      slab_count);
4116	}
4117
4118	/* Initialize the block allocators. */
4119	for (zone = 0; zone < depot->zone_count; zone++) {
4120		result = initialize_block_allocator(depot, zone);
4121		if (result != VDO_SUCCESS)
4122			return result;
4123	}
4124
4125	/* Allocate slabs. */
4126	result = allocate_slabs(depot, slab_count);
4127	if (result != VDO_SUCCESS)
4128		return result;
4129
4130	/* Use the new slabs. */
4131	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4132		struct vdo_slab *slab = depot->new_slabs[i];
4133
4134		register_slab_with_allocator(slab->allocator, slab);
4135		WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4136	}
4137
4138	depot->slabs = depot->new_slabs;
4139	depot->new_slabs = NULL;
4140	depot->new_slab_count = 0;
4141
4142	return VDO_SUCCESS;
4143}
4144
4145/**
4146 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4147 *                           block.
4148 * @state: The slab depot state from the super block.
4149 * @vdo: The VDO which will own the depot.
4150 * @summary_partition: The partition which holds the slab summary.
4151 * @depot_ptr: A pointer to hold the depot.
4152 *
4153 * Return: A success or error code.
4154 */
4155int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4156			  struct partition *summary_partition,
4157			  struct slab_depot **depot_ptr)
4158{
4159	unsigned int slab_size_shift;
4160	struct slab_depot *depot;
4161	int result;
4162
4163	/*
4164	 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4165	 * requires that the slab size be a power of two.
4166	 */
4167	block_count_t slab_size = state.slab_config.slab_blocks;
4168
4169	if (!is_power_of_2(slab_size)) {
4170		return vdo_log_error_strerror(UDS_INVALID_ARGUMENT,
4171					      "slab size must be a power of two");
4172	}
4173	slab_size_shift = ilog2(slab_size);
4174
4175	result = vdo_allocate_extended(struct slab_depot,
4176				       vdo->thread_config.physical_zone_count,
4177				       struct block_allocator, __func__, &depot);
4178	if (result != VDO_SUCCESS)
4179		return result;
4180
4181	depot->vdo = vdo;
4182	depot->old_zone_count = state.zone_count;
4183	depot->zone_count = vdo->thread_config.physical_zone_count;
4184	depot->slab_config = state.slab_config;
4185	depot->first_block = state.first_block;
4186	depot->last_block = state.last_block;
4187	depot->slab_size_shift = slab_size_shift;
4188
4189	result = allocate_components(depot, summary_partition);
4190	if (result != VDO_SUCCESS) {
4191		vdo_free_slab_depot(depot);
4192		return result;
4193	}
4194
4195	*depot_ptr = depot;
4196	return VDO_SUCCESS;
4197}
4198
4199static void uninitialize_allocator_summary(struct block_allocator *allocator)
4200{
4201	block_count_t i;
4202
4203	if (allocator->summary_blocks == NULL)
4204		return;
4205
4206	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4207		free_vio_components(&allocator->summary_blocks[i].vio);
4208		vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries));
4209	}
4210
4211	vdo_free(vdo_forget(allocator->summary_blocks));
4212}
4213
4214/**
4215 * vdo_free_slab_depot() - Destroy a slab depot.
4216 * @depot: The depot to destroy.
4217 */
4218void vdo_free_slab_depot(struct slab_depot *depot)
4219{
4220	zone_count_t zone = 0;
4221
4222	if (depot == NULL)
4223		return;
4224
4225	vdo_abandon_new_slabs(depot);
4226
4227	for (zone = 0; zone < depot->zone_count; zone++) {
4228		struct block_allocator *allocator = &depot->allocators[zone];
4229
4230		if (allocator->eraser != NULL)
4231			dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
4232
4233		uninitialize_allocator_summary(allocator);
4234		uninitialize_scrubber_vio(&allocator->scrubber);
4235		free_vio_pool(vdo_forget(allocator->vio_pool));
4236		vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs));
4237	}
4238
4239	if (depot->slabs != NULL) {
4240		slab_count_t i;
4241
4242		for (i = 0; i < depot->slab_count; i++)
4243			free_slab(vdo_forget(depot->slabs[i]));
4244	}
4245
4246	vdo_free(vdo_forget(depot->slabs));
4247	vdo_free(vdo_forget(depot->action_manager));
4248	vdo_free(vdo_forget(depot->summary_entries));
4249	vdo_free(depot);
4250}
4251
4252/**
4253 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4254 * @depot: The depot to encode.
4255 *
4256 * Return: The depot state.
4257 */
4258struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4259{
4260	/*
4261	 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4262	 * tool and is now being saved. We did not load and combine the slab summary, so we still
4263	 * need to do that next time we load with the old zone count rather than 0.
4264	 */
4265	struct slab_depot_state_2_0 state;
4266	zone_count_t zones_to_record = depot->zone_count;
4267
4268	if (depot->zone_count == 0)
4269		zones_to_record = depot->old_zone_count;
4270
4271	state = (struct slab_depot_state_2_0) {
4272		.slab_config = depot->slab_config,
4273		.first_block = depot->first_block,
4274		.last_block = depot->last_block,
4275		.zone_count = zones_to_record,
4276	};
4277
4278	return state;
4279}
4280
4281/**
4282 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4283 *
4284 * Context: This method may be called only before entering normal operation from the load thread.
4285 *
4286 * Return: VDO_SUCCESS or an error.
4287 */
4288int vdo_allocate_reference_counters(struct slab_depot *depot)
4289{
4290	struct slab_iterator iterator =
4291		get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4292
4293	while (iterator.next != NULL) {
4294		int result = allocate_slab_counters(next_slab(&iterator));
4295
4296		if (result != VDO_SUCCESS)
4297			return result;
4298	}
4299
4300	return VDO_SUCCESS;
4301}
4302
4303/**
4304 * get_slab_number() - Get the number of the slab that contains a specified block.
4305 * @depot: The slab depot.
4306 * @pbn: The physical block number.
4307 * @slab_number_ptr: A pointer to hold the slab number.
4308 *
4309 * Return: VDO_SUCCESS or an error.
4310 */
4311static int __must_check get_slab_number(const struct slab_depot *depot,
4312					physical_block_number_t pbn,
4313					slab_count_t *slab_number_ptr)
4314{
4315	slab_count_t slab_number;
4316
4317	if (pbn < depot->first_block)
4318		return VDO_OUT_OF_RANGE;
4319
4320	slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4321	if (slab_number >= depot->slab_count)
4322		return VDO_OUT_OF_RANGE;
4323
4324	*slab_number_ptr = slab_number;
4325	return VDO_SUCCESS;
4326}
4327
4328/**
4329 * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4330 * @depot: The slab depot.
4331 * @pbn: The physical block number.
4332 *
4333 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4334 *
4335 * Return: The slab containing the block, or NULL if the block number is the zero block or
4336 * otherwise out of range.
4337 */
4338struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4339			      physical_block_number_t pbn)
4340{
4341	slab_count_t slab_number;
4342	int result;
4343
4344	if (pbn == VDO_ZERO_BLOCK)
4345		return NULL;
4346
4347	result = get_slab_number(depot, pbn, &slab_number);
4348	if (result != VDO_SUCCESS) {
4349		vdo_enter_read_only_mode(depot->vdo, result);
4350		return NULL;
4351	}
4352
4353	return depot->slabs[slab_number];
4354}
4355
4356/**
4357 * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4358 * @depot: The slab depot.
4359 * @pbn: The physical block number that is being queried.
4360 *
4361 * Context: This method must be called from the physical zone thread of the PBN.
4362 *
4363 * Return: The number of available references.
4364 */
4365u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4366{
4367	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4368	vdo_refcount_t *counter_ptr = NULL;
4369	int result;
4370
4371	if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4372		return 0;
4373
4374	result = get_reference_counter(slab, pbn, &counter_ptr);
4375	if (result != VDO_SUCCESS)
4376		return 0;
4377
4378	if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4379		return (MAXIMUM_REFERENCE_COUNT - 1);
4380
4381	return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4382}
4383
4384/**
4385 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4386 * @depot: The depot.
4387 * @pbn: The physical block number to ask about.
4388 *
4389 * Return: True if the PBN corresponds to a data block.
4390 */
4391bool vdo_is_physical_data_block(const struct slab_depot *depot,
4392				physical_block_number_t pbn)
4393{
4394	slab_count_t slab_number;
4395	slab_block_number sbn;
4396
4397	return ((pbn == VDO_ZERO_BLOCK) ||
4398		((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4399		 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4400		  VDO_SUCCESS)));
4401}
4402
4403/**
4404 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4405 * the slabs in the depot.
4406 * @depot: The slab depot.
4407 *
4408 * This is the total number of blocks with a non-zero reference count.
4409 *
4410 * Context: This may be called from any thread.
4411 *
4412 * Return: The total number of blocks with a non-zero reference count.
4413 */
4414block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4415{
4416	block_count_t total = 0;
4417	zone_count_t zone;
4418
4419	for (zone = 0; zone < depot->zone_count; zone++) {
4420		/* The allocators are responsible for thread safety. */
4421		total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4422	}
4423
4424	return total;
4425}
4426
4427/**
4428 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4429 *                                    depot.
4430 * @depot: The slab depot.
4431 *
4432 * Context: This may be called from any thread.
4433 *
4434 * Return: The total number of data blocks in all slabs.
4435 */
4436block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4437{
4438	return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4439}
4440
4441/**
4442 * finish_combining_zones() - Clean up after saving out the combined slab summary.
4443 * @completion: The vio which was used to write the summary data.
4444 */
4445static void finish_combining_zones(struct vdo_completion *completion)
4446{
4447	int result = completion->result;
4448	struct vdo_completion *parent = completion->parent;
4449
4450	free_vio(as_vio(vdo_forget(completion)));
4451	vdo_fail_completion(parent, result);
4452}
4453
4454static void handle_combining_error(struct vdo_completion *completion)
4455{
4456	vio_record_metadata_io_error(as_vio(completion));
4457	finish_combining_zones(completion);
4458}
4459
4460static void write_summary_endio(struct bio *bio)
4461{
4462	struct vio *vio = bio->bi_private;
4463	struct vdo *vdo = vio->completion.vdo;
4464
4465	continue_vio_after_io(vio, finish_combining_zones,
4466			      vdo->thread_config.admin_thread);
4467}
4468
4469/**
4470 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4471 *                       update every zone to the correct values for every slab.
4472 * @depot: The depot whose summary entries should be combined.
4473 */
4474static void combine_summaries(struct slab_depot *depot)
4475{
4476	/*
4477	 * Combine all the old summary data into the portion of the buffer corresponding to the
4478	 * first zone.
4479	 */
4480	zone_count_t zone = 0;
4481	struct slab_summary_entry *entries = depot->summary_entries;
4482
4483	if (depot->old_zone_count > 1) {
4484		slab_count_t entry_number;
4485
4486		for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4487			if (zone != 0) {
4488				memcpy(entries + entry_number,
4489				       entries + (zone * MAX_VDO_SLABS) + entry_number,
4490				       sizeof(struct slab_summary_entry));
4491			}
4492
4493			zone++;
4494			if (zone == depot->old_zone_count)
4495				zone = 0;
4496		}
4497	}
4498
4499	/* Copy the combined data to each zones's region of the buffer. */
4500	for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4501		memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4502		       MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4503	}
4504}
4505
4506/**
4507 * finish_loading_summary() - Finish loading slab summary data.
4508 * @completion: The vio which was used to read the summary data.
4509 *
4510 * Combines the slab summary data from all the previously written zones and copies the combined
4511 * summary to each partition's data region. Then writes the combined summary back out to disk. This
4512 * callback is registered in load_summary_endio().
4513 */
4514static void finish_loading_summary(struct vdo_completion *completion)
4515{
4516	struct slab_depot *depot = completion->vdo->depot;
4517
4518	/* Combine the summary from each zone so each zone is correct for all slabs. */
4519	combine_summaries(depot);
4520
4521	/* Write the combined summary back out. */
4522	vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4523				write_summary_endio, handle_combining_error,
4524				REQ_OP_WRITE);
4525}
4526
4527static void load_summary_endio(struct bio *bio)
4528{
4529	struct vio *vio = bio->bi_private;
4530	struct vdo *vdo = vio->completion.vdo;
4531
4532	continue_vio_after_io(vio, finish_loading_summary,
4533			      vdo->thread_config.admin_thread);
4534}
4535
4536/**
4537 * load_slab_summary() - The preamble of a load operation.
4538 *
4539 * Implements vdo_action_preamble_fn.
4540 */
4541static void load_slab_summary(void *context, struct vdo_completion *parent)
4542{
4543	int result;
4544	struct vio *vio;
4545	struct slab_depot *depot = context;
4546	const struct admin_state_code *operation =
4547		vdo_get_current_manager_operation(depot->action_manager);
4548
4549	result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4550						 VIO_PRIORITY_METADATA, parent,
4551						 VDO_SLAB_SUMMARY_BLOCKS,
4552						 (char *) depot->summary_entries, &vio);
4553	if (result != VDO_SUCCESS) {
4554		vdo_fail_completion(parent, result);
4555		return;
4556	}
4557
4558	if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4559	    (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4560		finish_loading_summary(&vio->completion);
4561		return;
4562	}
4563
4564	vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4565				handle_combining_error, REQ_OP_READ);
4566}
4567
4568/* Implements vdo_zone_action_fn. */
4569static void load_allocator(void *context, zone_count_t zone_number,
4570			   struct vdo_completion *parent)
4571{
4572	struct slab_depot *depot = context;
4573
4574	vdo_start_loading(&depot->allocators[zone_number].state,
4575			  vdo_get_current_manager_operation(depot->action_manager),
4576			  parent, initiate_load);
4577}
4578
4579/**
4580 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4581 *                         super_block component.
4582 * @depot: The depot to load.
4583 * @operation: The type of load to perform.
4584 * @parent: The completion to notify when the load is complete.
4585 * @context: Additional context for the load operation; may be NULL.
4586 *
4587 * This method may be called only before entering normal operation from the load thread.
4588 */
4589void vdo_load_slab_depot(struct slab_depot *depot,
4590			 const struct admin_state_code *operation,
4591			 struct vdo_completion *parent, void *context)
4592{
4593	if (!vdo_assert_load_operation(operation, parent))
4594		return;
4595
4596	vdo_schedule_operation_with_context(depot->action_manager, operation,
4597					    load_slab_summary, load_allocator,
4598					    NULL, context, parent);
4599}
4600
4601/* Implements vdo_zone_action_fn. */
4602static void prepare_to_allocate(void *context, zone_count_t zone_number,
4603				struct vdo_completion *parent)
4604{
4605	struct slab_depot *depot = context;
4606	struct block_allocator *allocator = &depot->allocators[zone_number];
4607	int result;
4608
4609	result = vdo_prepare_slabs_for_allocation(allocator);
4610	if (result != VDO_SUCCESS) {
4611		vdo_fail_completion(parent, result);
4612		return;
4613	}
4614
4615	scrub_slabs(allocator, parent);
4616}
4617
4618/**
4619 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4620 *                                        allocating blocks.
4621 * @depot: The depot to prepare.
4622 * @load_type: The load type.
4623 * @parent: The completion to notify when the operation is complete.
4624 *
4625 * This method may be called only before entering normal operation from the load thread. It must be
4626 * called before allocation may proceed.
4627 */
4628void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4629					enum slab_depot_load_type load_type,
4630					struct vdo_completion *parent)
4631{
4632	depot->load_type = load_type;
4633	atomic_set(&depot->zones_to_scrub, depot->zone_count);
4634	vdo_schedule_action(depot->action_manager, NULL,
4635			    prepare_to_allocate, NULL, parent);
4636}
4637
4638/**
4639 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4640 * @depot: The depot to update.
4641 *
4642 * This size is saved to disk as part of the super block.
4643 */
4644void vdo_update_slab_depot_size(struct slab_depot *depot)
4645{
4646	depot->last_block = depot->new_last_block;
4647}
4648
4649/**
4650 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4651 *                                    the given size.
4652 * @depot: The depot to prepare to resize.
4653 * @partition: The new depot partition
4654 *
4655 * Return: VDO_SUCCESS or an error.
4656 */
4657int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4658				   const struct partition *partition)
4659{
4660	struct slab_depot_state_2_0 new_state;
4661	int result;
4662	slab_count_t new_slab_count;
4663
4664	if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4665		return VDO_INCREMENT_TOO_SMALL;
4666
4667	/* Generate the depot configuration for the new block count. */
4668	VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4669			    "New slab depot partition doesn't change origin");
4670	result = vdo_configure_slab_depot(partition, depot->slab_config,
4671					  depot->zone_count, &new_state);
4672	if (result != VDO_SUCCESS)
4673		return result;
4674
4675	new_slab_count = vdo_compute_slab_count(depot->first_block,
4676						new_state.last_block,
4677						depot->slab_size_shift);
4678	if (new_slab_count <= depot->slab_count)
4679		return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4680					      "Depot can only grow");
4681	if (new_slab_count == depot->new_slab_count) {
4682		/* Check it out, we've already got all the new slabs allocated! */
4683		return VDO_SUCCESS;
4684	}
4685
4686	vdo_abandon_new_slabs(depot);
4687	result = allocate_slabs(depot, new_slab_count);
4688	if (result != VDO_SUCCESS) {
4689		vdo_abandon_new_slabs(depot);
4690		return result;
4691	}
4692
4693	depot->new_size = partition->count;
4694	depot->old_last_block = depot->last_block;
4695	depot->new_last_block = new_state.last_block;
4696
4697	return VDO_SUCCESS;
4698}
4699
4700/**
4701 * finish_registration() - Finish registering new slabs now that all of the allocators have
4702 *                         received their new slabs.
4703 *
4704 * Implements vdo_action_conclusion_fn.
4705 */
4706static int finish_registration(void *context)
4707{
4708	struct slab_depot *depot = context;
4709
4710	WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4711	vdo_free(depot->slabs);
4712	depot->slabs = depot->new_slabs;
4713	depot->new_slabs = NULL;
4714	depot->new_slab_count = 0;
4715	return VDO_SUCCESS;
4716}
4717
4718/* Implements vdo_zone_action_fn. */
4719static void register_new_slabs(void *context, zone_count_t zone_number,
4720			       struct vdo_completion *parent)
4721{
4722	struct slab_depot *depot = context;
4723	struct block_allocator *allocator = &depot->allocators[zone_number];
4724	slab_count_t i;
4725
4726	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4727		struct vdo_slab *slab = depot->new_slabs[i];
4728
4729		if (slab->allocator == allocator)
4730			register_slab_with_allocator(allocator, slab);
4731	}
4732
4733	vdo_finish_completion(parent);
4734}
4735
4736/**
4737 * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4738 * @depot: The depot.
4739 * @parent: The object to notify when complete.
4740 */
4741void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4742{
4743	VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4744	vdo_schedule_operation(depot->action_manager,
4745			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4746			       NULL, register_new_slabs,
4747			       finish_registration, parent);
4748}
4749
4750/**
4751 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4752 *                    currently working on.
4753 * @scrubber: The scrubber to stop.
4754 * @parent: The completion to notify when scrubbing has stopped.
4755 */
4756static void stop_scrubbing(struct block_allocator *allocator)
4757{
4758	struct slab_scrubber *scrubber = &allocator->scrubber;
4759
4760	if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4761		vdo_finish_completion(&allocator->completion);
4762	} else {
4763		vdo_start_draining(&scrubber->admin_state,
4764				   VDO_ADMIN_STATE_SUSPENDING,
4765				   &allocator->completion, NULL);
4766	}
4767}
4768
4769/* Implements vdo_admin_initiator_fn. */
4770static void initiate_summary_drain(struct admin_state *state)
4771{
4772	check_summary_drain_complete(container_of(state, struct block_allocator,
4773						  summary_state));
4774}
4775
4776static void do_drain_step(struct vdo_completion *completion)
4777{
4778	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4779
4780	vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4781					   handle_operation_error, allocator->thread_id,
4782					   NULL);
4783	switch (++allocator->drain_step) {
4784	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4785		stop_scrubbing(allocator);
4786		return;
4787
4788	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4789		apply_to_slabs(allocator, do_drain_step);
4790		return;
4791
4792	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4793		vdo_start_draining(&allocator->summary_state,
4794				   vdo_get_admin_state_code(&allocator->state),
4795				   completion, initiate_summary_drain);
4796		return;
4797
4798	case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4799		VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4800				    "vio pool not busy");
4801		vdo_finish_draining_with_result(&allocator->state, completion->result);
4802		return;
4803
4804	default:
4805		vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4806	}
4807}
4808
4809/* Implements vdo_admin_initiator_fn. */
4810static void initiate_drain(struct admin_state *state)
4811{
4812	struct block_allocator *allocator =
4813		container_of(state, struct block_allocator, state);
4814
4815	allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4816	do_drain_step(&allocator->completion);
4817}
4818
4819/*
4820 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4821 * written to disk. The type of drain will be determined from the state of the allocator's depot.
4822 *
4823 * Implements vdo_zone_action_fn.
4824 */
4825static void drain_allocator(void *context, zone_count_t zone_number,
4826			    struct vdo_completion *parent)
4827{
4828	struct slab_depot *depot = context;
4829
4830	vdo_start_draining(&depot->allocators[zone_number].state,
4831			   vdo_get_current_manager_operation(depot->action_manager),
4832			   parent, initiate_drain);
4833}
4834
4835/**
4836 * vdo_drain_slab_depot() - Drain all slab depot I/O.
4837 * @depot: The depot to drain.
4838 * @operation: The drain operation (flush, rebuild, suspend, or save).
4839 * @parent: The completion to finish when the drain is complete.
4840 *
4841 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4842 * the depot will be left in a suspended state.
4843 */
4844void vdo_drain_slab_depot(struct slab_depot *depot,
4845			  const struct admin_state_code *operation,
4846			  struct vdo_completion *parent)
4847{
4848	vdo_schedule_operation(depot->action_manager, operation,
4849			       NULL, drain_allocator, NULL, parent);
4850}
4851
4852/**
4853 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4854 * @allocator: The allocator being resumed.
4855 */
4856static void resume_scrubbing(struct block_allocator *allocator)
4857{
4858	int result;
4859	struct slab_scrubber *scrubber = &allocator->scrubber;
4860
4861	if (!has_slabs_to_scrub(scrubber)) {
4862		vdo_finish_completion(&allocator->completion);
4863		return;
4864	}
4865
4866	result = vdo_resume_if_quiescent(&scrubber->admin_state);
4867	if (result != VDO_SUCCESS) {
4868		vdo_fail_completion(&allocator->completion, result);
4869		return;
4870	}
4871
4872	scrub_next_slab(scrubber);
4873	vdo_finish_completion(&allocator->completion);
4874}
4875
4876static void do_resume_step(struct vdo_completion *completion)
4877{
4878	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4879
4880	vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4881					   handle_operation_error,
4882					   allocator->thread_id, NULL);
4883	switch (--allocator->drain_step) {
4884	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4885		vdo_fail_completion(completion,
4886				    vdo_resume_if_quiescent(&allocator->summary_state));
4887		return;
4888
4889	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4890		apply_to_slabs(allocator, do_resume_step);
4891		return;
4892
4893	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4894		resume_scrubbing(allocator);
4895		return;
4896
4897	case VDO_DRAIN_ALLOCATOR_START:
4898		vdo_finish_resuming_with_result(&allocator->state, completion->result);
4899		return;
4900
4901	default:
4902		vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4903	}
4904}
4905
4906/* Implements vdo_admin_initiator_fn. */
4907static void initiate_resume(struct admin_state *state)
4908{
4909	struct block_allocator *allocator =
4910		container_of(state, struct block_allocator, state);
4911
4912	allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
4913	do_resume_step(&allocator->completion);
4914}
4915
4916/* Implements vdo_zone_action_fn. */
4917static void resume_allocator(void *context, zone_count_t zone_number,
4918			     struct vdo_completion *parent)
4919{
4920	struct slab_depot *depot = context;
4921
4922	vdo_start_resuming(&depot->allocators[zone_number].state,
4923			   vdo_get_current_manager_operation(depot->action_manager),
4924			   parent, initiate_resume);
4925}
4926
4927/**
4928 * vdo_resume_slab_depot() - Resume a suspended slab depot.
4929 * @depot: The depot to resume.
4930 * @parent: The completion to finish when the depot has resumed.
4931 */
4932void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
4933{
4934	if (vdo_is_read_only(depot->vdo)) {
4935		vdo_continue_completion(parent, VDO_READ_ONLY);
4936		return;
4937	}
4938
4939	vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
4940			       NULL, resume_allocator, NULL, parent);
4941}
4942
4943/**
4944 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
4945 *                                                given recovery journal block.
4946 * @depot: The depot.
4947 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
4948 *                         released.
4949 *
4950 * Context: This method must be called from the journal zone thread.
4951 */
4952void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
4953						sequence_number_t recovery_block_number)
4954{
4955	if (depot == NULL)
4956		return;
4957
4958	depot->new_release_request = recovery_block_number;
4959	vdo_schedule_default_action(depot->action_manager);
4960}
4961
4962/* Implements vdo_zone_action_fn. */
4963static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
4964					struct vdo_completion *parent)
4965{
4966	struct slab_depot *depot = context;
4967
4968	scrub_slabs(&depot->allocators[zone_number], NULL);
4969	vdo_launch_completion(parent);
4970}
4971
4972/**
4973 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
4974 * @depot: The depot to scrub.
4975 * @parent: The object to notify when scrubbing has been launched for all zones.
4976 */
4977void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
4978				     struct vdo_completion *parent)
4979{
4980	vdo_schedule_action(depot->action_manager, NULL,
4981			    scrub_all_unrecovered_slabs,
4982			    NULL, parent);
4983}
4984
4985/**
4986 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
4987 *                                    in the depot.
4988 * @depot: The slab depot.
4989 *
4990 * Return: The statistics from all block allocators in the depot.
4991 */
4992static struct block_allocator_statistics __must_check
4993get_block_allocator_statistics(const struct slab_depot *depot)
4994{
4995	struct block_allocator_statistics totals;
4996	zone_count_t zone;
4997
4998	memset(&totals, 0, sizeof(totals));
4999
5000	for (zone = 0; zone < depot->zone_count; zone++) {
5001		const struct block_allocator *allocator = &depot->allocators[zone];
5002		const struct block_allocator_statistics *stats = &allocator->statistics;
5003
5004		totals.slab_count += allocator->slab_count;
5005		totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5006		totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5007	}
5008
5009	return totals;
5010}
5011
5012/**
5013 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5014 * @depot: The slab depot.
5015 *
5016 * Return: The cumulative statistics for all ref_counts in the depot.
5017 */
5018static struct ref_counts_statistics __must_check
5019get_ref_counts_statistics(const struct slab_depot *depot)
5020{
5021	struct ref_counts_statistics totals;
5022	zone_count_t zone;
5023
5024	memset(&totals, 0, sizeof(totals));
5025
5026	for (zone = 0; zone < depot->zone_count; zone++) {
5027		totals.blocks_written +=
5028			READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5029	}
5030
5031	return totals;
5032}
5033
5034/**
5035 * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5036 * @depot: The slab depot.
5037 *
5038 * Return: The aggregated statistics for all slab journals in the depot.
5039 */
5040static struct slab_journal_statistics __must_check
5041get_slab_journal_statistics(const struct slab_depot *depot)
5042{
5043	struct slab_journal_statistics totals;
5044	zone_count_t zone;
5045
5046	memset(&totals, 0, sizeof(totals));
5047
5048	for (zone = 0; zone < depot->zone_count; zone++) {
5049		const struct slab_journal_statistics *stats =
5050			&depot->allocators[zone].slab_journal_statistics;
5051
5052		totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5053		totals.flush_count += READ_ONCE(stats->flush_count);
5054		totals.blocked_count += READ_ONCE(stats->blocked_count);
5055		totals.blocks_written += READ_ONCE(stats->blocks_written);
5056		totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5057	}
5058
5059	return totals;
5060}
5061
5062/**
5063 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5064 *                                   slab depot.
5065 * @depot: The slab depot.
5066 * @stats: The vdo statistics structure to partially fill.
5067 */
5068void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5069				   struct vdo_statistics *stats)
5070{
5071	slab_count_t slab_count = READ_ONCE(depot->slab_count);
5072	slab_count_t unrecovered = 0;
5073	zone_count_t zone;
5074
5075	for (zone = 0; zone < depot->zone_count; zone++) {
5076		/* The allocators are responsible for thread safety. */
5077		unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5078	}
5079
5080	stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5081	stats->allocator = get_block_allocator_statistics(depot);
5082	stats->ref_counts = get_ref_counts_statistics(depot);
5083	stats->slab_journal = get_slab_journal_statistics(depot);
5084	stats->slab_summary = (struct slab_summary_statistics) {
5085		.blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5086	};
5087}
5088
5089/**
5090 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5091 * @depot: The slab depot.
5092 */
5093void vdo_dump_slab_depot(const struct slab_depot *depot)
5094{
5095	vdo_log_info("vdo slab depot");
5096	vdo_log_info("  zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5097		     (unsigned int) depot->zone_count,
5098		     (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5099		     (unsigned long long) depot->active_release_request,
5100		     (unsigned long long) depot->new_release_request);
5101}
5102