1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "physical-zone.h"
7
8#include <linux/list.h>
9
10#include "logger.h"
11#include "memory-alloc.h"
12#include "permassert.h"
13
14#include "block-map.h"
15#include "completion.h"
16#include "constants.h"
17#include "data-vio.h"
18#include "dedupe.h"
19#include "encodings.h"
20#include "flush.h"
21#include "int-map.h"
22#include "slab-depot.h"
23#include "status-codes.h"
24#include "vdo.h"
25
26/* Each user data_vio needs a PBN read lock and write lock. */
27#define LOCK_POOL_CAPACITY (2 * MAXIMUM_VDO_USER_VIOS)
28
29struct pbn_lock_implementation {
30	enum pbn_lock_type type;
31	const char *name;
32	const char *release_reason;
33};
34
35/* This array must have an entry for every pbn_lock_type value. */
36static const struct pbn_lock_implementation LOCK_IMPLEMENTATIONS[] = {
37	[VIO_READ_LOCK] = {
38		.type = VIO_READ_LOCK,
39		.name = "read",
40		.release_reason = "candidate duplicate",
41	},
42	[VIO_WRITE_LOCK] = {
43		.type = VIO_WRITE_LOCK,
44		.name = "write",
45		.release_reason = "newly allocated",
46	},
47	[VIO_BLOCK_MAP_WRITE_LOCK] = {
48		.type = VIO_BLOCK_MAP_WRITE_LOCK,
49		.name = "block map write",
50		.release_reason = "block map write",
51	},
52};
53
54static inline bool has_lock_type(const struct pbn_lock *lock, enum pbn_lock_type type)
55{
56	return (lock->implementation == &LOCK_IMPLEMENTATIONS[type]);
57}
58
59/**
60 * vdo_is_pbn_read_lock() - Check whether a pbn_lock is a read lock.
61 * @lock: The lock to check.
62 *
63 * Return: true if the lock is a read lock.
64 */
65bool vdo_is_pbn_read_lock(const struct pbn_lock *lock)
66{
67	return has_lock_type(lock, VIO_READ_LOCK);
68}
69
70static inline void set_pbn_lock_type(struct pbn_lock *lock, enum pbn_lock_type type)
71{
72	lock->implementation = &LOCK_IMPLEMENTATIONS[type];
73}
74
75/**
76 * vdo_downgrade_pbn_write_lock() - Downgrade a PBN write lock to a PBN read lock.
77 * @lock: The PBN write lock to downgrade.
78 *
79 * The lock holder count is cleared and the caller is responsible for setting the new count.
80 */
81void vdo_downgrade_pbn_write_lock(struct pbn_lock *lock, bool compressed_write)
82{
83	VDO_ASSERT_LOG_ONLY(!vdo_is_pbn_read_lock(lock),
84			    "PBN lock must not already have been downgraded");
85	VDO_ASSERT_LOG_ONLY(!has_lock_type(lock, VIO_BLOCK_MAP_WRITE_LOCK),
86			    "must not downgrade block map write locks");
87	VDO_ASSERT_LOG_ONLY(lock->holder_count == 1,
88			    "PBN write lock should have one holder but has %u",
89			    lock->holder_count);
90	/*
91	 * data_vio write locks are downgraded in place--the writer retains the hold on the lock.
92	 * If this was a compressed write, the holder has not yet journaled its own inc ref,
93	 * otherwise, it has.
94	 */
95	lock->increment_limit =
96		(compressed_write ? MAXIMUM_REFERENCE_COUNT : MAXIMUM_REFERENCE_COUNT - 1);
97	set_pbn_lock_type(lock, VIO_READ_LOCK);
98}
99
100/**
101 * vdo_claim_pbn_lock_increment() - Try to claim one of the available reference count increments on
102 *				    a read lock.
103 * @lock: The PBN read lock from which to claim an increment.
104 *
105 * Claims may be attempted from any thread. A claim is only valid until the PBN lock is released.
106 *
107 * Return: true if the claim succeeded, guaranteeing one increment can be made without overflowing
108 *	   the PBN's reference count.
109 */
110bool vdo_claim_pbn_lock_increment(struct pbn_lock *lock)
111{
112	/*
113	 * Claim the next free reference atomically since hash locks from multiple hash zone
114	 * threads might be concurrently deduplicating against a single PBN lock on compressed
115	 * block. As long as hitting the increment limit will lead to the PBN lock being released
116	 * in a sane time-frame, we won't overflow a 32-bit claim counter, allowing a simple add
117	 * instead of a compare-and-swap.
118	 */
119	u32 claim_number = (u32) atomic_add_return(1, &lock->increments_claimed);
120
121	return (claim_number <= lock->increment_limit);
122}
123
124/**
125 * vdo_assign_pbn_lock_provisional_reference() - Inform a PBN lock that it is responsible for a
126 *						 provisional reference.
127 * @lock: The PBN lock.
128 */
129void vdo_assign_pbn_lock_provisional_reference(struct pbn_lock *lock)
130{
131	VDO_ASSERT_LOG_ONLY(!lock->has_provisional_reference,
132			    "lock does not have a provisional reference");
133	lock->has_provisional_reference = true;
134}
135
136/**
137 * vdo_unassign_pbn_lock_provisional_reference() - Inform a PBN lock that it is no longer
138 *						   responsible for a provisional reference.
139 * @lock: The PBN lock.
140 */
141void vdo_unassign_pbn_lock_provisional_reference(struct pbn_lock *lock)
142{
143	lock->has_provisional_reference = false;
144}
145
146/**
147 * release_pbn_lock_provisional_reference() - If the lock is responsible for a provisional
148 *					      reference, release that reference.
149 * @lock: The lock.
150 * @locked_pbn: The PBN covered by the lock.
151 * @allocator: The block allocator from which to release the reference.
152 *
153 * This method is called when the lock is released.
154 */
155static void release_pbn_lock_provisional_reference(struct pbn_lock *lock,
156						   physical_block_number_t locked_pbn,
157						   struct block_allocator *allocator)
158{
159	int result;
160
161	if (!vdo_pbn_lock_has_provisional_reference(lock))
162		return;
163
164	result = vdo_release_block_reference(allocator, locked_pbn);
165	if (result != VDO_SUCCESS) {
166		vdo_log_error_strerror(result,
167				       "Failed to release reference to %s physical block %llu",
168				       lock->implementation->release_reason,
169				       (unsigned long long) locked_pbn);
170	}
171
172	vdo_unassign_pbn_lock_provisional_reference(lock);
173}
174
175/**
176 * union idle_pbn_lock - PBN lock list entries.
177 *
178 * Unused (idle) PBN locks are kept in a list. Just like in a malloc implementation, the lock
179 * structure is unused memory, so we can save a bit of space (and not pollute the lock structure
180 * proper) by using a union to overlay the lock structure with the free list.
181 */
182typedef union {
183	/** @entry: Only used while locks are in the pool. */
184	struct list_head entry;
185	/** @lock: Only used while locks are not in the pool. */
186	struct pbn_lock lock;
187} idle_pbn_lock;
188
189/**
190 * struct pbn_lock_pool - list of PBN locks.
191 *
192 * The lock pool is little more than the memory allocated for the locks.
193 */
194struct pbn_lock_pool {
195	/** @capacity: The number of locks allocated for the pool. */
196	size_t capacity;
197	/** @borrowed: The number of locks currently borrowed from the pool. */
198	size_t borrowed;
199	/** @idle_list: A list containing all idle PBN lock instances. */
200	struct list_head idle_list;
201	/** @locks: The memory for all the locks allocated by this pool. */
202	idle_pbn_lock locks[];
203};
204
205/**
206 * return_pbn_lock_to_pool() - Return a pbn lock to its pool.
207 * @pool: The pool from which the lock was borrowed.
208 * @lock: The last reference to the lock being returned.
209 *
210 * It must be the last live reference, as if the memory were being freed (the lock memory will
211 * re-initialized or zeroed).
212 */
213static void return_pbn_lock_to_pool(struct pbn_lock_pool *pool, struct pbn_lock *lock)
214{
215	idle_pbn_lock *idle;
216
217	/* A bit expensive, but will promptly catch some use-after-free errors. */
218	memset(lock, 0, sizeof(*lock));
219
220	idle = container_of(lock, idle_pbn_lock, lock);
221	INIT_LIST_HEAD(&idle->entry);
222	list_add_tail(&idle->entry, &pool->idle_list);
223
224	VDO_ASSERT_LOG_ONLY(pool->borrowed > 0, "shouldn't return more than borrowed");
225	pool->borrowed -= 1;
226}
227
228/**
229 * make_pbn_lock_pool() - Create a new PBN lock pool and all the lock instances it can loan out.
230 *
231 * @capacity: The number of PBN locks to allocate for the pool.
232 * @pool_ptr: A pointer to receive the new pool.
233 *
234 * Return: VDO_SUCCESS or an error code.
235 */
236static int make_pbn_lock_pool(size_t capacity, struct pbn_lock_pool **pool_ptr)
237{
238	size_t i;
239	struct pbn_lock_pool *pool;
240	int result;
241
242	result = vdo_allocate_extended(struct pbn_lock_pool, capacity, idle_pbn_lock,
243				       __func__, &pool);
244	if (result != VDO_SUCCESS)
245		return result;
246
247	pool->capacity = capacity;
248	pool->borrowed = capacity;
249	INIT_LIST_HEAD(&pool->idle_list);
250
251	for (i = 0; i < capacity; i++)
252		return_pbn_lock_to_pool(pool, &pool->locks[i].lock);
253
254	*pool_ptr = pool;
255	return VDO_SUCCESS;
256}
257
258/**
259 * free_pbn_lock_pool() - Free a PBN lock pool.
260 * @pool: The lock pool to free.
261 *
262 * This also frees all the PBN locks it allocated, so the caller must ensure that all locks have
263 * been returned to the pool.
264 */
265static void free_pbn_lock_pool(struct pbn_lock_pool *pool)
266{
267	if (pool == NULL)
268		return;
269
270	VDO_ASSERT_LOG_ONLY(pool->borrowed == 0,
271			    "All PBN locks must be returned to the pool before it is freed, but %zu locks are still on loan",
272			    pool->borrowed);
273	vdo_free(pool);
274}
275
276/**
277 * borrow_pbn_lock_from_pool() - Borrow a PBN lock from the pool and initialize it with the
278 *				 provided type.
279 * @pool: The pool from which to borrow.
280 * @type: The type with which to initialize the lock.
281 * @lock_ptr:  A pointer to receive the borrowed lock.
282 *
283 * Pools do not grow on demand or allocate memory, so this will fail if the pool is empty. Borrowed
284 * locks are still associated with this pool and must be returned to only this pool.
285 *
286 * Return: VDO_SUCCESS, or VDO_LOCK_ERROR if the pool is empty.
287 */
288static int __must_check borrow_pbn_lock_from_pool(struct pbn_lock_pool *pool,
289						  enum pbn_lock_type type,
290						  struct pbn_lock **lock_ptr)
291{
292	int result;
293	struct list_head *idle_entry;
294	idle_pbn_lock *idle;
295
296	if (pool->borrowed >= pool->capacity)
297		return vdo_log_error_strerror(VDO_LOCK_ERROR,
298					      "no free PBN locks left to borrow");
299	pool->borrowed += 1;
300
301	result = VDO_ASSERT(!list_empty(&pool->idle_list),
302			    "idle list should not be empty if pool not at capacity");
303	if (result != VDO_SUCCESS)
304		return result;
305
306	idle_entry = pool->idle_list.prev;
307	list_del(idle_entry);
308	memset(idle_entry, 0, sizeof(*idle_entry));
309
310	idle = list_entry(idle_entry, idle_pbn_lock, entry);
311	idle->lock.holder_count = 0;
312	set_pbn_lock_type(&idle->lock, type);
313
314	*lock_ptr = &idle->lock;
315	return VDO_SUCCESS;
316}
317
318/**
319 * initialize_zone() - Initialize a physical zone.
320 * @vdo: The vdo to which the zone will belong.
321 * @zones: The physical_zones to which the zone being initialized belongs
322 *
323 * Return: VDO_SUCCESS or an error code.
324 */
325static int initialize_zone(struct vdo *vdo, struct physical_zones *zones)
326{
327	int result;
328	zone_count_t zone_number = zones->zone_count;
329	struct physical_zone *zone = &zones->zones[zone_number];
330
331	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->pbn_operations);
332	if (result != VDO_SUCCESS)
333		return result;
334
335	result = make_pbn_lock_pool(LOCK_POOL_CAPACITY, &zone->lock_pool);
336	if (result != VDO_SUCCESS) {
337		vdo_int_map_free(zone->pbn_operations);
338		return result;
339	}
340
341	zone->zone_number = zone_number;
342	zone->thread_id = vdo->thread_config.physical_threads[zone_number];
343	zone->allocator = &vdo->depot->allocators[zone_number];
344	zone->next = &zones->zones[(zone_number + 1) % vdo->thread_config.physical_zone_count];
345	result = vdo_make_default_thread(vdo, zone->thread_id);
346	if (result != VDO_SUCCESS) {
347		free_pbn_lock_pool(vdo_forget(zone->lock_pool));
348		vdo_int_map_free(zone->pbn_operations);
349		return result;
350	}
351	return result;
352}
353
354/**
355 * vdo_make_physical_zones() - Make the physical zones for a vdo.
356 * @vdo: The vdo being constructed
357 * @zones_ptr: A pointer to hold the zones
358 *
359 * Return: VDO_SUCCESS or an error code.
360 */
361int vdo_make_physical_zones(struct vdo *vdo, struct physical_zones **zones_ptr)
362{
363	struct physical_zones *zones;
364	int result;
365	zone_count_t zone_count = vdo->thread_config.physical_zone_count;
366
367	if (zone_count == 0)
368		return VDO_SUCCESS;
369
370	result = vdo_allocate_extended(struct physical_zones, zone_count,
371				       struct physical_zone, __func__, &zones);
372	if (result != VDO_SUCCESS)
373		return result;
374
375	for (zones->zone_count = 0; zones->zone_count < zone_count; zones->zone_count++) {
376		result = initialize_zone(vdo, zones);
377		if (result != VDO_SUCCESS) {
378			vdo_free_physical_zones(zones);
379			return result;
380		}
381	}
382
383	*zones_ptr = zones;
384	return VDO_SUCCESS;
385}
386
387/**
388 * vdo_free_physical_zones() - Destroy the physical zones.
389 * @zones: The zones to free.
390 */
391void vdo_free_physical_zones(struct physical_zones *zones)
392{
393	zone_count_t index;
394
395	if (zones == NULL)
396		return;
397
398	for (index = 0; index < zones->zone_count; index++) {
399		struct physical_zone *zone = &zones->zones[index];
400
401		free_pbn_lock_pool(vdo_forget(zone->lock_pool));
402		vdo_int_map_free(vdo_forget(zone->pbn_operations));
403	}
404
405	vdo_free(zones);
406}
407
408/**
409 * vdo_get_physical_zone_pbn_lock() - Get the lock on a PBN if one exists.
410 * @zone: The physical zone responsible for the PBN.
411 * @pbn: The physical block number whose lock is desired.
412 *
413 * Return: The lock or NULL if the PBN is not locked.
414 */
415struct pbn_lock *vdo_get_physical_zone_pbn_lock(struct physical_zone *zone,
416						physical_block_number_t pbn)
417{
418	return ((zone == NULL) ? NULL : vdo_int_map_get(zone->pbn_operations, pbn));
419}
420
421/**
422 * vdo_attempt_physical_zone_pbn_lock() - Attempt to lock a physical block in the zone responsible
423 *					  for it.
424 * @zone: The physical zone responsible for the PBN.
425 * @pbn: The physical block number to lock.
426 * @type: The type with which to initialize a new lock.
427 * @lock_ptr:  A pointer to receive the lock, existing or new.
428 *
429 * If the PBN is already locked, the existing lock will be returned. Otherwise, a new lock instance
430 * will be borrowed from the pool, initialized, and returned. The lock owner will be NULL for a new
431 * lock acquired by the caller, who is responsible for setting that field promptly. The lock owner
432 * will be non-NULL when there is already an existing lock on the PBN.
433 *
434 * Return: VDO_SUCCESS or an error.
435 */
436int vdo_attempt_physical_zone_pbn_lock(struct physical_zone *zone,
437				       physical_block_number_t pbn,
438				       enum pbn_lock_type type,
439				       struct pbn_lock **lock_ptr)
440{
441	/*
442	 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses in
443	 * the common case of no lock contention.
444	 */
445	struct pbn_lock *lock, *new_lock = NULL;
446	int result;
447
448	result = borrow_pbn_lock_from_pool(zone->lock_pool, type, &new_lock);
449	if (result != VDO_SUCCESS) {
450		VDO_ASSERT_LOG_ONLY(false, "must always be able to borrow a PBN lock");
451		return result;
452	}
453
454	result = vdo_int_map_put(zone->pbn_operations, pbn, new_lock, false,
455				 (void **) &lock);
456	if (result != VDO_SUCCESS) {
457		return_pbn_lock_to_pool(zone->lock_pool, new_lock);
458		return result;
459	}
460
461	if (lock != NULL) {
462		/* The lock is already held, so we don't need the borrowed one. */
463		return_pbn_lock_to_pool(zone->lock_pool, vdo_forget(new_lock));
464		result = VDO_ASSERT(lock->holder_count > 0, "physical block %llu lock held",
465				    (unsigned long long) pbn);
466		if (result != VDO_SUCCESS)
467			return result;
468		*lock_ptr = lock;
469	} else {
470		*lock_ptr = new_lock;
471	}
472	return VDO_SUCCESS;
473}
474
475/**
476 * allocate_and_lock_block() - Attempt to allocate a block from this zone.
477 * @allocation: The struct allocation of the data_vio attempting to allocate.
478 *
479 * If a block is allocated, the recipient will also hold a lock on it.
480 *
481 * Return: VDO_SUCCESS if a block was allocated, or an error code.
482 */
483static int allocate_and_lock_block(struct allocation *allocation)
484{
485	int result;
486	struct pbn_lock *lock;
487
488	VDO_ASSERT_LOG_ONLY(allocation->lock == NULL,
489			    "must not allocate a block while already holding a lock on one");
490
491	result = vdo_allocate_block(allocation->zone->allocator, &allocation->pbn);
492	if (result != VDO_SUCCESS)
493		return result;
494
495	result = vdo_attempt_physical_zone_pbn_lock(allocation->zone, allocation->pbn,
496						    allocation->write_lock_type, &lock);
497	if (result != VDO_SUCCESS)
498		return result;
499
500	if (lock->holder_count > 0) {
501		/* This block is already locked, which should be impossible. */
502		return vdo_log_error_strerror(VDO_LOCK_ERROR,
503					      "Newly allocated block %llu was spuriously locked (holder_count=%u)",
504					      (unsigned long long) allocation->pbn,
505					      lock->holder_count);
506	}
507
508	/* We've successfully acquired a new lock, so mark it as ours. */
509	lock->holder_count += 1;
510	allocation->lock = lock;
511	vdo_assign_pbn_lock_provisional_reference(lock);
512	return VDO_SUCCESS;
513}
514
515/**
516 * retry_allocation() - Retry allocating a block now that we're done waiting for scrubbing.
517 * @waiter: The allocating_vio that was waiting to allocate.
518 * @context: The context (unused).
519 */
520static void retry_allocation(struct vdo_waiter *waiter, void *context __always_unused)
521{
522	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
523
524	/* Now that some slab has scrubbed, restart the allocation process. */
525	data_vio->allocation.wait_for_clean_slab = false;
526	data_vio->allocation.first_allocation_zone = data_vio->allocation.zone->zone_number;
527	continue_data_vio(data_vio);
528}
529
530/**
531 * continue_allocating() - Continue searching for an allocation by enqueuing to wait for scrubbing
532 *			   or switching to the next zone.
533 * @data_vio: The data_vio attempting to get an allocation.
534 *
535 * This method should only be called from the error handler set in data_vio_allocate_data_block.
536 *
537 * Return: true if the allocation process has continued in another zone.
538 */
539static bool continue_allocating(struct data_vio *data_vio)
540{
541	struct allocation *allocation = &data_vio->allocation;
542	struct physical_zone *zone = allocation->zone;
543	struct vdo_completion *completion = &data_vio->vio.completion;
544	int result = VDO_SUCCESS;
545	bool was_waiting = allocation->wait_for_clean_slab;
546	bool tried_all = (allocation->first_allocation_zone == zone->next->zone_number);
547
548	vdo_reset_completion(completion);
549
550	if (tried_all && !was_waiting) {
551		/*
552		 * We've already looked in all the zones, and found nothing. So go through the
553		 * zones again, and wait for each to scrub before trying to allocate.
554		 */
555		allocation->wait_for_clean_slab = true;
556		allocation->first_allocation_zone = zone->zone_number;
557	}
558
559	if (allocation->wait_for_clean_slab) {
560		data_vio->waiter.callback = retry_allocation;
561		result = vdo_enqueue_clean_slab_waiter(zone->allocator,
562						       &data_vio->waiter);
563		if (result == VDO_SUCCESS) {
564			/* We've enqueued to wait for a slab to be scrubbed. */
565			return true;
566		}
567
568		if ((result != VDO_NO_SPACE) || (was_waiting && tried_all)) {
569			vdo_set_completion_result(completion, result);
570			return false;
571		}
572	}
573
574	allocation->zone = zone->next;
575	completion->callback_thread_id = allocation->zone->thread_id;
576	vdo_launch_completion(completion);
577	return true;
578}
579
580/**
581 * vdo_allocate_block_in_zone() - Attempt to allocate a block in the current physical zone, and if
582 *				  that fails try the next if possible.
583 * @data_vio: The data_vio needing an allocation.
584 *
585 * Return: true if a block was allocated, if not the data_vio will have been dispatched so the
586 *         caller must not touch it.
587 */
588bool vdo_allocate_block_in_zone(struct data_vio *data_vio)
589{
590	int result = allocate_and_lock_block(&data_vio->allocation);
591
592	if (result == VDO_SUCCESS)
593		return true;
594
595	if ((result != VDO_NO_SPACE) || !continue_allocating(data_vio))
596		continue_data_vio_with_error(data_vio, result);
597
598	return false;
599}
600
601/**
602 * vdo_release_physical_zone_pbn_lock() - Release a physical block lock if it is held and return it
603 *                                        to the lock pool.
604 * @zone: The physical zone in which the lock was obtained.
605 * @locked_pbn: The physical block number to unlock.
606 * @lock: The lock being released.
607 *
608 * It must be the last live reference, as if the memory were being freed (the
609 * lock memory will re-initialized or zeroed).
610 */
611void vdo_release_physical_zone_pbn_lock(struct physical_zone *zone,
612					physical_block_number_t locked_pbn,
613					struct pbn_lock *lock)
614{
615	struct pbn_lock *holder;
616
617	if (lock == NULL)
618		return;
619
620	VDO_ASSERT_LOG_ONLY(lock->holder_count > 0,
621			    "should not be releasing a lock that is not held");
622
623	lock->holder_count -= 1;
624	if (lock->holder_count > 0) {
625		/* The lock was shared and is still referenced, so don't release it yet. */
626		return;
627	}
628
629	holder = vdo_int_map_remove(zone->pbn_operations, locked_pbn);
630	VDO_ASSERT_LOG_ONLY((lock == holder), "physical block lock mismatch for block %llu",
631			    (unsigned long long) locked_pbn);
632
633	release_pbn_lock_provisional_reference(lock, locked_pbn, zone->allocator);
634	return_pbn_lock_to_pool(zone->lock_pool, lock);
635}
636
637/**
638 * vdo_dump_physical_zone() - Dump information about a physical zone to the log for debugging.
639 * @zone: The zone to dump.
640 */
641void vdo_dump_physical_zone(const struct physical_zone *zone)
642{
643	vdo_dump_block_allocator(zone->allocator);
644}
645