1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6
7#include "index.h"
8
9#include "logger.h"
10#include "memory-alloc.h"
11
12#include "funnel-requestqueue.h"
13#include "hash-utils.h"
14#include "sparse-cache.h"
15
16static const u64 NO_LAST_SAVE = U64_MAX;
17
18/*
19 * When searching for deduplication records, the index first searches the volume index, and then
20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
22 * committed (either the open chapter or a recently closed one), the index searches the in-memory
23 * representation of the chapter. Finally, if the volume index does not find a record and the index
24 * is sparse, the index will search the sparse cache.
25 *
26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the
27 * chapter writer, and sparse cache barrier messages for the sparse cache.
28 *
29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can
30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up
31 * its available space in a chapter, it informs the chapter writer that the chapter is complete,
32 * and also informs all other zones that it has closed the chapter. Each other zone will then close
33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
35 *
36 * The last zone to close the chapter also removes the oldest chapter from the volume index.
37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
38 * means that those zones will never ask the volume index about it. No zone is allowed to get more
39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
40 * chapter before the previous one has been closed by all zones, it is forced to wait.
41 *
42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
45 * paused its operations, the cache membership is changed and each zone is then informed that it
46 * can proceed. More details can be found in the sparse cache documentation.
47 *
48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
49 * barrier message to change the sparse cache membership, so the index simulates the message by
50 * invoking the handler directly.
51 */
52
53struct chapter_writer {
54	/* The index to which we belong */
55	struct uds_index *index;
56	/* The thread to do the writing */
57	struct thread *thread;
58	/* The lock protecting the following fields */
59	struct mutex mutex;
60	/* The condition signalled on state changes */
61	struct cond_var cond;
62	/* Set to true to stop the thread */
63	bool stop;
64	/* The result from the most recent write */
65	int result;
66	/* The number of bytes allocated by the chapter writer */
67	size_t memory_size;
68	/* The number of zones which have submitted a chapter for writing */
69	unsigned int zones_to_write;
70	/* Open chapter index used by uds_close_open_chapter() */
71	struct open_chapter_index *open_chapter_index;
72	/* Collated records used by uds_close_open_chapter() */
73	struct uds_volume_record *collated_records;
74	/* The chapters to write (one per zone) */
75	struct open_chapter_zone *chapters[];
76};
77
78static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
79{
80	return uds_is_chapter_sparse(zone->index->volume->geometry,
81				     zone->oldest_virtual_chapter,
82				     zone->newest_virtual_chapter, virtual_chapter);
83}
84
85static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
86			       struct uds_index *index)
87{
88	int result;
89	struct uds_request *request;
90
91	result = vdo_allocate(1, struct uds_request, __func__, &request);
92	if (result != VDO_SUCCESS)
93		return result;
94
95	request->index = index;
96	request->unbatched = true;
97	request->zone_number = zone;
98	request->zone_message = message;
99
100	uds_enqueue_request(request, STAGE_MESSAGE);
101	return UDS_SUCCESS;
102}
103
104static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
105{
106	struct uds_zone_message message = {
107		.type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
108		.virtual_chapter = virtual_chapter,
109	};
110	unsigned int zone;
111
112	for (zone = 0; zone < index->zone_count; zone++) {
113		int result = launch_zone_message(message, zone, index);
114
115		VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
116	}
117}
118
119/*
120 * Determine whether this request should trigger a sparse cache barrier message to change the
121 * membership of the sparse cache. If a change in membership is desired, the function returns the
122 * chapter number to add.
123 */
124static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
125{
126	u64 virtual_chapter;
127	struct index_zone *zone;
128
129	virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
130						       &request->record_name);
131	if (virtual_chapter == NO_CHAPTER)
132		return NO_CHAPTER;
133
134	zone = index->zones[request->zone_number];
135	if (!is_zone_chapter_sparse(zone, virtual_chapter))
136		return NO_CHAPTER;
137
138	/*
139	 * FIXME: Optimize for a common case by remembering the chapter from the most recent
140	 * barrier message and skipping this chapter if is it the same.
141	 */
142
143	return virtual_chapter;
144}
145
146/*
147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
149 * of index does nothing here.
150 */
151static int simulate_index_zone_barrier_message(struct index_zone *zone,
152					       struct uds_request *request)
153{
154	u64 sparse_virtual_chapter;
155
156	if ((zone->index->zone_count > 1) ||
157	    !uds_is_sparse_index_geometry(zone->index->volume->geometry))
158		return UDS_SUCCESS;
159
160	sparse_virtual_chapter = triage_index_request(zone->index, request);
161	if (sparse_virtual_chapter == NO_CHAPTER)
162		return UDS_SUCCESS;
163
164	return uds_update_sparse_cache(zone, sparse_virtual_chapter);
165}
166
167/* This is the request processing function for the triage queue. */
168static void triage_request(struct uds_request *request)
169{
170	struct uds_index *index = request->index;
171	u64 sparse_virtual_chapter = triage_index_request(index, request);
172
173	if (sparse_virtual_chapter != NO_CHAPTER)
174		enqueue_barrier_messages(index, sparse_virtual_chapter);
175
176	uds_enqueue_request(request, STAGE_INDEX);
177}
178
179static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
180{
181	int result;
182	struct chapter_writer *writer = index->chapter_writer;
183
184	mutex_lock(&writer->mutex);
185	while (index->newest_virtual_chapter < current_chapter_number)
186		uds_wait_cond(&writer->cond, &writer->mutex);
187	result = writer->result;
188	mutex_unlock(&writer->mutex);
189
190	if (result != UDS_SUCCESS)
191		return vdo_log_error_strerror(result,
192					      "Writing of previous open chapter failed");
193
194	return UDS_SUCCESS;
195}
196
197static int swap_open_chapter(struct index_zone *zone)
198{
199	int result;
200	struct open_chapter_zone *temporary_chapter;
201
202	result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
203	if (result != UDS_SUCCESS)
204		return result;
205
206	temporary_chapter = zone->open_chapter;
207	zone->open_chapter = zone->writing_chapter;
208	zone->writing_chapter = temporary_chapter;
209	return UDS_SUCCESS;
210}
211
212/*
213 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
214 * writing until all zones have closed it.
215 */
216static unsigned int start_closing_chapter(struct uds_index *index,
217					  unsigned int zone_number,
218					  struct open_chapter_zone *chapter)
219{
220	unsigned int finished_zones;
221	struct chapter_writer *writer = index->chapter_writer;
222
223	mutex_lock(&writer->mutex);
224	finished_zones = ++writer->zones_to_write;
225	writer->chapters[zone_number] = chapter;
226	uds_broadcast_cond(&writer->cond);
227	mutex_unlock(&writer->mutex);
228
229	return finished_zones;
230}
231
232static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
233{
234	int result;
235	unsigned int i;
236	struct uds_zone_message zone_message = {
237		.type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
238		.virtual_chapter = closed_chapter,
239	};
240
241	for (i = 0; i < zone->index->zone_count; i++) {
242		if (zone->id == i)
243			continue;
244
245		result = launch_zone_message(zone_message, i, zone->index);
246		if (result != UDS_SUCCESS)
247			return result;
248	}
249
250	return UDS_SUCCESS;
251}
252
253static int open_next_chapter(struct index_zone *zone)
254{
255	int result;
256	u64 closed_chapter;
257	u64 expiring;
258	unsigned int finished_zones;
259	u32 expire_chapters;
260
261	vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
262		      (unsigned long long) zone->newest_virtual_chapter, zone->id,
263		      zone->open_chapter->size,
264		      zone->open_chapter->capacity - zone->open_chapter->size);
265
266	result = swap_open_chapter(zone);
267	if (result != UDS_SUCCESS)
268		return result;
269
270	closed_chapter = zone->newest_virtual_chapter++;
271	uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
272					       zone->newest_virtual_chapter);
273	uds_reset_open_chapter(zone->open_chapter);
274
275	finished_zones = start_closing_chapter(zone->index, zone->id,
276					       zone->writing_chapter);
277	if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
278		result = announce_chapter_closed(zone, closed_chapter);
279		if (result != UDS_SUCCESS)
280			return result;
281	}
282
283	expiring = zone->oldest_virtual_chapter;
284	expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
285						 zone->newest_virtual_chapter);
286	zone->oldest_virtual_chapter += expire_chapters;
287
288	if (finished_zones < zone->index->zone_count)
289		return UDS_SUCCESS;
290
291	while (expire_chapters-- > 0)
292		uds_forget_chapter(zone->index->volume, expiring++);
293
294	return UDS_SUCCESS;
295}
296
297static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
298{
299	if (zone->newest_virtual_chapter == virtual_chapter)
300		return open_next_chapter(zone);
301
302	return UDS_SUCCESS;
303}
304
305static int dispatch_index_zone_control_request(struct uds_request *request)
306{
307	struct uds_zone_message *message = &request->zone_message;
308	struct index_zone *zone = request->index->zones[request->zone_number];
309
310	switch (message->type) {
311	case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
312		return uds_update_sparse_cache(zone, message->virtual_chapter);
313
314	case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
315		return handle_chapter_closed(zone, message->virtual_chapter);
316
317	default:
318		vdo_log_error("invalid message type: %d", message->type);
319		return UDS_INVALID_ARGUMENT;
320	}
321}
322
323static void set_request_location(struct uds_request *request,
324				 enum uds_index_region new_location)
325{
326	request->location = new_location;
327	request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
328			  (new_location == UDS_LOCATION_IN_DENSE) ||
329			  (new_location == UDS_LOCATION_IN_SPARSE));
330}
331
332static void set_chapter_location(struct uds_request *request,
333				 const struct index_zone *zone, u64 virtual_chapter)
334{
335	request->found = true;
336	if (virtual_chapter == zone->newest_virtual_chapter)
337		request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
338	else if (is_zone_chapter_sparse(zone, virtual_chapter))
339		request->location = UDS_LOCATION_IN_SPARSE;
340	else
341		request->location = UDS_LOCATION_IN_DENSE;
342}
343
344static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
345				       u64 virtual_chapter, bool *found)
346{
347	int result;
348	struct volume *volume;
349	u16 record_page_number;
350	u32 chapter;
351
352	result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
353					 &record_page_number);
354	if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
355		return result;
356
357	request->virtual_chapter = virtual_chapter;
358	volume = zone->index->volume;
359	chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
360	return uds_search_cached_record_page(volume, request, chapter,
361					     record_page_number, found);
362}
363
364static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
365				bool *found)
366{
367	struct volume *volume;
368
369	if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
370		*found = true;
371		return UDS_SUCCESS;
372	} else if (request->location == UDS_LOCATION_UNAVAILABLE) {
373		*found = false;
374		return UDS_SUCCESS;
375	}
376
377	if (request->virtual_chapter == zone->newest_virtual_chapter) {
378		uds_search_open_chapter(zone->open_chapter, &request->record_name,
379					&request->old_metadata, found);
380		return UDS_SUCCESS;
381	}
382
383	if ((zone->newest_virtual_chapter > 0) &&
384	    (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
385	    (zone->writing_chapter->size > 0)) {
386		uds_search_open_chapter(zone->writing_chapter, &request->record_name,
387					&request->old_metadata, found);
388		return UDS_SUCCESS;
389	}
390
391	volume = zone->index->volume;
392	if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
393	    uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
394				      request->zone_number))
395		return search_sparse_cache_in_zone(zone, request,
396						   request->virtual_chapter, found);
397
398	return uds_search_volume_page_cache(volume, request, found);
399}
400
401static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
402			      const struct uds_record_data *metadata)
403{
404	unsigned int remaining;
405
406	remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
407					 metadata);
408	if (remaining == 0)
409		return open_next_chapter(zone);
410
411	return UDS_SUCCESS;
412}
413
414static int search_index_zone(struct index_zone *zone, struct uds_request *request)
415{
416	int result;
417	struct volume_index_record record;
418	bool overflow_record, found = false;
419	struct uds_record_data *metadata;
420	u64 chapter;
421
422	result = uds_get_volume_index_record(zone->index->volume_index,
423					     &request->record_name, &record);
424	if (result != UDS_SUCCESS)
425		return result;
426
427	if (record.is_found) {
428		if (request->requeued && request->virtual_chapter != record.virtual_chapter)
429			set_request_location(request, UDS_LOCATION_UNKNOWN);
430
431		request->virtual_chapter = record.virtual_chapter;
432		result = get_record_from_zone(zone, request, &found);
433		if (result != UDS_SUCCESS)
434			return result;
435	}
436
437	if (found)
438		set_chapter_location(request, zone, record.virtual_chapter);
439
440	/*
441	 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
442	 * one chapter and collided with an existing record), it will exist as a collision record
443	 * in the volume index, but we won't find it in the volume. This case needs special
444	 * handling.
445	 */
446	overflow_record = (record.is_found && record.is_collision && !found);
447	chapter = zone->newest_virtual_chapter;
448	if (found || overflow_record) {
449		if ((request->type == UDS_QUERY_NO_UPDATE) ||
450		    ((request->type == UDS_QUERY) && overflow_record)) {
451			/* There is nothing left to do. */
452			return UDS_SUCCESS;
453		}
454
455		if (record.virtual_chapter != chapter) {
456			/*
457			 * Update the volume index to reference the new chapter for the block. If
458			 * the record had been deleted or dropped from the chapter index, it will
459			 * be back.
460			 */
461			result = uds_set_volume_index_record_chapter(&record, chapter);
462		} else if (request->type != UDS_UPDATE) {
463			/* The record is already in the open chapter. */
464			return UDS_SUCCESS;
465		}
466	} else {
467		/*
468		 * The record wasn't in the volume index, so check whether the
469		 * name is in a cached sparse chapter. If we found the name on
470		 * a previous search, use that result instead.
471		 */
472		if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
473			found = true;
474		} else if (request->location == UDS_LOCATION_UNAVAILABLE) {
475			found = false;
476		} else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) &&
477			   !uds_is_volume_index_sample(zone->index->volume_index,
478						       &request->record_name)) {
479			result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
480							     &found);
481			if (result != UDS_SUCCESS)
482				return result;
483		}
484
485		if (found)
486			set_request_location(request, UDS_LOCATION_IN_SPARSE);
487
488		if ((request->type == UDS_QUERY_NO_UPDATE) ||
489		    ((request->type == UDS_QUERY) && !found)) {
490			/* There is nothing left to do. */
491			return UDS_SUCCESS;
492		}
493
494		/*
495		 * Add a new entry to the volume index referencing the open chapter. This needs to
496		 * be done both for new records, and for records from cached sparse chapters.
497		 */
498		result = uds_put_volume_index_record(&record, chapter);
499	}
500
501	if (result == UDS_OVERFLOW) {
502		/*
503		 * The volume index encountered a delta list overflow. The condition was already
504		 * logged. We will go on without adding the record to the open chapter.
505		 */
506		return UDS_SUCCESS;
507	}
508
509	if (result != UDS_SUCCESS)
510		return result;
511
512	if (!found || (request->type == UDS_UPDATE)) {
513		/* This is a new record or we're updating an existing record. */
514		metadata = &request->new_metadata;
515	} else {
516		/* Move the existing record to the open chapter. */
517		metadata = &request->old_metadata;
518	}
519
520	return put_record_in_zone(zone, request, metadata);
521}
522
523static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
524{
525	int result;
526	struct volume_index_record record;
527
528	result = uds_get_volume_index_record(zone->index->volume_index,
529					     &request->record_name, &record);
530	if (result != UDS_SUCCESS)
531		return result;
532
533	if (!record.is_found)
534		return UDS_SUCCESS;
535
536	/* If the request was requeued, check whether the saved state is still valid. */
537
538	if (record.is_collision) {
539		set_chapter_location(request, zone, record.virtual_chapter);
540	} else {
541		/* Non-collision records are hints, so resolve the name in the chapter. */
542		bool found;
543
544		if (request->requeued && request->virtual_chapter != record.virtual_chapter)
545			set_request_location(request, UDS_LOCATION_UNKNOWN);
546
547		request->virtual_chapter = record.virtual_chapter;
548		result = get_record_from_zone(zone, request, &found);
549		if (result != UDS_SUCCESS)
550			return result;
551
552		if (!found) {
553			/* There is no record to remove. */
554			return UDS_SUCCESS;
555		}
556	}
557
558	set_chapter_location(request, zone, record.virtual_chapter);
559
560	/*
561	 * Delete the volume index entry for the named record only. Note that a later search might
562	 * later return stale advice if there is a colliding name in the same chapter, but it's a
563	 * very rare case (1 in 2^21).
564	 */
565	result = uds_remove_volume_index_record(&record);
566	if (result != UDS_SUCCESS)
567		return result;
568
569	/*
570	 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
571	 * trouble if the record is added again later.
572	 */
573	if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
574		uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
575
576	return UDS_SUCCESS;
577}
578
579static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
580{
581	int result;
582	struct index_zone *zone = index->zones[request->zone_number];
583
584	if (!request->requeued) {
585		result = simulate_index_zone_barrier_message(zone, request);
586		if (result != UDS_SUCCESS)
587			return result;
588	}
589
590	switch (request->type) {
591	case UDS_POST:
592	case UDS_UPDATE:
593	case UDS_QUERY:
594	case UDS_QUERY_NO_UPDATE:
595		result = search_index_zone(zone, request);
596		break;
597
598	case UDS_DELETE:
599		result = remove_from_index_zone(zone, request);
600		break;
601
602	default:
603		result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
604						  "invalid request type: %d",
605						  request->type);
606		break;
607	}
608
609	return result;
610}
611
612/* This is the request processing function invoked by each zone's thread. */
613static void execute_zone_request(struct uds_request *request)
614{
615	int result;
616	struct uds_index *index = request->index;
617
618	if (request->zone_message.type != UDS_MESSAGE_NONE) {
619		result = dispatch_index_zone_control_request(request);
620		if (result != UDS_SUCCESS) {
621			vdo_log_error_strerror(result, "error executing message: %d",
622					       request->zone_message.type);
623		}
624
625		/* Once the message is processed it can be freed. */
626		vdo_free(vdo_forget(request));
627		return;
628	}
629
630	index->need_to_save = true;
631	if (request->requeued && (request->status != UDS_SUCCESS)) {
632		set_request_location(request, UDS_LOCATION_UNAVAILABLE);
633		index->callback(request);
634		return;
635	}
636
637	result = dispatch_index_request(index, request);
638	if (result == UDS_QUEUED) {
639		/* The request has been requeued so don't let it complete. */
640		return;
641	}
642
643	if (!request->found)
644		set_request_location(request, UDS_LOCATION_UNAVAILABLE);
645
646	request->status = result;
647	index->callback(request);
648}
649
650static int initialize_index_queues(struct uds_index *index,
651				   const struct index_geometry *geometry)
652{
653	int result;
654	unsigned int i;
655
656	for (i = 0; i < index->zone_count; i++) {
657		result = uds_make_request_queue("indexW", &execute_zone_request,
658						&index->zone_queues[i]);
659		if (result != UDS_SUCCESS)
660			return result;
661	}
662
663	/* The triage queue is only needed for sparse multi-zone indexes. */
664	if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) {
665		result = uds_make_request_queue("triageW", &triage_request,
666						&index->triage_queue);
667		if (result != UDS_SUCCESS)
668			return result;
669	}
670
671	return UDS_SUCCESS;
672}
673
674/* This is the driver function for the chapter writer thread. */
675static void close_chapters(void *arg)
676{
677	int result;
678	struct chapter_writer *writer = arg;
679	struct uds_index *index = writer->index;
680
681	vdo_log_debug("chapter writer starting");
682	mutex_lock(&writer->mutex);
683	for (;;) {
684		while (writer->zones_to_write < index->zone_count) {
685			if (writer->stop && (writer->zones_to_write == 0)) {
686				/*
687				 * We've been told to stop, and all of the zones are in the same
688				 * open chapter, so we can exit now.
689				 */
690				mutex_unlock(&writer->mutex);
691				vdo_log_debug("chapter writer stopping");
692				return;
693			}
694			uds_wait_cond(&writer->cond, &writer->mutex);
695		}
696
697		/*
698		 * Release the lock while closing a chapter. We probably don't need to do this, but
699		 * it seems safer in principle. It's OK to access the chapter and chapter_number
700		 * fields without the lock since those aren't allowed to change until we're done.
701		 */
702		mutex_unlock(&writer->mutex);
703
704		if (index->has_saved_open_chapter) {
705			/*
706			 * Remove the saved open chapter the first time we close an open chapter
707			 * after loading from a clean shutdown, or after doing a clean save. The
708			 * lack of the saved open chapter will indicate that a recovery is
709			 * necessary.
710			 */
711			index->has_saved_open_chapter = false;
712			result = uds_discard_open_chapter(index->layout);
713			if (result == UDS_SUCCESS)
714				vdo_log_debug("Discarding saved open chapter");
715		}
716
717		result = uds_close_open_chapter(writer->chapters, index->zone_count,
718						index->volume,
719						writer->open_chapter_index,
720						writer->collated_records,
721						index->newest_virtual_chapter);
722
723		mutex_lock(&writer->mutex);
724		index->newest_virtual_chapter++;
725		index->oldest_virtual_chapter +=
726			uds_chapters_to_expire(index->volume->geometry,
727					       index->newest_virtual_chapter);
728		writer->result = result;
729		writer->zones_to_write = 0;
730		uds_broadcast_cond(&writer->cond);
731	}
732}
733
734static void stop_chapter_writer(struct chapter_writer *writer)
735{
736	struct thread *writer_thread = NULL;
737
738	mutex_lock(&writer->mutex);
739	if (writer->thread != NULL) {
740		writer_thread = writer->thread;
741		writer->thread = NULL;
742		writer->stop = true;
743		uds_broadcast_cond(&writer->cond);
744	}
745	mutex_unlock(&writer->mutex);
746
747	if (writer_thread != NULL)
748		vdo_join_threads(writer_thread);
749}
750
751static void free_chapter_writer(struct chapter_writer *writer)
752{
753	if (writer == NULL)
754		return;
755
756	stop_chapter_writer(writer);
757	uds_free_open_chapter_index(writer->open_chapter_index);
758	vdo_free(writer->collated_records);
759	vdo_free(writer);
760}
761
762static int make_chapter_writer(struct uds_index *index,
763			       struct chapter_writer **writer_ptr)
764{
765	int result;
766	struct chapter_writer *writer;
767	size_t collated_records_size =
768		(sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
769
770	result = vdo_allocate_extended(struct chapter_writer, index->zone_count,
771				       struct open_chapter_zone *, "Chapter Writer",
772				       &writer);
773	if (result != VDO_SUCCESS)
774		return result;
775
776	writer->index = index;
777	mutex_init(&writer->mutex);
778	uds_init_cond(&writer->cond);
779
780	result = vdo_allocate_cache_aligned(collated_records_size, "collated records",
781					    &writer->collated_records);
782	if (result != VDO_SUCCESS) {
783		free_chapter_writer(writer);
784		return result;
785	}
786
787	result = uds_make_open_chapter_index(&writer->open_chapter_index,
788					     index->volume->geometry,
789					     index->volume->nonce);
790	if (result != UDS_SUCCESS) {
791		free_chapter_writer(writer);
792		return result;
793	}
794
795	writer->memory_size = (sizeof(struct chapter_writer) +
796			       index->zone_count * sizeof(struct open_chapter_zone *) +
797			       collated_records_size +
798			       writer->open_chapter_index->memory_size);
799
800	result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread);
801	if (result != VDO_SUCCESS) {
802		free_chapter_writer(writer);
803		return result;
804	}
805
806	*writer_ptr = writer;
807	return UDS_SUCCESS;
808}
809
810static int load_index(struct uds_index *index)
811{
812	int result;
813	u64 last_save_chapter;
814
815	result = uds_load_index_state(index->layout, index);
816	if (result != UDS_SUCCESS)
817		return UDS_INDEX_NOT_SAVED_CLEANLY;
818
819	last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
820
821	vdo_log_info("loaded index from chapter %llu through chapter %llu",
822		     (unsigned long long) index->oldest_virtual_chapter,
823		     (unsigned long long) last_save_chapter);
824
825	return UDS_SUCCESS;
826}
827
828static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
829{
830	int result;
831	struct delta_index_page *chapter_index_page;
832	struct index_geometry *geometry = index->volume->geometry;
833	u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
834	u32 expected_list_number = 0;
835	u32 index_page_number;
836	u32 lowest_delta_list;
837	u32 highest_delta_list;
838
839	for (index_page_number = 0;
840	     index_page_number < geometry->index_pages_per_chapter;
841	     index_page_number++) {
842		result = uds_get_volume_index_page(index->volume, chapter,
843						   index_page_number,
844						   &chapter_index_page);
845		if (result != UDS_SUCCESS) {
846			return vdo_log_error_strerror(result,
847						      "failed to read index page %u in chapter %u",
848						      index_page_number, chapter);
849		}
850
851		lowest_delta_list = chapter_index_page->lowest_list_number;
852		highest_delta_list = chapter_index_page->highest_list_number;
853		if (lowest_delta_list != expected_list_number) {
854			return vdo_log_error_strerror(UDS_CORRUPT_DATA,
855						      "chapter %u index page %u is corrupt",
856						      chapter, index_page_number);
857		}
858
859		uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
860					  index_page_number, highest_delta_list);
861		expected_list_number = highest_delta_list + 1;
862	}
863
864	return UDS_SUCCESS;
865}
866
867static int replay_record(struct uds_index *index, const struct uds_record_name *name,
868			 u64 virtual_chapter, bool will_be_sparse_chapter)
869{
870	int result;
871	struct volume_index_record record;
872	bool update_record;
873
874	if (will_be_sparse_chapter &&
875	    !uds_is_volume_index_sample(index->volume_index, name)) {
876		/*
877		 * This entry will be in a sparse chapter after the rebuild completes, and it is
878		 * not a sample, so just skip over it.
879		 */
880		return UDS_SUCCESS;
881	}
882
883	result = uds_get_volume_index_record(index->volume_index, name, &record);
884	if (result != UDS_SUCCESS)
885		return result;
886
887	if (record.is_found) {
888		if (record.is_collision) {
889			if (record.virtual_chapter == virtual_chapter) {
890				/* The record is already correct. */
891				return UDS_SUCCESS;
892			}
893
894			update_record = true;
895		} else if (record.virtual_chapter == virtual_chapter) {
896			/*
897			 * There is a volume index entry pointing to the current chapter, but we
898			 * don't know if it is for the same name as the one we are currently
899			 * working on or not. For now, we're just going to assume that it isn't.
900			 * This will create one extra collision record if there was a deleted
901			 * record in the current chapter.
902			 */
903			update_record = false;
904		} else {
905			/*
906			 * If we're rebuilding, we don't normally want to go to disk to see if the
907			 * record exists, since we will likely have just read the record from disk
908			 * (i.e. we know it's there). The exception to this is when we find an
909			 * entry in the volume index that has a different chapter. In this case, we
910			 * need to search that chapter to determine if the volume index entry was
911			 * for the same record or a different one.
912			 */
913			result = uds_search_volume_page_cache_for_rebuild(index->volume,
914									  name,
915									  record.virtual_chapter,
916									  &update_record);
917			if (result != UDS_SUCCESS)
918				return result;
919			}
920	} else {
921		update_record = false;
922	}
923
924	if (update_record) {
925		/*
926		 * Update the volume index to reference the new chapter for the block. If the
927		 * record had been deleted or dropped from the chapter index, it will be back.
928		 */
929		result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
930	} else {
931		/*
932		 * Add a new entry to the volume index referencing the open chapter. This should be
933		 * done regardless of whether we are a brand new record or a sparse record, i.e.
934		 * one that doesn't exist in the index but does on disk, since for a sparse record,
935		 * we would want to un-sparsify if it did exist.
936		 */
937		result = uds_put_volume_index_record(&record, virtual_chapter);
938	}
939
940	if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
941		/* The rebuilt index will lose these records. */
942		return UDS_SUCCESS;
943	}
944
945	return result;
946}
947
948static bool check_for_suspend(struct uds_index *index)
949{
950	bool closing;
951
952	if (index->load_context == NULL)
953		return false;
954
955	mutex_lock(&index->load_context->mutex);
956	if (index->load_context->status != INDEX_SUSPENDING) {
957		mutex_unlock(&index->load_context->mutex);
958		return false;
959	}
960
961	/* Notify that we are suspended and wait for the resume. */
962	index->load_context->status = INDEX_SUSPENDED;
963	uds_broadcast_cond(&index->load_context->cond);
964
965	while ((index->load_context->status != INDEX_OPENING) &&
966	       (index->load_context->status != INDEX_FREEING))
967		uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
968
969	closing = (index->load_context->status == INDEX_FREEING);
970	mutex_unlock(&index->load_context->mutex);
971	return closing;
972}
973
974static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
975{
976	int result;
977	u32 i;
978	u32 j;
979	const struct index_geometry *geometry;
980	u32 physical_chapter;
981
982	if (check_for_suspend(index)) {
983		vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
984			     (unsigned long long) virtual);
985		return -EBUSY;
986	}
987
988	geometry = index->volume->geometry;
989	physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
990	uds_prefetch_volume_chapter(index->volume, physical_chapter);
991	uds_set_volume_index_open_chapter(index->volume_index, virtual);
992
993	result = rebuild_index_page_map(index, virtual);
994	if (result != UDS_SUCCESS) {
995		return vdo_log_error_strerror(result,
996					      "could not rebuild index page map for chapter %u",
997					      physical_chapter);
998	}
999
1000	for (i = 0; i < geometry->record_pages_per_chapter; i++) {
1001		u8 *record_page;
1002		u32 record_page_number;
1003
1004		record_page_number = geometry->index_pages_per_chapter + i;
1005		result = uds_get_volume_record_page(index->volume, physical_chapter,
1006						    record_page_number, &record_page);
1007		if (result != UDS_SUCCESS) {
1008			return vdo_log_error_strerror(result, "could not get page %d",
1009						      record_page_number);
1010		}
1011
1012		for (j = 0; j < geometry->records_per_page; j++) {
1013			const u8 *name_bytes;
1014			struct uds_record_name name;
1015
1016			name_bytes = record_page + (j * BYTES_PER_RECORD);
1017			memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1018			result = replay_record(index, &name, virtual, sparse);
1019			if (result != UDS_SUCCESS)
1020				return result;
1021		}
1022	}
1023
1024	return UDS_SUCCESS;
1025}
1026
1027static int replay_volume(struct uds_index *index)
1028{
1029	int result;
1030	u64 old_map_update;
1031	u64 new_map_update;
1032	u64 virtual;
1033	u64 from_virtual = index->oldest_virtual_chapter;
1034	u64 upto_virtual = index->newest_virtual_chapter;
1035	bool will_be_sparse;
1036
1037	vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
1038		     (unsigned long long) from_virtual,
1039		     (unsigned long long) upto_virtual);
1040
1041	/*
1042	 * The index failed to load, so the volume index is empty. Add records to the volume index
1043	 * in order, skipping non-hooks in chapters which will be sparse to save time.
1044	 *
1045	 * Go through each record page of each chapter and add the records back to the volume
1046	 * index. This should not cause anything to be written to either the open chapter or the
1047	 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1048	 * would have already been purged from the volume index when the chapter was opened.
1049	 *
1050	 * Also, go through each index page for each chapter and rebuild the index page map.
1051	 */
1052	old_map_update = index->volume->index_page_map->last_update;
1053	for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1054		will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1055						       from_virtual, upto_virtual,
1056						       virtual);
1057		result = replay_chapter(index, virtual, will_be_sparse);
1058		if (result != UDS_SUCCESS)
1059			return result;
1060	}
1061
1062	/* Also reap the chapter being replaced by the open chapter. */
1063	uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1064
1065	new_map_update = index->volume->index_page_map->last_update;
1066	if (new_map_update != old_map_update) {
1067		vdo_log_info("replay changed index page map update from %llu to %llu",
1068			     (unsigned long long) old_map_update,
1069			     (unsigned long long) new_map_update);
1070	}
1071
1072	return UDS_SUCCESS;
1073}
1074
1075static int rebuild_index(struct uds_index *index)
1076{
1077	int result;
1078	u64 lowest;
1079	u64 highest;
1080	bool is_empty = false;
1081	u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1082
1083	index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1084	result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1085						    &is_empty);
1086	if (result != UDS_SUCCESS) {
1087		return vdo_log_fatal_strerror(result,
1088					      "cannot rebuild index: unknown volume chapter boundaries");
1089	}
1090
1091	if (is_empty) {
1092		index->newest_virtual_chapter = 0;
1093		index->oldest_virtual_chapter = 0;
1094		index->volume->lookup_mode = LOOKUP_NORMAL;
1095		return UDS_SUCCESS;
1096	}
1097
1098	index->newest_virtual_chapter = highest + 1;
1099	index->oldest_virtual_chapter = lowest;
1100	if (index->newest_virtual_chapter ==
1101	    (index->oldest_virtual_chapter + chapters_per_volume)) {
1102		/* Skip the chapter shadowed by the open chapter. */
1103		index->oldest_virtual_chapter++;
1104	}
1105
1106	result = replay_volume(index);
1107	if (result != UDS_SUCCESS)
1108		return result;
1109
1110	index->volume->lookup_mode = LOOKUP_NORMAL;
1111	return UDS_SUCCESS;
1112}
1113
1114static void free_index_zone(struct index_zone *zone)
1115{
1116	if (zone == NULL)
1117		return;
1118
1119	uds_free_open_chapter(zone->open_chapter);
1120	uds_free_open_chapter(zone->writing_chapter);
1121	vdo_free(zone);
1122}
1123
1124static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1125{
1126	int result;
1127	struct index_zone *zone;
1128
1129	result = vdo_allocate(1, struct index_zone, "index zone", &zone);
1130	if (result != VDO_SUCCESS)
1131		return result;
1132
1133	result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1134				       &zone->open_chapter);
1135	if (result != UDS_SUCCESS) {
1136		free_index_zone(zone);
1137		return result;
1138	}
1139
1140	result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1141				       &zone->writing_chapter);
1142	if (result != UDS_SUCCESS) {
1143		free_index_zone(zone);
1144		return result;
1145	}
1146
1147	zone->index = index;
1148	zone->id = zone_number;
1149	index->zones[zone_number] = zone;
1150
1151	return UDS_SUCCESS;
1152}
1153
1154int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type,
1155		   struct index_load_context *load_context, index_callback_fn callback,
1156		   struct uds_index **new_index)
1157{
1158	int result;
1159	bool loaded = false;
1160	bool new = (open_type == UDS_CREATE);
1161	struct uds_index *index = NULL;
1162	struct index_zone *zone;
1163	u64 nonce;
1164	unsigned int z;
1165
1166	result = vdo_allocate_extended(struct uds_index, config->zone_count,
1167				       struct uds_request_queue *, "index", &index);
1168	if (result != VDO_SUCCESS)
1169		return result;
1170
1171	index->zone_count = config->zone_count;
1172
1173	result = uds_make_index_layout(config, new, &index->layout);
1174	if (result != UDS_SUCCESS) {
1175		uds_free_index(index);
1176		return result;
1177	}
1178
1179	result = vdo_allocate(index->zone_count, struct index_zone *, "zones",
1180			      &index->zones);
1181	if (result != VDO_SUCCESS) {
1182		uds_free_index(index);
1183		return result;
1184	}
1185
1186	result = uds_make_volume(config, index->layout, &index->volume);
1187	if (result != UDS_SUCCESS) {
1188		uds_free_index(index);
1189		return result;
1190	}
1191
1192	index->volume->lookup_mode = LOOKUP_NORMAL;
1193	for (z = 0; z < index->zone_count; z++) {
1194		result = make_index_zone(index, z);
1195		if (result != UDS_SUCCESS) {
1196			uds_free_index(index);
1197			return vdo_log_error_strerror(result,
1198						      "Could not create index zone");
1199		}
1200	}
1201
1202	nonce = uds_get_volume_nonce(index->layout);
1203	result = uds_make_volume_index(config, nonce, &index->volume_index);
1204	if (result != UDS_SUCCESS) {
1205		uds_free_index(index);
1206		return vdo_log_error_strerror(result, "could not make volume index");
1207	}
1208
1209	index->load_context = load_context;
1210	index->callback = callback;
1211
1212	result = initialize_index_queues(index, config->geometry);
1213	if (result != UDS_SUCCESS) {
1214		uds_free_index(index);
1215		return result;
1216	}
1217
1218	result = make_chapter_writer(index, &index->chapter_writer);
1219	if (result != UDS_SUCCESS) {
1220		uds_free_index(index);
1221		return result;
1222	}
1223
1224	if (!new) {
1225		result = load_index(index);
1226		switch (result) {
1227		case UDS_SUCCESS:
1228			loaded = true;
1229			break;
1230		case -ENOMEM:
1231			/* We should not try a rebuild for this error. */
1232			vdo_log_error_strerror(result, "index could not be loaded");
1233			break;
1234		default:
1235			vdo_log_error_strerror(result, "index could not be loaded");
1236			if (open_type == UDS_LOAD) {
1237				result = rebuild_index(index);
1238				if (result != UDS_SUCCESS) {
1239					vdo_log_error_strerror(result,
1240							       "index could not be rebuilt");
1241				}
1242			}
1243			break;
1244		}
1245	}
1246
1247	if (result != UDS_SUCCESS) {
1248		uds_free_index(index);
1249		return vdo_log_error_strerror(result, "fatal error in %s()", __func__);
1250	}
1251
1252	for (z = 0; z < index->zone_count; z++) {
1253		zone = index->zones[z];
1254		zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1255		zone->newest_virtual_chapter = index->newest_virtual_chapter;
1256	}
1257
1258	if (index->load_context != NULL) {
1259		mutex_lock(&index->load_context->mutex);
1260		index->load_context->status = INDEX_READY;
1261		/*
1262		 * If we get here, suspend is meaningless, but notify any thread trying to suspend
1263		 * us so it doesn't hang.
1264		 */
1265		uds_broadcast_cond(&index->load_context->cond);
1266		mutex_unlock(&index->load_context->mutex);
1267	}
1268
1269	index->has_saved_open_chapter = loaded;
1270	index->need_to_save = !loaded;
1271	*new_index = index;
1272	return UDS_SUCCESS;
1273}
1274
1275void uds_free_index(struct uds_index *index)
1276{
1277	unsigned int i;
1278
1279	if (index == NULL)
1280		return;
1281
1282	uds_request_queue_finish(index->triage_queue);
1283	for (i = 0; i < index->zone_count; i++)
1284		uds_request_queue_finish(index->zone_queues[i]);
1285
1286	free_chapter_writer(index->chapter_writer);
1287
1288	uds_free_volume_index(index->volume_index);
1289	if (index->zones != NULL) {
1290		for (i = 0; i < index->zone_count; i++)
1291			free_index_zone(index->zones[i]);
1292		vdo_free(index->zones);
1293	}
1294
1295	uds_free_volume(index->volume);
1296	uds_free_index_layout(vdo_forget(index->layout));
1297	vdo_free(index);
1298}
1299
1300/* Wait for the chapter writer to complete any outstanding writes. */
1301void uds_wait_for_idle_index(struct uds_index *index)
1302{
1303	struct chapter_writer *writer = index->chapter_writer;
1304
1305	mutex_lock(&writer->mutex);
1306	while (writer->zones_to_write > 0)
1307		uds_wait_cond(&writer->cond, &writer->mutex);
1308	mutex_unlock(&writer->mutex);
1309}
1310
1311/* This function assumes that all requests have been drained. */
1312int uds_save_index(struct uds_index *index)
1313{
1314	int result;
1315
1316	if (!index->need_to_save)
1317		return UDS_SUCCESS;
1318
1319	uds_wait_for_idle_index(index);
1320	index->prev_save = index->last_save;
1321	index->last_save = ((index->newest_virtual_chapter == 0) ?
1322			    NO_LAST_SAVE : index->newest_virtual_chapter - 1);
1323	vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1324
1325	result = uds_save_index_state(index->layout, index);
1326	if (result != UDS_SUCCESS) {
1327		vdo_log_info("save index failed");
1328		index->last_save = index->prev_save;
1329	} else {
1330		index->has_saved_open_chapter = true;
1331		index->need_to_save = false;
1332		vdo_log_info("finished save (vcn %llu)",
1333			     (unsigned long long) index->last_save);
1334	}
1335
1336	return result;
1337}
1338
1339int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1340{
1341	return uds_replace_volume_storage(index->volume, index->layout, bdev);
1342}
1343
1344/* Accessing statistics should be safe from any thread. */
1345void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1346{
1347	struct volume_index_stats stats;
1348
1349	uds_get_volume_index_stats(index->volume_index, &stats);
1350	counters->entries_indexed = stats.record_count;
1351	counters->collisions = stats.collision_count;
1352	counters->entries_discarded = stats.discard_count;
1353
1354	counters->memory_used = (index->volume_index->memory_size +
1355				 index->volume->cache_size +
1356				 index->chapter_writer->memory_size);
1357}
1358
1359void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1360{
1361	struct uds_index *index = request->index;
1362	struct uds_request_queue *queue;
1363
1364	switch (stage) {
1365	case STAGE_TRIAGE:
1366		if (index->triage_queue != NULL) {
1367			queue = index->triage_queue;
1368			break;
1369		}
1370
1371		fallthrough;
1372
1373	case STAGE_INDEX:
1374		request->zone_number =
1375			uds_get_volume_index_zone(index->volume_index, &request->record_name);
1376		fallthrough;
1377
1378	case STAGE_MESSAGE:
1379		queue = index->zone_queues[request->zone_number];
1380		break;
1381
1382	default:
1383		VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1384		return;
1385	}
1386
1387	uds_request_queue_enqueue(queue, request);
1388}
1389