1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "recovery-journal.h"
7
8#include <linux/atomic.h>
9#include <linux/bio.h>
10
11#include "logger.h"
12#include "memory-alloc.h"
13#include "permassert.h"
14
15#include "block-map.h"
16#include "completion.h"
17#include "constants.h"
18#include "data-vio.h"
19#include "encodings.h"
20#include "io-submitter.h"
21#include "slab-depot.h"
22#include "types.h"
23#include "vdo.h"
24#include "vio.h"
25#include "wait-queue.h"
26
27static const u64 RECOVERY_COUNT_MASK = 0xff;
28
29/*
30 * The number of reserved blocks must be large enough to prevent a new recovery journal
31 * block write from overwriting a block which appears to still be a valid head block of the
32 * journal. Currently, that means reserving enough space for all 2048 data_vios.
33 */
34#define RECOVERY_JOURNAL_RESERVED_BLOCKS				\
35	((MAXIMUM_VDO_USER_VIOS / RECOVERY_JOURNAL_ENTRIES_PER_BLOCK) + 2)
36
37/**
38 * DOC: Lock Counters.
39 *
40 * A lock_counter is intended to keep all of the locks for the blocks in the recovery journal. The
41 * per-zone counters are all kept in a single array which is arranged by zone (i.e. zone 0's lock 0
42 * is at index 0, zone 0's lock 1 is at index 1, and zone 1's lock 0 is at index 'locks'. This
43 * arrangement is intended to minimize cache-line contention for counters from different zones.
44 *
45 * The locks are implemented as a single object instead of as a lock counter per lock both to
46 * afford this opportunity to reduce cache line contention and also to eliminate the need to have a
47 * completion per lock.
48 *
49 * Lock sets are laid out with the set for recovery journal first, followed by the logical zones,
50 * and then the physical zones.
51 */
52
53enum lock_counter_state {
54	LOCK_COUNTER_STATE_NOT_NOTIFYING,
55	LOCK_COUNTER_STATE_NOTIFYING,
56	LOCK_COUNTER_STATE_SUSPENDED,
57};
58
59/**
60 * get_zone_count_ptr() - Get a pointer to the zone count for a given lock on a given zone.
61 * @journal: The recovery journal.
62 * @lock_number: The lock to get.
63 * @zone_type: The zone type whose count is desired.
64 *
65 * Return: A pointer to the zone count for the given lock and zone.
66 */
67static inline atomic_t *get_zone_count_ptr(struct recovery_journal *journal,
68					   block_count_t lock_number,
69					   enum vdo_zone_type zone_type)
70{
71	return ((zone_type == VDO_ZONE_TYPE_LOGICAL)
72		? &journal->lock_counter.logical_zone_counts[lock_number]
73		: &journal->lock_counter.physical_zone_counts[lock_number]);
74}
75
76/**
77 * get_counter() - Get the zone counter for a given lock on a given zone.
78 * @journal: The recovery journal.
79 * @lock_number: The lock to get.
80 * @zone_type: The zone type whose count is desired.
81 * @zone_id: The zone index whose count is desired.
82 *
83 * Return: The counter for the given lock and zone.
84 */
85static inline u16 *get_counter(struct recovery_journal *journal,
86			       block_count_t lock_number, enum vdo_zone_type zone_type,
87			       zone_count_t zone_id)
88{
89	struct lock_counter *counter = &journal->lock_counter;
90	block_count_t zone_counter = (counter->locks * zone_id) + lock_number;
91
92	if (zone_type == VDO_ZONE_TYPE_JOURNAL)
93		return &counter->journal_counters[zone_counter];
94
95	if (zone_type == VDO_ZONE_TYPE_LOGICAL)
96		return &counter->logical_counters[zone_counter];
97
98	return &counter->physical_counters[zone_counter];
99}
100
101static atomic_t *get_decrement_counter(struct recovery_journal *journal,
102				       block_count_t lock_number)
103{
104	return &journal->lock_counter.journal_decrement_counts[lock_number];
105}
106
107/**
108 * is_journal_zone_locked() - Check whether the journal zone is locked for a given lock.
109 * @journal: The recovery journal.
110 * @lock_number: The lock to check.
111 *
112 * Return: true if the journal zone is locked.
113 */
114static bool is_journal_zone_locked(struct recovery_journal *journal,
115				   block_count_t lock_number)
116{
117	u16 journal_value = *get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0);
118	u32 decrements = atomic_read(get_decrement_counter(journal, lock_number));
119
120	/* Pairs with barrier in vdo_release_journal_entry_lock() */
121	smp_rmb();
122	VDO_ASSERT_LOG_ONLY((decrements <= journal_value),
123			    "journal zone lock counter must not underflow");
124	return (journal_value != decrements);
125}
126
127/**
128 * vdo_release_recovery_journal_block_reference() - Release a reference to a recovery journal
129 *                                                  block.
130 * @journal: The recovery journal.
131 * @sequence_number: The journal sequence number of the referenced block.
132 * @zone_type: The type of the zone making the adjustment.
133 * @zone_id: The ID of the zone making the adjustment.
134 *
135 * If this is the last reference for a given zone type, an attempt will be made to reap the
136 * journal.
137 */
138void vdo_release_recovery_journal_block_reference(struct recovery_journal *journal,
139						  sequence_number_t sequence_number,
140						  enum vdo_zone_type zone_type,
141						  zone_count_t zone_id)
142{
143	u16 *current_value;
144	block_count_t lock_number;
145	int prior_state;
146
147	if (sequence_number == 0)
148		return;
149
150	lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
151	current_value = get_counter(journal, lock_number, zone_type, zone_id);
152
153	VDO_ASSERT_LOG_ONLY((*current_value >= 1),
154			    "decrement of lock counter must not underflow");
155	*current_value -= 1;
156
157	if (zone_type == VDO_ZONE_TYPE_JOURNAL) {
158		if (is_journal_zone_locked(journal, lock_number))
159			return;
160	} else {
161		atomic_t *zone_count;
162
163		if (*current_value != 0)
164			return;
165
166		zone_count = get_zone_count_ptr(journal, lock_number, zone_type);
167
168		if (atomic_add_return(-1, zone_count) > 0)
169			return;
170	}
171
172	/*
173	 * Extra barriers because this was original developed using a CAS operation that implicitly
174	 * had them.
175	 */
176	smp_mb__before_atomic();
177	prior_state = atomic_cmpxchg(&journal->lock_counter.state,
178				     LOCK_COUNTER_STATE_NOT_NOTIFYING,
179				     LOCK_COUNTER_STATE_NOTIFYING);
180	/* same as before_atomic */
181	smp_mb__after_atomic();
182
183	if (prior_state != LOCK_COUNTER_STATE_NOT_NOTIFYING)
184		return;
185
186	vdo_launch_completion(&journal->lock_counter.completion);
187}
188
189static inline struct recovery_journal_block * __must_check get_journal_block(struct list_head *list)
190{
191	return list_first_entry_or_null(list, struct recovery_journal_block, list_node);
192}
193
194/**
195 * pop_free_list() - Get a block from the end of the free list.
196 * @journal: The journal.
197 *
198 * Return: The block or NULL if the list is empty.
199 */
200static struct recovery_journal_block * __must_check pop_free_list(struct recovery_journal *journal)
201{
202	struct recovery_journal_block *block;
203
204	if (list_empty(&journal->free_tail_blocks))
205		return NULL;
206
207	block = list_last_entry(&journal->free_tail_blocks,
208				struct recovery_journal_block, list_node);
209	list_del_init(&block->list_node);
210	return block;
211}
212
213/**
214 * is_block_dirty() - Check whether a recovery block is dirty.
215 * @block: The block to check.
216 *
217 * Indicates it has any uncommitted entries, which includes both entries not written and entries
218 * written but not yet acknowledged.
219 *
220 * Return: true if the block has any uncommitted entries.
221 */
222static inline bool __must_check is_block_dirty(const struct recovery_journal_block *block)
223{
224	return (block->uncommitted_entry_count > 0);
225}
226
227/**
228 * is_block_empty() - Check whether a journal block is empty.
229 * @block: The block to check.
230 *
231 * Return: true if the block has no entries.
232 */
233static inline bool __must_check is_block_empty(const struct recovery_journal_block *block)
234{
235	return (block->entry_count == 0);
236}
237
238/**
239 * is_block_full() - Check whether a journal block is full.
240 * @block: The block to check.
241 *
242 * Return: true if the block is full.
243 */
244static inline bool __must_check is_block_full(const struct recovery_journal_block *block)
245{
246	return ((block == NULL) || (block->journal->entries_per_block == block->entry_count));
247}
248
249/**
250 * assert_on_journal_thread() - Assert that we are running on the journal thread.
251 * @journal: The journal.
252 * @function_name: The function doing the check (for logging).
253 */
254static void assert_on_journal_thread(struct recovery_journal *journal,
255				     const char *function_name)
256{
257	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == journal->thread_id),
258			    "%s() called on journal thread", function_name);
259}
260
261/**
262 * continue_waiter() - Release a data_vio from the journal.
263 *
264 * Invoked whenever a data_vio is to be released from the journal, either because its entry was
265 * committed to disk, or because there was an error. Implements waiter_callback_fn.
266 */
267static void continue_waiter(struct vdo_waiter *waiter, void *context)
268{
269	continue_data_vio_with_error(vdo_waiter_as_data_vio(waiter), *((int *) context));
270}
271
272/**
273 * has_block_waiters() - Check whether the journal has any waiters on any blocks.
274 * @journal: The journal in question.
275 *
276 * Return: true if any block has a waiter.
277 */
278static inline bool has_block_waiters(struct recovery_journal *journal)
279{
280	struct recovery_journal_block *block = get_journal_block(&journal->active_tail_blocks);
281
282	/*
283	 * Either the first active tail block (if it exists) has waiters, or no active tail block
284	 * has waiters.
285	 */
286	return ((block != NULL) &&
287		(vdo_waitq_has_waiters(&block->entry_waiters) ||
288		 vdo_waitq_has_waiters(&block->commit_waiters)));
289}
290
291static void recycle_journal_blocks(struct recovery_journal *journal);
292static void recycle_journal_block(struct recovery_journal_block *block);
293static void notify_commit_waiters(struct recovery_journal *journal);
294
295/**
296 * suspend_lock_counter() - Prevent the lock counter from notifying.
297 * @counter: The counter.
298 *
299 * Return: true if the lock counter was not notifying and hence the suspend was efficacious.
300 */
301static bool suspend_lock_counter(struct lock_counter *counter)
302{
303	int prior_state;
304
305	/*
306	 * Extra barriers because this was originally developed using a CAS operation that
307	 * implicitly had them.
308	 */
309	smp_mb__before_atomic();
310	prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_NOT_NOTIFYING,
311				     LOCK_COUNTER_STATE_SUSPENDED);
312	/* same as before_atomic */
313	smp_mb__after_atomic();
314
315	return ((prior_state == LOCK_COUNTER_STATE_SUSPENDED) ||
316		(prior_state == LOCK_COUNTER_STATE_NOT_NOTIFYING));
317}
318
319static inline bool is_read_only(struct recovery_journal *journal)
320{
321	return vdo_is_read_only(journal->flush_vio->completion.vdo);
322}
323
324/**
325 * check_for_drain_complete() - Check whether the journal has drained.
326 * @journal: The journal which may have just drained.
327 */
328static void check_for_drain_complete(struct recovery_journal *journal)
329{
330	int result = VDO_SUCCESS;
331
332	if (is_read_only(journal)) {
333		result = VDO_READ_ONLY;
334		/*
335		 * Clean up any full active blocks which were not written due to read-only mode.
336		 *
337		 * FIXME: This would probably be better as a short-circuit in write_block().
338		 */
339		notify_commit_waiters(journal);
340		recycle_journal_blocks(journal);
341
342		/* Release any data_vios waiting to be assigned entries. */
343		vdo_waitq_notify_all_waiters(&journal->entry_waiters,
344					     continue_waiter, &result);
345	}
346
347	if (!vdo_is_state_draining(&journal->state) ||
348	    journal->reaping ||
349	    has_block_waiters(journal) ||
350	    vdo_waitq_has_waiters(&journal->entry_waiters) ||
351	    !suspend_lock_counter(&journal->lock_counter))
352		return;
353
354	if (vdo_is_state_saving(&journal->state)) {
355		if (journal->active_block != NULL) {
356			VDO_ASSERT_LOG_ONLY(((result == VDO_READ_ONLY) ||
357					     !is_block_dirty(journal->active_block)),
358					    "journal being saved has clean active block");
359			recycle_journal_block(journal->active_block);
360		}
361
362		VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks),
363				    "all blocks in a journal being saved must be inactive");
364	}
365
366	vdo_finish_draining_with_result(&journal->state, result);
367}
368
369/**
370 * notify_recovery_journal_of_read_only_mode() - Notify a recovery journal that the VDO has gone
371 *                                               read-only.
372 * @listener: The journal.
373 * @parent: The completion to notify in order to acknowledge the notification.
374 *
375 * Implements vdo_read_only_notification_fn.
376 */
377static void notify_recovery_journal_of_read_only_mode(void *listener,
378						      struct vdo_completion *parent)
379{
380	check_for_drain_complete(listener);
381	vdo_finish_completion(parent);
382}
383
384/**
385 * enter_journal_read_only_mode() - Put the journal in read-only mode.
386 * @journal: The journal which has failed.
387 * @error_code: The error result triggering this call.
388 *
389 * All attempts to add entries after this function is called will fail. All VIOs waiting for
390 * commits will be awakened with an error.
391 */
392static void enter_journal_read_only_mode(struct recovery_journal *journal,
393					 int error_code)
394{
395	vdo_enter_read_only_mode(journal->flush_vio->completion.vdo, error_code);
396	check_for_drain_complete(journal);
397}
398
399/**
400 * vdo_get_recovery_journal_current_sequence_number() - Obtain the recovery journal's current
401 *                                                      sequence number.
402 * @journal: The journal in question.
403 *
404 * Exposed only so the block map can be initialized therefrom.
405 *
406 * Return: The sequence number of the tail block.
407 */
408sequence_number_t vdo_get_recovery_journal_current_sequence_number(struct recovery_journal *journal)
409{
410	return journal->tail;
411}
412
413/**
414 * get_recovery_journal_head() - Get the head of the recovery journal.
415 * @journal: The journal.
416 *
417 * The head is the lowest sequence number of the block map head and the slab journal head.
418 *
419 * Return: the head of the journal.
420 */
421static inline sequence_number_t get_recovery_journal_head(const struct recovery_journal *journal)
422{
423	return min(journal->block_map_head, journal->slab_journal_head);
424}
425
426/**
427 * compute_recovery_count_byte() - Compute the recovery count byte for a given recovery count.
428 * @recovery_count: The recovery count.
429 *
430 * Return: The byte corresponding to the recovery count.
431 */
432static inline u8 __must_check compute_recovery_count_byte(u64 recovery_count)
433{
434	return (u8)(recovery_count & RECOVERY_COUNT_MASK);
435}
436
437/**
438 * check_slab_journal_commit_threshold() - Check whether the journal is over the threshold, and if
439 *                                         so, force the oldest slab journal tail block to commit.
440 * @journal: The journal.
441 */
442static void check_slab_journal_commit_threshold(struct recovery_journal *journal)
443{
444	block_count_t current_length = journal->tail - journal->slab_journal_head;
445
446	if (current_length > journal->slab_journal_commit_threshold) {
447		journal->events.slab_journal_commits_requested++;
448		vdo_commit_oldest_slab_journal_tail_blocks(journal->depot,
449							   journal->slab_journal_head);
450	}
451}
452
453static void reap_recovery_journal(struct recovery_journal *journal);
454static void assign_entries(struct recovery_journal *journal);
455
456/**
457 * finish_reaping() - Finish reaping the journal.
458 * @journal: The journal being reaped.
459 */
460static void finish_reaping(struct recovery_journal *journal)
461{
462	block_count_t blocks_reaped;
463	sequence_number_t old_head = get_recovery_journal_head(journal);
464
465	journal->block_map_head = journal->block_map_reap_head;
466	journal->slab_journal_head = journal->slab_journal_reap_head;
467	blocks_reaped = get_recovery_journal_head(journal) - old_head;
468	journal->available_space += blocks_reaped * journal->entries_per_block;
469	journal->reaping = false;
470	check_slab_journal_commit_threshold(journal);
471	assign_entries(journal);
472	check_for_drain_complete(journal);
473}
474
475/**
476 * complete_reaping() - Finish reaping the journal after flushing the lower layer.
477 * @completion: The journal's flush VIO.
478 *
479 * This is the callback registered in reap_recovery_journal().
480 */
481static void complete_reaping(struct vdo_completion *completion)
482{
483	struct recovery_journal *journal = completion->parent;
484
485	finish_reaping(journal);
486
487	/* Try reaping again in case more locks were released while flush was out. */
488	reap_recovery_journal(journal);
489}
490
491/**
492 * handle_flush_error() - Handle an error when flushing the lower layer due to reaping.
493 * @completion: The journal's flush VIO.
494 */
495static void handle_flush_error(struct vdo_completion *completion)
496{
497	struct recovery_journal *journal = completion->parent;
498
499	vio_record_metadata_io_error(as_vio(completion));
500	journal->reaping = false;
501	enter_journal_read_only_mode(journal, completion->result);
502}
503
504static void flush_endio(struct bio *bio)
505{
506	struct vio *vio = bio->bi_private;
507	struct recovery_journal *journal = vio->completion.parent;
508
509	continue_vio_after_io(vio, complete_reaping, journal->thread_id);
510}
511
512/**
513 * initialize_journal_state() - Set all journal fields appropriately to start journaling from the
514 *                              current active block.
515 * @journal: The journal to be reset based on its active block.
516 */
517static void initialize_journal_state(struct recovery_journal *journal)
518{
519	journal->append_point.sequence_number = journal->tail;
520	journal->last_write_acknowledged = journal->tail;
521	journal->block_map_head = journal->tail;
522	journal->slab_journal_head = journal->tail;
523	journal->block_map_reap_head = journal->tail;
524	journal->slab_journal_reap_head = journal->tail;
525	journal->block_map_head_block_number =
526		vdo_get_recovery_journal_block_number(journal, journal->block_map_head);
527	journal->slab_journal_head_block_number =
528		vdo_get_recovery_journal_block_number(journal,
529						      journal->slab_journal_head);
530	journal->available_space =
531		(journal->entries_per_block * vdo_get_recovery_journal_length(journal->size));
532}
533
534/**
535 * vdo_get_recovery_journal_length() - Get the number of usable recovery journal blocks.
536 * @journal_size: The size of the recovery journal in blocks.
537 *
538 * Return: the number of recovery journal blocks usable for entries.
539 */
540block_count_t vdo_get_recovery_journal_length(block_count_t journal_size)
541{
542	block_count_t reserved_blocks = journal_size / 4;
543
544	if (reserved_blocks > RECOVERY_JOURNAL_RESERVED_BLOCKS)
545		reserved_blocks = RECOVERY_JOURNAL_RESERVED_BLOCKS;
546	return (journal_size - reserved_blocks);
547}
548
549/**
550 * reap_recovery_journal_callback() - Attempt to reap the journal.
551 * @completion: The lock counter completion.
552 *
553 * Attempts to reap the journal now that all the locks on some journal block have been released.
554 * This is the callback registered with the lock counter.
555 */
556static void reap_recovery_journal_callback(struct vdo_completion *completion)
557{
558	struct recovery_journal *journal = (struct recovery_journal *) completion->parent;
559	/*
560	 * The acknowledgment must be done before reaping so that there is no race between
561	 * acknowledging the notification and unlocks wishing to notify.
562	 */
563	smp_wmb();
564	atomic_set(&journal->lock_counter.state, LOCK_COUNTER_STATE_NOT_NOTIFYING);
565
566	if (vdo_is_state_quiescing(&journal->state)) {
567		/*
568		 * Don't start reaping when the journal is trying to quiesce. Do check if this
569		 * notification is the last thing the is waiting on.
570		 */
571		check_for_drain_complete(journal);
572		return;
573	}
574
575	reap_recovery_journal(journal);
576	check_slab_journal_commit_threshold(journal);
577}
578
579/**
580 * initialize_lock_counter() - Initialize a lock counter.
581 *
582 * @journal: The recovery journal.
583 * @vdo: The vdo.
584 *
585 * Return: VDO_SUCCESS or an error.
586 */
587static int __must_check initialize_lock_counter(struct recovery_journal *journal,
588						struct vdo *vdo)
589{
590	int result;
591	struct thread_config *config = &vdo->thread_config;
592	struct lock_counter *counter = &journal->lock_counter;
593
594	result = vdo_allocate(journal->size, u16, __func__, &counter->journal_counters);
595	if (result != VDO_SUCCESS)
596		return result;
597
598	result = vdo_allocate(journal->size, atomic_t, __func__,
599			      &counter->journal_decrement_counts);
600	if (result != VDO_SUCCESS)
601		return result;
602
603	result = vdo_allocate(journal->size * config->logical_zone_count, u16, __func__,
604			      &counter->logical_counters);
605	if (result != VDO_SUCCESS)
606		return result;
607
608	result = vdo_allocate(journal->size, atomic_t, __func__,
609			      &counter->logical_zone_counts);
610	if (result != VDO_SUCCESS)
611		return result;
612
613	result = vdo_allocate(journal->size * config->physical_zone_count, u16, __func__,
614			      &counter->physical_counters);
615	if (result != VDO_SUCCESS)
616		return result;
617
618	result = vdo_allocate(journal->size, atomic_t, __func__,
619			      &counter->physical_zone_counts);
620	if (result != VDO_SUCCESS)
621		return result;
622
623	vdo_initialize_completion(&counter->completion, vdo,
624				  VDO_LOCK_COUNTER_COMPLETION);
625	vdo_prepare_completion(&counter->completion, reap_recovery_journal_callback,
626			       reap_recovery_journal_callback, config->journal_thread,
627			       journal);
628	counter->logical_zones = config->logical_zone_count;
629	counter->physical_zones = config->physical_zone_count;
630	counter->locks = journal->size;
631	return VDO_SUCCESS;
632}
633
634/**
635 * set_journal_tail() - Set the journal's tail sequence number.
636 * @journal: The journal whose tail is to be set.
637 * @tail: The new tail value.
638 */
639static void set_journal_tail(struct recovery_journal *journal, sequence_number_t tail)
640{
641	/* VDO does not support sequence numbers above 1 << 48 in the slab journal. */
642	if (tail >= (1ULL << 48))
643		enter_journal_read_only_mode(journal, VDO_JOURNAL_OVERFLOW);
644
645	journal->tail = tail;
646}
647
648/**
649 * initialize_recovery_block() - Initialize a journal block.
650 * @vdo: The vdo from which to construct vios.
651 * @journal: The journal to which the block will belong.
652 * @block: The block to initialize.
653 *
654 * Return: VDO_SUCCESS or an error.
655 */
656static int initialize_recovery_block(struct vdo *vdo, struct recovery_journal *journal,
657				     struct recovery_journal_block *block)
658{
659	char *data;
660	int result;
661
662	/*
663	 * Ensure that a block is large enough to store RECOVERY_JOURNAL_ENTRIES_PER_BLOCK entries.
664	 */
665	BUILD_BUG_ON(RECOVERY_JOURNAL_ENTRIES_PER_BLOCK >
666		     ((VDO_BLOCK_SIZE - sizeof(struct packed_journal_header)) /
667		      sizeof(struct packed_recovery_journal_entry)));
668
669	/*
670	 * Allocate a full block for the journal block even though not all of the space is used
671	 * since the VIO needs to write a full disk block.
672	 */
673	result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &data);
674	if (result != VDO_SUCCESS)
675		return result;
676
677	result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL,
678					 VIO_PRIORITY_HIGH, block, 1, data, &block->vio);
679	if (result != VDO_SUCCESS) {
680		vdo_free(data);
681		return result;
682	}
683
684	list_add_tail(&block->list_node, &journal->free_tail_blocks);
685	block->journal = journal;
686	return VDO_SUCCESS;
687}
688
689/**
690 * vdo_decode_recovery_journal() - Make a recovery journal and initialize it with the state that
691 *                                 was decoded from the super block.
692 *
693 * @state: The decoded state of the journal.
694 * @nonce: The nonce of the VDO.
695 * @vdo: The VDO.
696 * @partition: The partition for the journal.
697 * @recovery_count: The VDO's number of completed recoveries.
698 * @journal_size: The number of blocks in the journal on disk.
699 * @journal_ptr: The pointer to hold the new recovery journal.
700 *
701 * Return: A success or error code.
702 */
703int vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state, nonce_t nonce,
704				struct vdo *vdo, struct partition *partition,
705				u64 recovery_count, block_count_t journal_size,
706				struct recovery_journal **journal_ptr)
707{
708	block_count_t i;
709	struct recovery_journal *journal;
710	int result;
711
712	result = vdo_allocate_extended(struct recovery_journal,
713				       RECOVERY_JOURNAL_RESERVED_BLOCKS,
714				       struct recovery_journal_block, __func__,
715				       &journal);
716	if (result != VDO_SUCCESS)
717		return result;
718
719	INIT_LIST_HEAD(&journal->free_tail_blocks);
720	INIT_LIST_HEAD(&journal->active_tail_blocks);
721	vdo_waitq_init(&journal->pending_writes);
722
723	journal->thread_id = vdo->thread_config.journal_thread;
724	journal->origin = partition->offset;
725	journal->nonce = nonce;
726	journal->recovery_count = compute_recovery_count_byte(recovery_count);
727	journal->size = journal_size;
728	journal->slab_journal_commit_threshold = (journal_size * 2) / 3;
729	journal->logical_blocks_used = state.logical_blocks_used;
730	journal->block_map_data_blocks = state.block_map_data_blocks;
731	journal->entries_per_block = RECOVERY_JOURNAL_ENTRIES_PER_BLOCK;
732	set_journal_tail(journal, state.journal_start);
733	initialize_journal_state(journal);
734	/* TODO: this will have to change if we make initial resume of a VDO a real resume */
735	vdo_set_admin_state_code(&journal->state, VDO_ADMIN_STATE_SUSPENDED);
736
737	for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) {
738		struct recovery_journal_block *block = &journal->blocks[i];
739
740		result = initialize_recovery_block(vdo, journal, block);
741		if (result != VDO_SUCCESS) {
742			vdo_free_recovery_journal(journal);
743			return result;
744		}
745	}
746
747	result = initialize_lock_counter(journal, vdo);
748	if (result != VDO_SUCCESS) {
749		vdo_free_recovery_journal(journal);
750		return result;
751	}
752
753	result = create_metadata_vio(vdo, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH,
754				     journal, NULL, &journal->flush_vio);
755	if (result != VDO_SUCCESS) {
756		vdo_free_recovery_journal(journal);
757		return result;
758	}
759
760	result = vdo_register_read_only_listener(vdo, journal,
761						 notify_recovery_journal_of_read_only_mode,
762						 journal->thread_id);
763	if (result != VDO_SUCCESS) {
764		vdo_free_recovery_journal(journal);
765		return result;
766	}
767
768	result = vdo_make_default_thread(vdo, journal->thread_id);
769	if (result != VDO_SUCCESS) {
770		vdo_free_recovery_journal(journal);
771		return result;
772	}
773
774	journal->flush_vio->completion.callback_thread_id = journal->thread_id;
775	*journal_ptr = journal;
776	return VDO_SUCCESS;
777}
778
779/**
780 * vdo_free_recovery_journal() - Free a recovery journal.
781 * @journal: The recovery journal to free.
782 */
783void vdo_free_recovery_journal(struct recovery_journal *journal)
784{
785	block_count_t i;
786
787	if (journal == NULL)
788		return;
789
790	vdo_free(vdo_forget(journal->lock_counter.logical_zone_counts));
791	vdo_free(vdo_forget(journal->lock_counter.physical_zone_counts));
792	vdo_free(vdo_forget(journal->lock_counter.journal_counters));
793	vdo_free(vdo_forget(journal->lock_counter.journal_decrement_counts));
794	vdo_free(vdo_forget(journal->lock_counter.logical_counters));
795	vdo_free(vdo_forget(journal->lock_counter.physical_counters));
796	free_vio(vdo_forget(journal->flush_vio));
797
798	/*
799	 * FIXME: eventually, the journal should be constructed in a quiescent state which
800	 *        requires opening before use.
801	 */
802	if (!vdo_is_state_quiescent(&journal->state)) {
803		VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks),
804				    "journal being freed has no active tail blocks");
805	} else if (!vdo_is_state_saved(&journal->state) &&
806		   !list_empty(&journal->active_tail_blocks)) {
807		vdo_log_warning("journal being freed has uncommitted entries");
808	}
809
810	for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) {
811		struct recovery_journal_block *block = &journal->blocks[i];
812
813		vdo_free(vdo_forget(block->vio.data));
814		free_vio_components(&block->vio);
815	}
816
817	vdo_free(journal);
818}
819
820/**
821 * vdo_initialize_recovery_journal_post_repair() - Initialize the journal after a repair.
822 * @journal: The journal in question.
823 * @recovery_count: The number of completed recoveries.
824 * @tail: The new tail block sequence number.
825 * @logical_blocks_used: The new number of logical blocks used.
826 * @block_map_data_blocks: The new number of block map data blocks.
827 */
828void vdo_initialize_recovery_journal_post_repair(struct recovery_journal *journal,
829						 u64 recovery_count,
830						 sequence_number_t tail,
831						 block_count_t logical_blocks_used,
832						 block_count_t block_map_data_blocks)
833{
834	set_journal_tail(journal, tail + 1);
835	journal->recovery_count = compute_recovery_count_byte(recovery_count);
836	initialize_journal_state(journal);
837	journal->logical_blocks_used = logical_blocks_used;
838	journal->block_map_data_blocks = block_map_data_blocks;
839}
840
841/**
842 * vdo_get_journal_block_map_data_blocks_used() - Get the number of block map pages, allocated from
843 *                                                data blocks, currently in use.
844 * @journal: The journal in question.
845 *
846 * Return: The number of block map pages allocated from slabs.
847 */
848block_count_t vdo_get_journal_block_map_data_blocks_used(struct recovery_journal *journal)
849{
850	return journal->block_map_data_blocks;
851}
852
853/**
854 * vdo_get_recovery_journal_thread_id() - Get the ID of a recovery journal's thread.
855 * @journal: The journal to query.
856 *
857 * Return: The ID of the journal's thread.
858 */
859thread_id_t vdo_get_recovery_journal_thread_id(struct recovery_journal *journal)
860{
861	return journal->thread_id;
862}
863
864/**
865 * vdo_open_recovery_journal() - Prepare the journal for new entries.
866 * @journal: The journal in question.
867 * @depot: The slab depot for this VDO.
868 * @block_map: The block map for this VDO.
869 */
870void vdo_open_recovery_journal(struct recovery_journal *journal,
871			       struct slab_depot *depot, struct block_map *block_map)
872{
873	journal->depot = depot;
874	journal->block_map = block_map;
875	WRITE_ONCE(journal->state.current_state, VDO_ADMIN_STATE_NORMAL_OPERATION);
876}
877
878/**
879 * vdo_record_recovery_journal() - Record the state of a recovery journal for encoding in the super
880 *                                 block.
881 * @journal: the recovery journal.
882 *
883 * Return: the state of the journal.
884 */
885struct recovery_journal_state_7_0
886vdo_record_recovery_journal(const struct recovery_journal *journal)
887{
888	struct recovery_journal_state_7_0 state = {
889		.logical_blocks_used = journal->logical_blocks_used,
890		.block_map_data_blocks = journal->block_map_data_blocks,
891	};
892
893	if (vdo_is_state_saved(&journal->state)) {
894		/*
895		 * If the journal is saved, we should start one past the active block (since the
896		 * active block is not guaranteed to be empty).
897		 */
898		state.journal_start = journal->tail;
899	} else {
900		/*
901		 * When we're merely suspended or have gone read-only, we must record the first
902		 * block that might have entries that need to be applied.
903		 */
904		state.journal_start = get_recovery_journal_head(journal);
905	}
906
907	return state;
908}
909
910/**
911 * get_block_header() - Get a pointer to the packed journal block header in the block buffer.
912 * @block: The recovery block.
913 *
914 * Return: The block's header.
915 */
916static inline struct packed_journal_header *
917get_block_header(const struct recovery_journal_block *block)
918{
919	return (struct packed_journal_header *) block->vio.data;
920}
921
922/**
923 * set_active_sector() - Set the current sector of the current block and initialize it.
924 * @block: The block to update.
925 * @sector: A pointer to the first byte of the new sector.
926 */
927static void set_active_sector(struct recovery_journal_block *block, void *sector)
928{
929	block->sector = sector;
930	block->sector->check_byte = get_block_header(block)->check_byte;
931	block->sector->recovery_count = block->journal->recovery_count;
932	block->sector->entry_count = 0;
933}
934
935/**
936 * advance_tail() - Advance the tail of the journal.
937 * @journal: The journal whose tail should be advanced.
938 *
939 * Return: true if the tail was advanced.
940 */
941static bool advance_tail(struct recovery_journal *journal)
942{
943	struct recovery_block_header unpacked;
944	struct packed_journal_header *header;
945	struct recovery_journal_block *block;
946
947	block = journal->active_block = pop_free_list(journal);
948	if (block == NULL)
949		return false;
950
951	list_move_tail(&block->list_node, &journal->active_tail_blocks);
952
953	unpacked = (struct recovery_block_header) {
954		.metadata_type = VDO_METADATA_RECOVERY_JOURNAL_2,
955		.block_map_data_blocks = journal->block_map_data_blocks,
956		.logical_blocks_used = journal->logical_blocks_used,
957		.nonce = journal->nonce,
958		.recovery_count = journal->recovery_count,
959		.sequence_number = journal->tail,
960		.check_byte = vdo_compute_recovery_journal_check_byte(journal,
961								      journal->tail),
962	};
963
964	header = get_block_header(block);
965	memset(block->vio.data, 0x0, VDO_BLOCK_SIZE);
966	block->sequence_number = journal->tail;
967	block->entry_count = 0;
968	block->uncommitted_entry_count = 0;
969	block->block_number = vdo_get_recovery_journal_block_number(journal,
970								    journal->tail);
971
972	vdo_pack_recovery_block_header(&unpacked, header);
973	set_active_sector(block, vdo_get_journal_block_sector(header, 1));
974	set_journal_tail(journal, journal->tail + 1);
975	vdo_advance_block_map_era(journal->block_map, journal->tail);
976	return true;
977}
978
979/**
980 * initialize_lock_count() - Initialize the value of the journal zone's counter for a given lock.
981 * @journal: The recovery journal.
982 *
983 * Context: This must be called from the journal zone.
984 */
985static void initialize_lock_count(struct recovery_journal *journal)
986{
987	u16 *journal_value;
988	block_count_t lock_number = journal->active_block->block_number;
989	atomic_t *decrement_counter = get_decrement_counter(journal, lock_number);
990
991	journal_value = get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0);
992	VDO_ASSERT_LOG_ONLY((*journal_value == atomic_read(decrement_counter)),
993			    "count to be initialized not in use");
994	*journal_value = journal->entries_per_block + 1;
995	atomic_set(decrement_counter, 0);
996}
997
998/**
999 * prepare_to_assign_entry() - Prepare the currently active block to receive an entry and check
1000 *			       whether an entry of the given type may be assigned at this time.
1001 * @journal: The journal receiving an entry.
1002 *
1003 * Return: true if there is space in the journal to store an entry of the specified type.
1004 */
1005static bool prepare_to_assign_entry(struct recovery_journal *journal)
1006{
1007	if (journal->available_space == 0)
1008		return false;
1009
1010	if (is_block_full(journal->active_block) && !advance_tail(journal))
1011		return false;
1012
1013	if (!is_block_empty(journal->active_block))
1014		return true;
1015
1016	if ((journal->tail - get_recovery_journal_head(journal)) > journal->size) {
1017		/* Cannot use this block since the journal is full. */
1018		journal->events.disk_full++;
1019		return false;
1020	}
1021
1022	/*
1023	 * Don't allow the new block to be reaped until all of its entries have been committed to
1024	 * the block map and until the journal block has been fully committed as well. Because the
1025	 * block map update is done only after any slab journal entries have been made, the
1026	 * per-entry lock for the block map entry serves to protect those as well.
1027	 */
1028	initialize_lock_count(journal);
1029	return true;
1030}
1031
1032static void write_blocks(struct recovery_journal *journal);
1033
1034/**
1035 * schedule_block_write() - Queue a block for writing.
1036 * @journal: The journal in question.
1037 * @block: The block which is now ready to write.
1038 *
1039 * The block is expected to be full. If the block is currently writing, this is a noop as the block
1040 * will be queued for writing when the write finishes. The block must not currently be queued for
1041 * writing.
1042 */
1043static void schedule_block_write(struct recovery_journal *journal,
1044				 struct recovery_journal_block *block)
1045{
1046	if (!block->committing)
1047		vdo_waitq_enqueue_waiter(&journal->pending_writes, &block->write_waiter);
1048	/*
1049	 * At the end of adding entries, or discovering this partial block is now full and ready to
1050	 * rewrite, we will call write_blocks() and write a whole batch.
1051	 */
1052}
1053
1054/**
1055 * release_journal_block_reference() - Release a reference to a journal block.
1056 * @block: The journal block from which to release a reference.
1057 */
1058static void release_journal_block_reference(struct recovery_journal_block *block)
1059{
1060	vdo_release_recovery_journal_block_reference(block->journal,
1061						     block->sequence_number,
1062						     VDO_ZONE_TYPE_JOURNAL, 0);
1063}
1064
1065static void update_usages(struct recovery_journal *journal, struct data_vio *data_vio)
1066{
1067	if (data_vio->increment_updater.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
1068		journal->block_map_data_blocks++;
1069		return;
1070	}
1071
1072	if (data_vio->new_mapped.state != VDO_MAPPING_STATE_UNMAPPED)
1073		journal->logical_blocks_used++;
1074
1075	if (data_vio->mapped.state != VDO_MAPPING_STATE_UNMAPPED)
1076		journal->logical_blocks_used--;
1077}
1078
1079/**
1080 * assign_entry() - Assign an entry waiter to the active block.
1081 *
1082 * Implements waiter_callback_fn.
1083 */
1084static void assign_entry(struct vdo_waiter *waiter, void *context)
1085{
1086	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1087	struct recovery_journal_block *block = context;
1088	struct recovery_journal *journal = block->journal;
1089
1090	/* Record the point at which we will make the journal entry. */
1091	data_vio->recovery_journal_point = (struct journal_point) {
1092		.sequence_number = block->sequence_number,
1093		.entry_count = block->entry_count,
1094	};
1095
1096	update_usages(journal, data_vio);
1097	journal->available_space--;
1098
1099	if (!vdo_waitq_has_waiters(&block->entry_waiters))
1100		journal->events.blocks.started++;
1101
1102	vdo_waitq_enqueue_waiter(&block->entry_waiters, &data_vio->waiter);
1103	block->entry_count++;
1104	block->uncommitted_entry_count++;
1105	journal->events.entries.started++;
1106
1107	if (is_block_full(block)) {
1108		/*
1109		 * The block is full, so we can write it anytime henceforth. If it is already
1110		 * committing, we'll queue it for writing when it comes back.
1111		 */
1112		schedule_block_write(journal, block);
1113	}
1114
1115	/* Force out slab journal tail blocks when threshold is reached. */
1116	check_slab_journal_commit_threshold(journal);
1117}
1118
1119static void assign_entries(struct recovery_journal *journal)
1120{
1121	if (journal->adding_entries) {
1122		/* Protect against re-entrancy. */
1123		return;
1124	}
1125
1126	journal->adding_entries = true;
1127	while (vdo_waitq_has_waiters(&journal->entry_waiters) &&
1128	       prepare_to_assign_entry(journal)) {
1129		vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1130					     assign_entry, journal->active_block);
1131	}
1132
1133	/* Now that we've finished with entries, see if we have a batch of blocks to write. */
1134	write_blocks(journal);
1135	journal->adding_entries = false;
1136}
1137
1138/**
1139 * recycle_journal_block() - Prepare an in-memory journal block to be reused now that it has been
1140 *                           fully committed.
1141 * @block: The block to be recycled.
1142 */
1143static void recycle_journal_block(struct recovery_journal_block *block)
1144{
1145	struct recovery_journal *journal = block->journal;
1146	block_count_t i;
1147
1148	list_move_tail(&block->list_node, &journal->free_tail_blocks);
1149
1150	/* Release any unused entry locks. */
1151	for (i = block->entry_count; i < journal->entries_per_block; i++)
1152		release_journal_block_reference(block);
1153
1154	/*
1155	 * Release our own lock against reaping now that the block is completely committed, or
1156	 * we're giving up because we're in read-only mode.
1157	 */
1158	if (block->entry_count > 0)
1159		release_journal_block_reference(block);
1160
1161	if (block == journal->active_block)
1162		journal->active_block = NULL;
1163}
1164
1165/**
1166 * continue_committed_waiter() - invoked whenever a VIO is to be released from the journal because
1167 *                               its entry was committed to disk.
1168 *
1169 * Implements waiter_callback_fn.
1170 */
1171static void continue_committed_waiter(struct vdo_waiter *waiter, void *context)
1172{
1173	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1174	struct recovery_journal *journal = context;
1175	int result = (is_read_only(journal) ? VDO_READ_ONLY : VDO_SUCCESS);
1176	bool has_decrement;
1177
1178	VDO_ASSERT_LOG_ONLY(vdo_before_journal_point(&journal->commit_point,
1179						     &data_vio->recovery_journal_point),
1180			    "DataVIOs released from recovery journal in order. Recovery journal point is (%llu, %u), but commit waiter point is (%llu, %u)",
1181			    (unsigned long long) journal->commit_point.sequence_number,
1182			    journal->commit_point.entry_count,
1183			    (unsigned long long) data_vio->recovery_journal_point.sequence_number,
1184			    data_vio->recovery_journal_point.entry_count);
1185
1186	journal->commit_point = data_vio->recovery_journal_point;
1187	data_vio->last_async_operation = VIO_ASYNC_OP_UPDATE_REFERENCE_COUNTS;
1188	if (result != VDO_SUCCESS) {
1189		continue_data_vio_with_error(data_vio, result);
1190		return;
1191	}
1192
1193	/*
1194	 * The increment must be launched first since it must come before the
1195	 * decrement if they are in the same slab.
1196	 */
1197	has_decrement = (data_vio->decrement_updater.zpbn.pbn != VDO_ZERO_BLOCK);
1198	if ((data_vio->increment_updater.zpbn.pbn != VDO_ZERO_BLOCK) || !has_decrement)
1199		continue_data_vio(data_vio);
1200
1201	if (has_decrement)
1202		vdo_launch_completion(&data_vio->decrement_completion);
1203}
1204
1205/**
1206 * notify_commit_waiters() - Notify any VIOs whose entries have now committed.
1207 * @journal: The recovery journal to update.
1208 */
1209static void notify_commit_waiters(struct recovery_journal *journal)
1210{
1211	struct recovery_journal_block *block;
1212
1213	list_for_each_entry(block, &journal->active_tail_blocks, list_node) {
1214		if (block->committing)
1215			return;
1216
1217		vdo_waitq_notify_all_waiters(&block->commit_waiters,
1218					     continue_committed_waiter, journal);
1219		if (is_read_only(journal)) {
1220			vdo_waitq_notify_all_waiters(&block->entry_waiters,
1221						     continue_committed_waiter,
1222						     journal);
1223		} else if (is_block_dirty(block) || !is_block_full(block)) {
1224			/* Stop at partially-committed or partially-filled blocks. */
1225			return;
1226		}
1227	}
1228}
1229
1230/**
1231 * recycle_journal_blocks() - Recycle any journal blocks which have been fully committed.
1232 * @journal: The recovery journal to update.
1233 */
1234static void recycle_journal_blocks(struct recovery_journal *journal)
1235{
1236	struct recovery_journal_block *block, *tmp;
1237
1238	list_for_each_entry_safe(block, tmp, &journal->active_tail_blocks, list_node) {
1239		if (block->committing) {
1240			/* Don't recycle committing blocks. */
1241			return;
1242		}
1243
1244		if (!is_read_only(journal) &&
1245		    (is_block_dirty(block) || !is_block_full(block))) {
1246			/*
1247			 * Don't recycle partially written or partially full blocks, except in
1248			 * read-only mode.
1249			 */
1250			return;
1251		}
1252
1253		recycle_journal_block(block);
1254	}
1255}
1256
1257/**
1258 * complete_write() - Handle post-commit processing.
1259 * @completion: The completion of the VIO writing this block.
1260 *
1261 * This is the callback registered by write_block(). If more entries accumulated in the block being
1262 * committed while the commit was in progress, another commit will be initiated.
1263 */
1264static void complete_write(struct vdo_completion *completion)
1265{
1266	struct recovery_journal_block *block = completion->parent;
1267	struct recovery_journal *journal = block->journal;
1268	struct recovery_journal_block *last_active_block;
1269
1270	assert_on_journal_thread(journal, __func__);
1271
1272	journal->pending_write_count -= 1;
1273	journal->events.blocks.committed += 1;
1274	journal->events.entries.committed += block->entries_in_commit;
1275	block->uncommitted_entry_count -= block->entries_in_commit;
1276	block->entries_in_commit = 0;
1277	block->committing = false;
1278
1279	/* If this block is the latest block to be acknowledged, record that fact. */
1280	if (block->sequence_number > journal->last_write_acknowledged)
1281		journal->last_write_acknowledged = block->sequence_number;
1282
1283	last_active_block = get_journal_block(&journal->active_tail_blocks);
1284	VDO_ASSERT_LOG_ONLY((block->sequence_number >= last_active_block->sequence_number),
1285			    "completed journal write is still active");
1286
1287	notify_commit_waiters(journal);
1288
1289	/*
1290	 * Is this block now full? Reaping, and adding entries, might have already sent it off for
1291	 * rewriting; else, queue it for rewrite.
1292	 */
1293	if (is_block_dirty(block) && is_block_full(block))
1294		schedule_block_write(journal, block);
1295
1296	recycle_journal_blocks(journal);
1297	write_blocks(journal);
1298
1299	check_for_drain_complete(journal);
1300}
1301
1302static void handle_write_error(struct vdo_completion *completion)
1303{
1304	struct recovery_journal_block *block = completion->parent;
1305	struct recovery_journal *journal = block->journal;
1306
1307	vio_record_metadata_io_error(as_vio(completion));
1308	vdo_log_error_strerror(completion->result,
1309			       "cannot write recovery journal block %llu",
1310			       (unsigned long long) block->sequence_number);
1311	enter_journal_read_only_mode(journal, completion->result);
1312	complete_write(completion);
1313}
1314
1315static void complete_write_endio(struct bio *bio)
1316{
1317	struct vio *vio = bio->bi_private;
1318	struct recovery_journal_block *block = vio->completion.parent;
1319	struct recovery_journal *journal = block->journal;
1320
1321	continue_vio_after_io(vio, complete_write, journal->thread_id);
1322}
1323
1324/**
1325 * add_queued_recovery_entries() - Actually add entries from the queue to the given block.
1326 * @block: The journal block.
1327 */
1328static void add_queued_recovery_entries(struct recovery_journal_block *block)
1329{
1330	while (vdo_waitq_has_waiters(&block->entry_waiters)) {
1331		struct data_vio *data_vio =
1332			vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&block->entry_waiters));
1333		struct tree_lock *lock = &data_vio->tree_lock;
1334		struct packed_recovery_journal_entry *packed_entry;
1335		struct recovery_journal_entry new_entry;
1336
1337		if (block->sector->entry_count == RECOVERY_JOURNAL_ENTRIES_PER_SECTOR)
1338			set_active_sector(block,
1339					  (char *) block->sector + VDO_SECTOR_SIZE);
1340
1341		/* Compose and encode the entry. */
1342		packed_entry = &block->sector->entries[block->sector->entry_count++];
1343		new_entry = (struct recovery_journal_entry) {
1344			.mapping = {
1345				.pbn = data_vio->increment_updater.zpbn.pbn,
1346				.state = data_vio->increment_updater.zpbn.state,
1347			},
1348			.unmapping = {
1349				.pbn = data_vio->decrement_updater.zpbn.pbn,
1350				.state = data_vio->decrement_updater.zpbn.state,
1351			},
1352			.operation = data_vio->increment_updater.operation,
1353			.slot = lock->tree_slots[lock->height].block_map_slot,
1354		};
1355		*packed_entry = vdo_pack_recovery_journal_entry(&new_entry);
1356		data_vio->recovery_sequence_number = block->sequence_number;
1357
1358		/* Enqueue the data_vio to wait for its entry to commit. */
1359		vdo_waitq_enqueue_waiter(&block->commit_waiters, &data_vio->waiter);
1360	}
1361}
1362
1363/**
1364 * write_block() - Issue a block for writing.
1365 *
1366 * Implements waiter_callback_fn.
1367 */
1368static void write_block(struct vdo_waiter *waiter, void *context __always_unused)
1369{
1370	struct recovery_journal_block *block =
1371		container_of(waiter, struct recovery_journal_block, write_waiter);
1372	struct recovery_journal *journal = block->journal;
1373	struct packed_journal_header *header = get_block_header(block);
1374
1375	if (block->committing || !vdo_waitq_has_waiters(&block->entry_waiters) ||
1376	    is_read_only(journal))
1377		return;
1378
1379	block->entries_in_commit = vdo_waitq_num_waiters(&block->entry_waiters);
1380	add_queued_recovery_entries(block);
1381
1382	journal->pending_write_count += 1;
1383	journal->events.blocks.written += 1;
1384	journal->events.entries.written += block->entries_in_commit;
1385
1386	header->block_map_head = __cpu_to_le64(journal->block_map_head);
1387	header->slab_journal_head = __cpu_to_le64(journal->slab_journal_head);
1388	header->entry_count = __cpu_to_le16(block->entry_count);
1389
1390	block->committing = true;
1391
1392	/*
1393	 * We must issue a flush and a FUA for every commit. The flush is necessary to ensure that
1394	 * the data being referenced is stable. The FUA is necessary to ensure that the journal
1395	 * block itself is stable before allowing overwrites of the lbn's previous data.
1396	 */
1397	vdo_submit_metadata_vio(&block->vio, journal->origin + block->block_number,
1398				complete_write_endio, handle_write_error,
1399				REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH | REQ_SYNC | REQ_FUA);
1400}
1401
1402
1403/**
1404 * write_blocks() - Attempt to commit blocks, according to write policy.
1405 * @journal: The recovery journal.
1406 */
1407static void write_blocks(struct recovery_journal *journal)
1408{
1409	assert_on_journal_thread(journal, __func__);
1410	/*
1411	 * We call this function after adding entries to the journal and after finishing a block
1412	 * write. Thus, when this function terminates we must either have no VIOs waiting in the
1413	 * journal or have some outstanding IO to provide a future wakeup.
1414	 *
1415	 * We want to only issue full blocks if there are no pending writes. However, if there are
1416	 * no outstanding writes and some unwritten entries, we must issue a block, even if it's
1417	 * the active block and it isn't full.
1418	 */
1419	if (journal->pending_write_count > 0)
1420		return;
1421
1422	/* Write all the full blocks. */
1423	vdo_waitq_notify_all_waiters(&journal->pending_writes, write_block, NULL);
1424
1425	/*
1426	 * Do we need to write the active block? Only if we have no outstanding writes, even after
1427	 * issuing all of the full writes.
1428	 */
1429	if ((journal->pending_write_count == 0) && (journal->active_block != NULL))
1430		write_block(&journal->active_block->write_waiter, NULL);
1431}
1432
1433/**
1434 * vdo_add_recovery_journal_entry() - Add an entry to a recovery journal.
1435 * @journal: The journal in which to make an entry.
1436 * @data_vio: The data_vio for which to add the entry. The entry will be taken
1437 *	      from the logical and new_mapped fields of the data_vio. The
1438 *	      data_vio's recovery_sequence_number field will be set to the
1439 *	      sequence number of the journal block in which the entry was
1440 *	      made.
1441 *
1442 * This method is asynchronous. The data_vio will not be called back until the entry is committed
1443 * to the on-disk journal.
1444 */
1445void vdo_add_recovery_journal_entry(struct recovery_journal *journal,
1446				    struct data_vio *data_vio)
1447{
1448	assert_on_journal_thread(journal, __func__);
1449	if (!vdo_is_state_normal(&journal->state)) {
1450		continue_data_vio_with_error(data_vio, VDO_INVALID_ADMIN_STATE);
1451		return;
1452	}
1453
1454	if (is_read_only(journal)) {
1455		continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
1456		return;
1457	}
1458
1459	VDO_ASSERT_LOG_ONLY(data_vio->recovery_sequence_number == 0,
1460			    "journal lock not held for new entry");
1461
1462	vdo_advance_journal_point(&journal->append_point, journal->entries_per_block);
1463	vdo_waitq_enqueue_waiter(&journal->entry_waiters, &data_vio->waiter);
1464	assign_entries(journal);
1465}
1466
1467/**
1468 * is_lock_locked() - Check whether a lock is locked for a zone type.
1469 * @journal: The recovery journal.
1470 * @lock_number: The lock to check.
1471 * @zone_type: The type of the zone.
1472 *
1473 * If the recovery journal has a lock on the lock number, both logical and physical zones are
1474 * considered locked.
1475 *
1476 * Return: true if the specified lock has references (is locked).
1477 */
1478static bool is_lock_locked(struct recovery_journal *journal, block_count_t lock_number,
1479			   enum vdo_zone_type zone_type)
1480{
1481	atomic_t *zone_count;
1482	bool locked;
1483
1484	if (is_journal_zone_locked(journal, lock_number))
1485		return true;
1486
1487	zone_count = get_zone_count_ptr(journal, lock_number, zone_type);
1488	locked = (atomic_read(zone_count) != 0);
1489	/* Pairs with implicit barrier in vdo_release_recovery_journal_block_reference() */
1490	smp_rmb();
1491	return locked;
1492}
1493
1494/**
1495 * reap_recovery_journal() - Conduct a sweep on a recovery journal to reclaim unreferenced blocks.
1496 * @journal: The recovery journal.
1497 */
1498static void reap_recovery_journal(struct recovery_journal *journal)
1499{
1500	if (journal->reaping) {
1501		/*
1502		 * We already have an outstanding reap in progress. We need to wait for it to
1503		 * finish.
1504		 */
1505		return;
1506	}
1507
1508	if (vdo_is_state_quiescent(&journal->state)) {
1509		/* We are supposed to not do IO. Don't botch it by reaping. */
1510		return;
1511	}
1512
1513	/*
1514	 * Start reclaiming blocks only when the journal head has no references. Then stop when a
1515	 * block is referenced.
1516	 */
1517	while ((journal->block_map_reap_head < journal->last_write_acknowledged) &&
1518		!is_lock_locked(journal, journal->block_map_head_block_number,
1519				VDO_ZONE_TYPE_LOGICAL)) {
1520		journal->block_map_reap_head++;
1521		if (++journal->block_map_head_block_number == journal->size)
1522			journal->block_map_head_block_number = 0;
1523	}
1524
1525	while ((journal->slab_journal_reap_head < journal->last_write_acknowledged) &&
1526		!is_lock_locked(journal, journal->slab_journal_head_block_number,
1527				VDO_ZONE_TYPE_PHYSICAL)) {
1528		journal->slab_journal_reap_head++;
1529		if (++journal->slab_journal_head_block_number == journal->size)
1530			journal->slab_journal_head_block_number = 0;
1531	}
1532
1533	if ((journal->block_map_reap_head == journal->block_map_head) &&
1534	    (journal->slab_journal_reap_head == journal->slab_journal_head)) {
1535		/* Nothing happened. */
1536		return;
1537	}
1538
1539	/*
1540	 * If the block map head will advance, we must flush any block map page modified by the
1541	 * entries we are reaping. If the slab journal head will advance, we must flush the slab
1542	 * summary update covering the slab journal that just released some lock.
1543	 */
1544	journal->reaping = true;
1545	vdo_submit_flush_vio(journal->flush_vio, flush_endio, handle_flush_error);
1546}
1547
1548/**
1549 * vdo_acquire_recovery_journal_block_reference() - Acquire a reference to a recovery journal block
1550 *                                                  from somewhere other than the journal itself.
1551 * @journal: The recovery journal.
1552 * @sequence_number: The journal sequence number of the referenced block.
1553 * @zone_type: The type of the zone making the adjustment.
1554 * @zone_id: The ID of the zone making the adjustment.
1555 */
1556void vdo_acquire_recovery_journal_block_reference(struct recovery_journal *journal,
1557						  sequence_number_t sequence_number,
1558						  enum vdo_zone_type zone_type,
1559						  zone_count_t zone_id)
1560{
1561	block_count_t lock_number;
1562	u16 *current_value;
1563
1564	if (sequence_number == 0)
1565		return;
1566
1567	VDO_ASSERT_LOG_ONLY((zone_type != VDO_ZONE_TYPE_JOURNAL),
1568			    "invalid lock count increment from journal zone");
1569
1570	lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
1571	current_value = get_counter(journal, lock_number, zone_type, zone_id);
1572	VDO_ASSERT_LOG_ONLY(*current_value < U16_MAX,
1573			    "increment of lock counter must not overflow");
1574
1575	if (*current_value == 0) {
1576		/*
1577		 * This zone is acquiring this lock for the first time. Extra barriers because this
1578		 * was original developed using an atomic add operation that implicitly had them.
1579		 */
1580		smp_mb__before_atomic();
1581		atomic_inc(get_zone_count_ptr(journal, lock_number, zone_type));
1582		/* same as before_atomic */
1583		smp_mb__after_atomic();
1584	}
1585
1586	*current_value += 1;
1587}
1588
1589/**
1590 * vdo_release_journal_entry_lock() - Release a single per-entry reference count for a recovery
1591 *                                    journal block.
1592 * @journal: The recovery journal.
1593 * @sequence_number: The journal sequence number of the referenced block.
1594 */
1595void vdo_release_journal_entry_lock(struct recovery_journal *journal,
1596				    sequence_number_t sequence_number)
1597{
1598	block_count_t lock_number;
1599
1600	if (sequence_number == 0)
1601		return;
1602
1603	lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
1604	/*
1605	 * Extra barriers because this was originally developed using an atomic add operation that
1606	 * implicitly had them.
1607	 */
1608	smp_mb__before_atomic();
1609	atomic_inc(get_decrement_counter(journal, lock_number));
1610	/* same as before_atomic */
1611	smp_mb__after_atomic();
1612}
1613
1614/**
1615 * initiate_drain() - Initiate a drain.
1616 *
1617 * Implements vdo_admin_initiator_fn.
1618 */
1619static void initiate_drain(struct admin_state *state)
1620{
1621	check_for_drain_complete(container_of(state, struct recovery_journal, state));
1622}
1623
1624/**
1625 * vdo_drain_recovery_journal() - Drain recovery journal I/O.
1626 * @journal: The journal to drain.
1627 * @operation: The drain operation (suspend or save).
1628 * @parent: The completion to notify once the journal is drained.
1629 *
1630 * All uncommitted entries will be written out.
1631 */
1632void vdo_drain_recovery_journal(struct recovery_journal *journal,
1633				const struct admin_state_code *operation,
1634				struct vdo_completion *parent)
1635{
1636	assert_on_journal_thread(journal, __func__);
1637	vdo_start_draining(&journal->state, operation, parent, initiate_drain);
1638}
1639
1640/**
1641 * resume_lock_counter() - Re-allow notifications from a suspended lock counter.
1642 * @counter: The counter.
1643 *
1644 * Return: true if the lock counter was suspended.
1645 */
1646static bool resume_lock_counter(struct lock_counter *counter)
1647{
1648	int prior_state;
1649
1650	/*
1651	 * Extra barriers because this was original developed using a CAS operation that implicitly
1652	 * had them.
1653	 */
1654	smp_mb__before_atomic();
1655	prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_SUSPENDED,
1656				     LOCK_COUNTER_STATE_NOT_NOTIFYING);
1657	/* same as before_atomic */
1658	smp_mb__after_atomic();
1659
1660	return (prior_state == LOCK_COUNTER_STATE_SUSPENDED);
1661}
1662
1663/**
1664 * vdo_resume_recovery_journal() - Resume a recovery journal which has been drained.
1665 * @journal: The journal to resume.
1666 * @parent: The completion to finish once the journal is resumed.
1667 */
1668void vdo_resume_recovery_journal(struct recovery_journal *journal,
1669				 struct vdo_completion *parent)
1670{
1671	bool saved;
1672
1673	assert_on_journal_thread(journal, __func__);
1674	saved = vdo_is_state_saved(&journal->state);
1675	vdo_set_completion_result(parent, vdo_resume_if_quiescent(&journal->state));
1676	if (is_read_only(journal)) {
1677		vdo_continue_completion(parent, VDO_READ_ONLY);
1678		return;
1679	}
1680
1681	if (saved)
1682		initialize_journal_state(journal);
1683
1684	if (resume_lock_counter(&journal->lock_counter)) {
1685		/* We might have missed a notification. */
1686		reap_recovery_journal(journal);
1687	}
1688
1689	vdo_launch_completion(parent);
1690}
1691
1692/**
1693 * vdo_get_recovery_journal_logical_blocks_used() - Get the number of logical blocks in use by the
1694 *                                                  VDO.
1695 * @journal: The journal.
1696 *
1697 * Return: The number of logical blocks in use by the VDO.
1698 */
1699block_count_t vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal *journal)
1700{
1701	return journal->logical_blocks_used;
1702}
1703
1704/**
1705 * vdo_get_recovery_journal_statistics() - Get the current statistics from the recovery journal.
1706 * @journal: The recovery journal to query.
1707 *
1708 * Return: A copy of the current statistics for the journal.
1709 */
1710struct recovery_journal_statistics
1711vdo_get_recovery_journal_statistics(const struct recovery_journal *journal)
1712{
1713	return journal->events;
1714}
1715
1716/**
1717 * dump_recovery_block() - Dump the contents of the recovery block to the log.
1718 * @block: The block to dump.
1719 */
1720static void dump_recovery_block(const struct recovery_journal_block *block)
1721{
1722	vdo_log_info("    sequence number %llu; entries %u; %s; %zu entry waiters; %zu commit waiters",
1723		     (unsigned long long) block->sequence_number, block->entry_count,
1724		     (block->committing ? "committing" : "waiting"),
1725		     vdo_waitq_num_waiters(&block->entry_waiters),
1726		     vdo_waitq_num_waiters(&block->commit_waiters));
1727}
1728
1729/**
1730 * vdo_dump_recovery_journal_statistics() - Dump some current statistics and other debug info from
1731 *                                          the recovery journal.
1732 * @journal: The recovery journal to dump.
1733 */
1734void vdo_dump_recovery_journal_statistics(const struct recovery_journal *journal)
1735{
1736	const struct recovery_journal_block *block;
1737	struct recovery_journal_statistics stats = vdo_get_recovery_journal_statistics(journal);
1738
1739	vdo_log_info("Recovery Journal");
1740	vdo_log_info("	block_map_head=%llu slab_journal_head=%llu last_write_acknowledged=%llu tail=%llu block_map_reap_head=%llu slab_journal_reap_head=%llu disk_full=%llu slab_journal_commits_requested=%llu entry_waiters=%zu",
1741		     (unsigned long long) journal->block_map_head,
1742		     (unsigned long long) journal->slab_journal_head,
1743		     (unsigned long long) journal->last_write_acknowledged,
1744		     (unsigned long long) journal->tail,
1745		     (unsigned long long) journal->block_map_reap_head,
1746		     (unsigned long long) journal->slab_journal_reap_head,
1747		     (unsigned long long) stats.disk_full,
1748		     (unsigned long long) stats.slab_journal_commits_requested,
1749		     vdo_waitq_num_waiters(&journal->entry_waiters));
1750	vdo_log_info("	entries: started=%llu written=%llu committed=%llu",
1751		     (unsigned long long) stats.entries.started,
1752		     (unsigned long long) stats.entries.written,
1753		     (unsigned long long) stats.entries.committed);
1754	vdo_log_info("	blocks: started=%llu written=%llu committed=%llu",
1755		     (unsigned long long) stats.blocks.started,
1756		     (unsigned long long) stats.blocks.written,
1757		     (unsigned long long) stats.blocks.committed);
1758
1759	vdo_log_info("	active blocks:");
1760	list_for_each_entry(block, &journal->active_tail_blocks, list_node)
1761		dump_recovery_block(block);
1762}
1763