1// SPDX-License-Identifier: GPL-2.0
2
3#include "bcachefs.h"
4#include "btree_locking.h"
5#include "btree_update.h"
6#include "btree_update_interior.h"
7#include "btree_write_buffer.h"
8#include "error.h"
9#include "journal.h"
10#include "journal_io.h"
11#include "journal_reclaim.h"
12
13#include <linux/prefetch.h>
14#include <linux/sort.h>
15
16static int bch2_btree_write_buffer_journal_flush(struct journal *,
17				struct journal_entry_pin *, u64);
18
19static int bch2_journal_keys_to_write_buffer(struct bch_fs *, struct journal_buf *);
20
21static inline bool __wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r)
22{
23	return (cmp_int(l->hi, r->hi) ?:
24		cmp_int(l->mi, r->mi) ?:
25		cmp_int(l->lo, r->lo)) >= 0;
26}
27
28static inline bool wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r)
29{
30#ifdef CONFIG_X86_64
31	int cmp;
32
33	asm("mov   (%[l]), %%rax;"
34	    "sub   (%[r]), %%rax;"
35	    "mov  8(%[l]), %%rax;"
36	    "sbb  8(%[r]), %%rax;"
37	    "mov 16(%[l]), %%rax;"
38	    "sbb 16(%[r]), %%rax;"
39	    : "=@ccae" (cmp)
40	    : [l] "r" (l), [r] "r" (r)
41	    : "rax", "cc");
42
43	EBUG_ON(cmp != __wb_key_ref_cmp(l, r));
44	return cmp;
45#else
46	return __wb_key_ref_cmp(l, r);
47#endif
48}
49
50static int wb_key_seq_cmp(const void *_l, const void *_r)
51{
52	const struct btree_write_buffered_key *l = _l;
53	const struct btree_write_buffered_key *r = _r;
54
55	return cmp_int(l->journal_seq, r->journal_seq);
56}
57
58/* Compare excluding idx, the low 24 bits: */
59static inline bool wb_key_eq(const void *_l, const void *_r)
60{
61	const struct wb_key_ref *l = _l;
62	const struct wb_key_ref *r = _r;
63
64	return !((l->hi ^ r->hi)|
65		 (l->mi ^ r->mi)|
66		 ((l->lo >> 24) ^ (r->lo >> 24)));
67}
68
69static noinline void wb_sort(struct wb_key_ref *base, size_t num)
70{
71	size_t n = num, a = num / 2;
72
73	if (!a)		/* num < 2 || size == 0 */
74		return;
75
76	for (;;) {
77		size_t b, c, d;
78
79		if (a)			/* Building heap: sift down --a */
80			--a;
81		else if (--n)		/* Sorting: Extract root to --n */
82			swap(base[0], base[n]);
83		else			/* Sort complete */
84			break;
85
86		/*
87		 * Sift element at "a" down into heap.  This is the
88		 * "bottom-up" variant, which significantly reduces
89		 * calls to cmp_func(): we find the sift-down path all
90		 * the way to the leaves (one compare per level), then
91		 * backtrack to find where to insert the target element.
92		 *
93		 * Because elements tend to sift down close to the leaves,
94		 * this uses fewer compares than doing two per level
95		 * on the way down.  (A bit more than half as many on
96		 * average, 3/4 worst-case.)
97		 */
98		for (b = a; c = 2*b + 1, (d = c + 1) < n;)
99			b = wb_key_ref_cmp(base + c, base + d) ? c : d;
100		if (d == n)		/* Special case last leaf with no sibling */
101			b = c;
102
103		/* Now backtrack from "b" to the correct location for "a" */
104		while (b != a && wb_key_ref_cmp(base + a, base + b))
105			b = (b - 1) / 2;
106		c = b;			/* Where "a" belongs */
107		while (b != a) {	/* Shift it into place */
108			b = (b - 1) / 2;
109			swap(base[b], base[c]);
110		}
111	}
112}
113
114static noinline int wb_flush_one_slowpath(struct btree_trans *trans,
115					  struct btree_iter *iter,
116					  struct btree_write_buffered_key *wb)
117{
118	struct btree_path *path = btree_iter_path(trans, iter);
119
120	bch2_btree_node_unlock_write(trans, path, path->l[0].b);
121
122	trans->journal_res.seq = wb->journal_seq;
123
124	return bch2_trans_update(trans, iter, &wb->k,
125				 BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE) ?:
126		bch2_trans_commit(trans, NULL, NULL,
127				  BCH_TRANS_COMMIT_no_enospc|
128				  BCH_TRANS_COMMIT_no_check_rw|
129				  BCH_TRANS_COMMIT_no_journal_res|
130				  BCH_TRANS_COMMIT_journal_reclaim);
131}
132
133static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *iter,
134			       struct btree_write_buffered_key *wb,
135			       bool *write_locked, size_t *fast)
136{
137	struct btree_path *path;
138	int ret;
139
140	EBUG_ON(!wb->journal_seq);
141	EBUG_ON(!trans->c->btree_write_buffer.flushing.pin.seq);
142	EBUG_ON(trans->c->btree_write_buffer.flushing.pin.seq > wb->journal_seq);
143
144	ret = bch2_btree_iter_traverse(iter);
145	if (ret)
146		return ret;
147
148	/*
149	 * We can't clone a path that has write locks: unshare it now, before
150	 * set_pos and traverse():
151	 */
152	if (btree_iter_path(trans, iter)->ref > 1)
153		iter->path = __bch2_btree_path_make_mut(trans, iter->path, true, _THIS_IP_);
154
155	path = btree_iter_path(trans, iter);
156
157	if (!*write_locked) {
158		ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c);
159		if (ret)
160			return ret;
161
162		bch2_btree_node_prep_for_write(trans, path, path->l[0].b);
163		*write_locked = true;
164	}
165
166	if (unlikely(!bch2_btree_node_insert_fits(path->l[0].b, wb->k.k.u64s))) {
167		*write_locked = false;
168		return wb_flush_one_slowpath(trans, iter, wb);
169	}
170
171	bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq);
172	(*fast)++;
173	return 0;
174}
175
176/*
177 * Update a btree with a write buffered key using the journal seq of the
178 * original write buffer insert.
179 *
180 * It is not safe to rejournal the key once it has been inserted into the write
181 * buffer because that may break recovery ordering. For example, the key may
182 * have already been modified in the active write buffer in a seq that comes
183 * before the current transaction. If we were to journal this key again and
184 * crash, recovery would process updates in the wrong order.
185 */
186static int
187btree_write_buffered_insert(struct btree_trans *trans,
188			  struct btree_write_buffered_key *wb)
189{
190	struct btree_iter iter;
191	int ret;
192
193	bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k),
194			     BTREE_ITER_CACHED|BTREE_ITER_INTENT);
195
196	trans->journal_res.seq = wb->journal_seq;
197
198	ret   = bch2_btree_iter_traverse(&iter) ?:
199		bch2_trans_update(trans, &iter, &wb->k,
200				  BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE);
201	bch2_trans_iter_exit(trans, &iter);
202	return ret;
203}
204
205static void move_keys_from_inc_to_flushing(struct btree_write_buffer *wb)
206{
207	struct bch_fs *c = container_of(wb, struct bch_fs, btree_write_buffer);
208	struct journal *j = &c->journal;
209
210	if (!wb->inc.keys.nr)
211		return;
212
213	bch2_journal_pin_add(j, wb->inc.keys.data[0].journal_seq, &wb->flushing.pin,
214			     bch2_btree_write_buffer_journal_flush);
215
216	darray_resize(&wb->flushing.keys, min_t(size_t, 1U << 20, wb->flushing.keys.nr + wb->inc.keys.nr));
217	darray_resize(&wb->sorted, wb->flushing.keys.size);
218
219	if (!wb->flushing.keys.nr && wb->sorted.size >= wb->inc.keys.nr) {
220		swap(wb->flushing.keys, wb->inc.keys);
221		goto out;
222	}
223
224	size_t nr = min(darray_room(wb->flushing.keys),
225			wb->sorted.size - wb->flushing.keys.nr);
226	nr = min(nr, wb->inc.keys.nr);
227
228	memcpy(&darray_top(wb->flushing.keys),
229	       wb->inc.keys.data,
230	       sizeof(wb->inc.keys.data[0]) * nr);
231
232	memmove(wb->inc.keys.data,
233		wb->inc.keys.data + nr,
234	       sizeof(wb->inc.keys.data[0]) * (wb->inc.keys.nr - nr));
235
236	wb->flushing.keys.nr	+= nr;
237	wb->inc.keys.nr		-= nr;
238out:
239	if (!wb->inc.keys.nr)
240		bch2_journal_pin_drop(j, &wb->inc.pin);
241	else
242		bch2_journal_pin_update(j, wb->inc.keys.data[0].journal_seq, &wb->inc.pin,
243					bch2_btree_write_buffer_journal_flush);
244
245	if (j->watermark) {
246		spin_lock(&j->lock);
247		bch2_journal_set_watermark(j);
248		spin_unlock(&j->lock);
249	}
250
251	BUG_ON(wb->sorted.size < wb->flushing.keys.nr);
252}
253
254static int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
255{
256	struct bch_fs *c = trans->c;
257	struct journal *j = &c->journal;
258	struct btree_write_buffer *wb = &c->btree_write_buffer;
259	struct btree_iter iter = { NULL };
260	size_t skipped = 0, fast = 0, slowpath = 0;
261	bool write_locked = false;
262	int ret = 0;
263
264	bch2_trans_unlock(trans);
265	bch2_trans_begin(trans);
266
267	mutex_lock(&wb->inc.lock);
268	move_keys_from_inc_to_flushing(wb);
269	mutex_unlock(&wb->inc.lock);
270
271	for (size_t i = 0; i < wb->flushing.keys.nr; i++) {
272		wb->sorted.data[i].idx = i;
273		wb->sorted.data[i].btree = wb->flushing.keys.data[i].btree;
274		memcpy(&wb->sorted.data[i].pos, &wb->flushing.keys.data[i].k.k.p, sizeof(struct bpos));
275	}
276	wb->sorted.nr = wb->flushing.keys.nr;
277
278	/*
279	 * We first sort so that we can detect and skip redundant updates, and
280	 * then we attempt to flush in sorted btree order, as this is most
281	 * efficient.
282	 *
283	 * However, since we're not flushing in the order they appear in the
284	 * journal we won't be able to drop our journal pin until everything is
285	 * flushed - which means this could deadlock the journal if we weren't
286	 * passing BCH_TRANS_COMMIT_journal_reclaim. This causes the update to fail
287	 * if it would block taking a journal reservation.
288	 *
289	 * If that happens, simply skip the key so we can optimistically insert
290	 * as many keys as possible in the fast path.
291	 */
292	wb_sort(wb->sorted.data, wb->sorted.nr);
293
294	darray_for_each(wb->sorted, i) {
295		struct btree_write_buffered_key *k = &wb->flushing.keys.data[i->idx];
296
297		for (struct wb_key_ref *n = i + 1; n < min(i + 4, &darray_top(wb->sorted)); n++)
298			prefetch(&wb->flushing.keys.data[n->idx]);
299
300		BUG_ON(!k->journal_seq);
301
302		if (i + 1 < &darray_top(wb->sorted) &&
303		    wb_key_eq(i, i + 1)) {
304			struct btree_write_buffered_key *n = &wb->flushing.keys.data[i[1].idx];
305
306			skipped++;
307			n->journal_seq = min_t(u64, n->journal_seq, k->journal_seq);
308			k->journal_seq = 0;
309			continue;
310		}
311
312		if (write_locked) {
313			struct btree_path *path = btree_iter_path(trans, &iter);
314
315			if (path->btree_id != i->btree ||
316			    bpos_gt(k->k.k.p, path->l[0].b->key.k.p)) {
317				bch2_btree_node_unlock_write(trans, path, path->l[0].b);
318				write_locked = false;
319
320				ret = lockrestart_do(trans,
321					bch2_btree_iter_traverse(&iter) ?:
322					bch2_foreground_maybe_merge(trans, iter.path, 0,
323							BCH_WATERMARK_reclaim|
324							BCH_TRANS_COMMIT_journal_reclaim|
325							BCH_TRANS_COMMIT_no_check_rw|
326							BCH_TRANS_COMMIT_no_enospc));
327				if (ret)
328					goto err;
329			}
330		}
331
332		if (!iter.path || iter.btree_id != k->btree) {
333			bch2_trans_iter_exit(trans, &iter);
334			bch2_trans_iter_init(trans, &iter, k->btree, k->k.k.p,
335					     BTREE_ITER_INTENT|BTREE_ITER_ALL_SNAPSHOTS);
336		}
337
338		bch2_btree_iter_set_pos(&iter, k->k.k.p);
339		btree_iter_path(trans, &iter)->preserve = false;
340
341		do {
342			if (race_fault()) {
343				ret = -BCH_ERR_journal_reclaim_would_deadlock;
344				break;
345			}
346
347			ret = wb_flush_one(trans, &iter, k, &write_locked, &fast);
348			if (!write_locked)
349				bch2_trans_begin(trans);
350		} while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
351
352		if (!ret) {
353			k->journal_seq = 0;
354		} else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) {
355			slowpath++;
356			ret = 0;
357		} else
358			break;
359	}
360
361	if (write_locked) {
362		struct btree_path *path = btree_iter_path(trans, &iter);
363		bch2_btree_node_unlock_write(trans, path, path->l[0].b);
364	}
365	bch2_trans_iter_exit(trans, &iter);
366
367	if (ret)
368		goto err;
369
370	if (slowpath) {
371		/*
372		 * Flush in the order they were present in the journal, so that
373		 * we can release journal pins:
374		 * The fastpath zapped the seq of keys that were successfully flushed so
375		 * we can skip those here.
376		 */
377		trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, wb->flushing.keys.nr);
378
379		sort(wb->flushing.keys.data,
380		     wb->flushing.keys.nr,
381		     sizeof(wb->flushing.keys.data[0]),
382		     wb_key_seq_cmp, NULL);
383
384		darray_for_each(wb->flushing.keys, i) {
385			if (!i->journal_seq)
386				continue;
387
388			bch2_journal_pin_update(j, i->journal_seq, &wb->flushing.pin,
389						bch2_btree_write_buffer_journal_flush);
390
391			bch2_trans_begin(trans);
392
393			ret = commit_do(trans, NULL, NULL,
394					BCH_WATERMARK_reclaim|
395					BCH_TRANS_COMMIT_journal_reclaim|
396					BCH_TRANS_COMMIT_no_check_rw|
397					BCH_TRANS_COMMIT_no_enospc|
398					BCH_TRANS_COMMIT_no_journal_res ,
399					btree_write_buffered_insert(trans, i));
400			if (ret)
401				goto err;
402		}
403	}
404err:
405	bch2_fs_fatal_err_on(ret, c, "%s", bch2_err_str(ret));
406	trace_write_buffer_flush(trans, wb->flushing.keys.nr, skipped, fast, 0);
407	bch2_journal_pin_drop(j, &wb->flushing.pin);
408	wb->flushing.keys.nr = 0;
409	return ret;
410}
411
412static int fetch_wb_keys_from_journal(struct bch_fs *c, u64 seq)
413{
414	struct journal *j = &c->journal;
415	struct journal_buf *buf;
416	int ret = 0;
417
418	while (!ret && (buf = bch2_next_write_buffer_flush_journal_buf(j, seq))) {
419		ret = bch2_journal_keys_to_write_buffer(c, buf);
420		mutex_unlock(&j->buf_lock);
421	}
422
423	return ret;
424}
425
426static int btree_write_buffer_flush_seq(struct btree_trans *trans, u64 seq)
427{
428	struct bch_fs *c = trans->c;
429	struct btree_write_buffer *wb = &c->btree_write_buffer;
430	int ret = 0, fetch_from_journal_err;
431
432	do {
433		bch2_trans_unlock(trans);
434
435		fetch_from_journal_err = fetch_wb_keys_from_journal(c, seq);
436
437		/*
438		 * On memory allocation failure, bch2_btree_write_buffer_flush_locked()
439		 * is not guaranteed to empty wb->inc:
440		 */
441		mutex_lock(&wb->flushing.lock);
442		ret = bch2_btree_write_buffer_flush_locked(trans);
443		mutex_unlock(&wb->flushing.lock);
444	} while (!ret &&
445		 (fetch_from_journal_err ||
446		  (wb->inc.pin.seq && wb->inc.pin.seq <= seq) ||
447		  (wb->flushing.pin.seq && wb->flushing.pin.seq <= seq)));
448
449	return ret;
450}
451
452static int bch2_btree_write_buffer_journal_flush(struct journal *j,
453				struct journal_entry_pin *_pin, u64 seq)
454{
455	struct bch_fs *c = container_of(j, struct bch_fs, journal);
456
457	return bch2_trans_run(c, btree_write_buffer_flush_seq(trans, seq));
458}
459
460int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
461{
462	struct bch_fs *c = trans->c;
463
464	trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_);
465
466	return btree_write_buffer_flush_seq(trans, journal_cur_seq(&c->journal));
467}
468
469int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans)
470{
471	struct bch_fs *c = trans->c;
472	struct btree_write_buffer *wb = &c->btree_write_buffer;
473	int ret = 0;
474
475	if (mutex_trylock(&wb->flushing.lock)) {
476		ret = bch2_btree_write_buffer_flush_locked(trans);
477		mutex_unlock(&wb->flushing.lock);
478	}
479
480	return ret;
481}
482
483int bch2_btree_write_buffer_tryflush(struct btree_trans *trans)
484{
485	struct bch_fs *c = trans->c;
486
487	if (!bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer))
488		return -BCH_ERR_erofs_no_writes;
489
490	int ret = bch2_btree_write_buffer_flush_nocheck_rw(trans);
491	bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
492	return ret;
493}
494
495static void bch2_btree_write_buffer_flush_work(struct work_struct *work)
496{
497	struct bch_fs *c = container_of(work, struct bch_fs, btree_write_buffer.flush_work);
498	struct btree_write_buffer *wb = &c->btree_write_buffer;
499	int ret;
500
501	mutex_lock(&wb->flushing.lock);
502	do {
503		ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans));
504	} while (!ret && bch2_btree_write_buffer_should_flush(c));
505	mutex_unlock(&wb->flushing.lock);
506
507	bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
508}
509
510int bch2_journal_key_to_wb_slowpath(struct bch_fs *c,
511			     struct journal_keys_to_wb *dst,
512			     enum btree_id btree, struct bkey_i *k)
513{
514	struct btree_write_buffer *wb = &c->btree_write_buffer;
515	int ret;
516retry:
517	ret = darray_make_room_gfp(&dst->wb->keys, 1, GFP_KERNEL);
518	if (!ret && dst->wb == &wb->flushing)
519		ret = darray_resize(&wb->sorted, wb->flushing.keys.size);
520
521	if (unlikely(ret)) {
522		if (dst->wb == &c->btree_write_buffer.flushing) {
523			mutex_unlock(&dst->wb->lock);
524			dst->wb = &c->btree_write_buffer.inc;
525			bch2_journal_pin_add(&c->journal, dst->seq, &dst->wb->pin,
526					     bch2_btree_write_buffer_journal_flush);
527			goto retry;
528		}
529
530		return ret;
531	}
532
533	dst->room = darray_room(dst->wb->keys);
534	if (dst->wb == &wb->flushing)
535		dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
536	BUG_ON(!dst->room);
537	BUG_ON(!dst->seq);
538
539	struct btree_write_buffered_key *wb_k = &darray_top(dst->wb->keys);
540	wb_k->journal_seq	= dst->seq;
541	wb_k->btree		= btree;
542	bkey_copy(&wb_k->k, k);
543	dst->wb->keys.nr++;
544	dst->room--;
545	return 0;
546}
547
548void bch2_journal_keys_to_write_buffer_start(struct bch_fs *c, struct journal_keys_to_wb *dst, u64 seq)
549{
550	struct btree_write_buffer *wb = &c->btree_write_buffer;
551
552	if (mutex_trylock(&wb->flushing.lock)) {
553		mutex_lock(&wb->inc.lock);
554		move_keys_from_inc_to_flushing(wb);
555
556		/*
557		 * Attempt to skip wb->inc, and add keys directly to
558		 * wb->flushing, saving us a copy later:
559		 */
560
561		if (!wb->inc.keys.nr) {
562			dst->wb = &wb->flushing;
563		} else {
564			mutex_unlock(&wb->flushing.lock);
565			dst->wb = &wb->inc;
566		}
567	} else {
568		mutex_lock(&wb->inc.lock);
569		dst->wb = &wb->inc;
570	}
571
572	dst->room = darray_room(dst->wb->keys);
573	if (dst->wb == &wb->flushing)
574		dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
575	dst->seq = seq;
576
577	bch2_journal_pin_add(&c->journal, seq, &dst->wb->pin,
578			     bch2_btree_write_buffer_journal_flush);
579}
580
581void bch2_journal_keys_to_write_buffer_end(struct bch_fs *c, struct journal_keys_to_wb *dst)
582{
583	struct btree_write_buffer *wb = &c->btree_write_buffer;
584
585	if (!dst->wb->keys.nr)
586		bch2_journal_pin_drop(&c->journal, &dst->wb->pin);
587
588	if (bch2_btree_write_buffer_should_flush(c) &&
589	    __bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer) &&
590	    !queue_work(system_unbound_wq, &c->btree_write_buffer.flush_work))
591		bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
592
593	if (dst->wb == &wb->flushing)
594		mutex_unlock(&wb->flushing.lock);
595	mutex_unlock(&wb->inc.lock);
596}
597
598static int bch2_journal_keys_to_write_buffer(struct bch_fs *c, struct journal_buf *buf)
599{
600	struct journal_keys_to_wb dst;
601	int ret = 0;
602
603	bch2_journal_keys_to_write_buffer_start(c, &dst, le64_to_cpu(buf->data->seq));
604
605	for_each_jset_entry_type(entry, buf->data, BCH_JSET_ENTRY_write_buffer_keys) {
606		jset_entry_for_each_key(entry, k) {
607			ret = bch2_journal_key_to_wb(c, &dst, entry->btree_id, k);
608			if (ret)
609				goto out;
610		}
611
612		entry->type = BCH_JSET_ENTRY_btree_keys;
613	}
614
615	spin_lock(&c->journal.lock);
616	buf->need_flush_to_write_buffer = false;
617	spin_unlock(&c->journal.lock);
618out:
619	bch2_journal_keys_to_write_buffer_end(c, &dst);
620	return ret;
621}
622
623static int wb_keys_resize(struct btree_write_buffer_keys *wb, size_t new_size)
624{
625	if (wb->keys.size >= new_size)
626		return 0;
627
628	if (!mutex_trylock(&wb->lock))
629		return -EINTR;
630
631	int ret = darray_resize(&wb->keys, new_size);
632	mutex_unlock(&wb->lock);
633	return ret;
634}
635
636int bch2_btree_write_buffer_resize(struct bch_fs *c, size_t new_size)
637{
638	struct btree_write_buffer *wb = &c->btree_write_buffer;
639
640	return wb_keys_resize(&wb->flushing, new_size) ?:
641		wb_keys_resize(&wb->inc, new_size);
642}
643
644void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
645{
646	struct btree_write_buffer *wb = &c->btree_write_buffer;
647
648	BUG_ON((wb->inc.keys.nr || wb->flushing.keys.nr) &&
649	       !bch2_journal_error(&c->journal));
650
651	darray_exit(&wb->sorted);
652	darray_exit(&wb->flushing.keys);
653	darray_exit(&wb->inc.keys);
654}
655
656int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
657{
658	struct btree_write_buffer *wb = &c->btree_write_buffer;
659
660	mutex_init(&wb->inc.lock);
661	mutex_init(&wb->flushing.lock);
662	INIT_WORK(&wb->flush_work, bch2_btree_write_buffer_flush_work);
663
664	/* Will be resized by journal as needed: */
665	unsigned initial_size = 1 << 16;
666
667	return  darray_make_room(&wb->inc.keys, initial_size) ?:
668		darray_make_room(&wb->flushing.keys, initial_size) ?:
669		darray_make_room(&wb->sorted, initial_size);
670}
671