1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "packer.h"
7
8#include <linux/atomic.h>
9#include <linux/blkdev.h>
10
11#include "logger.h"
12#include "memory-alloc.h"
13#include "permassert.h"
14#include "string-utils.h"
15
16#include "admin-state.h"
17#include "completion.h"
18#include "constants.h"
19#include "data-vio.h"
20#include "dedupe.h"
21#include "encodings.h"
22#include "io-submitter.h"
23#include "physical-zone.h"
24#include "status-codes.h"
25#include "vdo.h"
26#include "vio.h"
27
28static const struct version_number COMPRESSED_BLOCK_1_0 = {
29	.major_version = 1,
30	.minor_version = 0,
31};
32
33#define COMPRESSED_BLOCK_1_0_SIZE (4 + 4 + (2 * VDO_MAX_COMPRESSION_SLOTS))
34
35/**
36 * vdo_get_compressed_block_fragment() - Get a reference to a compressed fragment from a compressed
37 *                                       block.
38 * @mapping_state [in] The mapping state for the look up.
39 * @compressed_block [in] The compressed block that was read from disk.
40 * @fragment_offset [out] The offset of the fragment within a compressed block.
41 * @fragment_size [out] The size of the fragment.
42 *
43 * Return: If a valid compressed fragment is found, VDO_SUCCESS; otherwise, VDO_INVALID_FRAGMENT if
44 *         the fragment is invalid.
45 */
46int vdo_get_compressed_block_fragment(enum block_mapping_state mapping_state,
47				      struct compressed_block *block,
48				      u16 *fragment_offset, u16 *fragment_size)
49{
50	u16 compressed_size;
51	u16 offset = 0;
52	unsigned int i;
53	u8 slot;
54	struct version_number version;
55
56	if (!vdo_is_state_compressed(mapping_state))
57		return VDO_INVALID_FRAGMENT;
58
59	version = vdo_unpack_version_number(block->header.version);
60	if (!vdo_are_same_version(version, COMPRESSED_BLOCK_1_0))
61		return VDO_INVALID_FRAGMENT;
62
63	slot = mapping_state - VDO_MAPPING_STATE_COMPRESSED_BASE;
64	if (slot >= VDO_MAX_COMPRESSION_SLOTS)
65		return VDO_INVALID_FRAGMENT;
66
67	compressed_size = __le16_to_cpu(block->header.sizes[slot]);
68	for (i = 0; i < slot; i++) {
69		offset += __le16_to_cpu(block->header.sizes[i]);
70		if (offset >= VDO_COMPRESSED_BLOCK_DATA_SIZE)
71			return VDO_INVALID_FRAGMENT;
72	}
73
74	if ((offset + compressed_size) > VDO_COMPRESSED_BLOCK_DATA_SIZE)
75		return VDO_INVALID_FRAGMENT;
76
77	*fragment_offset = offset;
78	*fragment_size = compressed_size;
79	return VDO_SUCCESS;
80}
81
82/**
83 * assert_on_packer_thread() - Check that we are on the packer thread.
84 * @packer: The packer.
85 * @caller: The function which is asserting.
86 */
87static inline void assert_on_packer_thread(struct packer *packer, const char *caller)
88{
89	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == packer->thread_id),
90			    "%s() called from packer thread", caller);
91}
92
93/**
94 * insert_in_sorted_list() - Insert a bin to the list.
95 * @packer: The packer.
96 * @bin: The bin to move to its sorted position.
97 *
98 * The list is in ascending order of free space. Since all bins are already in the list, this
99 * actually moves the bin to the correct position in the list.
100 */
101static void insert_in_sorted_list(struct packer *packer, struct packer_bin *bin)
102{
103	struct packer_bin *active_bin;
104
105	list_for_each_entry(active_bin, &packer->bins, list)
106		if (active_bin->free_space > bin->free_space) {
107			list_move_tail(&bin->list, &active_bin->list);
108			return;
109		}
110
111	list_move_tail(&bin->list, &packer->bins);
112}
113
114/**
115 * make_bin() - Allocate a bin and put it into the packer's list.
116 * @packer: The packer.
117 */
118static int __must_check make_bin(struct packer *packer)
119{
120	struct packer_bin *bin;
121	int result;
122
123	result = vdo_allocate_extended(struct packer_bin, VDO_MAX_COMPRESSION_SLOTS,
124				       struct vio *, __func__, &bin);
125	if (result != VDO_SUCCESS)
126		return result;
127
128	bin->free_space = VDO_COMPRESSED_BLOCK_DATA_SIZE;
129	INIT_LIST_HEAD(&bin->list);
130	list_add_tail(&bin->list, &packer->bins);
131	return VDO_SUCCESS;
132}
133
134/**
135 * vdo_make_packer() - Make a new block packer.
136 *
137 * @vdo: The vdo to which this packer belongs.
138 * @bin_count: The number of partial bins to keep in memory.
139 * @packer_ptr: A pointer to hold the new packer.
140 *
141 * Return: VDO_SUCCESS or an error
142 */
143int vdo_make_packer(struct vdo *vdo, block_count_t bin_count, struct packer **packer_ptr)
144{
145	struct packer *packer;
146	block_count_t i;
147	int result;
148
149	result = vdo_allocate(1, struct packer, __func__, &packer);
150	if (result != VDO_SUCCESS)
151		return result;
152
153	packer->thread_id = vdo->thread_config.packer_thread;
154	packer->size = bin_count;
155	INIT_LIST_HEAD(&packer->bins);
156	vdo_set_admin_state_code(&packer->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
157
158	for (i = 0; i < bin_count; i++) {
159		result = make_bin(packer);
160		if (result != VDO_SUCCESS) {
161			vdo_free_packer(packer);
162			return result;
163		}
164	}
165
166	/*
167	 * The canceled bin can hold up to half the number of user vios. Every canceled vio in the
168	 * bin must have a canceler for which it is waiting, and any canceler will only have
169	 * canceled one lock holder at a time.
170	 */
171	result = vdo_allocate_extended(struct packer_bin, MAXIMUM_VDO_USER_VIOS / 2,
172				       struct vio *, __func__, &packer->canceled_bin);
173	if (result != VDO_SUCCESS) {
174		vdo_free_packer(packer);
175		return result;
176	}
177
178	result = vdo_make_default_thread(vdo, packer->thread_id);
179	if (result != VDO_SUCCESS) {
180		vdo_free_packer(packer);
181		return result;
182	}
183
184	*packer_ptr = packer;
185	return VDO_SUCCESS;
186}
187
188/**
189 * vdo_free_packer() - Free a block packer.
190 * @packer: The packer to free.
191 */
192void vdo_free_packer(struct packer *packer)
193{
194	struct packer_bin *bin, *tmp;
195
196	if (packer == NULL)
197		return;
198
199	list_for_each_entry_safe(bin, tmp, &packer->bins, list) {
200		list_del_init(&bin->list);
201		vdo_free(bin);
202	}
203
204	vdo_free(vdo_forget(packer->canceled_bin));
205	vdo_free(packer);
206}
207
208/**
209 * get_packer_from_data_vio() - Get the packer from a data_vio.
210 * @data_vio: The data_vio.
211 *
212 * Return: The packer from the VDO to which the data_vio belongs.
213 */
214static inline struct packer *get_packer_from_data_vio(struct data_vio *data_vio)
215{
216	return vdo_from_data_vio(data_vio)->packer;
217}
218
219/**
220 * vdo_get_packer_statistics() - Get the current statistics from the packer.
221 * @packer: The packer to query.
222 *
223 * Return: a copy of the current statistics for the packer.
224 */
225struct packer_statistics vdo_get_packer_statistics(const struct packer *packer)
226{
227	const struct packer_statistics *stats = &packer->statistics;
228
229	return (struct packer_statistics) {
230		.compressed_fragments_written = READ_ONCE(stats->compressed_fragments_written),
231		.compressed_blocks_written = READ_ONCE(stats->compressed_blocks_written),
232		.compressed_fragments_in_packer = READ_ONCE(stats->compressed_fragments_in_packer),
233	};
234}
235
236/**
237 * abort_packing() - Abort packing a data_vio.
238 * @data_vio: The data_vio to abort.
239 */
240static void abort_packing(struct data_vio *data_vio)
241{
242	struct packer *packer = get_packer_from_data_vio(data_vio);
243
244	WRITE_ONCE(packer->statistics.compressed_fragments_in_packer,
245		   packer->statistics.compressed_fragments_in_packer - 1);
246
247	write_data_vio(data_vio);
248}
249
250/**
251 * release_compressed_write_waiter() - Update a data_vio for which a successful compressed write
252 *                                     has completed and send it on its way.
253
254 * @data_vio: The data_vio to release.
255 * @allocation: The allocation to which the compressed block was written.
256 */
257static void release_compressed_write_waiter(struct data_vio *data_vio,
258					    struct allocation *allocation)
259{
260	data_vio->new_mapped = (struct zoned_pbn) {
261		.pbn = allocation->pbn,
262		.zone = allocation->zone,
263		.state = data_vio->compression.slot + VDO_MAPPING_STATE_COMPRESSED_BASE,
264	};
265
266	vdo_share_compressed_write_lock(data_vio, allocation->lock);
267	update_metadata_for_data_vio_write(data_vio, allocation->lock);
268}
269
270/**
271 * finish_compressed_write() - Finish a compressed block write.
272 * @completion: The compressed write completion.
273 *
274 * This callback is registered in continue_after_allocation().
275 */
276static void finish_compressed_write(struct vdo_completion *completion)
277{
278	struct data_vio *agent = as_data_vio(completion);
279	struct data_vio *client, *next;
280
281	assert_data_vio_in_allocated_zone(agent);
282
283	/*
284	 * Process all the non-agent waiters first to ensure that the pbn lock can not be released
285	 * until all of them have had a chance to journal their increfs.
286	 */
287	for (client = agent->compression.next_in_batch; client != NULL; client = next) {
288		next = client->compression.next_in_batch;
289		release_compressed_write_waiter(client, &agent->allocation);
290	}
291
292	completion->error_handler = handle_data_vio_error;
293	release_compressed_write_waiter(agent, &agent->allocation);
294}
295
296static void handle_compressed_write_error(struct vdo_completion *completion)
297{
298	struct data_vio *agent = as_data_vio(completion);
299	struct allocation *allocation = &agent->allocation;
300	struct data_vio *client, *next;
301
302	if (vdo_requeue_completion_if_needed(completion, allocation->zone->thread_id))
303		return;
304
305	update_vio_error_stats(as_vio(completion),
306			       "Completing compressed write vio for physical block %llu with error",
307			       (unsigned long long) allocation->pbn);
308
309	for (client = agent->compression.next_in_batch; client != NULL; client = next) {
310		next = client->compression.next_in_batch;
311		write_data_vio(client);
312	}
313
314	/* Now that we've released the batch from the packer, forget the error and continue on. */
315	vdo_reset_completion(completion);
316	completion->error_handler = handle_data_vio_error;
317	write_data_vio(agent);
318}
319
320/**
321 * add_to_bin() - Put a data_vio in a specific packer_bin in which it will definitely fit.
322 * @bin: The bin in which to put the data_vio.
323 * @data_vio: The data_vio to add.
324 */
325static void add_to_bin(struct packer_bin *bin, struct data_vio *data_vio)
326{
327	data_vio->compression.bin = bin;
328	data_vio->compression.slot = bin->slots_used;
329	bin->incoming[bin->slots_used++] = data_vio;
330}
331
332/**
333 * remove_from_bin() - Get the next data_vio whose compression has not been canceled from a bin.
334 * @packer: The packer.
335 * @bin: The bin from which to get a data_vio.
336 *
337 * Any canceled data_vios will be moved to the canceled bin.
338 * Return: An uncanceled data_vio from the bin or NULL if there are none.
339 */
340static struct data_vio *remove_from_bin(struct packer *packer, struct packer_bin *bin)
341{
342	while (bin->slots_used > 0) {
343		struct data_vio *data_vio = bin->incoming[--bin->slots_used];
344
345		if (!advance_data_vio_compression_stage(data_vio).may_not_compress) {
346			data_vio->compression.bin = NULL;
347			return data_vio;
348		}
349
350		add_to_bin(packer->canceled_bin, data_vio);
351	}
352
353	/* The bin is now empty. */
354	bin->free_space = VDO_COMPRESSED_BLOCK_DATA_SIZE;
355	return NULL;
356}
357
358/**
359 * initialize_compressed_block() - Initialize a compressed block.
360 * @block: The compressed block to initialize.
361 * @size: The size of the agent's fragment.
362 *
363 * This method initializes the compressed block in the compressed write agent. Because the
364 * compressor already put the agent's compressed fragment at the start of the compressed block's
365 * data field, it needn't be copied. So all we need do is initialize the header and set the size of
366 * the agent's fragment.
367 */
368static void initialize_compressed_block(struct compressed_block *block, u16 size)
369{
370	/*
371	 * Make sure the block layout isn't accidentally changed by changing the length of the
372	 * block header.
373	 */
374	BUILD_BUG_ON(sizeof(struct compressed_block_header) != COMPRESSED_BLOCK_1_0_SIZE);
375
376	block->header.version = vdo_pack_version_number(COMPRESSED_BLOCK_1_0);
377	block->header.sizes[0] = __cpu_to_le16(size);
378}
379
380/**
381 * pack_fragment() - Pack a data_vio's fragment into the compressed block in which it is already
382 *                   known to fit.
383 * @compression: The agent's compression_state to pack in to.
384 * @data_vio: The data_vio to pack.
385 * @offset: The offset into the compressed block at which to pack the fragment.
386 * @compressed_block: The compressed block which will be written out when batch is fully packed.
387 *
388 * Return: The new amount of space used.
389 */
390static block_size_t __must_check pack_fragment(struct compression_state *compression,
391					       struct data_vio *data_vio,
392					       block_size_t offset, slot_number_t slot,
393					       struct compressed_block *block)
394{
395	struct compression_state *to_pack = &data_vio->compression;
396	char *fragment = to_pack->block->data;
397
398	to_pack->next_in_batch = compression->next_in_batch;
399	compression->next_in_batch = data_vio;
400	to_pack->slot = slot;
401	block->header.sizes[slot] = __cpu_to_le16(to_pack->size);
402	memcpy(&block->data[offset], fragment, to_pack->size);
403	return (offset + to_pack->size);
404}
405
406/**
407 * compressed_write_end_io() - The bio_end_io for a compressed block write.
408 * @bio: The bio for the compressed write.
409 */
410static void compressed_write_end_io(struct bio *bio)
411{
412	struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
413
414	vdo_count_completed_bios(bio);
415	set_data_vio_allocated_zone_callback(data_vio, finish_compressed_write);
416	continue_data_vio_with_error(data_vio, blk_status_to_errno(bio->bi_status));
417}
418
419/**
420 * write_bin() - Write out a bin.
421 * @packer: The packer.
422 * @bin: The bin to write.
423 */
424static void write_bin(struct packer *packer, struct packer_bin *bin)
425{
426	int result;
427	block_size_t offset;
428	slot_number_t slot = 1;
429	struct compression_state *compression;
430	struct compressed_block *block;
431	struct data_vio *agent = remove_from_bin(packer, bin);
432	struct data_vio *client;
433	struct packer_statistics *stats;
434
435	if (agent == NULL)
436		return;
437
438	compression = &agent->compression;
439	compression->slot = 0;
440	block = compression->block;
441	initialize_compressed_block(block, compression->size);
442	offset = compression->size;
443
444	while ((client = remove_from_bin(packer, bin)) != NULL)
445		offset = pack_fragment(compression, client, offset, slot++, block);
446
447	/*
448	 * If the batch contains only a single vio, then we save nothing by saving the compressed
449	 * form. Continue processing the single vio in the batch.
450	 */
451	if (slot == 1) {
452		abort_packing(agent);
453		return;
454	}
455
456	if (slot < VDO_MAX_COMPRESSION_SLOTS) {
457		/* Clear out the sizes of the unused slots */
458		memset(&block->header.sizes[slot], 0,
459		       (VDO_MAX_COMPRESSION_SLOTS - slot) * sizeof(__le16));
460	}
461
462	agent->vio.completion.error_handler = handle_compressed_write_error;
463	if (vdo_is_read_only(vdo_from_data_vio(agent))) {
464		continue_data_vio_with_error(agent, VDO_READ_ONLY);
465		return;
466	}
467
468	result = vio_reset_bio(&agent->vio, (char *) block, compressed_write_end_io,
469			       REQ_OP_WRITE, agent->allocation.pbn);
470	if (result != VDO_SUCCESS) {
471		continue_data_vio_with_error(agent, result);
472		return;
473	}
474
475	/*
476	 * Once the compressed write is submitted, the fragments are no longer in the packer, so
477	 * update stats now.
478	 */
479	stats = &packer->statistics;
480	WRITE_ONCE(stats->compressed_fragments_in_packer,
481		   (stats->compressed_fragments_in_packer - slot));
482	WRITE_ONCE(stats->compressed_fragments_written,
483		   (stats->compressed_fragments_written + slot));
484	WRITE_ONCE(stats->compressed_blocks_written,
485		   stats->compressed_blocks_written + 1);
486
487	vdo_submit_data_vio(agent);
488}
489
490/**
491 * add_data_vio_to_packer_bin() - Add a data_vio to a bin's incoming queue
492 * @packer: The packer.
493 * @bin: The bin to which to add the data_vio.
494 * @data_vio: The data_vio to add to the bin's queue.
495 *
496 * Adds a data_vio to a bin's incoming queue, handles logical space change, and calls physical
497 * space processor.
498 */
499static void add_data_vio_to_packer_bin(struct packer *packer, struct packer_bin *bin,
500				       struct data_vio *data_vio)
501{
502	/* If the selected bin doesn't have room, start a new batch to make room. */
503	if (bin->free_space < data_vio->compression.size)
504		write_bin(packer, bin);
505
506	add_to_bin(bin, data_vio);
507	bin->free_space -= data_vio->compression.size;
508
509	/* If we happen to exactly fill the bin, start a new batch. */
510	if ((bin->slots_used == VDO_MAX_COMPRESSION_SLOTS) ||
511	    (bin->free_space == 0))
512		write_bin(packer, bin);
513
514	/* Now that we've finished changing the free space, restore the sort order. */
515	insert_in_sorted_list(packer, bin);
516}
517
518/**
519 * select_bin() - Select the bin that should be used to pack the compressed data in a data_vio with
520 *                other data_vios.
521 * @packer: The packer.
522 * @data_vio: The data_vio.
523 */
524static struct packer_bin * __must_check select_bin(struct packer *packer,
525						   struct data_vio *data_vio)
526{
527	/*
528	 * First best fit: select the bin with the least free space that has enough room for the
529	 * compressed data in the data_vio.
530	 */
531	struct packer_bin *bin, *fullest_bin;
532
533	list_for_each_entry(bin, &packer->bins, list) {
534		if (bin->free_space >= data_vio->compression.size)
535			return bin;
536	}
537
538	/*
539	 * None of the bins have enough space for the data_vio. We're not allowed to create new
540	 * bins, so we have to overflow one of the existing bins. It's pretty intuitive to select
541	 * the fullest bin, since that "wastes" the least amount of free space in the compressed
542	 * block. But if the space currently used in the fullest bin is smaller than the compressed
543	 * size of the incoming block, it seems wrong to force that bin to write when giving up on
544	 * compressing the incoming data_vio would likewise "waste" the least amount of free space.
545	 */
546	fullest_bin = list_first_entry(&packer->bins, struct packer_bin, list);
547	if (data_vio->compression.size >=
548	    (VDO_COMPRESSED_BLOCK_DATA_SIZE - fullest_bin->free_space))
549		return NULL;
550
551	/*
552	 * The fullest bin doesn't have room, but writing it out and starting a new batch with the
553	 * incoming data_vio will increase the packer's free space.
554	 */
555	return fullest_bin;
556}
557
558/**
559 * vdo_attempt_packing() - Attempt to rewrite the data in this data_vio as part of a compressed
560 *                         block.
561 * @data_vio: The data_vio to pack.
562 */
563void vdo_attempt_packing(struct data_vio *data_vio)
564{
565	int result;
566	struct packer_bin *bin;
567	struct data_vio_compression_status status = get_data_vio_compression_status(data_vio);
568	struct packer *packer = get_packer_from_data_vio(data_vio);
569
570	assert_on_packer_thread(packer, __func__);
571
572	result = VDO_ASSERT((status.stage == DATA_VIO_COMPRESSING),
573			    "attempt to pack data_vio not ready for packing, stage: %u",
574			    status.stage);
575	if (result != VDO_SUCCESS)
576		return;
577
578	/*
579	 * Increment whether or not this data_vio will be packed or not since abort_packing()
580	 * always decrements the counter.
581	 */
582	WRITE_ONCE(packer->statistics.compressed_fragments_in_packer,
583		   packer->statistics.compressed_fragments_in_packer + 1);
584
585	/*
586	 * If packing of this data_vio is disallowed for administrative reasons, give up before
587	 * making any state changes.
588	 */
589	if (!vdo_is_state_normal(&packer->state) ||
590	    (data_vio->flush_generation < packer->flush_generation)) {
591		abort_packing(data_vio);
592		return;
593	}
594
595	/*
596	 * The advance_data_vio_compression_stage() check here verifies that the data_vio is
597	 * allowed to be compressed (if it has already been canceled, we'll fall out here). Once
598	 * the data_vio is in the DATA_VIO_PACKING state, it must be guaranteed to be put in a bin
599	 * before any more requests can be processed by the packer thread. Otherwise, a canceling
600	 * data_vio could attempt to remove the canceled data_vio from the packer and fail to
601	 * rendezvous with it. Thus, we must call select_bin() first to ensure that we will
602	 * actually add the data_vio to a bin before advancing to the DATA_VIO_PACKING stage.
603	 */
604	bin = select_bin(packer, data_vio);
605	if ((bin == NULL) ||
606	    (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_PACKING)) {
607		abort_packing(data_vio);
608		return;
609	}
610
611	add_data_vio_to_packer_bin(packer, bin, data_vio);
612}
613
614/**
615 * check_for_drain_complete() - Check whether the packer has drained.
616 * @packer: The packer.
617 */
618static void check_for_drain_complete(struct packer *packer)
619{
620	if (vdo_is_state_draining(&packer->state) && (packer->canceled_bin->slots_used == 0))
621		vdo_finish_draining(&packer->state);
622}
623
624/**
625 * write_all_non_empty_bins() - Write out all non-empty bins on behalf of a flush or suspend.
626 * @packer: The packer being flushed.
627 */
628static void write_all_non_empty_bins(struct packer *packer)
629{
630	struct packer_bin *bin;
631
632	list_for_each_entry(bin, &packer->bins, list)
633		write_bin(packer, bin);
634		/*
635		 * We don't need to re-sort the bin here since this loop will make every bin have
636		 * the same amount of free space, so every ordering is sorted.
637		 */
638
639	check_for_drain_complete(packer);
640}
641
642/**
643 * vdo_flush_packer() - Request that the packer flush asynchronously.
644 * @packer: The packer to flush.
645 *
646 * All bins with at least two compressed data blocks will be written out, and any solitary pending
647 * VIOs will be released from the packer. While flushing is in progress, any VIOs submitted to
648 * vdo_attempt_packing() will be continued immediately without attempting to pack them.
649 */
650void vdo_flush_packer(struct packer *packer)
651{
652	assert_on_packer_thread(packer, __func__);
653	if (vdo_is_state_normal(&packer->state))
654		write_all_non_empty_bins(packer);
655}
656
657/**
658 * vdo_remove_lock_holder_from_packer() - Remove a lock holder from the packer.
659 * @completion: The data_vio which needs a lock held by a data_vio in the packer. The data_vio's
660 *              compression.lock_holder field will point to the data_vio to remove.
661 */
662void vdo_remove_lock_holder_from_packer(struct vdo_completion *completion)
663{
664	struct data_vio *data_vio = as_data_vio(completion);
665	struct packer *packer = get_packer_from_data_vio(data_vio);
666	struct data_vio *lock_holder;
667	struct packer_bin *bin;
668	slot_number_t slot;
669
670	assert_data_vio_in_packer_zone(data_vio);
671
672	lock_holder = vdo_forget(data_vio->compression.lock_holder);
673	bin = lock_holder->compression.bin;
674	VDO_ASSERT_LOG_ONLY((bin != NULL), "data_vio in packer has a bin");
675
676	slot = lock_holder->compression.slot;
677	bin->slots_used--;
678	if (slot < bin->slots_used) {
679		bin->incoming[slot] = bin->incoming[bin->slots_used];
680		bin->incoming[slot]->compression.slot = slot;
681	}
682
683	lock_holder->compression.bin = NULL;
684	lock_holder->compression.slot = 0;
685
686	if (bin != packer->canceled_bin) {
687		bin->free_space += lock_holder->compression.size;
688		insert_in_sorted_list(packer, bin);
689	}
690
691	abort_packing(lock_holder);
692	check_for_drain_complete(packer);
693}
694
695/**
696 * vdo_increment_packer_flush_generation() - Increment the flush generation in the packer.
697 * @packer: The packer.
698 *
699 * This will also cause the packer to flush so that any VIOs from previous generations will exit
700 * the packer.
701 */
702void vdo_increment_packer_flush_generation(struct packer *packer)
703{
704	assert_on_packer_thread(packer, __func__);
705	packer->flush_generation++;
706	vdo_flush_packer(packer);
707}
708
709/**
710 * initiate_drain() - Initiate a drain.
711 *
712 * Implements vdo_admin_initiator_fn.
713 */
714static void initiate_drain(struct admin_state *state)
715{
716	struct packer *packer = container_of(state, struct packer, state);
717
718	write_all_non_empty_bins(packer);
719}
720
721/**
722 * vdo_drain_packer() - Drain the packer by preventing any more VIOs from entering the packer and
723 *                      then flushing.
724 * @packer: The packer to drain.
725 * @completion: The completion to finish when the packer has drained.
726 */
727void vdo_drain_packer(struct packer *packer, struct vdo_completion *completion)
728{
729	assert_on_packer_thread(packer, __func__);
730	vdo_start_draining(&packer->state, VDO_ADMIN_STATE_SUSPENDING, completion,
731			   initiate_drain);
732}
733
734/**
735 * vdo_resume_packer() - Resume a packer which has been suspended.
736 * @packer: The packer to resume.
737 * @parent: The completion to finish when the packer has resumed.
738 */
739void vdo_resume_packer(struct packer *packer, struct vdo_completion *parent)
740{
741	assert_on_packer_thread(packer, __func__);
742	vdo_continue_completion(parent, vdo_resume_if_quiescent(&packer->state));
743}
744
745static void dump_packer_bin(const struct packer_bin *bin, bool canceled)
746{
747	if (bin->slots_used == 0)
748		/* Don't dump empty bins. */
749		return;
750
751	vdo_log_info("	  %sBin slots_used=%u free_space=%zu",
752		     (canceled ? "Canceled" : ""), bin->slots_used, bin->free_space);
753
754	/*
755	 * FIXME: dump vios in bin->incoming? The vios should have been dumped from the vio pool.
756	 * Maybe just dump their addresses so it's clear they're here?
757	 */
758}
759
760/**
761 * vdo_dump_packer() - Dump the packer.
762 * @packer: The packer.
763 *
764 * Context: dumps in a thread-unsafe fashion.
765 */
766void vdo_dump_packer(const struct packer *packer)
767{
768	struct packer_bin *bin;
769
770	vdo_log_info("packer");
771	vdo_log_info("	flushGeneration=%llu state %s  packer_bin_count=%llu",
772		     (unsigned long long) packer->flush_generation,
773		     vdo_get_admin_state_code(&packer->state)->name,
774		     (unsigned long long) packer->size);
775
776	list_for_each_entry(bin, &packer->bins, list)
777		dump_packer_bin(bin, false);
778
779	dump_packer_bin(packer->canceled_bin, true);
780}
781