1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5#include "volume-index.h"
6
7#include <linux/bitops.h>
8#include <linux/bits.h>
9#include <linux/cache.h>
10#include <linux/compiler.h>
11#include <linux/log2.h>
12
13#include "errors.h"
14#include "logger.h"
15#include "memory-alloc.h"
16#include "numeric.h"
17#include "permassert.h"
18#include "thread-utils.h"
19
20#include "config.h"
21#include "geometry.h"
22#include "hash-utils.h"
23#include "indexer.h"
24
25/*
26 * The volume index is a combination of two separate subindexes, one containing sparse hook entries
27 * (retained for all chapters), and one containing the remaining entries (retained only for the
28 * dense chapters). If there are no sparse chapters, only the non-hook sub index is used, and it
29 * will contain all records for all chapters.
30 *
31 * The volume index is also divided into zones, with one thread operating on each zone. Each
32 * incoming request is dispatched to the appropriate thread, and then to the appropriate subindex.
33 * Each delta list is handled by a single zone. To ensure that the distribution of delta lists to
34 * zones doesn't underflow (leaving some zone with no delta lists), the minimum number of delta
35 * lists must be the square of the maximum zone count for both subindexes.
36 *
37 * Each subindex zone is a delta index where the payload is a chapter number. The volume index can
38 * compute the delta list number, address, and zone number from the record name in order to
39 * dispatch record handling to the correct structures.
40 *
41 * Most operations that use all the zones take place either before request processing is allowed,
42 * or after all requests have been flushed in order to shut down. The only multi-threaded operation
43 * supported during normal operation is the uds_lookup_volume_index_name() method, used to determine
44 * whether a new chapter should be loaded into the sparse index cache. This operation only uses the
45 * sparse hook subindex, and the zone mutexes are used to make this operation safe.
46 *
47 * There are three ways of expressing chapter numbers in the volume index: virtual, index, and
48 * rolling. The interface to the volume index uses virtual chapter numbers, which are 64 bits long.
49 * Internally the subindex stores only the minimal number of bits necessary by masking away the
50 * high-order bits. When the index needs to deal with ordering of index chapter numbers, as when
51 * flushing entries from older chapters, it rolls the index chapter number around so that the
52 * smallest one in use is mapped to 0. See convert_index_to_virtual() or flush_invalid_entries()
53 * for an example of this technique.
54 *
55 * For efficiency, when older chapter numbers become invalid, the index does not immediately remove
56 * the invalidated entries. Instead it lazily removes them from a given delta list the next time it
57 * walks that list during normal operation. Because of this, the index size must be increased
58 * somewhat to accommodate all the invalid entries that have not yet been removed. For the standard
59 * index sizes, this requires about 4 chapters of old entries per 1024 chapters of valid entries in
60 * the index.
61 */
62
63struct sub_index_parameters {
64	/* The number of bits in address mask */
65	u8 address_bits;
66	/* The number of bits in chapter number */
67	u8 chapter_bits;
68	/* The mean delta */
69	u32 mean_delta;
70	/* The number of delta lists */
71	u64 list_count;
72	/* The number of chapters used */
73	u32 chapter_count;
74	/* The number of bits per chapter */
75	size_t chapter_size_in_bits;
76	/* The number of bytes of delta list memory */
77	size_t memory_size;
78	/* The number of bytes the index should keep free at all times */
79	size_t target_free_bytes;
80};
81
82struct split_config {
83	/* The hook subindex configuration */
84	struct uds_configuration hook_config;
85	struct index_geometry hook_geometry;
86
87	/* The non-hook subindex configuration */
88	struct uds_configuration non_hook_config;
89	struct index_geometry non_hook_geometry;
90};
91
92struct chapter_range {
93	u32 chapter_start;
94	u32 chapter_count;
95};
96
97#define MAGIC_SIZE 8
98
99static const char MAGIC_START_5[] = "MI5-0005";
100
101struct sub_index_data {
102	char magic[MAGIC_SIZE]; /* MAGIC_START_5 */
103	u64 volume_nonce;
104	u64 virtual_chapter_low;
105	u64 virtual_chapter_high;
106	u32 first_list;
107	u32 list_count;
108};
109
110static const char MAGIC_START_6[] = "MI6-0001";
111
112struct volume_index_data {
113	char magic[MAGIC_SIZE]; /* MAGIC_START_6 */
114	u32 sparse_sample_rate;
115};
116
117static inline u32 extract_address(const struct volume_sub_index *sub_index,
118				  const struct uds_record_name *name)
119{
120	return uds_extract_volume_index_bytes(name) & sub_index->address_mask;
121}
122
123static inline u32 extract_dlist_num(const struct volume_sub_index *sub_index,
124				    const struct uds_record_name *name)
125{
126	u64 bits = uds_extract_volume_index_bytes(name);
127
128	return (bits >> sub_index->address_bits) % sub_index->list_count;
129}
130
131static inline const struct volume_sub_index_zone *
132get_zone_for_record(const struct volume_index_record *record)
133{
134	return &record->sub_index->zones[record->zone_number];
135}
136
137static inline u64 convert_index_to_virtual(const struct volume_index_record *record,
138					   u32 index_chapter)
139{
140	const struct volume_sub_index_zone *volume_index_zone = get_zone_for_record(record);
141	u32 rolling_chapter = ((index_chapter - volume_index_zone->virtual_chapter_low) &
142			       record->sub_index->chapter_mask);
143
144	return volume_index_zone->virtual_chapter_low + rolling_chapter;
145}
146
147static inline u32 convert_virtual_to_index(const struct volume_sub_index *sub_index,
148					   u64 virtual_chapter)
149{
150	return virtual_chapter & sub_index->chapter_mask;
151}
152
153static inline bool is_virtual_chapter_indexed(const struct volume_index_record *record,
154					      u64 virtual_chapter)
155{
156	const struct volume_sub_index_zone *volume_index_zone = get_zone_for_record(record);
157
158	return ((virtual_chapter >= volume_index_zone->virtual_chapter_low) &&
159		(virtual_chapter <= volume_index_zone->virtual_chapter_high));
160}
161
162static inline bool has_sparse(const struct volume_index *volume_index)
163{
164	return volume_index->sparse_sample_rate > 0;
165}
166
167bool uds_is_volume_index_sample(const struct volume_index *volume_index,
168				const struct uds_record_name *name)
169{
170	if (!has_sparse(volume_index))
171		return false;
172
173	return (uds_extract_sampling_bytes(name) % volume_index->sparse_sample_rate) == 0;
174}
175
176static inline const struct volume_sub_index *
177get_volume_sub_index(const struct volume_index *volume_index,
178		     const struct uds_record_name *name)
179{
180	return (uds_is_volume_index_sample(volume_index, name) ?
181		&volume_index->vi_hook :
182		&volume_index->vi_non_hook);
183}
184
185static unsigned int get_volume_sub_index_zone(const struct volume_sub_index *sub_index,
186					      const struct uds_record_name *name)
187{
188	return extract_dlist_num(sub_index, name) / sub_index->delta_index.lists_per_zone;
189}
190
191unsigned int uds_get_volume_index_zone(const struct volume_index *volume_index,
192				       const struct uds_record_name *name)
193{
194	return get_volume_sub_index_zone(get_volume_sub_index(volume_index, name), name);
195}
196
197#define DELTA_LIST_SIZE 256
198
199static int compute_volume_sub_index_parameters(const struct uds_configuration *config,
200					       struct sub_index_parameters *params)
201{
202	u64 entries_in_volume_index, address_span;
203	u32 chapters_in_volume_index, invalid_chapters;
204	u32 rounded_chapters;
205	u64 delta_list_records;
206	u32 address_count;
207	u64 index_size_in_bits;
208	size_t expected_index_size;
209	u64 min_delta_lists = MAX_ZONES * MAX_ZONES;
210	struct index_geometry *geometry = config->geometry;
211	u64 records_per_chapter = geometry->records_per_chapter;
212
213	params->chapter_count = geometry->chapters_per_volume;
214	/*
215	 * Make sure that the number of delta list records in the volume index does not change when
216	 * the volume is reduced by one chapter. This preserves the mapping from name to volume
217	 * index delta list.
218	 */
219	rounded_chapters = params->chapter_count;
220	if (uds_is_reduced_index_geometry(geometry))
221		rounded_chapters += 1;
222	delta_list_records = records_per_chapter * rounded_chapters;
223	address_count = config->volume_index_mean_delta * DELTA_LIST_SIZE;
224	params->list_count = max(delta_list_records / DELTA_LIST_SIZE, min_delta_lists);
225	params->address_bits = bits_per(address_count - 1);
226	params->chapter_bits = bits_per(rounded_chapters - 1);
227	if ((u32) params->list_count != params->list_count) {
228		return vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
229						"cannot initialize volume index with %llu delta lists",
230						(unsigned long long) params->list_count);
231	}
232
233	if (params->address_bits > 31) {
234		return vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
235						"cannot initialize volume index with %u address bits",
236						params->address_bits);
237	}
238
239	/*
240	 * The probability that a given delta list is not touched during the writing of an entire
241	 * chapter is:
242	 *
243	 * double p_not_touched = pow((double) (params->list_count - 1) / params->list_count,
244	 *                            records_per_chapter);
245	 *
246	 * For the standard index sizes, about 78% of the delta lists are not touched, and
247	 * therefore contain old index entries that have not been eliminated by the lazy LRU
248	 * processing. Then the number of old index entries that accumulate over the entire index,
249	 * in terms of full chapters worth of entries, is:
250	 *
251	 * double invalid_chapters = p_not_touched / (1.0 - p_not_touched);
252	 *
253	 * For the standard index sizes, the index needs about 3.5 chapters of space for the old
254	 * entries in a 1024 chapter index, so round this up to use 4 chapters per 1024 chapters in
255	 * the index.
256	 */
257	invalid_chapters = max(rounded_chapters / 256, 2U);
258	chapters_in_volume_index = rounded_chapters + invalid_chapters;
259	entries_in_volume_index = records_per_chapter * chapters_in_volume_index;
260
261	address_span = params->list_count << params->address_bits;
262	params->mean_delta = address_span / entries_in_volume_index;
263
264	/*
265	 * Compute the expected size of a full index, then set the total memory to be 6% larger
266	 * than that expected size. This number should be large enough that there are not many
267	 * rebalances when the index is full.
268	 */
269	params->chapter_size_in_bits = uds_compute_delta_index_size(records_per_chapter,
270								    params->mean_delta,
271								    params->chapter_bits);
272	index_size_in_bits = params->chapter_size_in_bits * chapters_in_volume_index;
273	expected_index_size = index_size_in_bits / BITS_PER_BYTE;
274	params->memory_size = expected_index_size * 106 / 100;
275
276	params->target_free_bytes = expected_index_size / 20;
277	return UDS_SUCCESS;
278}
279
280static void uninitialize_volume_sub_index(struct volume_sub_index *sub_index)
281{
282	vdo_free(vdo_forget(sub_index->flush_chapters));
283	vdo_free(vdo_forget(sub_index->zones));
284	uds_uninitialize_delta_index(&sub_index->delta_index);
285}
286
287void uds_free_volume_index(struct volume_index *volume_index)
288{
289	if (volume_index == NULL)
290		return;
291
292	if (volume_index->zones != NULL)
293		vdo_free(vdo_forget(volume_index->zones));
294
295	uninitialize_volume_sub_index(&volume_index->vi_non_hook);
296	uninitialize_volume_sub_index(&volume_index->vi_hook);
297	vdo_free(volume_index);
298}
299
300
301static int compute_volume_sub_index_save_bytes(const struct uds_configuration *config,
302					       size_t *bytes)
303{
304	struct sub_index_parameters params = { .address_bits = 0 };
305	int result;
306
307	result = compute_volume_sub_index_parameters(config, &params);
308	if (result != UDS_SUCCESS)
309		return result;
310
311	*bytes = (sizeof(struct sub_index_data) + params.list_count * sizeof(u64) +
312		  uds_compute_delta_index_save_bytes(params.list_count,
313						     params.memory_size));
314	return UDS_SUCCESS;
315}
316
317/* This function is only useful if the configuration includes sparse chapters. */
318static void split_configuration(const struct uds_configuration *config,
319				struct split_config *split)
320{
321	u64 sample_rate, sample_records;
322	u64 dense_chapters, sparse_chapters;
323
324	/* Start with copies of the base configuration. */
325	split->hook_config = *config;
326	split->hook_geometry = *config->geometry;
327	split->hook_config.geometry = &split->hook_geometry;
328	split->non_hook_config = *config;
329	split->non_hook_geometry = *config->geometry;
330	split->non_hook_config.geometry = &split->non_hook_geometry;
331
332	sample_rate = config->sparse_sample_rate;
333	sparse_chapters = config->geometry->sparse_chapters_per_volume;
334	dense_chapters = config->geometry->chapters_per_volume - sparse_chapters;
335	sample_records = config->geometry->records_per_chapter / sample_rate;
336
337	/* Adjust the number of records indexed for each chapter. */
338	split->hook_geometry.records_per_chapter = sample_records;
339	split->non_hook_geometry.records_per_chapter -= sample_records;
340
341	/* Adjust the number of chapters indexed. */
342	split->hook_geometry.sparse_chapters_per_volume = 0;
343	split->non_hook_geometry.sparse_chapters_per_volume = 0;
344	split->non_hook_geometry.chapters_per_volume = dense_chapters;
345}
346
347static int compute_volume_index_save_bytes(const struct uds_configuration *config,
348					   size_t *bytes)
349{
350	size_t hook_bytes, non_hook_bytes;
351	struct split_config split;
352	int result;
353
354	if (!uds_is_sparse_index_geometry(config->geometry))
355		return compute_volume_sub_index_save_bytes(config, bytes);
356
357	split_configuration(config, &split);
358	result = compute_volume_sub_index_save_bytes(&split.hook_config, &hook_bytes);
359	if (result != UDS_SUCCESS)
360		return result;
361
362	result = compute_volume_sub_index_save_bytes(&split.non_hook_config,
363						     &non_hook_bytes);
364	if (result != UDS_SUCCESS)
365		return result;
366
367	*bytes = sizeof(struct volume_index_data) + hook_bytes + non_hook_bytes;
368	return UDS_SUCCESS;
369}
370
371int uds_compute_volume_index_save_blocks(const struct uds_configuration *config,
372					 size_t block_size, u64 *block_count)
373{
374	size_t bytes;
375	int result;
376
377	result = compute_volume_index_save_bytes(config, &bytes);
378	if (result != UDS_SUCCESS)
379		return result;
380
381	bytes += sizeof(struct delta_list_save_info);
382	*block_count = DIV_ROUND_UP(bytes, block_size) + MAX_ZONES;
383	return UDS_SUCCESS;
384}
385
386/* Flush invalid entries while walking the delta list. */
387static inline int flush_invalid_entries(struct volume_index_record *record,
388					struct chapter_range *flush_range,
389					u32 *next_chapter_to_invalidate)
390{
391	int result;
392
393	result = uds_next_delta_index_entry(&record->delta_entry);
394	if (result != UDS_SUCCESS)
395		return result;
396
397	while (!record->delta_entry.at_end) {
398		u32 index_chapter = uds_get_delta_entry_value(&record->delta_entry);
399		u32 relative_chapter = ((index_chapter - flush_range->chapter_start) &
400					record->sub_index->chapter_mask);
401
402		if (likely(relative_chapter >= flush_range->chapter_count)) {
403			if (relative_chapter < *next_chapter_to_invalidate)
404				*next_chapter_to_invalidate = relative_chapter;
405			break;
406		}
407
408		result = uds_remove_delta_index_entry(&record->delta_entry);
409		if (result != UDS_SUCCESS)
410			return result;
411	}
412
413	return UDS_SUCCESS;
414}
415
416/* Find the matching record, or the list offset where the record would go. */
417static int get_volume_index_entry(struct volume_index_record *record, u32 list_number,
418				  u32 key, struct chapter_range *flush_range)
419{
420	struct volume_index_record other_record;
421	const struct volume_sub_index *sub_index = record->sub_index;
422	u32 next_chapter_to_invalidate = sub_index->chapter_mask;
423	int result;
424
425	result = uds_start_delta_index_search(&sub_index->delta_index, list_number, 0,
426					      &record->delta_entry);
427	if (result != UDS_SUCCESS)
428		return result;
429
430	do {
431		result = flush_invalid_entries(record, flush_range,
432					       &next_chapter_to_invalidate);
433		if (result != UDS_SUCCESS)
434			return result;
435	} while (!record->delta_entry.at_end && (key > record->delta_entry.key));
436
437	result = uds_remember_delta_index_offset(&record->delta_entry);
438	if (result != UDS_SUCCESS)
439		return result;
440
441	/* Check any collision records for a more precise match. */
442	other_record = *record;
443	if (!other_record.delta_entry.at_end && (key == other_record.delta_entry.key)) {
444		for (;;) {
445			u8 collision_name[UDS_RECORD_NAME_SIZE];
446
447			result = flush_invalid_entries(&other_record, flush_range,
448						       &next_chapter_to_invalidate);
449			if (result != UDS_SUCCESS)
450				return result;
451
452			if (other_record.delta_entry.at_end ||
453			    !other_record.delta_entry.is_collision)
454				break;
455
456			result = uds_get_delta_entry_collision(&other_record.delta_entry,
457							       collision_name);
458			if (result != UDS_SUCCESS)
459				return result;
460
461			if (memcmp(collision_name, record->name, UDS_RECORD_NAME_SIZE) == 0) {
462				*record = other_record;
463				break;
464			}
465		}
466	}
467	while (!other_record.delta_entry.at_end) {
468		result = flush_invalid_entries(&other_record, flush_range,
469					       &next_chapter_to_invalidate);
470		if (result != UDS_SUCCESS)
471			return result;
472	}
473	next_chapter_to_invalidate += flush_range->chapter_start;
474	next_chapter_to_invalidate &= sub_index->chapter_mask;
475	flush_range->chapter_start = next_chapter_to_invalidate;
476	flush_range->chapter_count = 0;
477	return UDS_SUCCESS;
478}
479
480static int get_volume_sub_index_record(struct volume_sub_index *sub_index,
481				       const struct uds_record_name *name,
482				       struct volume_index_record *record)
483{
484	int result;
485	const struct volume_sub_index_zone *volume_index_zone;
486	u32 address = extract_address(sub_index, name);
487	u32 delta_list_number = extract_dlist_num(sub_index, name);
488	u64 flush_chapter = sub_index->flush_chapters[delta_list_number];
489
490	record->sub_index = sub_index;
491	record->mutex = NULL;
492	record->name = name;
493	record->zone_number = delta_list_number / sub_index->delta_index.lists_per_zone;
494	volume_index_zone = get_zone_for_record(record);
495
496	if (flush_chapter < volume_index_zone->virtual_chapter_low) {
497		struct chapter_range range;
498		u64 flush_count = volume_index_zone->virtual_chapter_low - flush_chapter;
499
500		range.chapter_start = convert_virtual_to_index(sub_index, flush_chapter);
501		range.chapter_count = (flush_count > sub_index->chapter_mask ?
502				       sub_index->chapter_mask + 1 :
503				       flush_count);
504		result = get_volume_index_entry(record, delta_list_number, address,
505						&range);
506		flush_chapter = convert_index_to_virtual(record, range.chapter_start);
507		if (flush_chapter > volume_index_zone->virtual_chapter_high)
508			flush_chapter = volume_index_zone->virtual_chapter_high;
509		sub_index->flush_chapters[delta_list_number] = flush_chapter;
510	} else {
511		result = uds_get_delta_index_entry(&sub_index->delta_index,
512						   delta_list_number, address,
513						   name->name, &record->delta_entry);
514	}
515
516	if (result != UDS_SUCCESS)
517		return result;
518
519	record->is_found =
520		(!record->delta_entry.at_end && (record->delta_entry.key == address));
521	if (record->is_found) {
522		u32 index_chapter = uds_get_delta_entry_value(&record->delta_entry);
523
524		record->virtual_chapter = convert_index_to_virtual(record, index_chapter);
525	}
526
527	record->is_collision = record->delta_entry.is_collision;
528	return UDS_SUCCESS;
529}
530
531int uds_get_volume_index_record(struct volume_index *volume_index,
532				const struct uds_record_name *name,
533				struct volume_index_record *record)
534{
535	int result;
536
537	if (uds_is_volume_index_sample(volume_index, name)) {
538		/*
539		 * Other threads cannot be allowed to call uds_lookup_volume_index_name() while
540		 * this thread is finding the volume index record. Due to the lazy LRU flushing of
541		 * the volume index, uds_get_volume_index_record() is not a read-only operation.
542		 */
543		unsigned int zone =
544			get_volume_sub_index_zone(&volume_index->vi_hook, name);
545		struct mutex *mutex = &volume_index->zones[zone].hook_mutex;
546
547		mutex_lock(mutex);
548		result = get_volume_sub_index_record(&volume_index->vi_hook, name,
549						     record);
550		mutex_unlock(mutex);
551		/* Remember the mutex so that other operations on the index record can use it. */
552		record->mutex = mutex;
553	} else {
554		result = get_volume_sub_index_record(&volume_index->vi_non_hook, name,
555						     record);
556	}
557
558	return result;
559}
560
561int uds_put_volume_index_record(struct volume_index_record *record, u64 virtual_chapter)
562{
563	int result;
564	u32 address;
565	const struct volume_sub_index *sub_index = record->sub_index;
566
567	if (!is_virtual_chapter_indexed(record, virtual_chapter)) {
568		u64 low = get_zone_for_record(record)->virtual_chapter_low;
569		u64 high = get_zone_for_record(record)->virtual_chapter_high;
570
571		return vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
572						"cannot put record into chapter number %llu that is out of the valid range %llu to %llu",
573						(unsigned long long) virtual_chapter,
574						(unsigned long long) low,
575						(unsigned long long) high);
576	}
577	address = extract_address(sub_index, record->name);
578	if (unlikely(record->mutex != NULL))
579		mutex_lock(record->mutex);
580	result = uds_put_delta_index_entry(&record->delta_entry, address,
581					   convert_virtual_to_index(sub_index,
582								    virtual_chapter),
583					   record->is_found ? record->name->name : NULL);
584	if (unlikely(record->mutex != NULL))
585		mutex_unlock(record->mutex);
586	switch (result) {
587	case UDS_SUCCESS:
588		record->virtual_chapter = virtual_chapter;
589		record->is_collision = record->delta_entry.is_collision;
590		record->is_found = true;
591		break;
592	case UDS_OVERFLOW:
593		vdo_log_ratelimit(vdo_log_warning_strerror, UDS_OVERFLOW,
594				  "Volume index entry dropped due to overflow condition");
595		uds_log_delta_index_entry(&record->delta_entry);
596		break;
597	default:
598		break;
599	}
600
601	return result;
602}
603
604int uds_remove_volume_index_record(struct volume_index_record *record)
605{
606	int result;
607
608	if (!record->is_found)
609		return vdo_log_warning_strerror(UDS_BAD_STATE,
610						"illegal operation on new record");
611
612	/* Mark the record so that it cannot be used again */
613	record->is_found = false;
614	if (unlikely(record->mutex != NULL))
615		mutex_lock(record->mutex);
616	result = uds_remove_delta_index_entry(&record->delta_entry);
617	if (unlikely(record->mutex != NULL))
618		mutex_unlock(record->mutex);
619	return result;
620}
621
622static void set_volume_sub_index_zone_open_chapter(struct volume_sub_index *sub_index,
623						   unsigned int zone_number,
624						   u64 virtual_chapter)
625{
626	u64 used_bits = 0;
627	struct volume_sub_index_zone *zone = &sub_index->zones[zone_number];
628	struct delta_zone *delta_zone;
629	u32 i;
630
631	zone->virtual_chapter_low = (virtual_chapter >= sub_index->chapter_count ?
632				     virtual_chapter - sub_index->chapter_count + 1 :
633				     0);
634	zone->virtual_chapter_high = virtual_chapter;
635
636	/* Check to see if the new zone data is too large. */
637	delta_zone = &sub_index->delta_index.delta_zones[zone_number];
638	for (i = 1; i <= delta_zone->list_count; i++)
639		used_bits += delta_zone->delta_lists[i].size;
640
641	if (used_bits > sub_index->max_zone_bits) {
642		/* Expire enough chapters to free the desired space. */
643		u64 expire_count =
644			1 + (used_bits - sub_index->max_zone_bits) / sub_index->chapter_zone_bits;
645
646		if (expire_count == 1) {
647			vdo_log_ratelimit(vdo_log_info,
648					  "zone %u:  At chapter %llu, expiring chapter %llu early",
649					  zone_number,
650					  (unsigned long long) virtual_chapter,
651					  (unsigned long long) zone->virtual_chapter_low);
652			zone->early_flushes++;
653			zone->virtual_chapter_low++;
654		} else {
655			u64 first_expired = zone->virtual_chapter_low;
656
657			if (first_expired + expire_count < zone->virtual_chapter_high) {
658				zone->early_flushes += expire_count;
659				zone->virtual_chapter_low += expire_count;
660			} else {
661				zone->early_flushes +=
662					zone->virtual_chapter_high - zone->virtual_chapter_low;
663				zone->virtual_chapter_low = zone->virtual_chapter_high;
664			}
665			vdo_log_ratelimit(vdo_log_info,
666					  "zone %u:  At chapter %llu, expiring chapters %llu to %llu early",
667					  zone_number,
668					  (unsigned long long) virtual_chapter,
669					  (unsigned long long) first_expired,
670					  (unsigned long long) zone->virtual_chapter_low - 1);
671		}
672	}
673}
674
675void uds_set_volume_index_zone_open_chapter(struct volume_index *volume_index,
676					    unsigned int zone_number,
677					    u64 virtual_chapter)
678{
679	struct mutex *mutex = &volume_index->zones[zone_number].hook_mutex;
680
681	set_volume_sub_index_zone_open_chapter(&volume_index->vi_non_hook, zone_number,
682					       virtual_chapter);
683
684	/*
685	 * Other threads cannot be allowed to call uds_lookup_volume_index_name() while the open
686	 * chapter number is changing.
687	 */
688	if (has_sparse(volume_index)) {
689		mutex_lock(mutex);
690		set_volume_sub_index_zone_open_chapter(&volume_index->vi_hook,
691						       zone_number, virtual_chapter);
692		mutex_unlock(mutex);
693	}
694}
695
696/*
697 * Set the newest open chapter number for the index, while also advancing the oldest valid chapter
698 * number.
699 */
700void uds_set_volume_index_open_chapter(struct volume_index *volume_index,
701				       u64 virtual_chapter)
702{
703	unsigned int zone;
704
705	for (zone = 0; zone < volume_index->zone_count; zone++)
706		uds_set_volume_index_zone_open_chapter(volume_index, zone, virtual_chapter);
707}
708
709int uds_set_volume_index_record_chapter(struct volume_index_record *record,
710					u64 virtual_chapter)
711{
712	const struct volume_sub_index *sub_index = record->sub_index;
713	int result;
714
715	if (!record->is_found)
716		return vdo_log_warning_strerror(UDS_BAD_STATE,
717						"illegal operation on new record");
718
719	if (!is_virtual_chapter_indexed(record, virtual_chapter)) {
720		u64 low = get_zone_for_record(record)->virtual_chapter_low;
721		u64 high = get_zone_for_record(record)->virtual_chapter_high;
722
723		return vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
724						"cannot set chapter number %llu that is out of the valid range %llu to %llu",
725						(unsigned long long) virtual_chapter,
726						(unsigned long long) low,
727						(unsigned long long) high);
728	}
729
730	if (unlikely(record->mutex != NULL))
731		mutex_lock(record->mutex);
732	result = uds_set_delta_entry_value(&record->delta_entry,
733					   convert_virtual_to_index(sub_index,
734								    virtual_chapter));
735	if (unlikely(record->mutex != NULL))
736		mutex_unlock(record->mutex);
737	if (result != UDS_SUCCESS)
738		return result;
739
740	record->virtual_chapter = virtual_chapter;
741	return UDS_SUCCESS;
742}
743
744static u64 lookup_volume_sub_index_name(const struct volume_sub_index *sub_index,
745					const struct uds_record_name *name)
746{
747	int result;
748	u32 address = extract_address(sub_index, name);
749	u32 delta_list_number = extract_dlist_num(sub_index, name);
750	unsigned int zone_number = get_volume_sub_index_zone(sub_index, name);
751	const struct volume_sub_index_zone *zone = &sub_index->zones[zone_number];
752	u64 virtual_chapter;
753	u32 index_chapter;
754	u32 rolling_chapter;
755	struct delta_index_entry delta_entry;
756
757	result = uds_get_delta_index_entry(&sub_index->delta_index, delta_list_number,
758					   address, name->name, &delta_entry);
759	if (result != UDS_SUCCESS)
760		return NO_CHAPTER;
761
762	if (delta_entry.at_end || (delta_entry.key != address))
763		return NO_CHAPTER;
764
765	index_chapter = uds_get_delta_entry_value(&delta_entry);
766	rolling_chapter = (index_chapter - zone->virtual_chapter_low) & sub_index->chapter_mask;
767
768	virtual_chapter = zone->virtual_chapter_low + rolling_chapter;
769	if (virtual_chapter > zone->virtual_chapter_high)
770		return NO_CHAPTER;
771
772	return virtual_chapter;
773}
774
775/* Do a read-only lookup of the record name for sparse cache management. */
776u64 uds_lookup_volume_index_name(const struct volume_index *volume_index,
777				 const struct uds_record_name *name)
778{
779	unsigned int zone_number = uds_get_volume_index_zone(volume_index, name);
780	struct mutex *mutex = &volume_index->zones[zone_number].hook_mutex;
781	u64 virtual_chapter;
782
783	if (!uds_is_volume_index_sample(volume_index, name))
784		return NO_CHAPTER;
785
786	mutex_lock(mutex);
787	virtual_chapter = lookup_volume_sub_index_name(&volume_index->vi_hook, name);
788	mutex_unlock(mutex);
789
790	return virtual_chapter;
791}
792
793static void abort_restoring_volume_sub_index(struct volume_sub_index *sub_index)
794{
795	uds_reset_delta_index(&sub_index->delta_index);
796}
797
798static void abort_restoring_volume_index(struct volume_index *volume_index)
799{
800	abort_restoring_volume_sub_index(&volume_index->vi_non_hook);
801	if (has_sparse(volume_index))
802		abort_restoring_volume_sub_index(&volume_index->vi_hook);
803}
804
805static int start_restoring_volume_sub_index(struct volume_sub_index *sub_index,
806					    struct buffered_reader **readers,
807					    unsigned int reader_count)
808{
809	unsigned int z;
810	int result;
811	u64 virtual_chapter_low = 0, virtual_chapter_high = 0;
812	unsigned int i;
813
814	for (i = 0; i < reader_count; i++) {
815		struct sub_index_data header;
816		u8 buffer[sizeof(struct sub_index_data)];
817		size_t offset = 0;
818		u32 j;
819
820		result = uds_read_from_buffered_reader(readers[i], buffer,
821						       sizeof(buffer));
822		if (result != UDS_SUCCESS) {
823			return vdo_log_warning_strerror(result,
824							"failed to read volume index header");
825		}
826
827		memcpy(&header.magic, buffer, MAGIC_SIZE);
828		offset += MAGIC_SIZE;
829		decode_u64_le(buffer, &offset, &header.volume_nonce);
830		decode_u64_le(buffer, &offset, &header.virtual_chapter_low);
831		decode_u64_le(buffer, &offset, &header.virtual_chapter_high);
832		decode_u32_le(buffer, &offset, &header.first_list);
833		decode_u32_le(buffer, &offset, &header.list_count);
834
835		result = VDO_ASSERT(offset == sizeof(buffer),
836				    "%zu bytes decoded of %zu expected", offset,
837				    sizeof(buffer));
838		if (result != VDO_SUCCESS)
839			result = UDS_CORRUPT_DATA;
840
841		if (memcmp(header.magic, MAGIC_START_5, MAGIC_SIZE) != 0) {
842			return vdo_log_warning_strerror(UDS_CORRUPT_DATA,
843							"volume index file had bad magic number");
844		}
845
846		if (sub_index->volume_nonce == 0) {
847			sub_index->volume_nonce = header.volume_nonce;
848		} else if (header.volume_nonce != sub_index->volume_nonce) {
849			return vdo_log_warning_strerror(UDS_CORRUPT_DATA,
850							"volume index volume nonce incorrect");
851		}
852
853		if (i == 0) {
854			virtual_chapter_low = header.virtual_chapter_low;
855			virtual_chapter_high = header.virtual_chapter_high;
856		} else if (virtual_chapter_high != header.virtual_chapter_high) {
857			u64 low = header.virtual_chapter_low;
858			u64 high = header.virtual_chapter_high;
859
860			return vdo_log_warning_strerror(UDS_CORRUPT_DATA,
861							"Inconsistent volume index zone files: Chapter range is [%llu,%llu], chapter range %d is [%llu,%llu]",
862							(unsigned long long) virtual_chapter_low,
863							(unsigned long long) virtual_chapter_high,
864							i, (unsigned long long) low,
865							(unsigned long long) high);
866		} else if (virtual_chapter_low < header.virtual_chapter_low) {
867			virtual_chapter_low = header.virtual_chapter_low;
868		}
869
870		for (j = 0; j < header.list_count; j++) {
871			u8 decoded[sizeof(u64)];
872
873			result = uds_read_from_buffered_reader(readers[i], decoded,
874							       sizeof(u64));
875			if (result != UDS_SUCCESS) {
876				return vdo_log_warning_strerror(result,
877								"failed to read volume index flush ranges");
878			}
879
880			sub_index->flush_chapters[header.first_list + j] =
881				get_unaligned_le64(decoded);
882		}
883	}
884
885	for (z = 0; z < sub_index->zone_count; z++) {
886		memset(&sub_index->zones[z], 0, sizeof(struct volume_sub_index_zone));
887		sub_index->zones[z].virtual_chapter_low = virtual_chapter_low;
888		sub_index->zones[z].virtual_chapter_high = virtual_chapter_high;
889	}
890
891	result = uds_start_restoring_delta_index(&sub_index->delta_index, readers,
892						 reader_count);
893	if (result != UDS_SUCCESS)
894		return vdo_log_warning_strerror(result, "restoring delta index failed");
895
896	return UDS_SUCCESS;
897}
898
899static int start_restoring_volume_index(struct volume_index *volume_index,
900					struct buffered_reader **buffered_readers,
901					unsigned int reader_count)
902{
903	unsigned int i;
904	int result;
905
906	if (!has_sparse(volume_index)) {
907		return start_restoring_volume_sub_index(&volume_index->vi_non_hook,
908							buffered_readers, reader_count);
909	}
910
911	for (i = 0; i < reader_count; i++) {
912		struct volume_index_data header;
913		u8 buffer[sizeof(struct volume_index_data)];
914		size_t offset = 0;
915
916		result = uds_read_from_buffered_reader(buffered_readers[i], buffer,
917						       sizeof(buffer));
918		if (result != UDS_SUCCESS) {
919			return vdo_log_warning_strerror(result,
920							"failed to read volume index header");
921		}
922
923		memcpy(&header.magic, buffer, MAGIC_SIZE);
924		offset += MAGIC_SIZE;
925		decode_u32_le(buffer, &offset, &header.sparse_sample_rate);
926
927		result = VDO_ASSERT(offset == sizeof(buffer),
928				    "%zu bytes decoded of %zu expected", offset,
929				    sizeof(buffer));
930		if (result != VDO_SUCCESS)
931			result = UDS_CORRUPT_DATA;
932
933		if (memcmp(header.magic, MAGIC_START_6, MAGIC_SIZE) != 0)
934			return vdo_log_warning_strerror(UDS_CORRUPT_DATA,
935							"volume index file had bad magic number");
936
937		if (i == 0) {
938			volume_index->sparse_sample_rate = header.sparse_sample_rate;
939		} else if (volume_index->sparse_sample_rate != header.sparse_sample_rate) {
940			vdo_log_warning_strerror(UDS_CORRUPT_DATA,
941						 "Inconsistent sparse sample rate in delta index zone files: %u vs. %u",
942						 volume_index->sparse_sample_rate,
943						 header.sparse_sample_rate);
944			return UDS_CORRUPT_DATA;
945		}
946	}
947
948	result = start_restoring_volume_sub_index(&volume_index->vi_non_hook,
949						  buffered_readers, reader_count);
950	if (result != UDS_SUCCESS)
951		return result;
952
953	return start_restoring_volume_sub_index(&volume_index->vi_hook, buffered_readers,
954						reader_count);
955}
956
957static int finish_restoring_volume_sub_index(struct volume_sub_index *sub_index,
958					     struct buffered_reader **buffered_readers,
959					     unsigned int reader_count)
960{
961	return uds_finish_restoring_delta_index(&sub_index->delta_index,
962						buffered_readers, reader_count);
963}
964
965static int finish_restoring_volume_index(struct volume_index *volume_index,
966					 struct buffered_reader **buffered_readers,
967					 unsigned int reader_count)
968{
969	int result;
970
971	result = finish_restoring_volume_sub_index(&volume_index->vi_non_hook,
972						   buffered_readers, reader_count);
973	if ((result == UDS_SUCCESS) && has_sparse(volume_index)) {
974		result = finish_restoring_volume_sub_index(&volume_index->vi_hook,
975							   buffered_readers,
976							   reader_count);
977	}
978
979	return result;
980}
981
982int uds_load_volume_index(struct volume_index *volume_index,
983			  struct buffered_reader **readers, unsigned int reader_count)
984{
985	int result;
986
987	/* Start by reading the header section of the stream. */
988	result = start_restoring_volume_index(volume_index, readers, reader_count);
989	if (result != UDS_SUCCESS)
990		return result;
991
992	result = finish_restoring_volume_index(volume_index, readers, reader_count);
993	if (result != UDS_SUCCESS) {
994		abort_restoring_volume_index(volume_index);
995		return result;
996	}
997
998	/* Check the final guard lists to make sure there is no extra data. */
999	result = uds_check_guard_delta_lists(readers, reader_count);
1000	if (result != UDS_SUCCESS)
1001		abort_restoring_volume_index(volume_index);
1002
1003	return result;
1004}
1005
1006static int start_saving_volume_sub_index(const struct volume_sub_index *sub_index,
1007					 unsigned int zone_number,
1008					 struct buffered_writer *buffered_writer)
1009{
1010	int result;
1011	struct volume_sub_index_zone *volume_index_zone = &sub_index->zones[zone_number];
1012	u32 first_list = sub_index->delta_index.delta_zones[zone_number].first_list;
1013	u32 list_count = sub_index->delta_index.delta_zones[zone_number].list_count;
1014	u8 buffer[sizeof(struct sub_index_data)];
1015	size_t offset = 0;
1016	u32 i;
1017
1018	memcpy(buffer, MAGIC_START_5, MAGIC_SIZE);
1019	offset += MAGIC_SIZE;
1020	encode_u64_le(buffer, &offset, sub_index->volume_nonce);
1021	encode_u64_le(buffer, &offset, volume_index_zone->virtual_chapter_low);
1022	encode_u64_le(buffer, &offset, volume_index_zone->virtual_chapter_high);
1023	encode_u32_le(buffer, &offset, first_list);
1024	encode_u32_le(buffer, &offset, list_count);
1025
1026	result =  VDO_ASSERT(offset == sizeof(struct sub_index_data),
1027			     "%zu bytes of config written, of %zu expected", offset,
1028			     sizeof(struct sub_index_data));
1029	if (result != VDO_SUCCESS)
1030		return result;
1031
1032	result = uds_write_to_buffered_writer(buffered_writer, buffer, offset);
1033	if (result != UDS_SUCCESS)
1034		return vdo_log_warning_strerror(result,
1035						"failed to write volume index header");
1036
1037	for (i = 0; i < list_count; i++) {
1038		u8 encoded[sizeof(u64)];
1039
1040		put_unaligned_le64(sub_index->flush_chapters[first_list + i], &encoded);
1041		result = uds_write_to_buffered_writer(buffered_writer, encoded,
1042						      sizeof(u64));
1043		if (result != UDS_SUCCESS) {
1044			return vdo_log_warning_strerror(result,
1045							"failed to write volume index flush ranges");
1046		}
1047	}
1048
1049	return uds_start_saving_delta_index(&sub_index->delta_index, zone_number,
1050					    buffered_writer);
1051}
1052
1053static int start_saving_volume_index(const struct volume_index *volume_index,
1054				     unsigned int zone_number,
1055				     struct buffered_writer *writer)
1056{
1057	u8 buffer[sizeof(struct volume_index_data)];
1058	size_t offset = 0;
1059	int result;
1060
1061	if (!has_sparse(volume_index)) {
1062		return start_saving_volume_sub_index(&volume_index->vi_non_hook,
1063						     zone_number, writer);
1064	}
1065
1066	memcpy(buffer, MAGIC_START_6, MAGIC_SIZE);
1067	offset += MAGIC_SIZE;
1068	encode_u32_le(buffer, &offset, volume_index->sparse_sample_rate);
1069	result = VDO_ASSERT(offset == sizeof(struct volume_index_data),
1070			    "%zu bytes of header written, of %zu expected", offset,
1071			    sizeof(struct volume_index_data));
1072	if (result != VDO_SUCCESS)
1073		return result;
1074
1075	result = uds_write_to_buffered_writer(writer, buffer, offset);
1076	if (result != UDS_SUCCESS) {
1077		vdo_log_warning_strerror(result, "failed to write volume index header");
1078		return result;
1079	}
1080
1081	result = start_saving_volume_sub_index(&volume_index->vi_non_hook, zone_number,
1082					       writer);
1083	if (result != UDS_SUCCESS)
1084		return result;
1085
1086	return start_saving_volume_sub_index(&volume_index->vi_hook, zone_number,
1087					     writer);
1088}
1089
1090static int finish_saving_volume_sub_index(const struct volume_sub_index *sub_index,
1091					  unsigned int zone_number)
1092{
1093	return uds_finish_saving_delta_index(&sub_index->delta_index, zone_number);
1094}
1095
1096static int finish_saving_volume_index(const struct volume_index *volume_index,
1097				      unsigned int zone_number)
1098{
1099	int result;
1100
1101	result = finish_saving_volume_sub_index(&volume_index->vi_non_hook, zone_number);
1102	if ((result == UDS_SUCCESS) && has_sparse(volume_index))
1103		result = finish_saving_volume_sub_index(&volume_index->vi_hook, zone_number);
1104	return result;
1105}
1106
1107int uds_save_volume_index(struct volume_index *volume_index,
1108			  struct buffered_writer **writers, unsigned int writer_count)
1109{
1110	int result = UDS_SUCCESS;
1111	unsigned int zone;
1112
1113	for (zone = 0; zone < writer_count; zone++) {
1114		result = start_saving_volume_index(volume_index, zone, writers[zone]);
1115		if (result != UDS_SUCCESS)
1116			break;
1117
1118		result = finish_saving_volume_index(volume_index, zone);
1119		if (result != UDS_SUCCESS)
1120			break;
1121
1122		result = uds_write_guard_delta_list(writers[zone]);
1123		if (result != UDS_SUCCESS)
1124			break;
1125
1126		result = uds_flush_buffered_writer(writers[zone]);
1127		if (result != UDS_SUCCESS)
1128			break;
1129	}
1130
1131	return result;
1132}
1133
1134static void get_volume_sub_index_stats(const struct volume_sub_index *sub_index,
1135				       struct volume_index_stats *stats)
1136{
1137	struct delta_index_stats dis;
1138	unsigned int z;
1139
1140	uds_get_delta_index_stats(&sub_index->delta_index, &dis);
1141	stats->rebalance_time = dis.rebalance_time;
1142	stats->rebalance_count = dis.rebalance_count;
1143	stats->record_count = dis.record_count;
1144	stats->collision_count = dis.collision_count;
1145	stats->discard_count = dis.discard_count;
1146	stats->overflow_count = dis.overflow_count;
1147	stats->delta_lists = dis.list_count;
1148	stats->early_flushes = 0;
1149	for (z = 0; z < sub_index->zone_count; z++)
1150		stats->early_flushes += sub_index->zones[z].early_flushes;
1151}
1152
1153void uds_get_volume_index_stats(const struct volume_index *volume_index,
1154				struct volume_index_stats *stats)
1155{
1156	struct volume_index_stats sparse_stats;
1157
1158	get_volume_sub_index_stats(&volume_index->vi_non_hook, stats);
1159	if (!has_sparse(volume_index))
1160		return;
1161
1162	get_volume_sub_index_stats(&volume_index->vi_hook, &sparse_stats);
1163	stats->rebalance_time += sparse_stats.rebalance_time;
1164	stats->rebalance_count += sparse_stats.rebalance_count;
1165	stats->record_count += sparse_stats.record_count;
1166	stats->collision_count += sparse_stats.collision_count;
1167	stats->discard_count += sparse_stats.discard_count;
1168	stats->overflow_count += sparse_stats.overflow_count;
1169	stats->delta_lists += sparse_stats.delta_lists;
1170	stats->early_flushes += sparse_stats.early_flushes;
1171}
1172
1173static int initialize_volume_sub_index(const struct uds_configuration *config,
1174				       u64 volume_nonce, u8 tag,
1175				       struct volume_sub_index *sub_index)
1176{
1177	struct sub_index_parameters params = { .address_bits = 0 };
1178	unsigned int zone_count = config->zone_count;
1179	u64 available_bytes = 0;
1180	unsigned int z;
1181	int result;
1182
1183	result = compute_volume_sub_index_parameters(config, &params);
1184	if (result != UDS_SUCCESS)
1185		return result;
1186
1187	sub_index->address_bits = params.address_bits;
1188	sub_index->address_mask = (1u << params.address_bits) - 1;
1189	sub_index->chapter_bits = params.chapter_bits;
1190	sub_index->chapter_mask = (1u << params.chapter_bits) - 1;
1191	sub_index->chapter_count = params.chapter_count;
1192	sub_index->list_count = params.list_count;
1193	sub_index->zone_count = zone_count;
1194	sub_index->chapter_zone_bits = params.chapter_size_in_bits / zone_count;
1195	sub_index->volume_nonce = volume_nonce;
1196
1197	result = uds_initialize_delta_index(&sub_index->delta_index, zone_count,
1198					    params.list_count, params.mean_delta,
1199					    params.chapter_bits, params.memory_size,
1200					    tag);
1201	if (result != UDS_SUCCESS)
1202		return result;
1203
1204	for (z = 0; z < sub_index->delta_index.zone_count; z++)
1205		available_bytes += sub_index->delta_index.delta_zones[z].size;
1206	available_bytes -= params.target_free_bytes;
1207	sub_index->max_zone_bits = (available_bytes * BITS_PER_BYTE) / zone_count;
1208	sub_index->memory_size = (sub_index->delta_index.memory_size +
1209				  sizeof(struct volume_sub_index) +
1210				  (params.list_count * sizeof(u64)) +
1211				  (zone_count * sizeof(struct volume_sub_index_zone)));
1212
1213	/* The following arrays are initialized to all zeros. */
1214	result = vdo_allocate(params.list_count, u64, "first chapter to flush",
1215			      &sub_index->flush_chapters);
1216	if (result != VDO_SUCCESS)
1217		return result;
1218
1219	return vdo_allocate(zone_count, struct volume_sub_index_zone,
1220			    "volume index zones", &sub_index->zones);
1221}
1222
1223int uds_make_volume_index(const struct uds_configuration *config, u64 volume_nonce,
1224			  struct volume_index **volume_index_ptr)
1225{
1226	struct split_config split;
1227	unsigned int zone;
1228	struct volume_index *volume_index;
1229	int result;
1230
1231	result = vdo_allocate(1, struct volume_index, "volume index", &volume_index);
1232	if (result != VDO_SUCCESS)
1233		return result;
1234
1235	volume_index->zone_count = config->zone_count;
1236
1237	if (!uds_is_sparse_index_geometry(config->geometry)) {
1238		result = initialize_volume_sub_index(config, volume_nonce, 'm',
1239						     &volume_index->vi_non_hook);
1240		if (result != UDS_SUCCESS) {
1241			uds_free_volume_index(volume_index);
1242			return result;
1243		}
1244
1245		volume_index->memory_size = volume_index->vi_non_hook.memory_size;
1246		*volume_index_ptr = volume_index;
1247		return UDS_SUCCESS;
1248	}
1249
1250	volume_index->sparse_sample_rate = config->sparse_sample_rate;
1251
1252	result = vdo_allocate(config->zone_count, struct volume_index_zone,
1253			      "volume index zones", &volume_index->zones);
1254	if (result != VDO_SUCCESS) {
1255		uds_free_volume_index(volume_index);
1256		return result;
1257	}
1258
1259	for (zone = 0; zone < config->zone_count; zone++)
1260		mutex_init(&volume_index->zones[zone].hook_mutex);
1261
1262	split_configuration(config, &split);
1263	result = initialize_volume_sub_index(&split.non_hook_config, volume_nonce, 'd',
1264					     &volume_index->vi_non_hook);
1265	if (result != UDS_SUCCESS) {
1266		uds_free_volume_index(volume_index);
1267		return vdo_log_error_strerror(result,
1268					      "Error creating non hook volume index");
1269	}
1270
1271	result = initialize_volume_sub_index(&split.hook_config, volume_nonce, 's',
1272					     &volume_index->vi_hook);
1273	if (result != UDS_SUCCESS) {
1274		uds_free_volume_index(volume_index);
1275		return vdo_log_error_strerror(result,
1276					      "Error creating hook volume index");
1277	}
1278
1279	volume_index->memory_size =
1280		volume_index->vi_non_hook.memory_size + volume_index->vi_hook.memory_size;
1281	*volume_index_ptr = volume_index;
1282	return UDS_SUCCESS;
1283}
1284