1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "volume.h"
7
8#include <linux/atomic.h>
9#include <linux/dm-bufio.h>
10#include <linux/err.h>
11
12#include "errors.h"
13#include "logger.h"
14#include "memory-alloc.h"
15#include "permassert.h"
16#include "string-utils.h"
17#include "thread-utils.h"
18
19#include "chapter-index.h"
20#include "config.h"
21#include "geometry.h"
22#include "hash-utils.h"
23#include "index.h"
24#include "sparse-cache.h"
25
26/*
27 * The first block of the volume layout is reserved for the volume header, which is no longer used.
28 * The remainder of the volume is divided into chapters consisting of several pages of records, and
29 * several pages of static index to use to find those records. The index pages are recorded first,
30 * followed by the record pages. The chapters are written in order as they are filled, so the
31 * volume storage acts as a circular log of the most recent chapters, with each new chapter
32 * overwriting the oldest saved one.
33 *
34 * When a new chapter is filled and closed, the records from that chapter are sorted and
35 * interleaved in approximate temporal order, and assigned to record pages. Then a static delta
36 * index is generated to store which record page contains each record. The in-memory index page map
37 * is also updated to indicate which delta lists fall on each chapter index page. This means that
38 * when a record is read, the volume only has to load a single index page and a single record page,
39 * rather than search the entire chapter. These index and record pages are written to storage, and
40 * the index pages are transferred to the page cache under the theory that the most recently
41 * written chapter is likely to be accessed again soon.
42 *
43 * When reading a record, the volume index will indicate which chapter should contain it. The
44 * volume uses the index page map to determine which chapter index page needs to be loaded, and
45 * then reads the relevant record page number from the chapter index. Both index and record pages
46 * are stored in a page cache when read for the common case that subsequent records need the same
47 * pages. The page cache evicts the least recently accessed entries when caching new pages. In
48 * addition, the volume uses dm-bufio to manage access to the storage, which may allow for
49 * additional caching depending on available system resources.
50 *
51 * Record requests are handled from cached pages when possible. If a page needs to be read, it is
52 * placed on a queue along with the request that wants to read it. Any requests for the same page
53 * that arrive while the read is pending are added to the queue entry. A separate reader thread
54 * handles the queued reads, adding the page to the cache and updating any requests queued with it
55 * so they can continue processing. This allows the index zone threads to continue processing new
56 * requests rather than wait for the storage reads.
57 *
58 * When an index rebuild is necessary, the volume reads each stored chapter to determine which
59 * range of chapters contain valid records, so that those records can be used to reconstruct the
60 * in-memory volume index.
61 */
62
63/* The maximum allowable number of contiguous bad chapters */
64#define MAX_BAD_CHAPTERS 100
65#define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1)
66#define VOLUME_CACHE_QUEUED_FLAG (1 << 15)
67#define VOLUME_CACHE_MAX_QUEUED_READS 4096
68
69static const u64 BAD_CHAPTER = U64_MAX;
70
71/*
72 * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits
73 * are the physical page number of the cached page being read. The high order 32 bits are a
74 * sequence number. This value is written when the zone that owns it begins or completes a cache
75 * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting
76 * to update the cache contents.
77 */
78union invalidate_counter {
79	u64 value;
80	struct {
81		u32 page;
82		u32 counter;
83	};
84};
85
86static inline u32 map_to_page_number(struct index_geometry *geometry, u32 physical_page)
87{
88	return (physical_page - HEADER_PAGES_PER_VOLUME) % geometry->pages_per_chapter;
89}
90
91static inline u32 map_to_chapter_number(struct index_geometry *geometry, u32 physical_page)
92{
93	return (physical_page - HEADER_PAGES_PER_VOLUME) / geometry->pages_per_chapter;
94}
95
96static inline bool is_record_page(struct index_geometry *geometry, u32 physical_page)
97{
98	return map_to_page_number(geometry, physical_page) >= geometry->index_pages_per_chapter;
99}
100
101static u32 map_to_physical_page(const struct index_geometry *geometry, u32 chapter, u32 page)
102{
103	/* Page zero is the header page, so the first chapter index page is page one. */
104	return HEADER_PAGES_PER_VOLUME + (geometry->pages_per_chapter * chapter) + page;
105}
106
107static inline union invalidate_counter get_invalidate_counter(struct page_cache *cache,
108							      unsigned int zone_number)
109{
110	return (union invalidate_counter) {
111		.value = READ_ONCE(cache->search_pending_counters[zone_number].atomic_value),
112	};
113}
114
115static inline void set_invalidate_counter(struct page_cache *cache,
116					  unsigned int zone_number,
117					  union invalidate_counter invalidate_counter)
118{
119	WRITE_ONCE(cache->search_pending_counters[zone_number].atomic_value,
120		   invalidate_counter.value);
121}
122
123static inline bool search_pending(union invalidate_counter invalidate_counter)
124{
125	return (invalidate_counter.counter & 1) != 0;
126}
127
128/* Lock the cache for a zone in order to search for a page. */
129static void begin_pending_search(struct page_cache *cache, u32 physical_page,
130				 unsigned int zone_number)
131{
132	union invalidate_counter invalidate_counter =
133		get_invalidate_counter(cache, zone_number);
134
135	invalidate_counter.page = physical_page;
136	invalidate_counter.counter++;
137	set_invalidate_counter(cache, zone_number, invalidate_counter);
138	VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
139			    "Search is pending for zone %u", zone_number);
140	/*
141	 * This memory barrier ensures that the write to the invalidate counter is seen by other
142	 * threads before this thread accesses the cached page. The corresponding read memory
143	 * barrier is in wait_for_pending_searches().
144	 */
145	smp_mb();
146}
147
148/* Unlock the cache for a zone by clearing its invalidate counter. */
149static void end_pending_search(struct page_cache *cache, unsigned int zone_number)
150{
151	union invalidate_counter invalidate_counter;
152
153	/*
154	 * This memory barrier ensures that this thread completes reads of the
155	 * cached page before other threads see the write to the invalidate
156	 * counter.
157	 */
158	smp_mb();
159
160	invalidate_counter = get_invalidate_counter(cache, zone_number);
161	VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
162			    "Search is pending for zone %u", zone_number);
163	invalidate_counter.counter++;
164	set_invalidate_counter(cache, zone_number, invalidate_counter);
165}
166
167static void wait_for_pending_searches(struct page_cache *cache, u32 physical_page)
168{
169	union invalidate_counter initial_counters[MAX_ZONES];
170	unsigned int i;
171
172	/*
173	 * We hold the read_threads_mutex. We are waiting for threads that do not hold the
174	 * read_threads_mutex. Those threads have "locked" their targeted page by setting the
175	 * search_pending_counter. The corresponding write memory barrier is in
176	 * begin_pending_search().
177	 */
178	smp_mb();
179
180	for (i = 0; i < cache->zone_count; i++)
181		initial_counters[i] = get_invalidate_counter(cache, i);
182	for (i = 0; i < cache->zone_count; i++) {
183		if (search_pending(initial_counters[i]) &&
184		    (initial_counters[i].page == physical_page)) {
185			/*
186			 * There is an active search using the physical page. We need to wait for
187			 * the search to finish.
188			 *
189			 * FIXME: Investigate using wait_event() to wait for the search to finish.
190			 */
191			while (initial_counters[i].value ==
192			       get_invalidate_counter(cache, i).value)
193				cond_resched();
194		}
195	}
196}
197
198static void release_page_buffer(struct cached_page *page)
199{
200	if (page->buffer != NULL)
201		dm_bufio_release(vdo_forget(page->buffer));
202}
203
204static void clear_cache_page(struct page_cache *cache, struct cached_page *page)
205{
206	/* Do not clear read_pending because the read queue relies on it. */
207	release_page_buffer(page);
208	page->physical_page = cache->indexable_pages;
209	WRITE_ONCE(page->last_used, 0);
210}
211
212static void make_page_most_recent(struct page_cache *cache, struct cached_page *page)
213{
214	/*
215	 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
216	 * thread holding the read_threads_mutex.
217	 */
218	if (atomic64_read(&cache->clock) != READ_ONCE(page->last_used))
219		WRITE_ONCE(page->last_used, atomic64_inc_return(&cache->clock));
220}
221
222/* Select a page to remove from the cache to make space for a new entry. */
223static struct cached_page *select_victim_in_cache(struct page_cache *cache)
224{
225	struct cached_page *page;
226	int oldest_index = 0;
227	s64 oldest_time = S64_MAX;
228	s64 last_used;
229	u16 i;
230
231	/* Find the oldest unclaimed page. We hold the read_threads_mutex. */
232	for (i = 0; i < cache->cache_slots; i++) {
233		/* A page with a pending read must not be replaced. */
234		if (cache->cache[i].read_pending)
235			continue;
236
237		last_used = READ_ONCE(cache->cache[i].last_used);
238		if (last_used <= oldest_time) {
239			oldest_time = last_used;
240			oldest_index = i;
241		}
242	}
243
244	page = &cache->cache[oldest_index];
245	if (page->physical_page != cache->indexable_pages) {
246		WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
247		wait_for_pending_searches(cache, page->physical_page);
248	}
249
250	page->read_pending = true;
251	clear_cache_page(cache, page);
252	return page;
253}
254
255/* Make a newly filled cache entry available to other threads. */
256static int put_page_in_cache(struct page_cache *cache, u32 physical_page,
257			     struct cached_page *page)
258{
259	int result;
260
261	/* We hold the read_threads_mutex. */
262	result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
263	if (result != VDO_SUCCESS)
264		return result;
265
266	page->physical_page = physical_page;
267	make_page_most_recent(cache, page);
268	page->read_pending = false;
269
270	/*
271	 * We hold the read_threads_mutex, but we must have a write memory barrier before making
272	 * the cached_page available to the readers that do not hold the mutex. The corresponding
273	 * read memory barrier is in get_page_and_index().
274	 */
275	smp_wmb();
276
277	/* This assignment also clears the queued flag. */
278	WRITE_ONCE(cache->index[physical_page], page - cache->cache);
279	return UDS_SUCCESS;
280}
281
282static void cancel_page_in_cache(struct page_cache *cache, u32 physical_page,
283				 struct cached_page *page)
284{
285	int result;
286
287	/* We hold the read_threads_mutex. */
288	result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
289	if (result != VDO_SUCCESS)
290		return;
291
292	clear_cache_page(cache, page);
293	page->read_pending = false;
294
295	/* Clear the mapping and the queued flag for the new page. */
296	WRITE_ONCE(cache->index[physical_page], cache->cache_slots);
297}
298
299static inline u16 next_queue_position(u16 position)
300{
301	return (position + 1) % VOLUME_CACHE_MAX_QUEUED_READS;
302}
303
304static inline void advance_queue_position(u16 *position)
305{
306	*position = next_queue_position(*position);
307}
308
309static inline bool read_queue_is_full(struct page_cache *cache)
310{
311	return cache->read_queue_first == next_queue_position(cache->read_queue_last);
312}
313
314static bool enqueue_read(struct page_cache *cache, struct uds_request *request,
315			 u32 physical_page)
316{
317	struct queued_read *queue_entry;
318	u16 last = cache->read_queue_last;
319	u16 read_queue_index;
320
321	/* We hold the read_threads_mutex. */
322	if ((cache->index[physical_page] & VOLUME_CACHE_QUEUED_FLAG) == 0) {
323		/* This page has no existing entry in the queue. */
324		if (read_queue_is_full(cache))
325			return false;
326
327		/* Fill in the read queue entry. */
328		cache->read_queue[last].physical_page = physical_page;
329		cache->read_queue[last].invalid = false;
330		cache->read_queue[last].first_request = NULL;
331		cache->read_queue[last].last_request = NULL;
332
333		/* Point the cache index to the read queue entry. */
334		read_queue_index = last;
335		WRITE_ONCE(cache->index[physical_page],
336			   read_queue_index | VOLUME_CACHE_QUEUED_FLAG);
337
338		advance_queue_position(&cache->read_queue_last);
339	} else {
340		/* It's already queued, so add this request to the existing entry. */
341		read_queue_index = cache->index[physical_page] & ~VOLUME_CACHE_QUEUED_FLAG;
342	}
343
344	request->next_request = NULL;
345	queue_entry = &cache->read_queue[read_queue_index];
346	if (queue_entry->first_request == NULL)
347		queue_entry->first_request = request;
348	else
349		queue_entry->last_request->next_request = request;
350	queue_entry->last_request = request;
351
352	return true;
353}
354
355static void enqueue_page_read(struct volume *volume, struct uds_request *request,
356			      u32 physical_page)
357{
358	/* Mark the page as queued, so that chapter invalidation knows to cancel a read. */
359	while (!enqueue_read(&volume->page_cache, request, physical_page)) {
360		vdo_log_debug("Read queue full, waiting for reads to finish");
361		uds_wait_cond(&volume->read_threads_read_done_cond,
362			      &volume->read_threads_mutex);
363	}
364
365	uds_signal_cond(&volume->read_threads_cond);
366}
367
368/*
369 * Reserve the next read queue entry for processing, but do not actually remove it from the queue.
370 * Must be followed by release_queued_requests().
371 */
372static struct queued_read *reserve_read_queue_entry(struct page_cache *cache)
373{
374	/* We hold the read_threads_mutex. */
375	struct queued_read *entry;
376	u16 index_value;
377	bool queued;
378
379	/* No items to dequeue */
380	if (cache->read_queue_next_read == cache->read_queue_last)
381		return NULL;
382
383	entry = &cache->read_queue[cache->read_queue_next_read];
384	index_value = cache->index[entry->physical_page];
385	queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
386	/* Check to see if it's still queued before resetting. */
387	if (entry->invalid && queued)
388		WRITE_ONCE(cache->index[entry->physical_page], cache->cache_slots);
389
390	/*
391	 * If a synchronous read has taken this page, set invalid to true so it doesn't get
392	 * overwritten. Requests will just be requeued.
393	 */
394	if (!queued)
395		entry->invalid = true;
396
397	entry->reserved = true;
398	advance_queue_position(&cache->read_queue_next_read);
399	return entry;
400}
401
402static inline struct queued_read *wait_to_reserve_read_queue_entry(struct volume *volume)
403{
404	struct queued_read *queue_entry = NULL;
405
406	while (!volume->read_threads_exiting) {
407		queue_entry = reserve_read_queue_entry(&volume->page_cache);
408		if (queue_entry != NULL)
409			break;
410
411		uds_wait_cond(&volume->read_threads_cond, &volume->read_threads_mutex);
412	}
413
414	return queue_entry;
415}
416
417static int init_chapter_index_page(const struct volume *volume, u8 *index_page,
418				   u32 chapter, u32 index_page_number,
419				   struct delta_index_page *chapter_index_page)
420{
421	u64 ci_virtual;
422	u32 ci_chapter;
423	u32 lowest_list;
424	u32 highest_list;
425	struct index_geometry *geometry = volume->geometry;
426	int result;
427
428	result = uds_initialize_chapter_index_page(chapter_index_page, geometry,
429						   index_page, volume->nonce);
430	if (volume->lookup_mode == LOOKUP_FOR_REBUILD)
431		return result;
432
433	if (result != UDS_SUCCESS) {
434		return vdo_log_error_strerror(result,
435					      "Reading chapter index page for chapter %u page %u",
436					      chapter, index_page_number);
437	}
438
439	uds_get_list_number_bounds(volume->index_page_map, chapter, index_page_number,
440				   &lowest_list, &highest_list);
441	ci_virtual = chapter_index_page->virtual_chapter_number;
442	ci_chapter = uds_map_to_physical_chapter(geometry, ci_virtual);
443	if ((chapter == ci_chapter) &&
444	    (lowest_list == chapter_index_page->lowest_list_number) &&
445	    (highest_list == chapter_index_page->highest_list_number))
446		return UDS_SUCCESS;
447
448	vdo_log_warning("Index page map updated to %llu",
449			(unsigned long long) volume->index_page_map->last_update);
450	vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u",
451			chapter, index_page_number, lowest_list, highest_list,
452			(unsigned long long) ci_virtual,
453			chapter_index_page->lowest_list_number,
454			chapter_index_page->highest_list_number);
455	return vdo_log_error_strerror(UDS_CORRUPT_DATA,
456				      "index page map mismatch with chapter index");
457}
458
459static int initialize_index_page(const struct volume *volume, u32 physical_page,
460				 struct cached_page *page)
461{
462	u32 chapter = map_to_chapter_number(volume->geometry, physical_page);
463	u32 index_page_number = map_to_page_number(volume->geometry, physical_page);
464
465	return init_chapter_index_page(volume, dm_bufio_get_block_data(page->buffer),
466				       chapter, index_page_number, &page->index_page);
467}
468
469static bool search_record_page(const u8 record_page[],
470			       const struct uds_record_name *name,
471			       const struct index_geometry *geometry,
472			       struct uds_record_data *metadata)
473{
474	/*
475	 * The array of records is sorted by name and stored as a binary tree in heap order, so the
476	 * root of the tree is the first array element.
477	 */
478	u32 node = 0;
479	const struct uds_volume_record *records = (const struct uds_volume_record *) record_page;
480
481	while (node < geometry->records_per_page) {
482		int result;
483		const struct uds_volume_record *record = &records[node];
484
485		result = memcmp(name, &record->name, UDS_RECORD_NAME_SIZE);
486		if (result == 0) {
487			if (metadata != NULL)
488				*metadata = record->data;
489			return true;
490		}
491
492		/* The children of node N are at indexes 2N+1 and 2N+2. */
493		node = ((2 * node) + ((result < 0) ? 1 : 2));
494	}
495
496	return false;
497}
498
499/*
500 * If we've read in a record page, we're going to do an immediate search, to speed up processing by
501 * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If
502 * we've read in an index page, we save the record page number so we don't have to resolve the
503 * index page again. We use the location, virtual_chapter, and old_metadata fields in the request
504 * to allow the index code to know where to begin processing the request again.
505 */
506static int search_page(struct cached_page *page, const struct volume *volume,
507		       struct uds_request *request, u32 physical_page)
508{
509	int result;
510	enum uds_index_region location;
511	u16 record_page_number;
512
513	if (is_record_page(volume->geometry, physical_page)) {
514		if (search_record_page(dm_bufio_get_block_data(page->buffer),
515				       &request->record_name, volume->geometry,
516				       &request->old_metadata))
517			location = UDS_LOCATION_RECORD_PAGE_LOOKUP;
518		else
519			location = UDS_LOCATION_UNAVAILABLE;
520	} else {
521		result = uds_search_chapter_index_page(&page->index_page,
522						       volume->geometry,
523						       &request->record_name,
524						       &record_page_number);
525		if (result != UDS_SUCCESS)
526			return result;
527
528		if (record_page_number == NO_CHAPTER_INDEX_ENTRY) {
529			location = UDS_LOCATION_UNAVAILABLE;
530		} else {
531			location = UDS_LOCATION_INDEX_PAGE_LOOKUP;
532			*((u16 *) &request->old_metadata) = record_page_number;
533		}
534	}
535
536	request->location = location;
537	request->found = false;
538	return UDS_SUCCESS;
539}
540
541static int process_entry(struct volume *volume, struct queued_read *entry)
542{
543	u32 page_number = entry->physical_page;
544	struct uds_request *request;
545	struct cached_page *page = NULL;
546	u8 *page_data;
547	int result;
548
549	if (entry->invalid) {
550		vdo_log_debug("Requeuing requests for invalid page");
551		return UDS_SUCCESS;
552	}
553
554	page = select_victim_in_cache(&volume->page_cache);
555
556	mutex_unlock(&volume->read_threads_mutex);
557	page_data = dm_bufio_read(volume->client, page_number, &page->buffer);
558	mutex_lock(&volume->read_threads_mutex);
559	if (IS_ERR(page_data)) {
560		result = -PTR_ERR(page_data);
561		vdo_log_warning_strerror(result,
562					 "error reading physical page %u from volume",
563					 page_number);
564		cancel_page_in_cache(&volume->page_cache, page_number, page);
565		return result;
566	}
567
568	if (entry->invalid) {
569		vdo_log_warning("Page %u invalidated after read", page_number);
570		cancel_page_in_cache(&volume->page_cache, page_number, page);
571		return UDS_SUCCESS;
572	}
573
574	if (!is_record_page(volume->geometry, page_number)) {
575		result = initialize_index_page(volume, page_number, page);
576		if (result != UDS_SUCCESS) {
577			vdo_log_warning("Error initializing chapter index page");
578			cancel_page_in_cache(&volume->page_cache, page_number, page);
579			return result;
580		}
581	}
582
583	result = put_page_in_cache(&volume->page_cache, page_number, page);
584	if (result != UDS_SUCCESS) {
585		vdo_log_warning("Error putting page %u in cache", page_number);
586		cancel_page_in_cache(&volume->page_cache, page_number, page);
587		return result;
588	}
589
590	request = entry->first_request;
591	while ((request != NULL) && (result == UDS_SUCCESS)) {
592		result = search_page(page, volume, request, page_number);
593		request = request->next_request;
594	}
595
596	return result;
597}
598
599static void release_queued_requests(struct volume *volume, struct queued_read *entry,
600				    int result)
601{
602	struct page_cache *cache = &volume->page_cache;
603	u16 next_read = cache->read_queue_next_read;
604	struct uds_request *request;
605	struct uds_request *next;
606
607	for (request = entry->first_request; request != NULL; request = next) {
608		next = request->next_request;
609		request->status = result;
610		request->requeued = true;
611		uds_enqueue_request(request, STAGE_INDEX);
612	}
613
614	entry->reserved = false;
615
616	/* Move the read_queue_first pointer as far as we can. */
617	while ((cache->read_queue_first != next_read) &&
618	       (!cache->read_queue[cache->read_queue_first].reserved))
619		advance_queue_position(&cache->read_queue_first);
620	uds_broadcast_cond(&volume->read_threads_read_done_cond);
621}
622
623static void read_thread_function(void *arg)
624{
625	struct volume *volume = arg;
626
627	vdo_log_debug("reader starting");
628	mutex_lock(&volume->read_threads_mutex);
629	while (true) {
630		struct queued_read *queue_entry;
631		int result;
632
633		queue_entry = wait_to_reserve_read_queue_entry(volume);
634		if (volume->read_threads_exiting)
635			break;
636
637		result = process_entry(volume, queue_entry);
638		release_queued_requests(volume, queue_entry, result);
639	}
640	mutex_unlock(&volume->read_threads_mutex);
641	vdo_log_debug("reader done");
642}
643
644static void get_page_and_index(struct page_cache *cache, u32 physical_page,
645			       int *queue_index, struct cached_page **page_ptr)
646{
647	u16 index_value;
648	u16 index;
649	bool queued;
650
651	/*
652	 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
653	 * thread holding the read_threads_mutex.
654	 *
655	 * Holding only a search_pending_counter is the most frequent case.
656	 */
657	/*
658	 * It would be unlikely for the compiler to turn the usage of index_value into two reads of
659	 * cache->index, but it would be possible and very bad if those reads did not return the
660	 * same bits.
661	 */
662	index_value = READ_ONCE(cache->index[physical_page]);
663	queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
664	index = index_value & ~VOLUME_CACHE_QUEUED_FLAG;
665
666	if (!queued && (index < cache->cache_slots)) {
667		*page_ptr = &cache->cache[index];
668		/*
669		 * We have acquired access to the cached page, but unless we hold the
670		 * read_threads_mutex, we need a read memory barrier now. The corresponding write
671		 * memory barrier is in put_page_in_cache().
672		 */
673		smp_rmb();
674	} else {
675		*page_ptr = NULL;
676	}
677
678	*queue_index = queued ? index : -1;
679}
680
681static void get_page_from_cache(struct page_cache *cache, u32 physical_page,
682				struct cached_page **page)
683{
684	/*
685	 * ASSERTION: We are in a zone thread.
686	 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex.
687	 */
688	int queue_index = -1;
689
690	get_page_and_index(cache, physical_page, &queue_index, page);
691}
692
693static int read_page_locked(struct volume *volume, u32 physical_page,
694			    struct cached_page **page_ptr)
695{
696	int result = UDS_SUCCESS;
697	struct cached_page *page = NULL;
698	u8 *page_data;
699
700	page = select_victim_in_cache(&volume->page_cache);
701	page_data = dm_bufio_read(volume->client, physical_page, &page->buffer);
702	if (IS_ERR(page_data)) {
703		result = -PTR_ERR(page_data);
704		vdo_log_warning_strerror(result,
705					 "error reading physical page %u from volume",
706					 physical_page);
707		cancel_page_in_cache(&volume->page_cache, physical_page, page);
708		return result;
709	}
710
711	if (!is_record_page(volume->geometry, physical_page)) {
712		result = initialize_index_page(volume, physical_page, page);
713		if (result != UDS_SUCCESS) {
714			if (volume->lookup_mode != LOOKUP_FOR_REBUILD)
715				vdo_log_warning("Corrupt index page %u", physical_page);
716			cancel_page_in_cache(&volume->page_cache, physical_page, page);
717			return result;
718		}
719	}
720
721	result = put_page_in_cache(&volume->page_cache, physical_page, page);
722	if (result != UDS_SUCCESS) {
723		vdo_log_warning("Error putting page %u in cache", physical_page);
724		cancel_page_in_cache(&volume->page_cache, physical_page, page);
725		return result;
726	}
727
728	*page_ptr = page;
729	return UDS_SUCCESS;
730}
731
732/* Retrieve a page from the cache while holding the read threads mutex. */
733static int get_volume_page_locked(struct volume *volume, u32 physical_page,
734				  struct cached_page **page_ptr)
735{
736	int result;
737	struct cached_page *page = NULL;
738
739	get_page_from_cache(&volume->page_cache, physical_page, &page);
740	if (page == NULL) {
741		result = read_page_locked(volume, physical_page, &page);
742		if (result != UDS_SUCCESS)
743			return result;
744	} else {
745		make_page_most_recent(&volume->page_cache, page);
746	}
747
748	*page_ptr = page;
749	return UDS_SUCCESS;
750}
751
752/* Retrieve a page from the cache while holding a search_pending lock. */
753static int get_volume_page_protected(struct volume *volume, struct uds_request *request,
754				     u32 physical_page, struct cached_page **page_ptr)
755{
756	struct cached_page *page;
757
758	get_page_from_cache(&volume->page_cache, physical_page, &page);
759	if (page != NULL) {
760		if (request->zone_number == 0) {
761			/* Only one zone is allowed to update the LRU. */
762			make_page_most_recent(&volume->page_cache, page);
763		}
764
765		*page_ptr = page;
766		return UDS_SUCCESS;
767	}
768
769	/* Prepare to enqueue a read for the page. */
770	end_pending_search(&volume->page_cache, request->zone_number);
771	mutex_lock(&volume->read_threads_mutex);
772
773	/*
774	 * Do the lookup again while holding the read mutex (no longer the fast case so this should
775	 * be fine to repeat). We need to do this because a page may have been added to the cache
776	 * by a reader thread between the time we searched above and the time we went to actually
777	 * try to enqueue it below. This could result in us enqueuing another read for a page which
778	 * is already in the cache, which would mean we end up with two entries in the cache for
779	 * the same page.
780	 */
781	get_page_from_cache(&volume->page_cache, physical_page, &page);
782	if (page == NULL) {
783		enqueue_page_read(volume, request, physical_page);
784		/*
785		 * The performance gain from unlocking first, while "search pending" mode is off,
786		 * turns out to be significant in some cases. The page is not available yet so
787		 * the order does not matter for correctness as it does below.
788		 */
789		mutex_unlock(&volume->read_threads_mutex);
790		begin_pending_search(&volume->page_cache, physical_page,
791				     request->zone_number);
792		return UDS_QUEUED;
793	}
794
795	/*
796	 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and
797	 * "search pending" state in careful order so no other thread can mess with the data before
798	 * the caller gets to look at it.
799	 */
800	begin_pending_search(&volume->page_cache, physical_page, request->zone_number);
801	mutex_unlock(&volume->read_threads_mutex);
802	*page_ptr = page;
803	return UDS_SUCCESS;
804}
805
806static int get_volume_page(struct volume *volume, u32 chapter, u32 page_number,
807			   struct cached_page **page_ptr)
808{
809	int result;
810	u32 physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
811
812	mutex_lock(&volume->read_threads_mutex);
813	result = get_volume_page_locked(volume, physical_page, page_ptr);
814	mutex_unlock(&volume->read_threads_mutex);
815	return result;
816}
817
818int uds_get_volume_record_page(struct volume *volume, u32 chapter, u32 page_number,
819			       u8 **data_ptr)
820{
821	int result;
822	struct cached_page *page = NULL;
823
824	result = get_volume_page(volume, chapter, page_number, &page);
825	if (result == UDS_SUCCESS)
826		*data_ptr = dm_bufio_get_block_data(page->buffer);
827	return result;
828}
829
830int uds_get_volume_index_page(struct volume *volume, u32 chapter, u32 page_number,
831			      struct delta_index_page **index_page_ptr)
832{
833	int result;
834	struct cached_page *page = NULL;
835
836	result = get_volume_page(volume, chapter, page_number, &page);
837	if (result == UDS_SUCCESS)
838		*index_page_ptr = &page->index_page;
839	return result;
840}
841
842/*
843 * Find the record page associated with a name in a given index page. This will return UDS_QUEUED
844 * if the page in question must be read from storage.
845 */
846static int search_cached_index_page(struct volume *volume, struct uds_request *request,
847				    u32 chapter, u32 index_page_number,
848				    u16 *record_page_number)
849{
850	int result;
851	struct cached_page *page = NULL;
852	u32 physical_page = map_to_physical_page(volume->geometry, chapter,
853						 index_page_number);
854
855	/*
856	 * Make sure the invalidate counter is updated before we try and read the mapping. This
857	 * prevents this thread from reading a page in the cache which has already been marked for
858	 * invalidation by the reader thread, before the reader thread has noticed that the
859	 * invalidate_counter has been incremented.
860	 */
861	begin_pending_search(&volume->page_cache, physical_page, request->zone_number);
862
863	result = get_volume_page_protected(volume, request, physical_page, &page);
864	if (result != UDS_SUCCESS) {
865		end_pending_search(&volume->page_cache, request->zone_number);
866		return result;
867	}
868
869	result = uds_search_chapter_index_page(&page->index_page, volume->geometry,
870					       &request->record_name,
871					       record_page_number);
872	end_pending_search(&volume->page_cache, request->zone_number);
873	return result;
874}
875
876/*
877 * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if
878 * the page in question must be read from storage.
879 */
880int uds_search_cached_record_page(struct volume *volume, struct uds_request *request,
881				  u32 chapter, u16 record_page_number, bool *found)
882{
883	struct cached_page *record_page;
884	struct index_geometry *geometry = volume->geometry;
885	int result;
886	u32 physical_page, page_number;
887
888	*found = false;
889	if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
890		return UDS_SUCCESS;
891
892	result = VDO_ASSERT(record_page_number < geometry->record_pages_per_chapter,
893			    "0 <= %d < %u", record_page_number,
894			    geometry->record_pages_per_chapter);
895	if (result != VDO_SUCCESS)
896		return result;
897
898	page_number = geometry->index_pages_per_chapter + record_page_number;
899
900	physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
901
902	/*
903	 * Make sure the invalidate counter is updated before we try and read the mapping. This
904	 * prevents this thread from reading a page in the cache which has already been marked for
905	 * invalidation by the reader thread, before the reader thread has noticed that the
906	 * invalidate_counter has been incremented.
907	 */
908	begin_pending_search(&volume->page_cache, physical_page, request->zone_number);
909
910	result = get_volume_page_protected(volume, request, physical_page, &record_page);
911	if (result != UDS_SUCCESS) {
912		end_pending_search(&volume->page_cache, request->zone_number);
913		return result;
914	}
915
916	if (search_record_page(dm_bufio_get_block_data(record_page->buffer),
917			       &request->record_name, geometry, &request->old_metadata))
918		*found = true;
919
920	end_pending_search(&volume->page_cache, request->zone_number);
921	return UDS_SUCCESS;
922}
923
924void uds_prefetch_volume_chapter(const struct volume *volume, u32 chapter)
925{
926	const struct index_geometry *geometry = volume->geometry;
927	u32 physical_page = map_to_physical_page(geometry, chapter, 0);
928
929	dm_bufio_prefetch(volume->client, physical_page, geometry->pages_per_chapter);
930}
931
932int uds_read_chapter_index_from_volume(const struct volume *volume, u64 virtual_chapter,
933				       struct dm_buffer *volume_buffers[],
934				       struct delta_index_page index_pages[])
935{
936	int result;
937	u32 i;
938	const struct index_geometry *geometry = volume->geometry;
939	u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
940	u32 physical_page = map_to_physical_page(geometry, physical_chapter, 0);
941
942	dm_bufio_prefetch(volume->client, physical_page, geometry->index_pages_per_chapter);
943	for (i = 0; i < geometry->index_pages_per_chapter; i++) {
944		u8 *index_page;
945
946		index_page = dm_bufio_read(volume->client, physical_page + i,
947					   &volume_buffers[i]);
948		if (IS_ERR(index_page)) {
949			result = -PTR_ERR(index_page);
950			vdo_log_warning_strerror(result,
951						 "error reading physical page %u",
952						 physical_page);
953			return result;
954		}
955
956		result = init_chapter_index_page(volume, index_page, physical_chapter, i,
957						 &index_pages[i]);
958		if (result != UDS_SUCCESS)
959			return result;
960	}
961
962	return UDS_SUCCESS;
963}
964
965int uds_search_volume_page_cache(struct volume *volume, struct uds_request *request,
966				 bool *found)
967{
968	int result;
969	u32 physical_chapter =
970		uds_map_to_physical_chapter(volume->geometry, request->virtual_chapter);
971	u32 index_page_number;
972	u16 record_page_number;
973
974	index_page_number = uds_find_index_page_number(volume->index_page_map,
975						       &request->record_name,
976						       physical_chapter);
977
978	if (request->location == UDS_LOCATION_INDEX_PAGE_LOOKUP) {
979		record_page_number = *((u16 *) &request->old_metadata);
980	} else {
981		result = search_cached_index_page(volume, request, physical_chapter,
982						  index_page_number,
983						  &record_page_number);
984		if (result != UDS_SUCCESS)
985			return result;
986	}
987
988	return uds_search_cached_record_page(volume, request, physical_chapter,
989					     record_page_number, found);
990}
991
992int uds_search_volume_page_cache_for_rebuild(struct volume *volume,
993					     const struct uds_record_name *name,
994					     u64 virtual_chapter, bool *found)
995{
996	int result;
997	struct index_geometry *geometry = volume->geometry;
998	struct cached_page *page;
999	u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
1000	u32 index_page_number;
1001	u16 record_page_number;
1002	u32 page_number;
1003
1004	*found = false;
1005	index_page_number =
1006		uds_find_index_page_number(volume->index_page_map, name,
1007					   physical_chapter);
1008	result = get_volume_page(volume, physical_chapter, index_page_number, &page);
1009	if (result != UDS_SUCCESS)
1010		return result;
1011
1012	result = uds_search_chapter_index_page(&page->index_page, geometry, name,
1013					       &record_page_number);
1014	if (result != UDS_SUCCESS)
1015		return result;
1016
1017	if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
1018		return UDS_SUCCESS;
1019
1020	page_number = geometry->index_pages_per_chapter + record_page_number;
1021	result = get_volume_page(volume, physical_chapter, page_number, &page);
1022	if (result != UDS_SUCCESS)
1023		return result;
1024
1025	*found = search_record_page(dm_bufio_get_block_data(page->buffer), name,
1026				    geometry, NULL);
1027	return UDS_SUCCESS;
1028}
1029
1030static void invalidate_page(struct page_cache *cache, u32 physical_page)
1031{
1032	struct cached_page *page;
1033	int queue_index = -1;
1034
1035	/* We hold the read_threads_mutex. */
1036	get_page_and_index(cache, physical_page, &queue_index, &page);
1037	if (page != NULL) {
1038		WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
1039		wait_for_pending_searches(cache, page->physical_page);
1040		clear_cache_page(cache, page);
1041	} else if (queue_index > -1) {
1042		vdo_log_debug("setting pending read to invalid");
1043		cache->read_queue[queue_index].invalid = true;
1044	}
1045}
1046
1047void uds_forget_chapter(struct volume *volume, u64 virtual_chapter)
1048{
1049	u32 physical_chapter =
1050		uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
1051	u32 first_page = map_to_physical_page(volume->geometry, physical_chapter, 0);
1052	u32 i;
1053
1054	vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter);
1055	mutex_lock(&volume->read_threads_mutex);
1056	for (i = 0; i < volume->geometry->pages_per_chapter; i++)
1057		invalidate_page(&volume->page_cache, first_page + i);
1058	mutex_unlock(&volume->read_threads_mutex);
1059}
1060
1061/*
1062 * Donate an index pages from a newly written chapter to the page cache since it is likely to be
1063 * used again soon. The caller must already hold the reader thread mutex.
1064 */
1065static int donate_index_page_locked(struct volume *volume, u32 physical_chapter,
1066				    u32 index_page_number, struct dm_buffer *page_buffer)
1067{
1068	int result;
1069	struct cached_page *page = NULL;
1070	u32 physical_page =
1071		map_to_physical_page(volume->geometry, physical_chapter,
1072				     index_page_number);
1073
1074	page = select_victim_in_cache(&volume->page_cache);
1075	page->buffer = page_buffer;
1076	result = init_chapter_index_page(volume, dm_bufio_get_block_data(page_buffer),
1077					 physical_chapter, index_page_number,
1078					 &page->index_page);
1079	if (result != UDS_SUCCESS) {
1080		vdo_log_warning("Error initialize chapter index page");
1081		cancel_page_in_cache(&volume->page_cache, physical_page, page);
1082		return result;
1083	}
1084
1085	result = put_page_in_cache(&volume->page_cache, physical_page, page);
1086	if (result != UDS_SUCCESS) {
1087		vdo_log_warning("Error putting page %u in cache", physical_page);
1088		cancel_page_in_cache(&volume->page_cache, physical_page, page);
1089		return result;
1090	}
1091
1092	return UDS_SUCCESS;
1093}
1094
1095static int write_index_pages(struct volume *volume, u32 physical_chapter_number,
1096			     struct open_chapter_index *chapter_index)
1097{
1098	struct index_geometry *geometry = volume->geometry;
1099	struct dm_buffer *page_buffer;
1100	u32 first_index_page = map_to_physical_page(geometry, physical_chapter_number, 0);
1101	u32 delta_list_number = 0;
1102	u32 index_page_number;
1103
1104	for (index_page_number = 0;
1105	     index_page_number < geometry->index_pages_per_chapter;
1106	     index_page_number++) {
1107		u8 *page_data;
1108		u32 physical_page = first_index_page + index_page_number;
1109		u32 lists_packed;
1110		bool last_page;
1111		int result;
1112
1113		page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1114		if (IS_ERR(page_data)) {
1115			return vdo_log_warning_strerror(-PTR_ERR(page_data),
1116							"failed to prepare index page");
1117		}
1118
1119		last_page = ((index_page_number + 1) == geometry->index_pages_per_chapter);
1120		result = uds_pack_open_chapter_index_page(chapter_index, page_data,
1121							  delta_list_number, last_page,
1122							  &lists_packed);
1123		if (result != UDS_SUCCESS) {
1124			dm_bufio_release(page_buffer);
1125			return vdo_log_warning_strerror(result,
1126							"failed to pack index page");
1127		}
1128
1129		dm_bufio_mark_buffer_dirty(page_buffer);
1130
1131		if (lists_packed == 0) {
1132			vdo_log_debug("no delta lists packed on chapter %u page %u",
1133				      physical_chapter_number, index_page_number);
1134		} else {
1135			delta_list_number += lists_packed;
1136		}
1137
1138		uds_update_index_page_map(volume->index_page_map,
1139					  chapter_index->virtual_chapter_number,
1140					  physical_chapter_number, index_page_number,
1141					  delta_list_number - 1);
1142
1143		mutex_lock(&volume->read_threads_mutex);
1144		result = donate_index_page_locked(volume, physical_chapter_number,
1145						  index_page_number, page_buffer);
1146		mutex_unlock(&volume->read_threads_mutex);
1147		if (result != UDS_SUCCESS) {
1148			dm_bufio_release(page_buffer);
1149			return result;
1150		}
1151	}
1152
1153	return UDS_SUCCESS;
1154}
1155
1156static u32 encode_tree(u8 record_page[],
1157		       const struct uds_volume_record *sorted_pointers[],
1158		       u32 next_record, u32 node, u32 node_count)
1159{
1160	if (node < node_count) {
1161		u32 child = (2 * node) + 1;
1162
1163		next_record = encode_tree(record_page, sorted_pointers, next_record,
1164					  child, node_count);
1165
1166		/*
1167		 * In-order traversal: copy the contents of the next record into the page at the
1168		 * node offset.
1169		 */
1170		memcpy(&record_page[node * BYTES_PER_RECORD],
1171		       sorted_pointers[next_record++], BYTES_PER_RECORD);
1172
1173		next_record = encode_tree(record_page, sorted_pointers, next_record,
1174					  child + 1, node_count);
1175	}
1176
1177	return next_record;
1178}
1179
1180static int encode_record_page(const struct volume *volume,
1181			      const struct uds_volume_record records[], u8 record_page[])
1182{
1183	int result;
1184	u32 i;
1185	u32 records_per_page = volume->geometry->records_per_page;
1186	const struct uds_volume_record **record_pointers = volume->record_pointers;
1187
1188	for (i = 0; i < records_per_page; i++)
1189		record_pointers[i] = &records[i];
1190
1191	/*
1192	 * Sort the record pointers by using just the names in the records, which is less work than
1193	 * sorting the entire record values.
1194	 */
1195	BUILD_BUG_ON(offsetof(struct uds_volume_record, name) != 0);
1196	result = uds_radix_sort(volume->radix_sorter, (const u8 **) record_pointers,
1197				records_per_page, UDS_RECORD_NAME_SIZE);
1198	if (result != UDS_SUCCESS)
1199		return result;
1200
1201	encode_tree(record_page, record_pointers, 0, 0, records_per_page);
1202	return UDS_SUCCESS;
1203}
1204
1205static int write_record_pages(struct volume *volume, u32 physical_chapter_number,
1206			      const struct uds_volume_record *records)
1207{
1208	u32 record_page_number;
1209	struct index_geometry *geometry = volume->geometry;
1210	struct dm_buffer *page_buffer;
1211	const struct uds_volume_record *next_record = records;
1212	u32 first_record_page = map_to_physical_page(geometry, physical_chapter_number,
1213						     geometry->index_pages_per_chapter);
1214
1215	for (record_page_number = 0;
1216	     record_page_number < geometry->record_pages_per_chapter;
1217	     record_page_number++) {
1218		u8 *page_data;
1219		u32 physical_page = first_record_page + record_page_number;
1220		int result;
1221
1222		page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1223		if (IS_ERR(page_data)) {
1224			return vdo_log_warning_strerror(-PTR_ERR(page_data),
1225							"failed to prepare record page");
1226		}
1227
1228		result = encode_record_page(volume, next_record, page_data);
1229		if (result != UDS_SUCCESS) {
1230			dm_bufio_release(page_buffer);
1231			return vdo_log_warning_strerror(result,
1232							"failed to encode record page %u",
1233							record_page_number);
1234		}
1235
1236		next_record += geometry->records_per_page;
1237		dm_bufio_mark_buffer_dirty(page_buffer);
1238		dm_bufio_release(page_buffer);
1239	}
1240
1241	return UDS_SUCCESS;
1242}
1243
1244int uds_write_chapter(struct volume *volume, struct open_chapter_index *chapter_index,
1245		      const struct uds_volume_record *records)
1246{
1247	int result;
1248	u32 physical_chapter_number =
1249		uds_map_to_physical_chapter(volume->geometry,
1250					    chapter_index->virtual_chapter_number);
1251
1252	result = write_index_pages(volume, physical_chapter_number, chapter_index);
1253	if (result != UDS_SUCCESS)
1254		return result;
1255
1256	result = write_record_pages(volume, physical_chapter_number, records);
1257	if (result != UDS_SUCCESS)
1258		return result;
1259
1260	result = -dm_bufio_write_dirty_buffers(volume->client);
1261	if (result != UDS_SUCCESS)
1262		vdo_log_error_strerror(result, "cannot sync chapter to volume");
1263
1264	return result;
1265}
1266
1267static void probe_chapter(struct volume *volume, u32 chapter_number,
1268			  u64 *virtual_chapter_number)
1269{
1270	const struct index_geometry *geometry = volume->geometry;
1271	u32 expected_list_number = 0;
1272	u32 i;
1273	u64 vcn = BAD_CHAPTER;
1274
1275	*virtual_chapter_number = BAD_CHAPTER;
1276	dm_bufio_prefetch(volume->client,
1277			  map_to_physical_page(geometry, chapter_number, 0),
1278			  geometry->index_pages_per_chapter);
1279
1280	for (i = 0; i < geometry->index_pages_per_chapter; i++) {
1281		struct delta_index_page *page;
1282		int result;
1283
1284		result = uds_get_volume_index_page(volume, chapter_number, i, &page);
1285		if (result != UDS_SUCCESS)
1286			return;
1287
1288		if (page->virtual_chapter_number == BAD_CHAPTER) {
1289			vdo_log_error("corrupt index page in chapter %u",
1290				      chapter_number);
1291			return;
1292		}
1293
1294		if (vcn == BAD_CHAPTER) {
1295			vcn = page->virtual_chapter_number;
1296		} else if (page->virtual_chapter_number != vcn) {
1297			vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu",
1298				      chapter_number, i, (unsigned long long) vcn,
1299				      (unsigned long long) page->virtual_chapter_number);
1300			return;
1301		}
1302
1303		if (expected_list_number != page->lowest_list_number) {
1304			vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u",
1305				      chapter_number, i, expected_list_number,
1306				      page->lowest_list_number);
1307			return;
1308		}
1309		expected_list_number = page->highest_list_number + 1;
1310
1311		result = uds_validate_chapter_index_page(page, geometry);
1312		if (result != UDS_SUCCESS)
1313			return;
1314	}
1315
1316	if (chapter_number != uds_map_to_physical_chapter(geometry, vcn)) {
1317		vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number,
1318			      (unsigned long long) vcn, geometry->chapters_per_volume);
1319		return;
1320	}
1321
1322	*virtual_chapter_number = vcn;
1323}
1324
1325/* Find the last valid physical chapter in the volume. */
1326static void find_real_end_of_volume(struct volume *volume, u32 limit, u32 *limit_ptr)
1327{
1328	u32 span = 1;
1329	u32 tries = 0;
1330
1331	while (limit > 0) {
1332		u32 chapter = (span > limit) ? 0 : limit - span;
1333		u64 vcn = 0;
1334
1335		probe_chapter(volume, chapter, &vcn);
1336		if (vcn == BAD_CHAPTER) {
1337			limit = chapter;
1338			if (++tries > 1)
1339				span *= 2;
1340		} else {
1341			if (span == 1)
1342				break;
1343			span /= 2;
1344			tries = 0;
1345		}
1346	}
1347
1348	*limit_ptr = limit;
1349}
1350
1351static int find_chapter_limits(struct volume *volume, u32 chapter_limit, u64 *lowest_vcn,
1352			       u64 *highest_vcn)
1353{
1354	struct index_geometry *geometry = volume->geometry;
1355	u64 zero_vcn;
1356	u64 lowest = BAD_CHAPTER;
1357	u64 highest = BAD_CHAPTER;
1358	u64 moved_chapter = BAD_CHAPTER;
1359	u32 left_chapter = 0;
1360	u32 right_chapter = 0;
1361	u32 bad_chapters = 0;
1362
1363	/*
1364	 * This method assumes there is at most one run of contiguous bad chapters caused by
1365	 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the
1366	 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the
1367	 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately
1368	 * precedes the lowest one.
1369	 */
1370
1371	/* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */
1372	probe_chapter(volume, 0, &zero_vcn);
1373
1374	/*
1375	 * Binary search for end of the discontinuity in the monotonically increasing virtual
1376	 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're
1377	 * searching for the index of the smallest value less than zero_vcn. In the case we go off
1378	 * the end it means that chapter 0 has the lowest vcn.
1379	 *
1380	 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always
1381	 * skip over the moved chapter when searching, adding it to the range at the end if
1382	 * necessary.
1383	 */
1384	if (geometry->remapped_physical > 0) {
1385		u64 remapped_vcn;
1386
1387		probe_chapter(volume, geometry->remapped_physical, &remapped_vcn);
1388		if (remapped_vcn == geometry->remapped_virtual)
1389			moved_chapter = geometry->remapped_physical;
1390	}
1391
1392	left_chapter = 0;
1393	right_chapter = chapter_limit;
1394
1395	while (left_chapter < right_chapter) {
1396		u64 probe_vcn;
1397		u32 chapter = (left_chapter + right_chapter) / 2;
1398
1399		if (chapter == moved_chapter)
1400			chapter--;
1401
1402		probe_chapter(volume, chapter, &probe_vcn);
1403		if (zero_vcn <= probe_vcn) {
1404			left_chapter = chapter + 1;
1405			if (left_chapter == moved_chapter)
1406				left_chapter++;
1407		} else {
1408			right_chapter = chapter;
1409		}
1410	}
1411
1412	/* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/
1413	if (left_chapter >= chapter_limit)
1414		left_chapter = 0;
1415
1416	/* At this point, left_chapter is the chapter with the lowest virtual chapter number. */
1417	probe_chapter(volume, left_chapter, &lowest);
1418
1419	/* The moved chapter might be the lowest in the range. */
1420	if ((moved_chapter != BAD_CHAPTER) && (lowest == geometry->remapped_virtual + 1))
1421		lowest = geometry->remapped_virtual;
1422
1423	/*
1424	 * Circularly scan backwards, moving over any bad chapters until encountering a good one,
1425	 * which is the chapter with the highest vcn.
1426	 */
1427	while (highest == BAD_CHAPTER) {
1428		right_chapter = (right_chapter + chapter_limit - 1) % chapter_limit;
1429		if (right_chapter == moved_chapter)
1430			continue;
1431
1432		probe_chapter(volume, right_chapter, &highest);
1433		if (bad_chapters++ >= MAX_BAD_CHAPTERS) {
1434			vdo_log_error("too many bad chapters in volume: %u",
1435				      bad_chapters);
1436			return UDS_CORRUPT_DATA;
1437		}
1438	}
1439
1440	*lowest_vcn = lowest;
1441	*highest_vcn = highest;
1442	return UDS_SUCCESS;
1443}
1444
1445/*
1446 * Find the highest and lowest contiguous chapters present in the volume and determine their
1447 * virtual chapter numbers. This is used by rebuild.
1448 */
1449int uds_find_volume_chapter_boundaries(struct volume *volume, u64 *lowest_vcn,
1450				       u64 *highest_vcn, bool *is_empty)
1451{
1452	u32 chapter_limit = volume->geometry->chapters_per_volume;
1453
1454	find_real_end_of_volume(volume, chapter_limit, &chapter_limit);
1455	if (chapter_limit == 0) {
1456		*lowest_vcn = 0;
1457		*highest_vcn = 0;
1458		*is_empty = true;
1459		return UDS_SUCCESS;
1460	}
1461
1462	*is_empty = false;
1463	return find_chapter_limits(volume, chapter_limit, lowest_vcn, highest_vcn);
1464}
1465
1466int __must_check uds_replace_volume_storage(struct volume *volume,
1467					    struct index_layout *layout,
1468					    struct block_device *bdev)
1469{
1470	int result;
1471	u32 i;
1472
1473	result = uds_replace_index_layout_storage(layout, bdev);
1474	if (result != UDS_SUCCESS)
1475		return result;
1476
1477	/* Release all outstanding dm_bufio objects */
1478	for (i = 0; i < volume->page_cache.indexable_pages; i++)
1479		volume->page_cache.index[i] = volume->page_cache.cache_slots;
1480	for (i = 0; i < volume->page_cache.cache_slots; i++)
1481		clear_cache_page(&volume->page_cache, &volume->page_cache.cache[i]);
1482	if (volume->sparse_cache != NULL)
1483		uds_invalidate_sparse_cache(volume->sparse_cache);
1484	if (volume->client != NULL)
1485		dm_bufio_client_destroy(vdo_forget(volume->client));
1486
1487	return uds_open_volume_bufio(layout, volume->geometry->bytes_per_page,
1488				     volume->reserved_buffers, &volume->client);
1489}
1490
1491static int __must_check initialize_page_cache(struct page_cache *cache,
1492					      const struct index_geometry *geometry,
1493					      u32 chapters_in_cache,
1494					      unsigned int zone_count)
1495{
1496	int result;
1497	u32 i;
1498
1499	cache->indexable_pages = geometry->pages_per_volume + 1;
1500	cache->cache_slots = chapters_in_cache * geometry->record_pages_per_chapter;
1501	cache->zone_count = zone_count;
1502	atomic64_set(&cache->clock, 1);
1503
1504	result = VDO_ASSERT((cache->cache_slots <= VOLUME_CACHE_MAX_ENTRIES),
1505			    "requested cache size, %u, within limit %u",
1506			    cache->cache_slots, VOLUME_CACHE_MAX_ENTRIES);
1507	if (result != VDO_SUCCESS)
1508		return result;
1509
1510	result = vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS, struct queued_read,
1511			      "volume read queue", &cache->read_queue);
1512	if (result != VDO_SUCCESS)
1513		return result;
1514
1515	result = vdo_allocate(cache->zone_count, struct search_pending_counter,
1516			      "Volume Cache Zones", &cache->search_pending_counters);
1517	if (result != VDO_SUCCESS)
1518		return result;
1519
1520	result = vdo_allocate(cache->indexable_pages, u16, "page cache index",
1521			      &cache->index);
1522	if (result != VDO_SUCCESS)
1523		return result;
1524
1525	result = vdo_allocate(cache->cache_slots, struct cached_page, "page cache cache",
1526			      &cache->cache);
1527	if (result != VDO_SUCCESS)
1528		return result;
1529
1530	/* Initialize index values to invalid values. */
1531	for (i = 0; i < cache->indexable_pages; i++)
1532		cache->index[i] = cache->cache_slots;
1533
1534	for (i = 0; i < cache->cache_slots; i++)
1535		clear_cache_page(cache, &cache->cache[i]);
1536
1537	return UDS_SUCCESS;
1538}
1539
1540int uds_make_volume(const struct uds_configuration *config, struct index_layout *layout,
1541		    struct volume **new_volume)
1542{
1543	unsigned int i;
1544	struct volume *volume = NULL;
1545	struct index_geometry *geometry;
1546	unsigned int reserved_buffers;
1547	int result;
1548
1549	result = vdo_allocate(1, struct volume, "volume", &volume);
1550	if (result != VDO_SUCCESS)
1551		return result;
1552
1553	volume->nonce = uds_get_volume_nonce(layout);
1554
1555	result = uds_copy_index_geometry(config->geometry, &volume->geometry);
1556	if (result != UDS_SUCCESS) {
1557		uds_free_volume(volume);
1558		return vdo_log_warning_strerror(result,
1559						"failed to allocate geometry: error");
1560	}
1561	geometry = volume->geometry;
1562
1563	/*
1564	 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one
1565	 * for each entry in the sparse cache.
1566	 */
1567	reserved_buffers = config->cache_chapters * geometry->record_pages_per_chapter;
1568	reserved_buffers += 1;
1569	if (uds_is_sparse_index_geometry(geometry))
1570		reserved_buffers += (config->cache_chapters * geometry->index_pages_per_chapter);
1571	volume->reserved_buffers = reserved_buffers;
1572	result = uds_open_volume_bufio(layout, geometry->bytes_per_page,
1573				       volume->reserved_buffers, &volume->client);
1574	if (result != UDS_SUCCESS) {
1575		uds_free_volume(volume);
1576		return result;
1577	}
1578
1579	result = uds_make_radix_sorter(geometry->records_per_page,
1580				       &volume->radix_sorter);
1581	if (result != UDS_SUCCESS) {
1582		uds_free_volume(volume);
1583		return result;
1584	}
1585
1586	result = vdo_allocate(geometry->records_per_page,
1587			      const struct uds_volume_record *, "record pointers",
1588			      &volume->record_pointers);
1589	if (result != VDO_SUCCESS) {
1590		uds_free_volume(volume);
1591		return result;
1592	}
1593
1594	if (uds_is_sparse_index_geometry(geometry)) {
1595		size_t page_size = sizeof(struct delta_index_page) + geometry->bytes_per_page;
1596
1597		result = uds_make_sparse_cache(geometry, config->cache_chapters,
1598					       config->zone_count,
1599					       &volume->sparse_cache);
1600		if (result != UDS_SUCCESS) {
1601			uds_free_volume(volume);
1602			return result;
1603		}
1604
1605		volume->cache_size =
1606			page_size * geometry->index_pages_per_chapter * config->cache_chapters;
1607	}
1608
1609	result = initialize_page_cache(&volume->page_cache, geometry,
1610				       config->cache_chapters, config->zone_count);
1611	if (result != UDS_SUCCESS) {
1612		uds_free_volume(volume);
1613		return result;
1614	}
1615
1616	volume->cache_size += volume->page_cache.cache_slots * sizeof(struct delta_index_page);
1617	result = uds_make_index_page_map(geometry, &volume->index_page_map);
1618	if (result != UDS_SUCCESS) {
1619		uds_free_volume(volume);
1620		return result;
1621	}
1622
1623	mutex_init(&volume->read_threads_mutex);
1624	uds_init_cond(&volume->read_threads_read_done_cond);
1625	uds_init_cond(&volume->read_threads_cond);
1626
1627	result = vdo_allocate(config->read_threads, struct thread *, "reader threads",
1628			      &volume->reader_threads);
1629	if (result != VDO_SUCCESS) {
1630		uds_free_volume(volume);
1631		return result;
1632	}
1633
1634	for (i = 0; i < config->read_threads; i++) {
1635		result = vdo_create_thread(read_thread_function, (void *) volume,
1636					   "reader", &volume->reader_threads[i]);
1637		if (result != VDO_SUCCESS) {
1638			uds_free_volume(volume);
1639			return result;
1640		}
1641
1642		volume->read_thread_count = i + 1;
1643	}
1644
1645	*new_volume = volume;
1646	return UDS_SUCCESS;
1647}
1648
1649static void uninitialize_page_cache(struct page_cache *cache)
1650{
1651	u16 i;
1652
1653	if (cache->cache != NULL) {
1654		for (i = 0; i < cache->cache_slots; i++)
1655			release_page_buffer(&cache->cache[i]);
1656	}
1657	vdo_free(cache->index);
1658	vdo_free(cache->cache);
1659	vdo_free(cache->search_pending_counters);
1660	vdo_free(cache->read_queue);
1661}
1662
1663void uds_free_volume(struct volume *volume)
1664{
1665	if (volume == NULL)
1666		return;
1667
1668	if (volume->reader_threads != NULL) {
1669		unsigned int i;
1670
1671		/* This works even if some threads weren't started. */
1672		mutex_lock(&volume->read_threads_mutex);
1673		volume->read_threads_exiting = true;
1674		uds_broadcast_cond(&volume->read_threads_cond);
1675		mutex_unlock(&volume->read_threads_mutex);
1676		for (i = 0; i < volume->read_thread_count; i++)
1677			vdo_join_threads(volume->reader_threads[i]);
1678		vdo_free(volume->reader_threads);
1679		volume->reader_threads = NULL;
1680	}
1681
1682	/* Must destroy the client AFTER freeing the cached pages. */
1683	uninitialize_page_cache(&volume->page_cache);
1684	uds_free_sparse_cache(volume->sparse_cache);
1685	if (volume->client != NULL)
1686		dm_bufio_client_destroy(vdo_forget(volume->client));
1687
1688	uds_free_index_page_map(volume->index_page_map);
1689	uds_free_radix_sorter(volume->radix_sorter);
1690	vdo_free(volume->geometry);
1691	vdo_free(volume->record_pointers);
1692	vdo_free(volume);
1693}
1694