1/*	$NetBSD: vfs_wapbl.c,v 1.113 2024/05/13 00:01:53 msaitoh Exp $	*/
2
3/*-
4 * Copyright (c) 2003, 2008, 2009 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Wasabi Systems, Inc.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 *    notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 *    notice, this list of conditions and the following disclaimer in the
17 *    documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32/*
33 * This implements file system independent write ahead filesystem logging.
34 */
35
36#define WAPBL_INTERNAL
37
38#include <sys/cdefs.h>
39__KERNEL_RCSID(0, "$NetBSD: vfs_wapbl.c,v 1.113 2024/05/13 00:01:53 msaitoh Exp $");
40
41#include <sys/param.h>
42#include <sys/bitops.h>
43#include <sys/time.h>
44#include <sys/wapbl.h>
45#include <sys/wapbl_replay.h>
46
47#ifdef _KERNEL
48
49#include <sys/atomic.h>
50#include <sys/conf.h>
51#include <sys/evcnt.h>
52#include <sys/file.h>
53#include <sys/kauth.h>
54#include <sys/kernel.h>
55#include <sys/module.h>
56#include <sys/mount.h>
57#include <sys/mutex.h>
58#include <sys/namei.h>
59#include <sys/proc.h>
60#include <sys/resourcevar.h>
61#include <sys/sysctl.h>
62#include <sys/uio.h>
63#include <sys/vnode.h>
64
65#include <miscfs/specfs/specdev.h>
66
67#define	wapbl_alloc(s) kmem_alloc((s), KM_SLEEP)
68#define	wapbl_free(a, s) kmem_free((a), (s))
69#define	wapbl_calloc(n, s) kmem_zalloc((n)*(s), KM_SLEEP)
70
71static int wapbl_flush_disk_cache = 1;
72static int wapbl_verbose_commit = 0;
73static int wapbl_allow_dpofua = 0; 	/* switched off by default for now */
74static int wapbl_journal_iobufs = 4;
75
76static inline size_t wapbl_space_free(size_t, off_t, off_t);
77
78#else /* !_KERNEL */
79
80#include <assert.h>
81#include <errno.h>
82#include <stdbool.h>
83#include <stdio.h>
84#include <stdlib.h>
85#include <string.h>
86
87#define	KDASSERT(x) assert(x)
88#define	KASSERT(x) assert(x)
89#define	wapbl_alloc(s) malloc(s)
90#define	wapbl_free(a, s) free(a)
91#define	wapbl_calloc(n, s) calloc((n), (s))
92
93#endif /* !_KERNEL */
94
95/*
96 * INTERNAL DATA STRUCTURES
97 */
98
99/*
100 * This structure holds per-mount log information.
101 *
102 * Legend:	a = atomic access only
103 *		r = read-only after init
104 *		l = rwlock held
105 *		m = mutex held
106 *		lm = rwlock held writing or mutex held
107 *		u = unlocked access ok
108 *		b = bufcache_lock held
109 */
110LIST_HEAD(wapbl_ino_head, wapbl_ino);
111struct wapbl {
112	struct vnode *wl_logvp;	/* r:	log here */
113	struct vnode *wl_devvp;	/* r:	log on this device */
114	struct mount *wl_mount;	/* r:	mountpoint wl is associated with */
115	daddr_t wl_logpbn;	/* r:	Physical block number of start of log */
116	int wl_log_dev_bshift;	/* r:	logarithm of device block size of log
117					device */
118	int wl_fs_dev_bshift;	/* r:	logarithm of device block size of
119					filesystem device */
120
121	unsigned wl_lock_count;	/* m:	Count of transactions in progress */
122
123	size_t wl_circ_size; 	/* r:	Number of bytes in buffer of log */
124	size_t wl_circ_off;	/* r:	Number of bytes reserved at start */
125
126	size_t wl_bufcount_max;	/* r:	Number of buffers reserved for log */
127	size_t wl_bufbytes_max;	/* r:	Number of buf bytes reserved for log */
128
129	off_t wl_head;		/* l:	Byte offset of log head */
130	off_t wl_tail;		/* l:	Byte offset of log tail */
131	/*
132	 * WAPBL log layout, stored on wl_devvp at wl_logpbn:
133	 *
134	 *  ___________________ wl_circ_size __________________
135	 * /                                                   \
136	 * +---------+---------+-------+--------------+--------+
137	 * [ commit0 | commit1 | CCWCW | EEEEEEEEEEEE | CCCWCW ]
138	 * +---------+---------+-------+--------------+--------+
139	 *       wl_circ_off --^       ^-- wl_head    ^-- wl_tail
140	 *
141	 * commit0 and commit1 are commit headers.  A commit header has
142	 * a generation number, indicating which of the two headers is
143	 * more recent, and an assignment of head and tail pointers.
144	 * The rest is a circular queue of log records, starting at
145	 * the byte offset wl_circ_off.
146	 *
147	 * E marks empty space for records.
148	 * W marks records for block writes issued but waiting.
149	 * C marks completed records.
150	 *
151	 * wapbl_flush writes new records to empty `E' spaces after
152	 * wl_head from the current transaction in memory.
153	 *
154	 * wapbl_truncate advances wl_tail past any completed `C'
155	 * records, freeing them up for use.
156	 *
157	 * head == tail == 0 means log is empty.
158	 * head == tail != 0 means log is full.
159	 *
160	 * See assertions in wapbl_advance() for other boundary
161	 * conditions.
162	 *
163	 * Only wapbl_flush moves the head, except when wapbl_truncate
164	 * sets it to 0 to indicate that the log is empty.
165	 *
166	 * Only wapbl_truncate moves the tail, except when wapbl_flush
167	 * sets it to wl_circ_off to indicate that the log is full.
168	 */
169
170	struct wapbl_wc_header *wl_wc_header;	/* l	*/
171	void *wl_wc_scratch;	/* l:	scratch space (XXX: por que?!?) */
172
173	kmutex_t wl_mtx;	/* u:	short-term lock */
174	krwlock_t wl_rwlock;	/* u:	File system transaction lock */
175
176	/*
177	 * Must be held while accessing
178	 * wl_count or wl_bufs or head or tail
179	 */
180
181#if _KERNEL
182	/*
183	 * Callback called from within the flush routine to flush any extra
184	 * bits.  Note that flush may be skipped without calling this if
185	 * there are no outstanding buffers in the transaction.
186	 */
187	wapbl_flush_fn_t wl_flush;	/* r	*/
188	wapbl_flush_fn_t wl_flush_abort;/* r	*/
189
190	/* Event counters */
191	char wl_ev_group[EVCNT_STRING_MAX];	/* r	*/
192	struct evcnt wl_ev_commit;		/* l	*/
193	struct evcnt wl_ev_journalwrite;	/* l	*/
194	struct evcnt wl_ev_jbufs_bio_nowait;	/* l	*/
195	struct evcnt wl_ev_metawrite;		/* lm	*/
196	struct evcnt wl_ev_cacheflush;		/* l	*/
197#endif
198
199	size_t wl_bufbytes;	/* m:	Byte count of pages in wl_bufs */
200	size_t wl_bufcount;	/* m:	Count of buffers in wl_bufs */
201	size_t wl_bcount;	/* m:	Total bcount of wl_bufs */
202
203	TAILQ_HEAD(, buf) wl_bufs; /* m: Buffers in current transaction */
204
205	kcondvar_t wl_reclaimable_cv;	/* m (obviously) */
206	size_t wl_reclaimable_bytes; /* m:	Amount of space available for
207						reclamation by truncate */
208	int wl_error_count;	/* m:	# of wl_entries with errors */
209	size_t wl_reserved_bytes; /* never truncate log smaller than this */
210
211#ifdef WAPBL_DEBUG_BUFBYTES
212	size_t wl_unsynced_bufbytes; /* Byte count of unsynced buffers */
213#endif
214
215#if _KERNEL
216	int wl_brperjblock;	/* r Block records per journal block */
217#endif
218
219	TAILQ_HEAD(, wapbl_dealloc) wl_dealloclist;	/* lm:	list head */
220	int wl_dealloccnt;				/* lm:	total count */
221	int wl_dealloclim;				/* r:	max count */
222
223	/* hashtable of inode numbers for allocated but unlinked inodes */
224	/* synch ??? */
225	struct wapbl_ino_head *wl_inohash;
226	u_long wl_inohashmask;
227	int wl_inohashcnt;
228
229	SIMPLEQ_HEAD(, wapbl_entry) wl_entries; /* m: On disk transaction
230						   accounting */
231
232	/* buffers for wapbl_buffered_write() */
233	TAILQ_HEAD(, buf) wl_iobufs;		/* l: Free or filling bufs */
234	TAILQ_HEAD(, buf) wl_iobufs_busy;	/* l: In-transit bufs */
235
236	int wl_dkcache;		/* r: 	disk cache flags */
237#define WAPBL_USE_FUA(wl)	\
238		(wapbl_allow_dpofua && ISSET((wl)->wl_dkcache, DKCACHE_FUA))
239#define WAPBL_JFLAGS(wl)	\
240		(WAPBL_USE_FUA(wl) ? (wl)->wl_jwrite_flags : 0)
241#define WAPBL_JDATA_FLAGS(wl)	\
242		(WAPBL_JFLAGS(wl) & B_MEDIA_DPO)	/* only DPO */
243	int wl_jwrite_flags;	/* r: 	journal write flags */
244};
245
246#ifdef WAPBL_DEBUG_PRINT
247int wapbl_debug_print = WAPBL_DEBUG_PRINT;
248#endif
249
250/****************************************************************/
251#ifdef _KERNEL
252
253#ifdef WAPBL_DEBUG
254struct wapbl *wapbl_debug_wl;
255#endif
256
257static int wapbl_write_commit(struct wapbl *wl, off_t head, off_t tail);
258static int wapbl_write_blocks(struct wapbl *wl, off_t *offp);
259static int wapbl_write_revocations(struct wapbl *wl, off_t *offp);
260static int wapbl_write_inodes(struct wapbl *wl, off_t *offp);
261#endif /* _KERNEL */
262
263static int wapbl_replay_process(struct wapbl_replay *wr, off_t, off_t);
264
265static inline size_t wapbl_space_used(size_t avail, off_t head,
266	off_t tail);
267
268#ifdef _KERNEL
269
270static struct pool wapbl_entry_pool;
271static struct pool wapbl_dealloc_pool;
272
273#define	WAPBL_INODETRK_SIZE 83
274static int wapbl_ino_pool_refcount;
275static struct pool wapbl_ino_pool;
276struct wapbl_ino {
277	LIST_ENTRY(wapbl_ino) wi_hash;
278	ino_t wi_ino;
279	mode_t wi_mode;
280};
281
282static void wapbl_inodetrk_init(struct wapbl *wl, u_int size);
283static void wapbl_inodetrk_free(struct wapbl *wl);
284static struct wapbl_ino *wapbl_inodetrk_get(struct wapbl *wl, ino_t ino);
285
286static size_t wapbl_transaction_len(struct wapbl *wl);
287static inline size_t wapbl_transaction_inodes_len(struct wapbl *wl);
288
289static void wapbl_deallocation_free(struct wapbl *, struct wapbl_dealloc *,
290	bool);
291
292static void wapbl_evcnt_init(struct wapbl *);
293static void wapbl_evcnt_free(struct wapbl *);
294
295static void wapbl_dkcache_init(struct wapbl *);
296
297#if 0
298int wapbl_replay_verify(struct wapbl_replay *, struct vnode *);
299#endif
300
301static int wapbl_replay_isopen1(struct wapbl_replay *);
302
303const struct wapbl_ops wapbl_ops = {
304	.wo_wapbl_discard	= wapbl_discard,
305	.wo_wapbl_replay_isopen	= wapbl_replay_isopen1,
306	.wo_wapbl_replay_can_read = wapbl_replay_can_read,
307	.wo_wapbl_replay_read	= wapbl_replay_read,
308	.wo_wapbl_add_buf	= wapbl_add_buf,
309	.wo_wapbl_remove_buf	= wapbl_remove_buf,
310	.wo_wapbl_resize_buf	= wapbl_resize_buf,
311	.wo_wapbl_begin		= wapbl_begin,
312	.wo_wapbl_end		= wapbl_end,
313	.wo_wapbl_junlock_assert= wapbl_junlock_assert,
314	.wo_wapbl_jlock_assert	= wapbl_jlock_assert,
315
316	/* XXX: the following is only used to say "this is a wapbl buf" */
317	.wo_wapbl_biodone	= wapbl_biodone,
318};
319
320SYSCTL_SETUP(wapbl_sysctl_init, "wapbl sysctl")
321{
322	int rv;
323	const struct sysctlnode *rnode, *cnode;
324
325	rv = sysctl_createv(clog, 0, NULL, &rnode,
326		       CTLFLAG_PERMANENT,
327		       CTLTYPE_NODE, "wapbl",
328		       SYSCTL_DESCR("WAPBL journaling options"),
329		       NULL, 0, NULL, 0,
330		       CTL_VFS, CTL_CREATE, CTL_EOL);
331	if (rv)
332		return;
333
334	rv = sysctl_createv(clog, 0, &rnode, &cnode,
335		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
336		       CTLTYPE_INT, "flush_disk_cache",
337		       SYSCTL_DESCR("flush disk cache"),
338		       NULL, 0, &wapbl_flush_disk_cache, 0,
339		       CTL_CREATE, CTL_EOL);
340	if (rv)
341		return;
342
343	rv = sysctl_createv(clog, 0, &rnode, &cnode,
344		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
345		       CTLTYPE_INT, "verbose_commit",
346		       SYSCTL_DESCR("show time and size of wapbl log commits"),
347		       NULL, 0, &wapbl_verbose_commit, 0,
348		       CTL_CREATE, CTL_EOL);
349	if (rv)
350		return;
351
352	rv = sysctl_createv(clog, 0, &rnode, &cnode,
353		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
354		       CTLTYPE_INT, "allow_dpofua",
355		       SYSCTL_DESCR("allow use of FUA/DPO instead of cache flush if available"),
356		       NULL, 0, &wapbl_allow_dpofua, 0,
357		       CTL_CREATE, CTL_EOL);
358	if (rv)
359		return;
360
361	rv = sysctl_createv(clog, 0, &rnode, &cnode,
362		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
363		       CTLTYPE_INT, "journal_iobufs",
364		       SYSCTL_DESCR("count of bufs used for journal I/O (max async count)"),
365		       NULL, 0, &wapbl_journal_iobufs, 0,
366		       CTL_CREATE, CTL_EOL);
367	if (rv)
368		return;
369
370	return;
371}
372
373static void
374wapbl_init(void)
375{
376
377	pool_init(&wapbl_entry_pool, sizeof(struct wapbl_entry), 0, 0, 0,
378	    "wapblentrypl", &pool_allocator_kmem, IPL_VM);
379	pool_init(&wapbl_dealloc_pool, sizeof(struct wapbl_dealloc), 0, 0, 0,
380	    "wapbldealloc", &pool_allocator_nointr, IPL_NONE);
381}
382
383static int
384wapbl_fini(void)
385{
386
387	pool_destroy(&wapbl_dealloc_pool);
388	pool_destroy(&wapbl_entry_pool);
389
390	return 0;
391}
392
393static void
394wapbl_evcnt_init(struct wapbl *wl)
395{
396	snprintf(wl->wl_ev_group, sizeof(wl->wl_ev_group),
397	    "wapbl fsid 0x%x/0x%x",
398	    wl->wl_mount->mnt_stat.f_fsidx.__fsid_val[0],
399	    wl->wl_mount->mnt_stat.f_fsidx.__fsid_val[1]
400	);
401
402	evcnt_attach_dynamic(&wl->wl_ev_commit, EVCNT_TYPE_MISC,
403	    NULL, wl->wl_ev_group, "commit");
404	evcnt_attach_dynamic(&wl->wl_ev_journalwrite, EVCNT_TYPE_MISC,
405	    NULL, wl->wl_ev_group, "journal write total");
406	evcnt_attach_dynamic(&wl->wl_ev_jbufs_bio_nowait, EVCNT_TYPE_MISC,
407	    NULL, wl->wl_ev_group, "journal write finished async");
408	evcnt_attach_dynamic(&wl->wl_ev_metawrite, EVCNT_TYPE_MISC,
409	    NULL, wl->wl_ev_group, "metadata async write");
410	evcnt_attach_dynamic(&wl->wl_ev_cacheflush, EVCNT_TYPE_MISC,
411	    NULL, wl->wl_ev_group, "cache flush");
412}
413
414static void
415wapbl_evcnt_free(struct wapbl *wl)
416{
417	evcnt_detach(&wl->wl_ev_commit);
418	evcnt_detach(&wl->wl_ev_journalwrite);
419	evcnt_detach(&wl->wl_ev_jbufs_bio_nowait);
420	evcnt_detach(&wl->wl_ev_metawrite);
421	evcnt_detach(&wl->wl_ev_cacheflush);
422}
423
424static void
425wapbl_dkcache_init(struct wapbl *wl)
426{
427	int error;
428
429	/* Get disk cache flags */
430	error = VOP_IOCTL(wl->wl_devvp, DIOCGCACHE, &wl->wl_dkcache,
431	    FWRITE, FSCRED);
432	if (error) {
433		/* behave as if there was a write cache */
434		wl->wl_dkcache = DKCACHE_WRITE;
435	}
436
437	/* Use FUA instead of cache flush if available */
438	if (ISSET(wl->wl_dkcache, DKCACHE_FUA))
439		wl->wl_jwrite_flags |= B_MEDIA_FUA;
440
441	/* Use DPO for journal writes if available */
442	if (ISSET(wl->wl_dkcache, DKCACHE_DPO))
443		wl->wl_jwrite_flags |= B_MEDIA_DPO;
444}
445
446static int
447wapbl_start_flush_inodes(struct wapbl *wl, struct wapbl_replay *wr)
448{
449	int error, i;
450
451	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
452	    ("wapbl_start: reusing log with %d inodes\n", wr->wr_inodescnt));
453
454	/*
455	 * Its only valid to reuse the replay log if its
456	 * the same as the new log we just opened.
457	 */
458	KDASSERT(!wapbl_replay_isopen(wr));
459	KASSERT(wl->wl_devvp->v_type == VBLK);
460	KASSERT(wr->wr_devvp->v_type == VBLK);
461	KASSERT(wl->wl_devvp->v_rdev == wr->wr_devvp->v_rdev);
462	KASSERT(wl->wl_logpbn == wr->wr_logpbn);
463	KASSERT(wl->wl_circ_size == wr->wr_circ_size);
464	KASSERT(wl->wl_circ_off == wr->wr_circ_off);
465	KASSERT(wl->wl_log_dev_bshift == wr->wr_log_dev_bshift);
466	KASSERT(wl->wl_fs_dev_bshift == wr->wr_fs_dev_bshift);
467
468	wl->wl_wc_header->wc_generation = wr->wr_generation + 1;
469
470	for (i = 0; i < wr->wr_inodescnt; i++)
471		wapbl_register_inode(wl, wr->wr_inodes[i].wr_inumber,
472		    wr->wr_inodes[i].wr_imode);
473
474	/* Make sure new transaction won't overwrite old inodes list */
475	KDASSERT(wapbl_transaction_len(wl) <=
476	    wapbl_space_free(wl->wl_circ_size, wr->wr_inodeshead,
477	    wr->wr_inodestail));
478
479	wl->wl_head = wl->wl_tail = wr->wr_inodeshead;
480	wl->wl_reclaimable_bytes = wl->wl_reserved_bytes =
481	    wapbl_transaction_len(wl);
482
483	error = wapbl_write_inodes(wl, &wl->wl_head);
484	if (error)
485		return error;
486
487	KASSERT(wl->wl_head != wl->wl_tail);
488	KASSERT(wl->wl_head != 0);
489
490	return 0;
491}
492
493int
494wapbl_start(struct wapbl ** wlp, struct mount *mp, struct vnode *vp,
495	daddr_t off, size_t count, size_t blksize, struct wapbl_replay *wr,
496	wapbl_flush_fn_t flushfn, wapbl_flush_fn_t flushabortfn)
497{
498	struct wapbl *wl;
499	struct vnode *devvp;
500	daddr_t logpbn;
501	int error;
502	int log_dev_bshift = ilog2(blksize);
503	int fs_dev_bshift = log_dev_bshift;
504	int run;
505
506	WAPBL_PRINTF(WAPBL_PRINT_OPEN, ("wapbl_start: vp=%p off=%" PRId64
507	    " count=%zu blksize=%zu\n", vp, off, count, blksize));
508
509	if (log_dev_bshift > fs_dev_bshift) {
510		WAPBL_PRINTF(WAPBL_PRINT_OPEN,
511			("wapbl: log device's block size cannot be larger "
512			 "than filesystem's\n"));
513		/*
514		 * Not currently implemented, although it could be if
515		 * needed someday.
516		 */
517		return ENOSYS;
518	}
519
520	if (off < 0)
521		return EINVAL;
522
523	if (blksize < DEV_BSIZE)
524		return EINVAL;
525	if (blksize % DEV_BSIZE)
526		return EINVAL;
527
528	/* XXXTODO: verify that the full load is writable */
529
530	/*
531	 * XXX check for minimum log size
532	 * minimum is governed by minimum amount of space
533	 * to complete a transaction. (probably truncate)
534	 */
535	/* XXX for now pick something minimal */
536	if ((count * blksize) < MAXPHYS) {
537		return ENOSPC;
538	}
539
540	if ((error = VOP_BMAP(vp, off, &devvp, &logpbn, &run)) != 0) {
541		return error;
542	}
543
544	wl = wapbl_calloc(1, sizeof(*wl));
545	rw_init(&wl->wl_rwlock);
546	mutex_init(&wl->wl_mtx, MUTEX_DEFAULT, IPL_NONE);
547	cv_init(&wl->wl_reclaimable_cv, "wapblrec");
548	TAILQ_INIT(&wl->wl_bufs);
549	SIMPLEQ_INIT(&wl->wl_entries);
550
551	wl->wl_logvp = vp;
552	wl->wl_devvp = devvp;
553	wl->wl_mount = mp;
554	wl->wl_logpbn = logpbn;
555	wl->wl_log_dev_bshift = log_dev_bshift;
556	wl->wl_fs_dev_bshift = fs_dev_bshift;
557
558	wl->wl_flush = flushfn;
559	wl->wl_flush_abort = flushabortfn;
560
561	/* Reserve two log device blocks for the commit headers */
562	wl->wl_circ_off = 2<<wl->wl_log_dev_bshift;
563	wl->wl_circ_size = ((count * blksize) - wl->wl_circ_off);
564	/* truncate the log usage to a multiple of log_dev_bshift */
565	wl->wl_circ_size >>= wl->wl_log_dev_bshift;
566	wl->wl_circ_size <<= wl->wl_log_dev_bshift;
567
568	/*
569	 * wl_bufbytes_max limits the size of the in memory transaction space.
570	 * - Since buffers are allocated and accounted for in units of
571	 *   PAGE_SIZE it is required to be a multiple of PAGE_SIZE
572	 *   (i.e. 1<<PAGE_SHIFT)
573	 * - Since the log device has to be written in units of
574	 *   1<<wl_log_dev_bshift it is required to be a multiple of
575	 *   1<<wl_log_dev_bshift.
576	 * - Since filesystem will provide data in units of 1<<wl_fs_dev_bshift,
577	 *   it is convenient to be a multiple of 1<<wl_fs_dev_bshift.
578	 * Therefore it must be multiple of the least common multiple of those
579	 * three quantities.  Fortunately, all of those quantities are
580	 * guaranteed to be a power of two, and the least common multiple of
581	 * a set of numbers which are all powers of two is simply the maximum
582	 * of those numbers.  Finally, the maximum logarithm of a power of two
583	 * is the same as the log of the maximum power of two.  So we can do
584	 * the following operations to size wl_bufbytes_max:
585	 */
586
587	/* XXX fix actual number of pages reserved per filesystem. */
588	wl->wl_bufbytes_max = MIN(wl->wl_circ_size, buf_memcalc() / 2);
589
590	/* Round wl_bufbytes_max to the largest power of two constraint */
591	wl->wl_bufbytes_max >>= PAGE_SHIFT;
592	wl->wl_bufbytes_max <<= PAGE_SHIFT;
593	wl->wl_bufbytes_max >>= wl->wl_log_dev_bshift;
594	wl->wl_bufbytes_max <<= wl->wl_log_dev_bshift;
595	wl->wl_bufbytes_max >>= wl->wl_fs_dev_bshift;
596	wl->wl_bufbytes_max <<= wl->wl_fs_dev_bshift;
597
598	/* XXX maybe use filesystem fragment size instead of 1024 */
599	/* XXX fix actual number of buffers reserved per filesystem. */
600	wl->wl_bufcount_max = (buf_nbuf() / 2) * 1024;
601
602	wl->wl_brperjblock = ((1<<wl->wl_log_dev_bshift)
603	    - offsetof(struct wapbl_wc_blocklist, wc_blocks)) /
604	    sizeof(((struct wapbl_wc_blocklist *)0)->wc_blocks[0]);
605	KASSERT(wl->wl_brperjblock > 0);
606
607	/* XXX tie this into resource estimation */
608	wl->wl_dealloclim = wl->wl_bufbytes_max / mp->mnt_stat.f_bsize / 2;
609	TAILQ_INIT(&wl->wl_dealloclist);
610
611	wapbl_inodetrk_init(wl, WAPBL_INODETRK_SIZE);
612
613	wapbl_evcnt_init(wl);
614
615	wapbl_dkcache_init(wl);
616
617	/* Initialize the commit header */
618	{
619		struct wapbl_wc_header *wc;
620		size_t len = 1 << wl->wl_log_dev_bshift;
621		wc = wapbl_calloc(1, len);
622		wc->wc_type = WAPBL_WC_HEADER;
623		wc->wc_len = len;
624		wc->wc_circ_off = wl->wl_circ_off;
625		wc->wc_circ_size = wl->wl_circ_size;
626		/* XXX wc->wc_fsid */
627		wc->wc_log_dev_bshift = wl->wl_log_dev_bshift;
628		wc->wc_fs_dev_bshift = wl->wl_fs_dev_bshift;
629		wl->wl_wc_header = wc;
630		wl->wl_wc_scratch = wapbl_alloc(len);
631	}
632
633	TAILQ_INIT(&wl->wl_iobufs);
634	TAILQ_INIT(&wl->wl_iobufs_busy);
635	for (int i = 0; i < wapbl_journal_iobufs; i++) {
636		struct buf *bp;
637
638		if ((bp = geteblk(MAXPHYS)) == NULL)
639			goto errout;
640
641		mutex_enter(&bufcache_lock);
642		mutex_enter(devvp->v_interlock);
643		bgetvp(devvp, bp);
644		mutex_exit(devvp->v_interlock);
645		mutex_exit(&bufcache_lock);
646
647		bp->b_dev = devvp->v_rdev;
648
649		TAILQ_INSERT_TAIL(&wl->wl_iobufs, bp, b_wapbllist);
650	}
651
652	/*
653	 * if there was an existing set of unlinked but
654	 * allocated inodes, preserve it in the new
655	 * log.
656	 */
657	if (wr && wr->wr_inodescnt) {
658		error = wapbl_start_flush_inodes(wl, wr);
659		if (error)
660			goto errout;
661	}
662
663	error = wapbl_write_commit(wl, wl->wl_head, wl->wl_tail);
664	if (error) {
665		goto errout;
666	}
667
668	*wlp = wl;
669#if defined(WAPBL_DEBUG)
670	wapbl_debug_wl = wl;
671#endif
672
673	return 0;
674 errout:
675	wapbl_discard(wl);
676	wapbl_free(wl->wl_wc_scratch, wl->wl_wc_header->wc_len);
677	wapbl_free(wl->wl_wc_header, wl->wl_wc_header->wc_len);
678	while (!TAILQ_EMPTY(&wl->wl_iobufs)) {
679		struct buf *bp;
680
681		bp = TAILQ_FIRST(&wl->wl_iobufs);
682		TAILQ_REMOVE(&wl->wl_iobufs, bp, b_wapbllist);
683		brelse(bp, BC_INVAL);
684	}
685	wapbl_inodetrk_free(wl);
686	wapbl_free(wl, sizeof(*wl));
687
688	return error;
689}
690
691/*
692 * Like wapbl_flush, only discards the transaction
693 * completely
694 */
695
696void
697wapbl_discard(struct wapbl *wl)
698{
699	struct wapbl_entry *we;
700	struct wapbl_dealloc *wd;
701	struct buf *bp;
702	int i;
703
704	/*
705	 * XXX we may consider using upgrade here
706	 * if we want to call flush from inside a transaction
707	 */
708	rw_enter(&wl->wl_rwlock, RW_WRITER);
709	wl->wl_flush(wl->wl_mount, TAILQ_FIRST(&wl->wl_dealloclist));
710
711#ifdef WAPBL_DEBUG_PRINT
712	{
713		pid_t pid = -1;
714		lwpid_t lid = -1;
715		if (curproc)
716			pid = curproc->p_pid;
717		if (curlwp)
718			lid = curlwp->l_lid;
719#ifdef WAPBL_DEBUG_BUFBYTES
720		WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
721		    ("wapbl_discard: thread %d.%d discarding "
722		    "transaction\n"
723		    "\tbufcount=%zu bufbytes=%zu bcount=%zu "
724		    "deallocs=%d inodes=%d\n"
725		    "\terrcnt = %u, reclaimable=%zu reserved=%zu "
726		    "unsynced=%zu\n",
727		    pid, lid, wl->wl_bufcount, wl->wl_bufbytes,
728		    wl->wl_bcount, wl->wl_dealloccnt,
729		    wl->wl_inohashcnt, wl->wl_error_count,
730		    wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
731		    wl->wl_unsynced_bufbytes));
732		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
733			WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
734			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
735			     "error = %d, unsynced = %zu\n",
736			     we->we_bufcount, we->we_reclaimable_bytes,
737			     we->we_error, we->we_unsynced_bufbytes));
738		}
739#else /* !WAPBL_DEBUG_BUFBYTES */
740		WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
741		    ("wapbl_discard: thread %d.%d discarding transaction\n"
742		    "\tbufcount=%zu bufbytes=%zu bcount=%zu "
743		    "deallocs=%d inodes=%d\n"
744		    "\terrcnt = %u, reclaimable=%zu reserved=%zu\n",
745		    pid, lid, wl->wl_bufcount, wl->wl_bufbytes,
746		    wl->wl_bcount, wl->wl_dealloccnt,
747		    wl->wl_inohashcnt, wl->wl_error_count,
748		    wl->wl_reclaimable_bytes, wl->wl_reserved_bytes));
749		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
750			WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
751			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
752			     "error = %d\n",
753			     we->we_bufcount, we->we_reclaimable_bytes,
754			     we->we_error));
755		}
756#endif /* !WAPBL_DEBUG_BUFBYTES */
757	}
758#endif /* WAPBL_DEBUG_PRINT */
759
760	for (i = 0; i <= wl->wl_inohashmask; i++) {
761		struct wapbl_ino_head *wih;
762		struct wapbl_ino *wi;
763
764		wih = &wl->wl_inohash[i];
765		while ((wi = LIST_FIRST(wih)) != NULL) {
766			LIST_REMOVE(wi, wi_hash);
767			pool_put(&wapbl_ino_pool, wi);
768			KASSERT(wl->wl_inohashcnt > 0);
769			wl->wl_inohashcnt--;
770		}
771	}
772
773	/*
774	 * clean buffer list
775	 */
776	mutex_enter(&bufcache_lock);
777	mutex_enter(&wl->wl_mtx);
778	while ((bp = TAILQ_FIRST(&wl->wl_bufs)) != NULL) {
779		if (bbusy(bp, 0, 0, &wl->wl_mtx) == 0) {
780			KASSERT(bp->b_flags & B_LOCKED);
781			KASSERT(bp->b_oflags & BO_DELWRI);
782			/*
783			 * Buffer is already on BQ_LOCKED queue.
784			 * The buffer will be unlocked and
785			 * removed from the transaction in brelsel()
786			 */
787			mutex_exit(&wl->wl_mtx);
788			bremfree(bp);
789			brelsel(bp, BC_INVAL);
790			mutex_enter(&wl->wl_mtx);
791		}
792	}
793
794	/*
795	 * Remove references to this wl from wl_entries, free any which
796	 * no longer have buffers, others will be freed in wapbl_biodone()
797	 * when they no longer have any buffers.
798	 */
799	while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) != NULL) {
800		SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
801		/* XXX should we be accumulating wl_error_count
802		 * and increasing reclaimable bytes ? */
803		we->we_wapbl = NULL;
804		if (we->we_bufcount == 0) {
805#ifdef WAPBL_DEBUG_BUFBYTES
806			KASSERT(we->we_unsynced_bufbytes == 0);
807#endif
808			pool_put(&wapbl_entry_pool, we);
809		}
810	}
811
812	mutex_exit(&wl->wl_mtx);
813	mutex_exit(&bufcache_lock);
814
815	/* Discard list of deallocs */
816	while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL)
817		wapbl_deallocation_free(wl, wd, true);
818
819	/* XXX should we clear wl_reserved_bytes? */
820
821	KASSERT(wl->wl_bufbytes == 0);
822	KASSERT(wl->wl_bcount == 0);
823	KASSERT(wl->wl_bufcount == 0);
824	KASSERT(TAILQ_EMPTY(&wl->wl_bufs));
825	KASSERT(SIMPLEQ_EMPTY(&wl->wl_entries));
826	KASSERT(wl->wl_inohashcnt == 0);
827	KASSERT(TAILQ_EMPTY(&wl->wl_dealloclist));
828	KASSERT(wl->wl_dealloccnt == 0);
829
830	rw_exit(&wl->wl_rwlock);
831}
832
833int
834wapbl_stop(struct wapbl *wl, int force)
835{
836	int error;
837
838	WAPBL_PRINTF(WAPBL_PRINT_OPEN, ("wapbl_stop called\n"));
839	error = wapbl_flush(wl, 1);
840	if (error) {
841		if (force)
842			wapbl_discard(wl);
843		else
844			return error;
845	}
846
847	/* Unlinked inodes persist after a flush */
848	if (wl->wl_inohashcnt) {
849		if (force) {
850			wapbl_discard(wl);
851		} else {
852			return EBUSY;
853		}
854	}
855
856	KASSERT(wl->wl_bufbytes == 0);
857	KASSERT(wl->wl_bcount == 0);
858	KASSERT(wl->wl_bufcount == 0);
859	KASSERT(TAILQ_EMPTY(&wl->wl_bufs));
860	KASSERT(wl->wl_dealloccnt == 0);
861	KASSERT(SIMPLEQ_EMPTY(&wl->wl_entries));
862	KASSERT(wl->wl_inohashcnt == 0);
863	KASSERT(TAILQ_EMPTY(&wl->wl_dealloclist));
864	KASSERT(wl->wl_dealloccnt == 0);
865	KASSERT(TAILQ_EMPTY(&wl->wl_iobufs_busy));
866
867	wapbl_free(wl->wl_wc_scratch, wl->wl_wc_header->wc_len);
868	wapbl_free(wl->wl_wc_header, wl->wl_wc_header->wc_len);
869	while (!TAILQ_EMPTY(&wl->wl_iobufs)) {
870		struct buf *bp;
871
872		bp = TAILQ_FIRST(&wl->wl_iobufs);
873		TAILQ_REMOVE(&wl->wl_iobufs, bp, b_wapbllist);
874		brelse(bp, BC_INVAL);
875	}
876	wapbl_inodetrk_free(wl);
877
878	wapbl_evcnt_free(wl);
879
880	cv_destroy(&wl->wl_reclaimable_cv);
881	mutex_destroy(&wl->wl_mtx);
882	rw_destroy(&wl->wl_rwlock);
883	wapbl_free(wl, sizeof(*wl));
884
885	return 0;
886}
887
888/****************************************************************/
889/*
890 * Unbuffered disk I/O
891 */
892
893static void
894wapbl_doio_accounting(struct vnode *devvp, int flags)
895{
896	struct pstats *pstats = curlwp->l_proc->p_stats;
897
898	if ((flags & (B_WRITE | B_READ)) == B_WRITE) {
899		mutex_enter(devvp->v_interlock);
900		devvp->v_numoutput++;
901		mutex_exit(devvp->v_interlock);
902		pstats->p_ru.ru_oublock++;
903	} else {
904		pstats->p_ru.ru_inblock++;
905	}
906
907}
908
909static int
910wapbl_doio(void *data, size_t len, struct vnode *devvp, daddr_t pbn, int flags)
911{
912	struct buf *bp;
913	int error;
914
915	KASSERT(devvp->v_type == VBLK);
916
917	wapbl_doio_accounting(devvp, flags);
918
919	bp = getiobuf(devvp, true);
920	bp->b_flags = flags;
921	bp->b_cflags |= BC_BUSY;	/* mandatory, asserted by biowait() */
922	bp->b_dev = devvp->v_rdev;
923	bp->b_data = data;
924	bp->b_bufsize = bp->b_resid = bp->b_bcount = len;
925	bp->b_blkno = pbn;
926	BIO_SETPRIO(bp, BPRIO_TIMECRITICAL);
927
928	WAPBL_PRINTF(WAPBL_PRINT_IO,
929	    ("wapbl_doio: %s %d bytes at block %"PRId64" on dev 0x%"PRIx64"\n",
930	    BUF_ISWRITE(bp) ? "write" : "read", bp->b_bcount,
931	    bp->b_blkno, bp->b_dev));
932
933	VOP_STRATEGY(devvp, bp);
934
935	error = biowait(bp);
936	putiobuf(bp);
937
938	if (error) {
939		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
940		    ("wapbl_doio: %s %zu bytes at block %" PRId64
941		    " on dev 0x%"PRIx64" failed with error %d\n",
942		    (((flags & (B_WRITE | B_READ)) == B_WRITE) ?
943		     "write" : "read"),
944		    len, pbn, devvp->v_rdev, error));
945	}
946
947	return error;
948}
949
950/*
951 * wapbl_write(data, len, devvp, pbn)
952 *
953 *	Synchronously write len bytes from data to physical block pbn
954 *	on devvp.
955 */
956int
957wapbl_write(void *data, size_t len, struct vnode *devvp, daddr_t pbn)
958{
959
960	return wapbl_doio(data, len, devvp, pbn, B_WRITE);
961}
962
963/*
964 * wapbl_read(data, len, devvp, pbn)
965 *
966 *	Synchronously read len bytes into data from physical block pbn
967 *	on devvp.
968 */
969int
970wapbl_read(void *data, size_t len, struct vnode *devvp, daddr_t pbn)
971{
972
973	return wapbl_doio(data, len, devvp, pbn, B_READ);
974}
975
976/****************************************************************/
977/*
978 * Buffered disk writes -- try to coalesce writes and emit
979 * MAXPHYS-aligned blocks.
980 */
981
982/*
983 * wapbl_buffered_write_async(wl, bp)
984 *
985 *	Send buffer for asynchronous write.
986 */
987static void
988wapbl_buffered_write_async(struct wapbl *wl, struct buf *bp)
989{
990	wapbl_doio_accounting(wl->wl_devvp, bp->b_flags);
991
992	KASSERT(TAILQ_FIRST(&wl->wl_iobufs) == bp);
993	TAILQ_REMOVE(&wl->wl_iobufs, bp, b_wapbllist);
994
995	bp->b_flags |= B_WRITE;
996	bp->b_cflags |= BC_BUSY;	/* mandatory, asserted by biowait() */
997	bp->b_oflags = 0;
998	bp->b_bcount = bp->b_resid;
999	BIO_SETPRIO(bp, BPRIO_TIMECRITICAL);
1000
1001	VOP_STRATEGY(wl->wl_devvp, bp);
1002
1003	wl->wl_ev_journalwrite.ev_count++;
1004
1005	TAILQ_INSERT_TAIL(&wl->wl_iobufs_busy, bp, b_wapbllist);
1006}
1007
1008/*
1009 * wapbl_buffered_flush(wl)
1010 *
1011 *	Flush any buffered writes from wapbl_buffered_write.
1012 */
1013static int
1014wapbl_buffered_flush(struct wapbl *wl, bool full)
1015{
1016	int error = 0;
1017	struct buf *bp, *bnext;
1018	bool only_done = true, found = false;
1019
1020	/* if there is outstanding buffered write, send it now */
1021	if ((bp = TAILQ_FIRST(&wl->wl_iobufs)) && bp->b_resid > 0)
1022		wapbl_buffered_write_async(wl, bp);
1023
1024	/* wait for I/O to complete */
1025again:
1026	TAILQ_FOREACH_SAFE(bp, &wl->wl_iobufs_busy, b_wapbllist, bnext) {
1027		if (!full && only_done) {
1028			/* skip unfinished */
1029			if (!ISSET(bp->b_oflags, BO_DONE))
1030				continue;
1031		}
1032
1033		if (ISSET(bp->b_oflags, BO_DONE))
1034			wl->wl_ev_jbufs_bio_nowait.ev_count++;
1035
1036		TAILQ_REMOVE(&wl->wl_iobufs_busy, bp, b_wapbllist);
1037		error = biowait(bp);
1038
1039		/* reset for reuse */
1040		bp->b_blkno = bp->b_resid = bp->b_flags = 0;
1041		TAILQ_INSERT_TAIL(&wl->wl_iobufs, bp, b_wapbllist);
1042		found = true;
1043
1044		if (!full)
1045			break;
1046	}
1047
1048	if (!found && only_done && !TAILQ_EMPTY(&wl->wl_iobufs_busy)) {
1049		only_done = false;
1050		goto again;
1051	}
1052
1053	return error;
1054}
1055
1056/*
1057 * wapbl_buffered_write(data, len, wl, pbn)
1058 *
1059 *	Write len bytes from data to physical block pbn on
1060 *	wl->wl_devvp.  The write may not complete until
1061 *	wapbl_buffered_flush.
1062 */
1063static int
1064wapbl_buffered_write(void *data, size_t len, struct wapbl *wl, daddr_t pbn,
1065    int bflags)
1066{
1067	size_t resid;
1068	struct buf *bp;
1069
1070again:
1071	bp = TAILQ_FIRST(&wl->wl_iobufs);
1072
1073	if (bp == NULL) {
1074		/* No more buffers, wait for any previous I/O to finish. */
1075		wapbl_buffered_flush(wl, false);
1076
1077		bp = TAILQ_FIRST(&wl->wl_iobufs);
1078		KASSERT(bp != NULL);
1079	}
1080
1081	/*
1082	 * If not adjacent to buffered data flush first.  Disk block
1083	 * address is always valid for non-empty buffer.
1084	 */
1085	if ((bp->b_resid > 0 && pbn != bp->b_blkno + btodb(bp->b_resid))) {
1086		wapbl_buffered_write_async(wl, bp);
1087		goto again;
1088	}
1089
1090	/*
1091	 * If this write goes to an empty buffer we have to
1092	 * save the disk block address first.
1093	 */
1094	if (bp->b_blkno == 0) {
1095		bp->b_blkno = pbn;
1096		bp->b_flags |= bflags;
1097	}
1098
1099	/*
1100	 * Remaining space so this buffer ends on a buffer size boundary.
1101	 *
1102	 * Cannot become less or equal zero as the buffer would have been
1103	 * flushed on the last call then.
1104	 */
1105	resid = bp->b_bufsize - dbtob(bp->b_blkno % btodb(bp->b_bufsize)) -
1106	    bp->b_resid;
1107	KASSERT(resid > 0);
1108	KASSERT(dbtob(btodb(resid)) == resid);
1109
1110	if (len < resid)
1111		resid = len;
1112
1113	memcpy((uint8_t *)bp->b_data + bp->b_resid, data, resid);
1114	bp->b_resid += resid;
1115
1116	if (len >= resid) {
1117		/* Just filled the buf, or data did not fit */
1118		wapbl_buffered_write_async(wl, bp);
1119
1120		data = (uint8_t *)data + resid;
1121		len -= resid;
1122		pbn += btodb(resid);
1123
1124		if (len > 0)
1125			goto again;
1126	}
1127
1128	return 0;
1129}
1130
1131/*
1132 * wapbl_circ_write(wl, data, len, offp)
1133 *
1134 *	Write len bytes from data to the circular queue of wl, starting
1135 *	at linear byte offset *offp, and returning the new linear byte
1136 *	offset in *offp.
1137 *
1138 *	If the starting linear byte offset precedes wl->wl_circ_off,
1139 *	the write instead begins at wl->wl_circ_off.  XXX WTF?  This
1140 *	should be a KASSERT, not a conditional.
1141 *
1142 *	The write is buffered in wl and must be flushed with
1143 *	wapbl_buffered_flush before it will be submitted to the disk.
1144 */
1145static int
1146wapbl_circ_write(struct wapbl *wl, void *data, size_t len, off_t *offp)
1147{
1148	size_t slen;
1149	off_t off = *offp;
1150	int error;
1151	daddr_t pbn;
1152
1153	KDASSERT(((len >> wl->wl_log_dev_bshift) <<
1154	    wl->wl_log_dev_bshift) == len);
1155
1156	if (off < wl->wl_circ_off)
1157		off = wl->wl_circ_off;
1158	slen = wl->wl_circ_off + wl->wl_circ_size - off;
1159	if (slen < len) {
1160		pbn = wl->wl_logpbn + (off >> wl->wl_log_dev_bshift);
1161#ifdef _KERNEL
1162		pbn = btodb(pbn << wl->wl_log_dev_bshift);
1163#endif
1164		error = wapbl_buffered_write(data, slen, wl, pbn,
1165		    WAPBL_JDATA_FLAGS(wl));
1166		if (error)
1167			return error;
1168		data = (uint8_t *)data + slen;
1169		len -= slen;
1170		off = wl->wl_circ_off;
1171	}
1172	pbn = wl->wl_logpbn + (off >> wl->wl_log_dev_bshift);
1173#ifdef _KERNEL
1174	pbn = btodb(pbn << wl->wl_log_dev_bshift);
1175#endif
1176	error = wapbl_buffered_write(data, len, wl, pbn,
1177	    WAPBL_JDATA_FLAGS(wl));
1178	if (error)
1179		return error;
1180	off += len;
1181	if (off >= wl->wl_circ_off + wl->wl_circ_size)
1182		off = wl->wl_circ_off;
1183	*offp = off;
1184	return 0;
1185}
1186
1187/****************************************************************/
1188/*
1189 * WAPBL transactions: entering, adding/removing bufs, and exiting
1190 */
1191
1192int
1193wapbl_begin(struct wapbl *wl, const char *file, int line)
1194{
1195	int doflush;
1196	unsigned lockcount;
1197
1198	KDASSERT(wl);
1199
1200	/*
1201	 * XXX this needs to be made much more sophisticated.
1202	 * perhaps each wapbl_begin could reserve a specified
1203	 * number of buffers and bytes.
1204	 */
1205	mutex_enter(&wl->wl_mtx);
1206	lockcount = wl->wl_lock_count;
1207	doflush = ((wl->wl_bufbytes + (lockcount * MAXPHYS)) >
1208		   wl->wl_bufbytes_max / 2) ||
1209		  ((wl->wl_bufcount + (lockcount * 10)) >
1210		   wl->wl_bufcount_max / 2) ||
1211		  (wapbl_transaction_len(wl) > wl->wl_circ_size / 2) ||
1212		  (wl->wl_dealloccnt >= (wl->wl_dealloclim / 2));
1213	mutex_exit(&wl->wl_mtx);
1214
1215	if (doflush) {
1216		WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1217		    ("force flush lockcnt=%d bufbytes=%zu "
1218		    "(max=%zu) bufcount=%zu (max=%zu) "
1219		    "dealloccnt %d (lim=%d)\n",
1220		    lockcount, wl->wl_bufbytes,
1221		    wl->wl_bufbytes_max, wl->wl_bufcount,
1222		    wl->wl_bufcount_max,
1223		    wl->wl_dealloccnt, wl->wl_dealloclim));
1224	}
1225
1226	if (doflush) {
1227		int error = wapbl_flush(wl, 0);
1228		if (error)
1229			return error;
1230	}
1231
1232	rw_enter(&wl->wl_rwlock, RW_READER);
1233	mutex_enter(&wl->wl_mtx);
1234	wl->wl_lock_count++;
1235	mutex_exit(&wl->wl_mtx);
1236
1237#if defined(WAPBL_DEBUG_PRINT)
1238	WAPBL_PRINTF(WAPBL_PRINT_TRANSACTION,
1239	    ("wapbl_begin thread %d.%d with bufcount=%zu "
1240	    "bufbytes=%zu bcount=%zu at %s:%d\n",
1241	    curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
1242	    wl->wl_bufbytes, wl->wl_bcount, file, line));
1243#endif
1244
1245	return 0;
1246}
1247
1248void
1249wapbl_end(struct wapbl *wl)
1250{
1251
1252#if defined(WAPBL_DEBUG_PRINT)
1253	WAPBL_PRINTF(WAPBL_PRINT_TRANSACTION,
1254	     ("wapbl_end thread %d.%d with bufcount=%zu "
1255	      "bufbytes=%zu bcount=%zu\n",
1256	      curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
1257	      wl->wl_bufbytes, wl->wl_bcount));
1258#endif
1259
1260	/*
1261	 * XXX this could be handled more gracefully, perhaps place
1262	 * only a partial transaction in the log and allow the
1263	 * remaining to flush without the protection of the journal.
1264	 */
1265	KASSERTMSG((wapbl_transaction_len(wl) <=
1266		(wl->wl_circ_size - wl->wl_reserved_bytes)),
1267	    "wapbl_end: current transaction too big to flush");
1268
1269	mutex_enter(&wl->wl_mtx);
1270	KASSERT(wl->wl_lock_count > 0);
1271	wl->wl_lock_count--;
1272	mutex_exit(&wl->wl_mtx);
1273
1274	rw_exit(&wl->wl_rwlock);
1275}
1276
1277void
1278wapbl_add_buf(struct wapbl *wl, struct buf * bp)
1279{
1280
1281	KASSERT(bp->b_cflags & BC_BUSY);
1282	KASSERT(bp->b_vp);
1283
1284	wapbl_jlock_assert(wl);
1285
1286#if 0
1287	/*
1288	 * XXX this might be an issue for swapfiles.
1289	 * see uvm_swap.c:1702
1290	 *
1291	 * XXX2 why require it then?  leap of semantics?
1292	 */
1293	KASSERT((bp->b_cflags & BC_NOCACHE) == 0);
1294#endif
1295
1296	mutex_enter(&wl->wl_mtx);
1297	if (bp->b_flags & B_LOCKED) {
1298		TAILQ_REMOVE(&wl->wl_bufs, bp, b_wapbllist);
1299		WAPBL_PRINTF(WAPBL_PRINT_BUFFER2,
1300		   ("wapbl_add_buf thread %d.%d re-adding buf %p "
1301		    "with %d bytes %d bcount\n",
1302		    curproc->p_pid, curlwp->l_lid, bp, bp->b_bufsize,
1303		    bp->b_bcount));
1304	} else {
1305		/* unlocked by dirty buffers shouldn't exist */
1306		KASSERT(!(bp->b_oflags & BO_DELWRI));
1307		wl->wl_bufbytes += bp->b_bufsize;
1308		wl->wl_bcount += bp->b_bcount;
1309		wl->wl_bufcount++;
1310		WAPBL_PRINTF(WAPBL_PRINT_BUFFER,
1311		   ("wapbl_add_buf thread %d.%d adding buf %p "
1312		    "with %d bytes %d bcount\n",
1313		    curproc->p_pid, curlwp->l_lid, bp, bp->b_bufsize,
1314		    bp->b_bcount));
1315	}
1316	TAILQ_INSERT_TAIL(&wl->wl_bufs, bp, b_wapbllist);
1317	mutex_exit(&wl->wl_mtx);
1318
1319	bp->b_flags |= B_LOCKED;
1320}
1321
1322static void
1323wapbl_remove_buf_locked(struct wapbl * wl, struct buf *bp)
1324{
1325
1326	KASSERT(mutex_owned(&wl->wl_mtx));
1327	KASSERT(bp->b_cflags & BC_BUSY);
1328	wapbl_jlock_assert(wl);
1329
1330#if 0
1331	/*
1332	 * XXX this might be an issue for swapfiles.
1333	 * see uvm_swap.c:1725
1334	 *
1335	 * XXXdeux: see above
1336	 */
1337	KASSERT((bp->b_flags & BC_NOCACHE) == 0);
1338#endif
1339	KASSERT(bp->b_flags & B_LOCKED);
1340
1341	WAPBL_PRINTF(WAPBL_PRINT_BUFFER,
1342	   ("wapbl_remove_buf thread %d.%d removing buf %p with "
1343	    "%d bytes %d bcount\n",
1344	    curproc->p_pid, curlwp->l_lid, bp, bp->b_bufsize, bp->b_bcount));
1345
1346	KASSERT(wl->wl_bufbytes >= bp->b_bufsize);
1347	wl->wl_bufbytes -= bp->b_bufsize;
1348	KASSERT(wl->wl_bcount >= bp->b_bcount);
1349	wl->wl_bcount -= bp->b_bcount;
1350	KASSERT(wl->wl_bufcount > 0);
1351	wl->wl_bufcount--;
1352	KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
1353	KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
1354	TAILQ_REMOVE(&wl->wl_bufs, bp, b_wapbllist);
1355
1356	bp->b_flags &= ~B_LOCKED;
1357}
1358
1359/* called from brelsel() in vfs_bio among other places */
1360void
1361wapbl_remove_buf(struct wapbl * wl, struct buf *bp)
1362{
1363
1364	mutex_enter(&wl->wl_mtx);
1365	wapbl_remove_buf_locked(wl, bp);
1366	mutex_exit(&wl->wl_mtx);
1367}
1368
1369void
1370wapbl_resize_buf(struct wapbl *wl, struct buf *bp, long oldsz, long oldcnt)
1371{
1372
1373	KASSERT(bp->b_cflags & BC_BUSY);
1374
1375	/*
1376	 * XXX: why does this depend on B_LOCKED?  otherwise the buf
1377	 * is not for a transaction?  if so, why is this called in the
1378	 * first place?
1379	 */
1380	if (bp->b_flags & B_LOCKED) {
1381		mutex_enter(&wl->wl_mtx);
1382		wl->wl_bufbytes += bp->b_bufsize - oldsz;
1383		wl->wl_bcount += bp->b_bcount - oldcnt;
1384		mutex_exit(&wl->wl_mtx);
1385	}
1386}
1387
1388#endif /* _KERNEL */
1389
1390/****************************************************************/
1391/* Some utility inlines */
1392
1393/*
1394 * wapbl_space_used(avail, head, tail)
1395 *
1396 *	Number of bytes used in a circular queue of avail total bytes,
1397 *	from tail to head.
1398 */
1399static inline size_t
1400wapbl_space_used(size_t avail, off_t head, off_t tail)
1401{
1402
1403	if (tail == 0) {
1404		KASSERT(head == 0);
1405		return 0;
1406	}
1407	return ((head + (avail - 1) - tail) % avail) + 1;
1408}
1409
1410#ifdef _KERNEL
1411/*
1412 * wapbl_advance(size, off, oldoff, delta)
1413 *
1414 *	Given a byte offset oldoff into a circular queue of size bytes
1415 *	starting at off, return a new byte offset oldoff + delta into
1416 *	the circular queue.
1417 */
1418static inline off_t
1419wapbl_advance(size_t size, size_t off, off_t oldoff, size_t delta)
1420{
1421	off_t newoff;
1422
1423	/* Define acceptable ranges for inputs. */
1424	KASSERT(delta <= (size_t)size);
1425	KASSERT((oldoff == 0) || ((size_t)oldoff >= off));
1426	KASSERT(oldoff < (off_t)(size + off));
1427
1428	if ((oldoff == 0) && (delta != 0))
1429		newoff = off + delta;
1430	else if ((oldoff + delta) < (size + off))
1431		newoff = oldoff + delta;
1432	else
1433		newoff = (oldoff + delta) - size;
1434
1435	/* Note some interesting axioms */
1436	KASSERT((delta != 0) || (newoff == oldoff));
1437	KASSERT((delta == 0) || (newoff != 0));
1438	KASSERT((delta != (size)) || (newoff == oldoff));
1439
1440	/* Define acceptable ranges for output. */
1441	KASSERT((newoff == 0) || ((size_t)newoff >= off));
1442	KASSERT((size_t)newoff < (size + off));
1443	return newoff;
1444}
1445
1446/*
1447 * wapbl_space_free(avail, head, tail)
1448 *
1449 *	Number of bytes free in a circular queue of avail total bytes,
1450 *	in which everything from tail to head is used.
1451 */
1452static inline size_t
1453wapbl_space_free(size_t avail, off_t head, off_t tail)
1454{
1455
1456	return avail - wapbl_space_used(avail, head, tail);
1457}
1458
1459/*
1460 * wapbl_advance_head(size, off, delta, headp, tailp)
1461 *
1462 *	In a circular queue of size bytes starting at off, given the
1463 *	old head and tail offsets *headp and *tailp, store the new head
1464 *	and tail offsets in *headp and *tailp resulting from adding
1465 *	delta bytes of data to the head.
1466 */
1467static inline void
1468wapbl_advance_head(size_t size, size_t off, size_t delta, off_t *headp,
1469		   off_t *tailp)
1470{
1471	off_t head = *headp;
1472	off_t tail = *tailp;
1473
1474	KASSERT(delta <= wapbl_space_free(size, head, tail));
1475	head = wapbl_advance(size, off, head, delta);
1476	if ((tail == 0) && (head != 0))
1477		tail = off;
1478	*headp = head;
1479	*tailp = tail;
1480}
1481
1482/*
1483 * wapbl_advance_tail(size, off, delta, headp, tailp)
1484 *
1485 *	In a circular queue of size bytes starting at off, given the
1486 *	old head and tail offsets *headp and *tailp, store the new head
1487 *	and tail offsets in *headp and *tailp resulting from removing
1488 *	delta bytes of data from the tail.
1489 */
1490static inline void
1491wapbl_advance_tail(size_t size, size_t off, size_t delta, off_t *headp,
1492		   off_t *tailp)
1493{
1494	off_t head = *headp;
1495	off_t tail = *tailp;
1496
1497	KASSERT(delta <= wapbl_space_used(size, head, tail));
1498	tail = wapbl_advance(size, off, tail, delta);
1499	if (head == tail) {
1500		head = tail = 0;
1501	}
1502	*headp = head;
1503	*tailp = tail;
1504}
1505
1506
1507/****************************************************************/
1508
1509/*
1510 * wapbl_truncate(wl, minfree)
1511 *
1512 *	Wait until at least minfree bytes are available in the log.
1513 *
1514 *	If it was necessary to wait for writes to complete,
1515 *	advance the circular queue tail to reflect the new write
1516 *	completions and issue a write commit to the log.
1517 *
1518 *	=> Caller must hold wl->wl_rwlock writer lock.
1519 */
1520static int
1521wapbl_truncate(struct wapbl *wl, size_t minfree)
1522{
1523	size_t delta;
1524	size_t avail;
1525	off_t head;
1526	off_t tail;
1527	int error = 0;
1528
1529	KASSERT(minfree <= (wl->wl_circ_size - wl->wl_reserved_bytes));
1530	KASSERT(rw_write_held(&wl->wl_rwlock));
1531
1532	mutex_enter(&wl->wl_mtx);
1533
1534	/*
1535	 * First check to see if we have to do a commit
1536	 * at all.
1537	 */
1538	avail = wapbl_space_free(wl->wl_circ_size, wl->wl_head, wl->wl_tail);
1539	if (minfree < avail) {
1540		mutex_exit(&wl->wl_mtx);
1541		return 0;
1542	}
1543	minfree -= avail;
1544	while ((wl->wl_error_count == 0) &&
1545	    (wl->wl_reclaimable_bytes < minfree)) {
1546        	WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
1547                   ("wapbl_truncate: sleeping on %p wl=%p bytes=%zd "
1548		    "minfree=%zd\n",
1549                    &wl->wl_reclaimable_bytes, wl, wl->wl_reclaimable_bytes,
1550		    minfree));
1551
1552		cv_wait(&wl->wl_reclaimable_cv, &wl->wl_mtx);
1553	}
1554	if (wl->wl_reclaimable_bytes < minfree) {
1555		KASSERT(wl->wl_error_count);
1556		/* XXX maybe get actual error from buffer instead someday? */
1557		error = EIO;
1558	}
1559	head = wl->wl_head;
1560	tail = wl->wl_tail;
1561	delta = wl->wl_reclaimable_bytes;
1562
1563	/* If all of the entries are flushed, then be sure to keep
1564	 * the reserved bytes reserved.  Watch out for discarded transactions,
1565	 * which could leave more bytes reserved than are reclaimable.
1566	 */
1567	if (SIMPLEQ_EMPTY(&wl->wl_entries) &&
1568	    (delta >= wl->wl_reserved_bytes)) {
1569		delta -= wl->wl_reserved_bytes;
1570	}
1571	wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta, &head,
1572			   &tail);
1573	KDASSERT(wl->wl_reserved_bytes <=
1574		wapbl_space_used(wl->wl_circ_size, head, tail));
1575	mutex_exit(&wl->wl_mtx);
1576
1577	if (error)
1578		return error;
1579
1580	/*
1581	 * This is where head, tail and delta are unprotected
1582	 * from races against itself or flush.  This is ok since
1583	 * we only call this routine from inside flush itself.
1584	 *
1585	 * XXX: how can it race against itself when accessed only
1586	 * from behind the write-locked rwlock?
1587	 */
1588	error = wapbl_write_commit(wl, head, tail);
1589	if (error)
1590		return error;
1591
1592	wl->wl_head = head;
1593	wl->wl_tail = tail;
1594
1595	mutex_enter(&wl->wl_mtx);
1596	KASSERT(wl->wl_reclaimable_bytes >= delta);
1597	wl->wl_reclaimable_bytes -= delta;
1598	mutex_exit(&wl->wl_mtx);
1599	WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
1600	    ("wapbl_truncate thread %d.%d truncating %zu bytes\n",
1601	    curproc->p_pid, curlwp->l_lid, delta));
1602
1603	return 0;
1604}
1605
1606/****************************************************************/
1607
1608void
1609wapbl_biodone(struct buf *bp)
1610{
1611	struct wapbl_entry *we = bp->b_private;
1612	struct wapbl *wl;
1613#ifdef WAPBL_DEBUG_BUFBYTES
1614	const int bufsize = bp->b_bufsize;
1615#endif
1616
1617	mutex_enter(&bufcache_lock);
1618	wl = we->we_wapbl;
1619	mutex_exit(&bufcache_lock);
1620
1621	/*
1622	 * Handle possible flushing of buffers after log has been
1623	 * decomissioned.
1624	 */
1625	if (!wl) {
1626		KASSERT(we->we_bufcount > 0);
1627		we->we_bufcount--;
1628#ifdef WAPBL_DEBUG_BUFBYTES
1629		KASSERT(we->we_unsynced_bufbytes >= bufsize);
1630		we->we_unsynced_bufbytes -= bufsize;
1631#endif
1632
1633		if (we->we_bufcount == 0) {
1634#ifdef WAPBL_DEBUG_BUFBYTES
1635			KASSERT(we->we_unsynced_bufbytes == 0);
1636#endif
1637			pool_put(&wapbl_entry_pool, we);
1638		}
1639
1640		brelse(bp, 0);
1641		return;
1642	}
1643
1644#ifdef ohbother
1645	KDASSERT(bp->b_oflags & BO_DONE);
1646	KDASSERT(!(bp->b_oflags & BO_DELWRI));
1647	KDASSERT(bp->b_flags & B_ASYNC);
1648	KDASSERT(bp->b_cflags & BC_BUSY);
1649	KDASSERT(!(bp->b_flags & B_LOCKED));
1650	KDASSERT(!(bp->b_flags & B_READ));
1651	KDASSERT(!(bp->b_cflags & BC_INVAL));
1652	KDASSERT(!(bp->b_cflags & BC_NOCACHE));
1653#endif
1654
1655	if (bp->b_error) {
1656		/*
1657		 * If an error occurs, it would be nice to leave the buffer
1658		 * as a delayed write on the LRU queue so that we can retry
1659		 * it later. But buffercache(9) can't handle dirty buffer
1660		 * reuse, so just mark the log permanently errored out.
1661		 */
1662		mutex_enter(&wl->wl_mtx);
1663		if (wl->wl_error_count == 0) {
1664			wl->wl_error_count++;
1665			cv_broadcast(&wl->wl_reclaimable_cv);
1666		}
1667		mutex_exit(&wl->wl_mtx);
1668	}
1669
1670	/*
1671	 * Make sure that the buf doesn't retain the media flags, so that
1672	 * e.g. wapbl_allow_fuadpo has immediate effect on any following I/O.
1673	 * The flags will be set again if needed by another I/O.
1674	 */
1675	bp->b_flags &= ~B_MEDIA_FLAGS;
1676
1677	/*
1678	 * Release the buffer here. wapbl_flush() may wait for the
1679	 * log to become empty and we better unbusy the buffer before
1680	 * wapbl_flush() returns.
1681	 */
1682	brelse(bp, 0);
1683
1684	mutex_enter(&wl->wl_mtx);
1685
1686	KASSERT(we->we_bufcount > 0);
1687	we->we_bufcount--;
1688#ifdef WAPBL_DEBUG_BUFBYTES
1689	KASSERT(we->we_unsynced_bufbytes >= bufsize);
1690	we->we_unsynced_bufbytes -= bufsize;
1691	KASSERT(wl->wl_unsynced_bufbytes >= bufsize);
1692	wl->wl_unsynced_bufbytes -= bufsize;
1693#endif
1694	wl->wl_ev_metawrite.ev_count++;
1695
1696	/*
1697	 * If the current transaction can be reclaimed, start
1698	 * at the beginning and reclaim any consecutive reclaimable
1699	 * transactions.  If we successfully reclaim anything,
1700	 * then wakeup anyone waiting for the reclaim.
1701	 */
1702	if (we->we_bufcount == 0) {
1703		size_t delta = 0;
1704		int errcnt = 0;
1705#ifdef WAPBL_DEBUG_BUFBYTES
1706		KDASSERT(we->we_unsynced_bufbytes == 0);
1707#endif
1708		/*
1709		 * clear any posted error, since the buffer it came from
1710		 * has successfully flushed by now
1711		 */
1712		while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) &&
1713		       (we->we_bufcount == 0)) {
1714			delta += we->we_reclaimable_bytes;
1715			if (we->we_error)
1716				errcnt++;
1717			SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
1718			pool_put(&wapbl_entry_pool, we);
1719		}
1720
1721		if (delta) {
1722			wl->wl_reclaimable_bytes += delta;
1723			KASSERT(wl->wl_error_count >= errcnt);
1724			wl->wl_error_count -= errcnt;
1725			cv_broadcast(&wl->wl_reclaimable_cv);
1726		}
1727	}
1728
1729	mutex_exit(&wl->wl_mtx);
1730}
1731
1732/*
1733 * wapbl_flush(wl, wait)
1734 *
1735 *	Flush pending block writes, deallocations, and inodes from
1736 *	the current transaction in memory to the log on disk:
1737 *
1738 *	1. Call the file system's wl_flush callback to flush any
1739 *	   per-file-system pending updates.
1740 *	2. Wait for enough space in the log for the current transaction.
1741 *	3. Synchronously write the new log records, advancing the
1742 *	   circular queue head.
1743 *	4. Issue the pending block writes asynchronously, now that they
1744 *	   are recorded in the log and can be replayed after crash.
1745 *	5. If wait is true, wait for all writes to complete and for the
1746 *	   log to become empty.
1747 *
1748 *	On failure, call the file system's wl_flush_abort callback.
1749 */
1750int
1751wapbl_flush(struct wapbl *wl, int waitfor)
1752{
1753	struct buf *bp;
1754	struct wapbl_entry *we;
1755	off_t off;
1756	off_t head;
1757	off_t tail;
1758	size_t delta = 0;
1759	size_t flushsize;
1760	size_t reserved;
1761	int error = 0;
1762
1763	/*
1764	 * Do a quick check to see if a full flush can be skipped
1765	 * This assumes that the flush callback does not need to be called
1766	 * unless there are other outstanding bufs.
1767	 */
1768	if (!waitfor) {
1769		size_t nbufs;
1770		mutex_enter(&wl->wl_mtx);	/* XXX need mutex here to
1771						   protect the KASSERTS */
1772		nbufs = wl->wl_bufcount;
1773		KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
1774		KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
1775		mutex_exit(&wl->wl_mtx);
1776		if (nbufs == 0)
1777			return 0;
1778	}
1779
1780	/*
1781	 * XXX we may consider using LK_UPGRADE here
1782	 * if we want to call flush from inside a transaction
1783	 */
1784	rw_enter(&wl->wl_rwlock, RW_WRITER);
1785	wl->wl_flush(wl->wl_mount, TAILQ_FIRST(&wl->wl_dealloclist));
1786
1787	/*
1788	 * Now that we are exclusively locked and the file system has
1789	 * issued any deferred block writes for this transaction, check
1790	 * whether there are any blocks to write to the log.  If not,
1791	 * skip waiting for space or writing any log entries.
1792	 *
1793	 * XXX Shouldn't this also check wl_dealloccnt and
1794	 * wl_inohashcnt?  Perhaps wl_dealloccnt doesn't matter if the
1795	 * file system didn't produce any blocks as a consequence of
1796	 * it, but the same does not seem to be so of wl_inohashcnt.
1797	 */
1798	if (wl->wl_bufcount == 0) {
1799		goto wait_out;
1800	}
1801
1802#if 0
1803	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1804		     ("wapbl_flush thread %d.%d flushing entries with "
1805		      "bufcount=%zu bufbytes=%zu\n",
1806		      curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
1807		      wl->wl_bufbytes));
1808#endif
1809
1810	/* Calculate amount of space needed to flush */
1811	flushsize = wapbl_transaction_len(wl);
1812	if (wapbl_verbose_commit) {
1813		struct timespec ts;
1814		getnanotime(&ts);
1815		printf("%s: %lld.%09ld this transaction = %zu bytes\n",
1816		    __func__, (long long)ts.tv_sec,
1817		    (long)ts.tv_nsec, flushsize);
1818	}
1819
1820	if (flushsize > (wl->wl_circ_size - wl->wl_reserved_bytes)) {
1821		/*
1822		 * XXX this could be handled more gracefully, perhaps place
1823		 * only a partial transaction in the log and allow the
1824		 * remaining to flush without the protection of the journal.
1825		 */
1826		panic("wapbl_flush: current transaction too big to flush");
1827	}
1828
1829	error = wapbl_truncate(wl, flushsize);
1830	if (error)
1831		goto out;
1832
1833	off = wl->wl_head;
1834	KASSERT((off == 0) || (off >= wl->wl_circ_off));
1835	KASSERT((off == 0) || (off < wl->wl_circ_off + wl->wl_circ_size));
1836	error = wapbl_write_blocks(wl, &off);
1837	if (error)
1838		goto out;
1839	error = wapbl_write_revocations(wl, &off);
1840	if (error)
1841		goto out;
1842	error = wapbl_write_inodes(wl, &off);
1843	if (error)
1844		goto out;
1845
1846	reserved = 0;
1847	if (wl->wl_inohashcnt)
1848		reserved = wapbl_transaction_inodes_len(wl);
1849
1850	head = wl->wl_head;
1851	tail = wl->wl_tail;
1852
1853	wapbl_advance_head(wl->wl_circ_size, wl->wl_circ_off, flushsize,
1854	    &head, &tail);
1855
1856	KASSERTMSG(head == off,
1857	    "lost head! head=%"PRIdMAX" tail=%" PRIdMAX
1858	    " off=%"PRIdMAX" flush=%zu",
1859	    (intmax_t)head, (intmax_t)tail, (intmax_t)off,
1860	    flushsize);
1861
1862	/* Opportunistically move the tail forward if we can */
1863	mutex_enter(&wl->wl_mtx);
1864	delta = wl->wl_reclaimable_bytes;
1865	mutex_exit(&wl->wl_mtx);
1866	wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta,
1867	    &head, &tail);
1868
1869	error = wapbl_write_commit(wl, head, tail);
1870	if (error)
1871		goto out;
1872
1873	we = pool_get(&wapbl_entry_pool, PR_WAITOK);
1874
1875#ifdef WAPBL_DEBUG_BUFBYTES
1876	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1877		("wapbl_flush: thread %d.%d head+=%zu tail+=%zu used=%zu"
1878		 " unsynced=%zu"
1879		 "\n\tbufcount=%zu bufbytes=%zu bcount=%zu deallocs=%d "
1880		 "inodes=%d\n",
1881		 curproc->p_pid, curlwp->l_lid, flushsize, delta,
1882		 wapbl_space_used(wl->wl_circ_size, head, tail),
1883		 wl->wl_unsynced_bufbytes, wl->wl_bufcount,
1884		 wl->wl_bufbytes, wl->wl_bcount, wl->wl_dealloccnt,
1885		 wl->wl_inohashcnt));
1886#else
1887	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1888		("wapbl_flush: thread %d.%d head+=%zu tail+=%zu used=%zu"
1889		 "\n\tbufcount=%zu bufbytes=%zu bcount=%zu deallocs=%d "
1890		 "inodes=%d\n",
1891		 curproc->p_pid, curlwp->l_lid, flushsize, delta,
1892		 wapbl_space_used(wl->wl_circ_size, head, tail),
1893		 wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
1894		 wl->wl_dealloccnt, wl->wl_inohashcnt));
1895#endif
1896
1897
1898	mutex_enter(&bufcache_lock);
1899	mutex_enter(&wl->wl_mtx);
1900
1901	wl->wl_reserved_bytes = reserved;
1902	wl->wl_head = head;
1903	wl->wl_tail = tail;
1904	KASSERT(wl->wl_reclaimable_bytes >= delta);
1905	wl->wl_reclaimable_bytes -= delta;
1906	KDASSERT(wl->wl_dealloccnt == 0);
1907#ifdef WAPBL_DEBUG_BUFBYTES
1908	wl->wl_unsynced_bufbytes += wl->wl_bufbytes;
1909#endif
1910
1911	we->we_wapbl = wl;
1912	we->we_bufcount = wl->wl_bufcount;
1913#ifdef WAPBL_DEBUG_BUFBYTES
1914	we->we_unsynced_bufbytes = wl->wl_bufbytes;
1915#endif
1916	we->we_reclaimable_bytes = flushsize;
1917	we->we_error = 0;
1918	SIMPLEQ_INSERT_TAIL(&wl->wl_entries, we, we_entries);
1919
1920	/*
1921	 * This flushes bufs in order than they were queued, so the LRU
1922	 * order is preserved.
1923	 */
1924	while ((bp = TAILQ_FIRST(&wl->wl_bufs)) != NULL) {
1925		if (bbusy(bp, 0, 0, &wl->wl_mtx)) {
1926			continue;
1927		}
1928		bp->b_iodone = wapbl_biodone;
1929		bp->b_private = we;
1930
1931		bremfree(bp);
1932		wapbl_remove_buf_locked(wl, bp);
1933		mutex_exit(&wl->wl_mtx);
1934		mutex_exit(&bufcache_lock);
1935		bawrite(bp);
1936		mutex_enter(&bufcache_lock);
1937		mutex_enter(&wl->wl_mtx);
1938	}
1939	mutex_exit(&wl->wl_mtx);
1940	mutex_exit(&bufcache_lock);
1941
1942#if 0
1943	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1944		     ("wapbl_flush thread %d.%d done flushing entries...\n",
1945		     curproc->p_pid, curlwp->l_lid));
1946#endif
1947
1948 wait_out:
1949
1950	/*
1951	 * If the waitfor flag is set, don't return until everything is
1952	 * fully flushed and the on disk log is empty.
1953	 */
1954	if (waitfor) {
1955		error = wapbl_truncate(wl, wl->wl_circ_size -
1956			wl->wl_reserved_bytes);
1957	}
1958
1959 out:
1960	if (error) {
1961		wl->wl_flush_abort(wl->wl_mount,
1962		    TAILQ_FIRST(&wl->wl_dealloclist));
1963	}
1964
1965#ifdef WAPBL_DEBUG_PRINT
1966	if (error) {
1967		pid_t pid = -1;
1968		lwpid_t lid = -1;
1969		if (curproc)
1970			pid = curproc->p_pid;
1971		if (curlwp)
1972			lid = curlwp->l_lid;
1973		mutex_enter(&wl->wl_mtx);
1974#ifdef WAPBL_DEBUG_BUFBYTES
1975		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1976		    ("wapbl_flush: thread %d.%d aborted flush: "
1977		    "error = %d\n"
1978		    "\tbufcount=%zu bufbytes=%zu bcount=%zu "
1979		    "deallocs=%d inodes=%d\n"
1980		    "\terrcnt = %d, reclaimable=%zu reserved=%zu "
1981		    "unsynced=%zu\n",
1982		    pid, lid, error, wl->wl_bufcount,
1983		    wl->wl_bufbytes, wl->wl_bcount,
1984		    wl->wl_dealloccnt, wl->wl_inohashcnt,
1985		    wl->wl_error_count, wl->wl_reclaimable_bytes,
1986		    wl->wl_reserved_bytes, wl->wl_unsynced_bufbytes));
1987		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
1988			WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1989			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
1990			     "error = %d, unsynced = %zu\n",
1991			     we->we_bufcount, we->we_reclaimable_bytes,
1992			     we->we_error, we->we_unsynced_bufbytes));
1993		}
1994#else
1995		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1996		    ("wapbl_flush: thread %d.%d aborted flush: "
1997		     "error = %d\n"
1998		     "\tbufcount=%zu bufbytes=%zu bcount=%zu "
1999		     "deallocs=%d inodes=%d\n"
2000		     "\terrcnt = %d, reclaimable=%zu reserved=%zu\n",
2001		     pid, lid, error, wl->wl_bufcount,
2002		     wl->wl_bufbytes, wl->wl_bcount,
2003		     wl->wl_dealloccnt, wl->wl_inohashcnt,
2004		     wl->wl_error_count, wl->wl_reclaimable_bytes,
2005		     wl->wl_reserved_bytes));
2006		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
2007			WAPBL_PRINTF(WAPBL_PRINT_ERROR,
2008			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
2009			     "error = %d\n", we->we_bufcount,
2010			     we->we_reclaimable_bytes, we->we_error));
2011		}
2012#endif
2013		mutex_exit(&wl->wl_mtx);
2014	}
2015#endif
2016
2017	rw_exit(&wl->wl_rwlock);
2018	return error;
2019}
2020
2021/****************************************************************/
2022
2023void
2024wapbl_jlock_assert(struct wapbl *wl)
2025{
2026
2027	KASSERT(rw_lock_held(&wl->wl_rwlock));
2028}
2029
2030void
2031wapbl_junlock_assert(struct wapbl *wl)
2032{
2033
2034	KASSERT(!rw_write_held(&wl->wl_rwlock));
2035}
2036
2037/****************************************************************/
2038
2039/* locks missing */
2040void
2041wapbl_print(struct wapbl *wl,
2042		int full,
2043		void (*pr)(const char *, ...))
2044{
2045	struct buf *bp;
2046	struct wapbl_entry *we;
2047	(*pr)("wapbl %p", wl);
2048	(*pr)("\nlogvp = %p, devvp = %p, logpbn = %"PRId64"\n",
2049	      wl->wl_logvp, wl->wl_devvp, wl->wl_logpbn);
2050	(*pr)("circ = %zu, header = %zu, head = %"PRIdMAX" tail = %"PRIdMAX"\n",
2051	      wl->wl_circ_size, wl->wl_circ_off,
2052	      (intmax_t)wl->wl_head, (intmax_t)wl->wl_tail);
2053	(*pr)("fs_dev_bshift = %d, log_dev_bshift = %d\n",
2054	      wl->wl_log_dev_bshift, wl->wl_fs_dev_bshift);
2055#ifdef WAPBL_DEBUG_BUFBYTES
2056	(*pr)("bufcount = %zu, bufbytes = %zu bcount = %zu reclaimable = %zu "
2057	      "reserved = %zu errcnt = %d unsynced = %zu\n",
2058	      wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
2059	      wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
2060				wl->wl_error_count, wl->wl_unsynced_bufbytes);
2061#else
2062	(*pr)("bufcount = %zu, bufbytes = %zu bcount = %zu reclaimable = %zu "
2063	      "reserved = %zu errcnt = %d\n", wl->wl_bufcount, wl->wl_bufbytes,
2064	      wl->wl_bcount, wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
2065				wl->wl_error_count);
2066#endif
2067	(*pr)("\tdealloccnt = %d, dealloclim = %d\n",
2068	      wl->wl_dealloccnt, wl->wl_dealloclim);
2069	(*pr)("\tinohashcnt = %d, inohashmask = 0x%08x\n",
2070	      wl->wl_inohashcnt, wl->wl_inohashmask);
2071	(*pr)("entries:\n");
2072	SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
2073#ifdef WAPBL_DEBUG_BUFBYTES
2074		(*pr)("\tbufcount = %zu, reclaimable = %zu, error = %d, "
2075		      "unsynced = %zu\n",
2076		      we->we_bufcount, we->we_reclaimable_bytes,
2077		      we->we_error, we->we_unsynced_bufbytes);
2078#else
2079		(*pr)("\tbufcount = %zu, reclaimable = %zu, error = %d\n",
2080		      we->we_bufcount, we->we_reclaimable_bytes, we->we_error);
2081#endif
2082	}
2083	if (full) {
2084		int cnt = 0;
2085		(*pr)("bufs =");
2086		TAILQ_FOREACH(bp, &wl->wl_bufs, b_wapbllist) {
2087			if (!TAILQ_NEXT(bp, b_wapbllist)) {
2088				(*pr)(" %p", bp);
2089			} else if ((++cnt % 6) == 0) {
2090				(*pr)(" %p,\n\t", bp);
2091			} else {
2092				(*pr)(" %p,", bp);
2093			}
2094		}
2095		(*pr)("\n");
2096
2097		(*pr)("dealloced blks = ");
2098		{
2099			struct wapbl_dealloc *wd;
2100			cnt = 0;
2101			TAILQ_FOREACH(wd, &wl->wl_dealloclist, wd_entries) {
2102				(*pr)(" %"PRId64":%d,",
2103				      wd->wd_blkno,
2104				      wd->wd_len);
2105				if ((++cnt % 4) == 0) {
2106					(*pr)("\n\t");
2107				}
2108			}
2109		}
2110		(*pr)("\n");
2111
2112		(*pr)("registered inodes = ");
2113		{
2114			int i;
2115			cnt = 0;
2116			for (i = 0; i <= wl->wl_inohashmask; i++) {
2117				struct wapbl_ino_head *wih;
2118				struct wapbl_ino *wi;
2119
2120				wih = &wl->wl_inohash[i];
2121				LIST_FOREACH(wi, wih, wi_hash) {
2122					if (wi->wi_ino == 0)
2123						continue;
2124					(*pr)(" %"PRIu64"/0%06"PRIo32",",
2125					    wi->wi_ino, wi->wi_mode);
2126					if ((++cnt % 4) == 0) {
2127						(*pr)("\n\t");
2128					}
2129				}
2130			}
2131			(*pr)("\n");
2132		}
2133
2134		(*pr)("iobufs free =");
2135		TAILQ_FOREACH(bp, &wl->wl_iobufs, b_wapbllist) {
2136			if (!TAILQ_NEXT(bp, b_wapbllist)) {
2137				(*pr)(" %p", bp);
2138			} else if ((++cnt % 6) == 0) {
2139				(*pr)(" %p,\n\t", bp);
2140			} else {
2141				(*pr)(" %p,", bp);
2142			}
2143		}
2144		(*pr)("\n");
2145
2146		(*pr)("iobufs busy =");
2147		TAILQ_FOREACH(bp, &wl->wl_iobufs_busy, b_wapbllist) {
2148			if (!TAILQ_NEXT(bp, b_wapbllist)) {
2149				(*pr)(" %p", bp);
2150			} else if ((++cnt % 6) == 0) {
2151				(*pr)(" %p,\n\t", bp);
2152			} else {
2153				(*pr)(" %p,", bp);
2154			}
2155		}
2156		(*pr)("\n");
2157	}
2158}
2159
2160#if defined(WAPBL_DEBUG) || defined(DDB)
2161void
2162wapbl_dump(struct wapbl *wl)
2163{
2164#if defined(WAPBL_DEBUG)
2165	if (!wl)
2166		wl = wapbl_debug_wl;
2167#endif
2168	if (!wl)
2169		return;
2170	wapbl_print(wl, 1, printf);
2171}
2172#endif
2173
2174/****************************************************************/
2175
2176int
2177wapbl_register_deallocation(struct wapbl *wl, daddr_t blk, int len, bool force,
2178    void **cookiep)
2179{
2180	struct wapbl_dealloc *wd;
2181	int error = 0;
2182
2183	wapbl_jlock_assert(wl);
2184
2185	mutex_enter(&wl->wl_mtx);
2186
2187	if (__predict_false(wl->wl_dealloccnt >= wl->wl_dealloclim)) {
2188		if (!force) {
2189			error = EAGAIN;
2190			goto out;
2191		}
2192
2193		/*
2194		 * Forced registration can only be used when:
2195		 * 1) the caller can't cope with failure
2196		 * 2) the path can be triggered only bounded, small
2197		 *    times per transaction
2198		 * If this is not fullfilled, and the path would be triggered
2199		 * many times, this could overflow maximum transaction size
2200		 * and panic later.
2201		 */
2202		printf("%s: forced dealloc registration over limit: %d >= %d\n",
2203			wl->wl_mount->mnt_stat.f_mntonname,
2204			wl->wl_dealloccnt, wl->wl_dealloclim);
2205	}
2206
2207	wl->wl_dealloccnt++;
2208	mutex_exit(&wl->wl_mtx);
2209
2210	wd = pool_get(&wapbl_dealloc_pool, PR_WAITOK);
2211	wd->wd_blkno = blk;
2212	wd->wd_len = len;
2213
2214	mutex_enter(&wl->wl_mtx);
2215	TAILQ_INSERT_TAIL(&wl->wl_dealloclist, wd, wd_entries);
2216
2217	if (cookiep)
2218		*cookiep = wd;
2219
2220 out:
2221	mutex_exit(&wl->wl_mtx);
2222
2223	WAPBL_PRINTF(WAPBL_PRINT_ALLOC,
2224	    ("wapbl_register_deallocation: blk=%"PRId64" len=%d error=%d\n",
2225	    blk, len, error));
2226
2227	return error;
2228}
2229
2230static void
2231wapbl_deallocation_free(struct wapbl *wl, struct wapbl_dealloc *wd,
2232	bool locked)
2233{
2234	KASSERT(!locked
2235	    || rw_lock_held(&wl->wl_rwlock) || mutex_owned(&wl->wl_mtx));
2236
2237	if (!locked)
2238		mutex_enter(&wl->wl_mtx);
2239
2240	TAILQ_REMOVE(&wl->wl_dealloclist, wd, wd_entries);
2241	wl->wl_dealloccnt--;
2242
2243	if (!locked)
2244		mutex_exit(&wl->wl_mtx);
2245
2246	pool_put(&wapbl_dealloc_pool, wd);
2247}
2248
2249void
2250wapbl_unregister_deallocation(struct wapbl *wl, void *cookie)
2251{
2252	KASSERT(cookie != NULL);
2253	wapbl_deallocation_free(wl, cookie, false);
2254}
2255
2256/****************************************************************/
2257
2258static void
2259wapbl_inodetrk_init(struct wapbl *wl, u_int size)
2260{
2261
2262	wl->wl_inohash = hashinit(size, HASH_LIST, true, &wl->wl_inohashmask);
2263	if (atomic_inc_uint_nv(&wapbl_ino_pool_refcount) == 1) {
2264		pool_init(&wapbl_ino_pool, sizeof(struct wapbl_ino), 0, 0, 0,
2265		    "wapblinopl", &pool_allocator_nointr, IPL_NONE);
2266	}
2267}
2268
2269static void
2270wapbl_inodetrk_free(struct wapbl *wl)
2271{
2272
2273	/* XXX this KASSERT needs locking/mutex analysis */
2274	KASSERT(wl->wl_inohashcnt == 0);
2275	hashdone(wl->wl_inohash, HASH_LIST, wl->wl_inohashmask);
2276	membar_release();
2277	if (atomic_dec_uint_nv(&wapbl_ino_pool_refcount) == 0) {
2278		membar_acquire();
2279		pool_destroy(&wapbl_ino_pool);
2280	}
2281}
2282
2283static struct wapbl_ino *
2284wapbl_inodetrk_get(struct wapbl *wl, ino_t ino)
2285{
2286	struct wapbl_ino_head *wih;
2287	struct wapbl_ino *wi;
2288
2289	KASSERT(mutex_owned(&wl->wl_mtx));
2290
2291	wih = &wl->wl_inohash[ino & wl->wl_inohashmask];
2292	LIST_FOREACH(wi, wih, wi_hash) {
2293		if (ino == wi->wi_ino)
2294			return wi;
2295	}
2296	return 0;
2297}
2298
2299void
2300wapbl_register_inode(struct wapbl *wl, ino_t ino, mode_t mode)
2301{
2302	struct wapbl_ino_head *wih;
2303	struct wapbl_ino *wi;
2304
2305	wi = pool_get(&wapbl_ino_pool, PR_WAITOK);
2306
2307	mutex_enter(&wl->wl_mtx);
2308	if (wapbl_inodetrk_get(wl, ino) == NULL) {
2309		wi->wi_ino = ino;
2310		wi->wi_mode = mode;
2311		wih = &wl->wl_inohash[ino & wl->wl_inohashmask];
2312		LIST_INSERT_HEAD(wih, wi, wi_hash);
2313		wl->wl_inohashcnt++;
2314		WAPBL_PRINTF(WAPBL_PRINT_INODE,
2315		    ("wapbl_register_inode: ino=%"PRId64"\n", ino));
2316		mutex_exit(&wl->wl_mtx);
2317	} else {
2318		mutex_exit(&wl->wl_mtx);
2319		pool_put(&wapbl_ino_pool, wi);
2320	}
2321}
2322
2323void
2324wapbl_unregister_inode(struct wapbl *wl, ino_t ino, mode_t mode)
2325{
2326	struct wapbl_ino *wi;
2327
2328	mutex_enter(&wl->wl_mtx);
2329	wi = wapbl_inodetrk_get(wl, ino);
2330	if (wi) {
2331		WAPBL_PRINTF(WAPBL_PRINT_INODE,
2332		    ("wapbl_unregister_inode: ino=%"PRId64"\n", ino));
2333		KASSERT(wl->wl_inohashcnt > 0);
2334		wl->wl_inohashcnt--;
2335		LIST_REMOVE(wi, wi_hash);
2336		mutex_exit(&wl->wl_mtx);
2337
2338		pool_put(&wapbl_ino_pool, wi);
2339	} else {
2340		mutex_exit(&wl->wl_mtx);
2341	}
2342}
2343
2344/****************************************************************/
2345
2346/*
2347 * wapbl_transaction_inodes_len(wl)
2348 *
2349 *	Calculate the number of bytes required for inode registration
2350 *	log records in wl.
2351 */
2352static inline size_t
2353wapbl_transaction_inodes_len(struct wapbl *wl)
2354{
2355	int blocklen = 1<<wl->wl_log_dev_bshift;
2356	int iph;
2357
2358	/* Calculate number of inodes described in a inodelist header */
2359	iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
2360	    sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
2361
2362	KASSERT(iph > 0);
2363
2364	return MAX(1, howmany(wl->wl_inohashcnt, iph)) * blocklen;
2365}
2366
2367
2368/*
2369 * wapbl_transaction_len(wl)
2370 *
2371 *	Calculate number of bytes required for all log records in wl.
2372 */
2373static size_t
2374wapbl_transaction_len(struct wapbl *wl)
2375{
2376	int blocklen = 1<<wl->wl_log_dev_bshift;
2377	size_t len;
2378
2379	/* Calculate number of blocks described in a blocklist header */
2380	len = wl->wl_bcount;
2381	len += howmany(wl->wl_bufcount, wl->wl_brperjblock) * blocklen;
2382	len += howmany(wl->wl_dealloccnt, wl->wl_brperjblock) * blocklen;
2383	len += wapbl_transaction_inodes_len(wl);
2384
2385	return len;
2386}
2387
2388/*
2389 * wapbl_cache_sync(wl, msg)
2390 *
2391 *	Issue DIOCCACHESYNC to wl->wl_devvp.
2392 *
2393 *	If sysctl(vfs.wapbl.verbose_commit) >= 2, print a message
2394 *	including msg about the duration of the cache sync.
2395 */
2396static int
2397wapbl_cache_sync(struct wapbl *wl, const char *msg)
2398{
2399	const bool verbose = wapbl_verbose_commit >= 2;
2400	struct bintime start_time;
2401	int force = 1;
2402	int error;
2403
2404	/* Skip full cache sync if disabled */
2405	if (!wapbl_flush_disk_cache) {
2406		return 0;
2407	}
2408	if (verbose) {
2409		bintime(&start_time);
2410	}
2411	error = VOP_IOCTL(wl->wl_devvp, DIOCCACHESYNC, &force,
2412	    FWRITE, FSCRED);
2413	if (error) {
2414		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
2415		    ("wapbl_cache_sync: DIOCCACHESYNC on dev 0x%jx "
2416		    "returned %d\n", (uintmax_t)wl->wl_devvp->v_rdev, error));
2417	}
2418	if (verbose) {
2419		struct bintime d;
2420		struct timespec ts;
2421
2422		bintime(&d);
2423		bintime_sub(&d, &start_time);
2424		bintime2timespec(&d, &ts);
2425		printf("wapbl_cache_sync: %s: dev 0x%jx %ju.%09lu\n",
2426		    msg, (uintmax_t)wl->wl_devvp->v_rdev,
2427		    (uintmax_t)ts.tv_sec, ts.tv_nsec);
2428	}
2429
2430	wl->wl_ev_cacheflush.ev_count++;
2431
2432	return error;
2433}
2434
2435/*
2436 * wapbl_write_commit(wl, head, tail)
2437 *
2438 *	Issue a disk cache sync to wait for all pending writes to the
2439 *	log to complete, and then synchronously commit the current
2440 *	circular queue head and tail to the log, in the next of two
2441 *	locations for commit headers on disk.
2442 *
2443 *	Increment the generation number.  If the generation number
2444 *	rolls over to zero, then a subsequent commit would appear to
2445 *	have an older generation than this one -- in that case, issue a
2446 *	duplicate commit to avoid this.
2447 *
2448 *	=> Caller must have exclusive access to wl, either by holding
2449 *	wl->wl_rwlock for writer or by being wapbl_start before anyone
2450 *	else has seen wl.
2451 */
2452static int
2453wapbl_write_commit(struct wapbl *wl, off_t head, off_t tail)
2454{
2455	struct wapbl_wc_header *wc = wl->wl_wc_header;
2456	struct timespec ts;
2457	int error;
2458	daddr_t pbn;
2459
2460	error = wapbl_buffered_flush(wl, true);
2461	if (error)
2462		return error;
2463	/*
2464	 * Flush disk cache to ensure that blocks we've written are actually
2465	 * written to the stable storage before the commit header.
2466	 * This flushes to disk not only journal blocks, but also all
2467	 * metadata blocks, written asynchronously since previous commit.
2468	 *
2469	 * XXX Calc checksum here, instead we do this for now
2470	 */
2471	wapbl_cache_sync(wl, "1");
2472
2473	wc->wc_head = head;
2474	wc->wc_tail = tail;
2475	wc->wc_checksum = 0;
2476	wc->wc_version = 1;
2477	getnanotime(&ts);
2478	wc->wc_time = ts.tv_sec;
2479	wc->wc_timensec = ts.tv_nsec;
2480
2481	WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2482	    ("wapbl_write_commit: head = %"PRIdMAX "tail = %"PRIdMAX"\n",
2483	    (intmax_t)head, (intmax_t)tail));
2484
2485	/*
2486	 * write the commit header.
2487	 *
2488	 * XXX if generation will rollover, then first zero
2489	 * over second commit header before trying to write both headers.
2490	 */
2491
2492	pbn = wl->wl_logpbn + (wc->wc_generation % 2);
2493#ifdef _KERNEL
2494	pbn = btodb(pbn << wc->wc_log_dev_bshift);
2495#endif
2496	error = wapbl_buffered_write(wc, wc->wc_len, wl, pbn, WAPBL_JFLAGS(wl));
2497	if (error)
2498		return error;
2499	error = wapbl_buffered_flush(wl, true);
2500	if (error)
2501		return error;
2502
2503	/*
2504	 * Flush disk cache to ensure that the commit header is actually
2505	 * written before meta data blocks. Commit block is written using
2506	 * FUA when enabled, in that case this flush is not needed.
2507	 */
2508	if (!WAPBL_USE_FUA(wl))
2509		wapbl_cache_sync(wl, "2");
2510
2511	/*
2512	 * If the generation number was zero, write it out a second time.
2513	 * This handles initialization and generation number rollover
2514	 */
2515	if (wc->wc_generation++ == 0) {
2516		error = wapbl_write_commit(wl, head, tail);
2517		/*
2518		 * This panic should be able to be removed if we do the
2519		 * zero'ing mentioned above, and we are certain to roll
2520		 * back generation number on failure.
2521		 */
2522		if (error)
2523			panic("wapbl_write_commit: error writing duplicate "
2524			      "log header: %d", error);
2525	}
2526
2527	wl->wl_ev_commit.ev_count++;
2528
2529	return 0;
2530}
2531
2532/*
2533 * wapbl_write_blocks(wl, offp)
2534 *
2535 *	Write all pending physical blocks in the current transaction
2536 *	from wapbl_add_buf to the log on disk, adding to the circular
2537 *	queue head at byte offset *offp, and returning the new head's
2538 *	byte offset in *offp.
2539 */
2540static int
2541wapbl_write_blocks(struct wapbl *wl, off_t *offp)
2542{
2543	struct wapbl_wc_blocklist *wc =
2544	    (struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
2545	int blocklen = 1<<wl->wl_log_dev_bshift;
2546	struct buf *bp;
2547	off_t off = *offp;
2548	int error;
2549	size_t padding;
2550
2551	KASSERT(rw_write_held(&wl->wl_rwlock));
2552
2553	bp = TAILQ_FIRST(&wl->wl_bufs);
2554
2555	while (bp) {
2556		int cnt;
2557		struct buf *obp = bp;
2558
2559		KASSERT(bp->b_flags & B_LOCKED);
2560
2561		wc->wc_type = WAPBL_WC_BLOCKS;
2562		wc->wc_len = blocklen;
2563		wc->wc_blkcount = 0;
2564		wc->wc_unused = 0;
2565		while (bp && (wc->wc_blkcount < wl->wl_brperjblock)) {
2566			/*
2567			 * Make sure all the physical block numbers are up to
2568			 * date.  If this is not always true on a given
2569			 * filesystem, then VOP_BMAP must be called.  We
2570			 * could call VOP_BMAP here, or else in the filesystem
2571			 * specific flush callback, although neither of those
2572			 * solutions allow us to take the vnode lock.  If a
2573			 * filesystem requires that we must take the vnode lock
2574			 * to call VOP_BMAP, then we can probably do it in
2575			 * bwrite when the vnode lock should already be held
2576			 * by the invoking code.
2577			 */
2578			KASSERT((bp->b_vp->v_type == VBLK) ||
2579				 (bp->b_blkno != bp->b_lblkno));
2580			KASSERT(bp->b_blkno > 0);
2581
2582			wc->wc_blocks[wc->wc_blkcount].wc_daddr = bp->b_blkno;
2583			wc->wc_blocks[wc->wc_blkcount].wc_dlen = bp->b_bcount;
2584			wc->wc_len += bp->b_bcount;
2585			wc->wc_blkcount++;
2586			bp = TAILQ_NEXT(bp, b_wapbllist);
2587		}
2588		if (wc->wc_len % blocklen != 0) {
2589			padding = blocklen - wc->wc_len % blocklen;
2590			wc->wc_len += padding;
2591		} else {
2592			padding = 0;
2593		}
2594
2595		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2596		    ("wapbl_write_blocks: len = %u (padding %zu) off = %"PRIdMAX"\n",
2597		    wc->wc_len, padding, (intmax_t)off));
2598
2599		error = wapbl_circ_write(wl, wc, blocklen, &off);
2600		if (error)
2601			return error;
2602		bp = obp;
2603		cnt = 0;
2604		while (bp && (cnt++ < wl->wl_brperjblock)) {
2605			error = wapbl_circ_write(wl, bp->b_data,
2606			    bp->b_bcount, &off);
2607			if (error)
2608				return error;
2609			bp = TAILQ_NEXT(bp, b_wapbllist);
2610		}
2611		if (padding) {
2612			void *zero;
2613
2614			zero = wapbl_alloc(padding);
2615			memset(zero, 0, padding);
2616			error = wapbl_circ_write(wl, zero, padding, &off);
2617			wapbl_free(zero, padding);
2618			if (error)
2619				return error;
2620		}
2621	}
2622	*offp = off;
2623	return 0;
2624}
2625
2626/*
2627 * wapbl_write_revocations(wl, offp)
2628 *
2629 *	Write all pending deallocations in the current transaction from
2630 *	wapbl_register_deallocation to the log on disk, adding to the
2631 *	circular queue's head at byte offset *offp, and returning the
2632 *	new head's byte offset in *offp.
2633 */
2634static int
2635wapbl_write_revocations(struct wapbl *wl, off_t *offp)
2636{
2637	struct wapbl_wc_blocklist *wc =
2638	    (struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
2639	struct wapbl_dealloc *wd, *lwd;
2640	int blocklen = 1<<wl->wl_log_dev_bshift;
2641	off_t off = *offp;
2642	int error;
2643
2644	KASSERT(rw_write_held(&wl->wl_rwlock));
2645
2646	if (wl->wl_dealloccnt == 0)
2647		return 0;
2648
2649	while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL) {
2650		wc->wc_type = WAPBL_WC_REVOCATIONS;
2651		wc->wc_len = blocklen;
2652		wc->wc_blkcount = 0;
2653		wc->wc_unused = 0;
2654		while (wd && (wc->wc_blkcount < wl->wl_brperjblock)) {
2655			wc->wc_blocks[wc->wc_blkcount].wc_daddr =
2656			    wd->wd_blkno;
2657			wc->wc_blocks[wc->wc_blkcount].wc_dlen =
2658			    wd->wd_len;
2659			wc->wc_blkcount++;
2660
2661			wd = TAILQ_NEXT(wd, wd_entries);
2662		}
2663		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2664		    ("wapbl_write_revocations: len = %u off = %"PRIdMAX"\n",
2665		    wc->wc_len, (intmax_t)off));
2666		error = wapbl_circ_write(wl, wc, blocklen, &off);
2667		if (error)
2668			return error;
2669
2670		/* free all successfully written deallocs */
2671		lwd = wd;
2672		while ((wd = TAILQ_FIRST(&wl->wl_dealloclist)) != NULL) {
2673			if (wd == lwd)
2674				break;
2675			wapbl_deallocation_free(wl, wd, true);
2676		}
2677	}
2678	*offp = off;
2679	return 0;
2680}
2681
2682/*
2683 * wapbl_write_inodes(wl, offp)
2684 *
2685 *	Write all pending inode allocations in the current transaction
2686 *	from wapbl_register_inode to the log on disk, adding to the
2687 *	circular queue's head at byte offset *offp and returning the
2688 *	new head's byte offset in *offp.
2689 */
2690static int
2691wapbl_write_inodes(struct wapbl *wl, off_t *offp)
2692{
2693	struct wapbl_wc_inodelist *wc =
2694	    (struct wapbl_wc_inodelist *)wl->wl_wc_scratch;
2695	int i;
2696	int blocklen = 1 << wl->wl_log_dev_bshift;
2697	off_t off = *offp;
2698	int error;
2699
2700	struct wapbl_ino_head *wih;
2701	struct wapbl_ino *wi;
2702	int iph;
2703
2704	iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
2705	    sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
2706
2707	i = 0;
2708	wih = &wl->wl_inohash[0];
2709	wi = 0;
2710	do {
2711		wc->wc_type = WAPBL_WC_INODES;
2712		wc->wc_len = blocklen;
2713		wc->wc_inocnt = 0;
2714		wc->wc_clear = (i == 0);
2715		while ((i < wl->wl_inohashcnt) && (wc->wc_inocnt < iph)) {
2716			while (!wi) {
2717				KASSERT((wih - &wl->wl_inohash[0])
2718				    <= wl->wl_inohashmask);
2719				wi = LIST_FIRST(wih++);
2720			}
2721			wc->wc_inodes[wc->wc_inocnt].wc_inumber = wi->wi_ino;
2722			wc->wc_inodes[wc->wc_inocnt].wc_imode = wi->wi_mode;
2723			wc->wc_inocnt++;
2724			i++;
2725			wi = LIST_NEXT(wi, wi_hash);
2726		}
2727		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2728		    ("wapbl_write_inodes: len = %u off = %"PRIdMAX"\n",
2729		    wc->wc_len, (intmax_t)off));
2730		error = wapbl_circ_write(wl, wc, blocklen, &off);
2731		if (error)
2732			return error;
2733	} while (i < wl->wl_inohashcnt);
2734
2735	*offp = off;
2736	return 0;
2737}
2738
2739#endif /* _KERNEL */
2740
2741/****************************************************************/
2742
2743struct wapbl_blk {
2744	LIST_ENTRY(wapbl_blk) wb_hash;
2745	daddr_t wb_blk;
2746	off_t wb_off; /* Offset of this block in the log */
2747};
2748#define	WAPBL_BLKPOOL_MIN 83
2749
2750static void
2751wapbl_blkhash_init(struct wapbl_replay *wr, u_int size)
2752{
2753	if (size < WAPBL_BLKPOOL_MIN)
2754		size = WAPBL_BLKPOOL_MIN;
2755	KASSERT(wr->wr_blkhash == 0);
2756#ifdef _KERNEL
2757	wr->wr_blkhash = hashinit(size, HASH_LIST, true, &wr->wr_blkhashmask);
2758#else /* ! _KERNEL */
2759	/* Manually implement hashinit */
2760	{
2761		unsigned long i, hashsize;
2762		for (hashsize = 1; hashsize < size; hashsize <<= 1)
2763			continue;
2764		wr->wr_blkhash = wapbl_alloc(hashsize * sizeof(*wr->wr_blkhash));
2765		for (i = 0; i < hashsize; i++)
2766			LIST_INIT(&wr->wr_blkhash[i]);
2767		wr->wr_blkhashmask = hashsize - 1;
2768	}
2769#endif /* ! _KERNEL */
2770}
2771
2772static void
2773wapbl_blkhash_free(struct wapbl_replay *wr)
2774{
2775	KASSERT(wr->wr_blkhashcnt == 0);
2776#ifdef _KERNEL
2777	hashdone(wr->wr_blkhash, HASH_LIST, wr->wr_blkhashmask);
2778#else /* ! _KERNEL */
2779	wapbl_free(wr->wr_blkhash,
2780	    (wr->wr_blkhashmask + 1) * sizeof(*wr->wr_blkhash));
2781#endif /* ! _KERNEL */
2782}
2783
2784static struct wapbl_blk *
2785wapbl_blkhash_get(struct wapbl_replay *wr, daddr_t blk)
2786{
2787	struct wapbl_blk_head *wbh;
2788	struct wapbl_blk *wb;
2789	wbh = &wr->wr_blkhash[blk & wr->wr_blkhashmask];
2790	LIST_FOREACH(wb, wbh, wb_hash) {
2791		if (blk == wb->wb_blk)
2792			return wb;
2793	}
2794	return 0;
2795}
2796
2797static void
2798wapbl_blkhash_ins(struct wapbl_replay *wr, daddr_t blk, off_t off)
2799{
2800	struct wapbl_blk_head *wbh;
2801	struct wapbl_blk *wb;
2802	wb = wapbl_blkhash_get(wr, blk);
2803	if (wb) {
2804		KASSERT(wb->wb_blk == blk);
2805		wb->wb_off = off;
2806	} else {
2807		wb = wapbl_alloc(sizeof(*wb));
2808		wb->wb_blk = blk;
2809		wb->wb_off = off;
2810		wbh = &wr->wr_blkhash[blk & wr->wr_blkhashmask];
2811		LIST_INSERT_HEAD(wbh, wb, wb_hash);
2812		wr->wr_blkhashcnt++;
2813	}
2814}
2815
2816static void
2817wapbl_blkhash_rem(struct wapbl_replay *wr, daddr_t blk)
2818{
2819	struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
2820	if (wb) {
2821		KASSERT(wr->wr_blkhashcnt > 0);
2822		wr->wr_blkhashcnt--;
2823		LIST_REMOVE(wb, wb_hash);
2824		wapbl_free(wb, sizeof(*wb));
2825	}
2826}
2827
2828static void
2829wapbl_blkhash_clear(struct wapbl_replay *wr)
2830{
2831	unsigned long i;
2832	for (i = 0; i <= wr->wr_blkhashmask; i++) {
2833		struct wapbl_blk *wb;
2834
2835		while ((wb = LIST_FIRST(&wr->wr_blkhash[i]))) {
2836			KASSERT(wr->wr_blkhashcnt > 0);
2837			wr->wr_blkhashcnt--;
2838			LIST_REMOVE(wb, wb_hash);
2839			wapbl_free(wb, sizeof(*wb));
2840		}
2841	}
2842	KASSERT(wr->wr_blkhashcnt == 0);
2843}
2844
2845/****************************************************************/
2846
2847/*
2848 * wapbl_circ_read(wr, data, len, offp)
2849 *
2850 *	Read len bytes into data from the circular queue of wr,
2851 *	starting at the linear byte offset *offp, and returning the new
2852 *	linear byte offset in *offp.
2853 *
2854 *	If the starting linear byte offset precedes wr->wr_circ_off,
2855 *	the read instead begins at wr->wr_circ_off.  XXX WTF?  This
2856 *	should be a KASSERT, not a conditional.
2857 */
2858static int
2859wapbl_circ_read(struct wapbl_replay *wr, void *data, size_t len, off_t *offp)
2860{
2861	size_t slen;
2862	off_t off = *offp;
2863	int error;
2864	daddr_t pbn;
2865
2866	KASSERT(((len >> wr->wr_log_dev_bshift) <<
2867	    wr->wr_log_dev_bshift) == len);
2868
2869	if (off < wr->wr_circ_off)
2870		off = wr->wr_circ_off;
2871	slen = wr->wr_circ_off + wr->wr_circ_size - off;
2872	if (slen < len) {
2873		pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
2874#ifdef _KERNEL
2875		pbn = btodb(pbn << wr->wr_log_dev_bshift);
2876#endif
2877		error = wapbl_read(data, slen, wr->wr_devvp, pbn);
2878		if (error)
2879			return error;
2880		data = (uint8_t *)data + slen;
2881		len -= slen;
2882		off = wr->wr_circ_off;
2883	}
2884	pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
2885#ifdef _KERNEL
2886	pbn = btodb(pbn << wr->wr_log_dev_bshift);
2887#endif
2888	error = wapbl_read(data, len, wr->wr_devvp, pbn);
2889	if (error)
2890		return error;
2891	off += len;
2892	if (off >= wr->wr_circ_off + wr->wr_circ_size)
2893		off = wr->wr_circ_off;
2894	*offp = off;
2895	return 0;
2896}
2897
2898/*
2899 * wapbl_circ_advance(wr, len, offp)
2900 *
2901 *	Compute the linear byte offset of the circular queue of wr that
2902 *	is len bytes past *offp, and store it in *offp.
2903 *
2904 *	This is as if wapbl_circ_read, but without actually reading
2905 *	anything.
2906 *
2907 *	If the starting linear byte offset precedes wr->wr_circ_off, it
2908 *	is taken to be wr->wr_circ_off instead.  XXX WTF?  This should
2909 *	be a KASSERT, not a conditional.
2910 */
2911static void
2912wapbl_circ_advance(struct wapbl_replay *wr, size_t len, off_t *offp)
2913{
2914	size_t slen;
2915	off_t off = *offp;
2916
2917	KASSERT(((len >> wr->wr_log_dev_bshift) <<
2918	    wr->wr_log_dev_bshift) == len);
2919
2920	if (off < wr->wr_circ_off)
2921		off = wr->wr_circ_off;
2922	slen = wr->wr_circ_off + wr->wr_circ_size - off;
2923	if (slen < len) {
2924		len -= slen;
2925		off = wr->wr_circ_off;
2926	}
2927	off += len;
2928	if (off >= wr->wr_circ_off + wr->wr_circ_size)
2929		off = wr->wr_circ_off;
2930	*offp = off;
2931}
2932
2933/****************************************************************/
2934
2935int
2936wapbl_replay_start(struct wapbl_replay **wrp, struct vnode *vp,
2937	daddr_t off, size_t count, size_t blksize)
2938{
2939	struct wapbl_replay *wr;
2940	int error;
2941	struct vnode *devvp;
2942	daddr_t logpbn;
2943	uint8_t *scratch;
2944	struct wapbl_wc_header *wch;
2945	struct wapbl_wc_header *wch2;
2946	/* Use this until we read the actual log header */
2947	int log_dev_bshift = ilog2(blksize);
2948	size_t used;
2949	daddr_t pbn;
2950
2951	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
2952	    ("wapbl_replay_start: vp=%p off=%"PRId64 " count=%zu blksize=%zu\n",
2953	    vp, off, count, blksize));
2954
2955	if (off < 0)
2956		return EINVAL;
2957
2958	if (blksize < DEV_BSIZE)
2959		return EINVAL;
2960	if (blksize % DEV_BSIZE)
2961		return EINVAL;
2962
2963#ifdef _KERNEL
2964#if 0
2965	/* XXX vp->v_size isn't reliably set for VBLK devices,
2966	 * especially root.  However, we might still want to verify
2967	 * that the full load is readable */
2968	if ((off + count) * blksize > vp->v_size)
2969		return EINVAL;
2970#endif
2971	if ((error = VOP_BMAP(vp, off, &devvp, &logpbn, 0)) != 0) {
2972		return error;
2973	}
2974#else /* ! _KERNEL */
2975	devvp = vp;
2976	logpbn = off;
2977#endif /* ! _KERNEL */
2978
2979	scratch = wapbl_alloc(MAXBSIZE);
2980
2981	pbn = logpbn;
2982#ifdef _KERNEL
2983	pbn = btodb(pbn << log_dev_bshift);
2984#endif
2985	error = wapbl_read(scratch, 2<<log_dev_bshift, devvp, pbn);
2986	if (error)
2987		goto errout;
2988
2989	wch = (struct wapbl_wc_header *)scratch;
2990	wch2 =
2991	    (struct wapbl_wc_header *)(scratch + (1<<log_dev_bshift));
2992	/* XXX verify checksums and magic numbers */
2993	if (wch->wc_type != WAPBL_WC_HEADER) {
2994		printf("Unrecognized wapbl magic: 0x%08x\n", wch->wc_type);
2995		error = EFTYPE;
2996		goto errout;
2997	}
2998
2999	if (wch2->wc_generation > wch->wc_generation)
3000		wch = wch2;
3001
3002	wr = wapbl_calloc(1, sizeof(*wr));
3003
3004	wr->wr_logvp = vp;
3005	wr->wr_devvp = devvp;
3006	wr->wr_logpbn = logpbn;
3007
3008	wr->wr_scratch = scratch;
3009
3010	wr->wr_log_dev_bshift = wch->wc_log_dev_bshift;
3011	wr->wr_fs_dev_bshift = wch->wc_fs_dev_bshift;
3012	wr->wr_circ_off = wch->wc_circ_off;
3013	wr->wr_circ_size = wch->wc_circ_size;
3014	wr->wr_generation = wch->wc_generation;
3015
3016	used = wapbl_space_used(wch->wc_circ_size, wch->wc_head, wch->wc_tail);
3017
3018	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
3019	    ("wapbl_replay: head=%"PRId64" tail=%"PRId64" off=%"PRId64
3020	    " len=%"PRId64" used=%zu\n",
3021	    wch->wc_head, wch->wc_tail, wch->wc_circ_off,
3022	    wch->wc_circ_size, used));
3023
3024	wapbl_blkhash_init(wr, (used >> wch->wc_fs_dev_bshift));
3025
3026	error = wapbl_replay_process(wr, wch->wc_head, wch->wc_tail);
3027	if (error) {
3028		wapbl_replay_stop(wr);
3029		wapbl_replay_free(wr);
3030		return error;
3031	}
3032
3033	*wrp = wr;
3034	return 0;
3035
3036 errout:
3037	wapbl_free(scratch, MAXBSIZE);
3038	return error;
3039}
3040
3041void
3042wapbl_replay_stop(struct wapbl_replay *wr)
3043{
3044
3045	if (!wapbl_replay_isopen(wr))
3046		return;
3047
3048	WAPBL_PRINTF(WAPBL_PRINT_REPLAY, ("wapbl_replay_stop called\n"));
3049
3050	wapbl_free(wr->wr_scratch, MAXBSIZE);
3051	wr->wr_scratch = NULL;
3052
3053	wr->wr_logvp = NULL;
3054
3055	wapbl_blkhash_clear(wr);
3056	wapbl_blkhash_free(wr);
3057}
3058
3059void
3060wapbl_replay_free(struct wapbl_replay *wr)
3061{
3062
3063	KDASSERT(!wapbl_replay_isopen(wr));
3064
3065	if (wr->wr_inodes)
3066		wapbl_free(wr->wr_inodes,
3067		    wr->wr_inodescnt * sizeof(wr->wr_inodes[0]));
3068	wapbl_free(wr, sizeof(*wr));
3069}
3070
3071#ifdef _KERNEL
3072int
3073wapbl_replay_isopen1(struct wapbl_replay *wr)
3074{
3075
3076	return wapbl_replay_isopen(wr);
3077}
3078#endif
3079
3080/*
3081 * calculate the disk address for the i'th block in the wc_blockblist
3082 * offset by j blocks of size blen.
3083 *
3084 * wc_daddr is always a kernel disk address in DEV_BSIZE units that
3085 * was written to the journal.
3086 *
3087 * The kernel needs that address plus the offset in DEV_BSIZE units.
3088 *
3089 * Userland needs that address plus the offset in blen units.
3090 *
3091 */
3092static daddr_t
3093wapbl_block_daddr(struct wapbl_wc_blocklist *wc, int i, int j, int blen)
3094{
3095	daddr_t pbn;
3096
3097#ifdef _KERNEL
3098	pbn = wc->wc_blocks[i].wc_daddr + btodb(j * blen);
3099#else
3100	pbn = dbtob(wc->wc_blocks[i].wc_daddr) / blen + j;
3101#endif
3102
3103	return pbn;
3104}
3105
3106static void
3107wapbl_replay_process_blocks(struct wapbl_replay *wr, off_t *offp)
3108{
3109	struct wapbl_wc_blocklist *wc =
3110	    (struct wapbl_wc_blocklist *)wr->wr_scratch;
3111	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3112	int i, j, n;
3113
3114	for (i = 0; i < wc->wc_blkcount; i++) {
3115		/*
3116		 * Enter each physical block into the hashtable independently.
3117		 */
3118		n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
3119		for (j = 0; j < n; j++) {
3120			wapbl_blkhash_ins(wr, wapbl_block_daddr(wc, i, j, fsblklen),
3121			    *offp);
3122			wapbl_circ_advance(wr, fsblklen, offp);
3123		}
3124	}
3125}
3126
3127static void
3128wapbl_replay_process_revocations(struct wapbl_replay *wr)
3129{
3130	struct wapbl_wc_blocklist *wc =
3131	    (struct wapbl_wc_blocklist *)wr->wr_scratch;
3132	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3133	int i, j, n;
3134
3135	for (i = 0; i < wc->wc_blkcount; i++) {
3136		/*
3137		 * Remove any blocks found from the hashtable.
3138		 */
3139		n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
3140		for (j = 0; j < n; j++)
3141			wapbl_blkhash_rem(wr, wapbl_block_daddr(wc, i, j, fsblklen));
3142	}
3143}
3144
3145static void
3146wapbl_replay_process_inodes(struct wapbl_replay *wr, off_t oldoff, off_t newoff)
3147{
3148	struct wapbl_wc_inodelist *wc =
3149	    (struct wapbl_wc_inodelist *)wr->wr_scratch;
3150	void *new_inodes;
3151	const size_t oldsize = wr->wr_inodescnt * sizeof(wr->wr_inodes[0]);
3152
3153	KASSERT(sizeof(wr->wr_inodes[0]) == sizeof(wc->wc_inodes[0]));
3154
3155	/*
3156	 * Keep track of where we found this so location won't be
3157	 * overwritten.
3158	 */
3159	if (wc->wc_clear) {
3160		wr->wr_inodestail = oldoff;
3161		wr->wr_inodescnt = 0;
3162		if (wr->wr_inodes != NULL) {
3163			wapbl_free(wr->wr_inodes, oldsize);
3164			wr->wr_inodes = NULL;
3165		}
3166	}
3167	wr->wr_inodeshead = newoff;
3168	if (wc->wc_inocnt == 0)
3169		return;
3170
3171	new_inodes = wapbl_alloc((wr->wr_inodescnt + wc->wc_inocnt) *
3172	    sizeof(wr->wr_inodes[0]));
3173	if (wr->wr_inodes != NULL) {
3174		memcpy(new_inodes, wr->wr_inodes, oldsize);
3175		wapbl_free(wr->wr_inodes, oldsize);
3176	}
3177	wr->wr_inodes = new_inodes;
3178	memcpy(&wr->wr_inodes[wr->wr_inodescnt], wc->wc_inodes,
3179	    wc->wc_inocnt * sizeof(wr->wr_inodes[0]));
3180	wr->wr_inodescnt += wc->wc_inocnt;
3181}
3182
3183static int
3184wapbl_replay_process(struct wapbl_replay *wr, off_t head, off_t tail)
3185{
3186	off_t off;
3187	int error;
3188
3189	int logblklen = 1 << wr->wr_log_dev_bshift;
3190
3191	wapbl_blkhash_clear(wr);
3192
3193	off = tail;
3194	while (off != head) {
3195		struct wapbl_wc_null *wcn;
3196		off_t saveoff = off;
3197		error = wapbl_circ_read(wr, wr->wr_scratch, logblklen, &off);
3198		if (error)
3199			goto errout;
3200		wcn = (struct wapbl_wc_null *)wr->wr_scratch;
3201		switch (wcn->wc_type) {
3202		case WAPBL_WC_BLOCKS:
3203			wapbl_replay_process_blocks(wr, &off);
3204			break;
3205
3206		case WAPBL_WC_REVOCATIONS:
3207			wapbl_replay_process_revocations(wr);
3208			break;
3209
3210		case WAPBL_WC_INODES:
3211			wapbl_replay_process_inodes(wr, saveoff, off);
3212			break;
3213
3214		default:
3215			printf("Unrecognized wapbl type: 0x%08x\n",
3216			       wcn->wc_type);
3217 			error = EFTYPE;
3218			goto errout;
3219		}
3220		wapbl_circ_advance(wr, wcn->wc_len, &saveoff);
3221		if (off != saveoff) {
3222			printf("wapbl_replay: corrupted records\n");
3223			error = EFTYPE;
3224			goto errout;
3225		}
3226	}
3227	return 0;
3228
3229 errout:
3230	wapbl_blkhash_clear(wr);
3231	return error;
3232}
3233
3234#if 0
3235int
3236wapbl_replay_verify(struct wapbl_replay *wr, struct vnode *fsdevvp)
3237{
3238	off_t off;
3239	int mismatchcnt = 0;
3240	int logblklen = 1 << wr->wr_log_dev_bshift;
3241	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3242	void *scratch1 = wapbl_alloc(MAXBSIZE);
3243	void *scratch2 = wapbl_alloc(MAXBSIZE);
3244	int error = 0;
3245
3246	KDASSERT(wapbl_replay_isopen(wr));
3247
3248	off = wch->wc_tail;
3249	while (off != wch->wc_head) {
3250		struct wapbl_wc_null *wcn;
3251#ifdef DEBUG
3252		off_t saveoff = off;
3253#endif
3254		error = wapbl_circ_read(wr, wr->wr_scratch, logblklen, &off);
3255		if (error)
3256			goto out;
3257		wcn = (struct wapbl_wc_null *)wr->wr_scratch;
3258		switch (wcn->wc_type) {
3259		case WAPBL_WC_BLOCKS:
3260			{
3261				struct wapbl_wc_blocklist *wc =
3262				    (struct wapbl_wc_blocklist *)wr->wr_scratch;
3263				int i;
3264				for (i = 0; i < wc->wc_blkcount; i++) {
3265					int foundcnt = 0;
3266					int dirtycnt = 0;
3267					int j, n;
3268					/*
3269					 * Check each physical block into the
3270					 * hashtable independently
3271					 */
3272					n = wc->wc_blocks[i].wc_dlen >>
3273					    wch->wc_fs_dev_bshift;
3274					for (j = 0; j < n; j++) {
3275						struct wapbl_blk *wb =
3276						   wapbl_blkhash_get(wr,
3277						   wapbl_block_daddr(wc, i, j, fsblklen));
3278						if (wb && (wb->wb_off == off)) {
3279							foundcnt++;
3280							error =
3281							    wapbl_circ_read(wr,
3282							    scratch1, fsblklen,
3283							    &off);
3284							if (error)
3285								goto out;
3286							error =
3287							    wapbl_read(scratch2,
3288							    fsblklen, fsdevvp,
3289							    wb->wb_blk);
3290							if (error)
3291								goto out;
3292							if (memcmp(scratch1,
3293								   scratch2,
3294								   fsblklen)) {
3295								printf(
3296		"wapbl_verify: mismatch block %"PRId64" at off %"PRIdMAX"\n",
3297		wb->wb_blk, (intmax_t)off);
3298								dirtycnt++;
3299								mismatchcnt++;
3300							}
3301						} else {
3302							wapbl_circ_advance(wr,
3303							    fsblklen, &off);
3304						}
3305					}
3306#if 0
3307					/*
3308					 * If all of the blocks in an entry
3309					 * are clean, then remove all of its
3310					 * blocks from the hashtable since they
3311					 * never will need replay.
3312					 */
3313					if ((foundcnt != 0) &&
3314					    (dirtycnt == 0)) {
3315						off = saveoff;
3316						wapbl_circ_advance(wr,
3317						    logblklen, &off);
3318						for (j = 0; j < n; j++) {
3319							struct wapbl_blk *wb =
3320							   wapbl_blkhash_get(wr,
3321							   wapbl_block_daddr(wc, i, j, fsblklen));
3322							if (wb &&
3323							  (wb->wb_off == off)) {
3324								wapbl_blkhash_rem(wr, wb->wb_blk);
3325							}
3326							wapbl_circ_advance(wr,
3327							    fsblklen, &off);
3328						}
3329					}
3330#endif
3331				}
3332			}
3333			break;
3334		case WAPBL_WC_REVOCATIONS:
3335		case WAPBL_WC_INODES:
3336			break;
3337		default:
3338			KASSERT(0);
3339		}
3340#ifdef DEBUG
3341		wapbl_circ_advance(wr, wcn->wc_len, &saveoff);
3342		KASSERT(off == saveoff);
3343#endif
3344	}
3345 out:
3346	wapbl_free(scratch1, MAXBSIZE);
3347	wapbl_free(scratch2, MAXBSIZE);
3348	if (!error && mismatchcnt)
3349		error = EFTYPE;
3350	return error;
3351}
3352#endif
3353
3354int
3355wapbl_replay_write(struct wapbl_replay *wr, struct vnode *fsdevvp)
3356{
3357	struct wapbl_blk *wb;
3358	size_t i;
3359	off_t off;
3360	void *scratch;
3361	int error = 0;
3362	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3363
3364	KDASSERT(wapbl_replay_isopen(wr));
3365
3366	scratch = wapbl_alloc(MAXBSIZE);
3367
3368	for (i = 0; i <= wr->wr_blkhashmask; ++i) {
3369		LIST_FOREACH(wb, &wr->wr_blkhash[i], wb_hash) {
3370			off = wb->wb_off;
3371			error = wapbl_circ_read(wr, scratch, fsblklen, &off);
3372			if (error)
3373				break;
3374			error = wapbl_write(scratch, fsblklen, fsdevvp,
3375			    wb->wb_blk);
3376			if (error)
3377				break;
3378		}
3379	}
3380
3381	wapbl_free(scratch, MAXBSIZE);
3382	return error;
3383}
3384
3385int
3386wapbl_replay_can_read(struct wapbl_replay *wr, daddr_t blk, long len)
3387{
3388	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3389
3390	KDASSERT(wapbl_replay_isopen(wr));
3391	KASSERT((len % fsblklen) == 0);
3392
3393	while (len != 0) {
3394		struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
3395		if (wb)
3396			return 1;
3397		len -= fsblklen;
3398	}
3399	return 0;
3400}
3401
3402int
3403wapbl_replay_read(struct wapbl_replay *wr, void *data, daddr_t blk, long len)
3404{
3405	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3406
3407	KDASSERT(wapbl_replay_isopen(wr));
3408
3409	KASSERT((len % fsblklen) == 0);
3410
3411	while (len != 0) {
3412		struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
3413		if (wb) {
3414			off_t off = wb->wb_off;
3415			int error;
3416			error = wapbl_circ_read(wr, data, fsblklen, &off);
3417			if (error)
3418				return error;
3419		}
3420		data = (uint8_t *)data + fsblklen;
3421		len -= fsblklen;
3422		blk++;
3423	}
3424	return 0;
3425}
3426
3427#ifdef _KERNEL
3428
3429MODULE(MODULE_CLASS_VFS, wapbl, NULL);
3430
3431static int
3432wapbl_modcmd(modcmd_t cmd, void *arg)
3433{
3434
3435	switch (cmd) {
3436	case MODULE_CMD_INIT:
3437		wapbl_init();
3438		return 0;
3439	case MODULE_CMD_FINI:
3440		return wapbl_fini();
3441	default:
3442		return ENOTTY;
3443	}
3444}
3445#endif /* _KERNEL */
3446