1/*-
2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3 *
4 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
5 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
6 * Copyright (c) 2009-2010, The FreeBSD Foundation
7 * All rights reserved.
8 *
9 * Portions of this software were developed at the Centre for Advanced
10 * Internet Architectures, Swinburne University of Technology, Melbourne,
11 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 * 1. Redistributions of source code must retain the above copyright
17 *    notice unmodified, this list of conditions, and the following
18 *    disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright
20 *    notice, this list of conditions and the following disclaimer in the
21 *    documentation and/or other materials provided with the distribution.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
24 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
25 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
26 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
28 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
32 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD$");
37
38#include "opt_mac.h"
39
40#include <sys/param.h>
41#include <sys/systm.h>
42#include <sys/kernel.h>
43#include <sys/kthread.h>
44#include <sys/lock.h>
45#include <sys/mount.h>
46#include <sys/mutex.h>
47#include <sys/namei.h>
48#include <sys/proc.h>
49#include <sys/vnode.h>
50#include <sys/alq.h>
51#include <sys/malloc.h>
52#include <sys/unistd.h>
53#include <sys/fcntl.h>
54#include <sys/eventhandler.h>
55
56#include <security/mac/mac_framework.h>
57
58/* Async. Logging Queue */
59struct alq {
60	char	*aq_entbuf;		/* Buffer for stored entries */
61	int	aq_entmax;		/* Max entries */
62	int	aq_entlen;		/* Entry length */
63	int	aq_freebytes;		/* Bytes available in buffer */
64	int	aq_buflen;		/* Total length of our buffer */
65	int	aq_writehead;		/* Location for next write */
66	int	aq_writetail;		/* Flush starts at this location */
67	int	aq_wrapearly;		/* # bytes left blank at end of buf */
68	int	aq_flags;		/* Queue flags */
69	int	aq_waiters;		/* Num threads waiting for resources
70					 * NB: Used as a wait channel so must
71					 * not be first field in the alq struct
72					 */
73	struct	ale	aq_getpost;	/* ALE for use by get/post */
74	struct mtx	aq_mtx;		/* Queue lock */
75	struct vnode	*aq_vp;		/* Open vnode handle */
76	struct ucred	*aq_cred;	/* Credentials of the opening thread */
77	LIST_ENTRY(alq)	aq_act;		/* List of active queues */
78	LIST_ENTRY(alq)	aq_link;	/* List of all queues */
79};
80
81#define	AQ_WANTED	0x0001		/* Wakeup sleeper when io is done */
82#define	AQ_ACTIVE	0x0002		/* on the active list */
83#define	AQ_FLUSHING	0x0004		/* doing IO */
84#define	AQ_SHUTDOWN	0x0008		/* Queue no longer valid */
85#define	AQ_ORDERED	0x0010		/* Queue enforces ordered writes */
86#define	AQ_LEGACY	0x0020		/* Legacy queue (fixed length writes) */
87
88#define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
89#define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
90
91#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
92
93static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
94
95/*
96 * The ald_mtx protects the ald_queues list and the ald_active list.
97 */
98static struct mtx ald_mtx;
99static LIST_HEAD(, alq) ald_queues;
100static LIST_HEAD(, alq) ald_active;
101static int ald_shutingdown = 0;
102struct thread *ald_thread;
103static struct proc *ald_proc;
104static eventhandler_tag alq_eventhandler_tag = NULL;
105
106#define	ALD_LOCK()	mtx_lock(&ald_mtx)
107#define	ALD_UNLOCK()	mtx_unlock(&ald_mtx)
108
109/* Daemon functions */
110static int ald_add(struct alq *);
111static int ald_rem(struct alq *);
112static void ald_startup(void *);
113static void ald_daemon(void);
114static void ald_shutdown(void *, int);
115static void ald_activate(struct alq *);
116static void ald_deactivate(struct alq *);
117
118/* Internal queue functions */
119static void alq_shutdown(struct alq *);
120static void alq_destroy(struct alq *);
121static int alq_doio(struct alq *);
122
123
124/*
125 * Add a new queue to the global list.  Fail if we're shutting down.
126 */
127static int
128ald_add(struct alq *alq)
129{
130	int error;
131
132	error = 0;
133
134	ALD_LOCK();
135	if (ald_shutingdown) {
136		error = EBUSY;
137		goto done;
138	}
139	LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
140done:
141	ALD_UNLOCK();
142	return (error);
143}
144
145/*
146 * Remove a queue from the global list unless we're shutting down.  If so,
147 * the ald will take care of cleaning up it's resources.
148 */
149static int
150ald_rem(struct alq *alq)
151{
152	int error;
153
154	error = 0;
155
156	ALD_LOCK();
157	if (ald_shutingdown) {
158		error = EBUSY;
159		goto done;
160	}
161	LIST_REMOVE(alq, aq_link);
162done:
163	ALD_UNLOCK();
164	return (error);
165}
166
167/*
168 * Put a queue on the active list.  This will schedule it for writing.
169 */
170static void
171ald_activate(struct alq *alq)
172{
173	LIST_INSERT_HEAD(&ald_active, alq, aq_act);
174	wakeup(&ald_active);
175}
176
177static void
178ald_deactivate(struct alq *alq)
179{
180	LIST_REMOVE(alq, aq_act);
181	alq->aq_flags &= ~AQ_ACTIVE;
182}
183
184static void
185ald_startup(void *unused)
186{
187	mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
188	LIST_INIT(&ald_queues);
189	LIST_INIT(&ald_active);
190}
191
192static void
193ald_daemon(void)
194{
195	int needwakeup;
196	struct alq *alq;
197
198	ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
199
200	alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
201	    ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
202
203	ALD_LOCK();
204
205	for (;;) {
206		while ((alq = LIST_FIRST(&ald_active)) == NULL &&
207		    !ald_shutingdown)
208			mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
209
210		/* Don't shutdown until all active ALQs are flushed. */
211		if (ald_shutingdown && alq == NULL) {
212			ALD_UNLOCK();
213			break;
214		}
215
216		ALQ_LOCK(alq);
217		ald_deactivate(alq);
218		ALD_UNLOCK();
219		needwakeup = alq_doio(alq);
220		ALQ_UNLOCK(alq);
221		if (needwakeup)
222			wakeup_one(alq);
223		ALD_LOCK();
224	}
225
226	kproc_exit(0);
227}
228
229static void
230ald_shutdown(void *arg, int howto)
231{
232	struct alq *alq;
233
234	ALD_LOCK();
235
236	/* Ensure no new queues can be created. */
237	ald_shutingdown = 1;
238
239	/* Shutdown all ALQs prior to terminating the ald_daemon. */
240	while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
241		LIST_REMOVE(alq, aq_link);
242		ALD_UNLOCK();
243		alq_shutdown(alq);
244		ALD_LOCK();
245	}
246
247	/* At this point, all ALQs are flushed and shutdown. */
248
249	/*
250	 * Wake ald_daemon so that it exits. It won't be able to do
251	 * anything until we mtx_sleep because we hold the ald_mtx.
252	 */
253	wakeup(&ald_active);
254
255	/* Wait for ald_daemon to exit. */
256	mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
257
258	ALD_UNLOCK();
259}
260
261static void
262alq_shutdown(struct alq *alq)
263{
264	ALQ_LOCK(alq);
265
266	/* Stop any new writers. */
267	alq->aq_flags |= AQ_SHUTDOWN;
268
269	/*
270	 * If the ALQ isn't active but has unwritten data (possible if
271	 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
272	 * ALQ here so that the pending data gets flushed by the ald_daemon.
273	 */
274	if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
275		alq->aq_flags |= AQ_ACTIVE;
276		ALQ_UNLOCK(alq);
277		ALD_LOCK();
278		ald_activate(alq);
279		ALD_UNLOCK();
280		ALQ_LOCK(alq);
281	}
282
283	/* Drain IO */
284	while (alq->aq_flags & AQ_ACTIVE) {
285		alq->aq_flags |= AQ_WANTED;
286		msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
287	}
288	ALQ_UNLOCK(alq);
289
290	vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
291	    curthread);
292	crfree(alq->aq_cred);
293}
294
295void
296alq_destroy(struct alq *alq)
297{
298	/* Drain all pending IO. */
299	alq_shutdown(alq);
300
301	mtx_destroy(&alq->aq_mtx);
302	free(alq->aq_entbuf, M_ALD);
303	free(alq, M_ALD);
304}
305
306/*
307 * Flush all pending data to disk.  This operation will block.
308 */
309static int
310alq_doio(struct alq *alq)
311{
312	struct thread *td;
313	struct mount *mp;
314	struct vnode *vp;
315	struct uio auio;
316	struct iovec aiov[2];
317	int totlen;
318	int iov;
319	int wrapearly;
320
321	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
322
323	vp = alq->aq_vp;
324	td = curthread;
325	totlen = 0;
326	iov = 1;
327	wrapearly = alq->aq_wrapearly;
328
329	bzero(&aiov, sizeof(aiov));
330	bzero(&auio, sizeof(auio));
331
332	/* Start the write from the location of our buffer tail pointer. */
333	aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
334
335	if (alq->aq_writetail < alq->aq_writehead) {
336		/* Buffer not wrapped. */
337		totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
338	} else if (alq->aq_writehead == 0) {
339		/* Buffer not wrapped (special case to avoid an empty iov). */
340		totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
341		    wrapearly;
342	} else {
343		/*
344		 * Buffer wrapped, requires 2 aiov entries:
345		 * - first is from writetail to end of buffer
346		 * - second is from start of buffer to writehead
347		 */
348		aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
349		    wrapearly;
350		iov++;
351		aiov[1].iov_base = alq->aq_entbuf;
352		aiov[1].iov_len =  alq->aq_writehead;
353		totlen = aiov[0].iov_len + aiov[1].iov_len;
354	}
355
356	alq->aq_flags |= AQ_FLUSHING;
357	ALQ_UNLOCK(alq);
358
359	auio.uio_iov = &aiov[0];
360	auio.uio_offset = 0;
361	auio.uio_segflg = UIO_SYSSPACE;
362	auio.uio_rw = UIO_WRITE;
363	auio.uio_iovcnt = iov;
364	auio.uio_resid = totlen;
365	auio.uio_td = td;
366
367	/*
368	 * Do all of the junk required to write now.
369	 */
370	vn_start_write(vp, &mp, V_WAIT);
371	vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
372	/*
373	 * XXX: VOP_WRITE error checks are ignored.
374	 */
375#ifdef MAC
376	if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
377#endif
378		VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
379	VOP_UNLOCK(vp, 0);
380	vn_finished_write(mp);
381
382	ALQ_LOCK(alq);
383	alq->aq_flags &= ~AQ_FLUSHING;
384
385	/* Adjust writetail as required, taking into account wrapping. */
386	alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
387	    alq->aq_buflen;
388	alq->aq_freebytes += totlen + wrapearly;
389
390	/*
391	 * If we just flushed part of the buffer which wrapped, reset the
392	 * wrapearly indicator.
393	 */
394	if (wrapearly)
395		alq->aq_wrapearly = 0;
396
397	/*
398	 * If we just flushed the buffer completely, reset indexes to 0 to
399	 * minimise buffer wraps.
400	 * This is also required to ensure alq_getn() can't wedge itself.
401	 */
402	if (!HAS_PENDING_DATA(alq))
403		alq->aq_writehead = alq->aq_writetail = 0;
404
405	KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
406	    ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
407
408	if (alq->aq_flags & AQ_WANTED) {
409		alq->aq_flags &= ~AQ_WANTED;
410		return (1);
411	}
412
413	return(0);
414}
415
416static struct kproc_desc ald_kp = {
417        "ALQ Daemon",
418        ald_daemon,
419        &ald_proc
420};
421
422SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
423SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
424
425
426/* User visible queue functions */
427
428/*
429 * Create the queue data structure, allocate the buffer, and open the file.
430 */
431
432int
433alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
434    int size, int flags)
435{
436	struct thread *td;
437	struct nameidata nd;
438	struct alq *alq;
439	int oflags;
440	int error;
441
442	KASSERT((size > 0), ("%s: size <= 0", __func__));
443
444	*alqp = NULL;
445	td = curthread;
446
447	NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td);
448	oflags = FWRITE | O_NOFOLLOW | O_CREAT;
449
450	error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
451	if (error)
452		return (error);
453
454	NDFREE(&nd, NDF_ONLY_PNBUF);
455	/* We just unlock so we hold a reference */
456	VOP_UNLOCK(nd.ni_vp, 0);
457
458	alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
459	alq->aq_vp = nd.ni_vp;
460	alq->aq_cred = crhold(cred);
461
462	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
463
464	alq->aq_buflen = size;
465	alq->aq_entmax = 0;
466	alq->aq_entlen = 0;
467
468	alq->aq_freebytes = alq->aq_buflen;
469	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
470	alq->aq_writehead = alq->aq_writetail = 0;
471	if (flags & ALQ_ORDERED)
472		alq->aq_flags |= AQ_ORDERED;
473
474	if ((error = ald_add(alq)) != 0) {
475		alq_destroy(alq);
476		return (error);
477	}
478
479	*alqp = alq;
480
481	return (0);
482}
483
484int
485alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
486    int size, int count)
487{
488	int ret;
489
490	KASSERT((count >= 0), ("%s: count < 0", __func__));
491
492	if (count > 0) {
493		if ((ret = alq_open_flags(alqp, file, cred, cmode,
494		    size*count, 0)) == 0) {
495			(*alqp)->aq_flags |= AQ_LEGACY;
496			(*alqp)->aq_entmax = count;
497			(*alqp)->aq_entlen = size;
498		}
499	} else
500		ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
501
502	return (ret);
503}
504
505
506/*
507 * Copy a new entry into the queue.  If the operation would block either
508 * wait or return an error depending on the value of waitok.
509 */
510int
511alq_writen(struct alq *alq, void *data, int len, int flags)
512{
513	int activate, copy, ret;
514	void *waitchan;
515
516	KASSERT((len > 0 && len <= alq->aq_buflen),
517	    ("%s: len <= 0 || len > aq_buflen", __func__));
518
519	activate = ret = 0;
520	copy = len;
521	waitchan = NULL;
522
523	ALQ_LOCK(alq);
524
525	/*
526	 * Fail to perform the write and return EWOULDBLOCK if:
527	 * - The message is larger than our underlying buffer.
528	 * - The ALQ is being shutdown.
529	 * - There is insufficient free space in our underlying buffer
530	 *   to accept the message and the user can't wait for space.
531	 * - There is insufficient free space in our underlying buffer
532	 *   to accept the message and the alq is inactive due to prior
533	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
534	 */
535	if (len > alq->aq_buflen ||
536	    alq->aq_flags & AQ_SHUTDOWN ||
537	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
538	    HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
539		ALQ_UNLOCK(alq);
540		return (EWOULDBLOCK);
541	}
542
543	/*
544	 * If we want ordered writes and there is already at least one thread
545	 * waiting for resources to become available, sleep until we're woken.
546	 */
547	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
548		KASSERT(!(flags & ALQ_NOWAIT),
549		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
550		alq->aq_waiters++;
551		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
552		alq->aq_waiters--;
553	}
554
555	/*
556	 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
557	 * enter while loop and sleep until we have enough free bytes (former)
558	 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
559	 * be in this loop. Otherwise, multiple threads may be sleeping here
560	 * competing for ALQ resources.
561	 */
562	while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
563		KASSERT(!(flags & ALQ_NOWAIT),
564		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
565		alq->aq_flags |= AQ_WANTED;
566		alq->aq_waiters++;
567		if (waitchan)
568			wakeup(waitchan);
569		msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
570		alq->aq_waiters--;
571
572		/*
573		 * If we're the first thread to wake after an AQ_WANTED wakeup
574		 * but there isn't enough free space for us, we're going to loop
575		 * and sleep again. If there are other threads waiting in this
576		 * loop, schedule a wakeup so that they can see if the space
577		 * they require is available.
578		 */
579		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
580		    alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
581			waitchan = alq;
582		else
583			waitchan = NULL;
584	}
585
586	/*
587	 * If there are waiters, we need to signal the waiting threads after we
588	 * complete our work. The alq ptr is used as a wait channel for threads
589	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
590	 * are not allowed to concurrently compete for resources in the above
591	 * while loop, so we use a different wait channel in this case.
592	 */
593	if (alq->aq_waiters > 0) {
594		if (alq->aq_flags & AQ_ORDERED)
595			waitchan = &alq->aq_waiters;
596		else
597			waitchan = alq;
598	} else
599		waitchan = NULL;
600
601	/* Bail if we're shutting down. */
602	if (alq->aq_flags & AQ_SHUTDOWN) {
603		ret = EWOULDBLOCK;
604		goto unlock;
605	}
606
607	/*
608	 * If we need to wrap the buffer to accommodate the write,
609	 * we'll need 2 calls to bcopy.
610	 */
611	if ((alq->aq_buflen - alq->aq_writehead) < len)
612		copy = alq->aq_buflen - alq->aq_writehead;
613
614	/* Copy message (or part thereof if wrap required) to the buffer. */
615	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
616	alq->aq_writehead += copy;
617
618	if (alq->aq_writehead >= alq->aq_buflen) {
619		KASSERT((alq->aq_writehead == alq->aq_buflen),
620		    ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
621		    __func__,
622		    alq->aq_writehead,
623		    alq->aq_buflen));
624		alq->aq_writehead = 0;
625	}
626
627	if (copy != len) {
628		/*
629		 * Wrap the buffer by copying the remainder of our message
630		 * to the start of the buffer and resetting aq_writehead.
631		 */
632		bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
633		alq->aq_writehead = len - copy;
634	}
635
636	KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
637	    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
638
639	alq->aq_freebytes -= len;
640
641	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
642		alq->aq_flags |= AQ_ACTIVE;
643		activate = 1;
644	}
645
646	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
647
648unlock:
649	ALQ_UNLOCK(alq);
650
651	if (activate) {
652		ALD_LOCK();
653		ald_activate(alq);
654		ALD_UNLOCK();
655	}
656
657	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
658	if (waitchan != NULL)
659		wakeup_one(waitchan);
660
661	return (ret);
662}
663
664int
665alq_write(struct alq *alq, void *data, int flags)
666{
667	/* Should only be called in fixed length message (legacy) mode. */
668	KASSERT((alq->aq_flags & AQ_LEGACY),
669	    ("%s: fixed length write on variable length queue", __func__));
670	return (alq_writen(alq, data, alq->aq_entlen, flags));
671}
672
673/*
674 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
675 */
676struct ale *
677alq_getn(struct alq *alq, int len, int flags)
678{
679	int contigbytes;
680	void *waitchan;
681
682	KASSERT((len > 0 && len <= alq->aq_buflen),
683	    ("%s: len <= 0 || len > alq->aq_buflen", __func__));
684
685	waitchan = NULL;
686
687	ALQ_LOCK(alq);
688
689	/*
690	 * Determine the number of free contiguous bytes.
691	 * We ensure elsewhere that if aq_writehead == aq_writetail because
692	 * the buffer is empty, they will both be set to 0 and therefore
693	 * aq_freebytes == aq_buflen and is fully contiguous.
694	 * If they are equal and the buffer is not empty, aq_freebytes will
695	 * be 0 indicating the buffer is full.
696	 */
697	if (alq->aq_writehead <= alq->aq_writetail)
698		contigbytes = alq->aq_freebytes;
699	else {
700		contigbytes = alq->aq_buflen - alq->aq_writehead;
701
702		if (contigbytes < len) {
703			/*
704			 * Insufficient space at end of buffer to handle a
705			 * contiguous write. Wrap early if there's space at
706			 * the beginning. This will leave a hole at the end
707			 * of the buffer which we will have to skip over when
708			 * flushing the buffer to disk.
709			 */
710			if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
711				/* Keep track of # bytes left blank. */
712				alq->aq_wrapearly = contigbytes;
713				/* Do the wrap and adjust counters. */
714				contigbytes = alq->aq_freebytes =
715				    alq->aq_writetail;
716				alq->aq_writehead = 0;
717			}
718		}
719	}
720
721	/*
722	 * Return a NULL ALE if:
723	 * - The message is larger than our underlying buffer.
724	 * - The ALQ is being shutdown.
725	 * - There is insufficient free space in our underlying buffer
726	 *   to accept the message and the user can't wait for space.
727	 * - There is insufficient free space in our underlying buffer
728	 *   to accept the message and the alq is inactive due to prior
729	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
730	 */
731	if (len > alq->aq_buflen ||
732	    alq->aq_flags & AQ_SHUTDOWN ||
733	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
734	    HAS_PENDING_DATA(alq))) && contigbytes < len)) {
735		ALQ_UNLOCK(alq);
736		return (NULL);
737	}
738
739	/*
740	 * If we want ordered writes and there is already at least one thread
741	 * waiting for resources to become available, sleep until we're woken.
742	 */
743	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
744		KASSERT(!(flags & ALQ_NOWAIT),
745		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
746		alq->aq_waiters++;
747		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
748		alq->aq_waiters--;
749	}
750
751	/*
752	 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
753	 * while loop and sleep until we have enough contiguous free bytes
754	 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
755	 * time will be in this loop. Otherwise, multiple threads may be
756	 * sleeping here competing for ALQ resources.
757	 */
758	while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
759		KASSERT(!(flags & ALQ_NOWAIT),
760		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
761		alq->aq_flags |= AQ_WANTED;
762		alq->aq_waiters++;
763		if (waitchan)
764			wakeup(waitchan);
765		msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
766		alq->aq_waiters--;
767
768		if (alq->aq_writehead <= alq->aq_writetail)
769			contigbytes = alq->aq_freebytes;
770		else
771			contigbytes = alq->aq_buflen - alq->aq_writehead;
772
773		/*
774		 * If we're the first thread to wake after an AQ_WANTED wakeup
775		 * but there isn't enough free space for us, we're going to loop
776		 * and sleep again. If there are other threads waiting in this
777		 * loop, schedule a wakeup so that they can see if the space
778		 * they require is available.
779		 */
780		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
781		    contigbytes < len && !(alq->aq_flags & AQ_WANTED))
782			waitchan = alq;
783		else
784			waitchan = NULL;
785	}
786
787	/*
788	 * If there are waiters, we need to signal the waiting threads after we
789	 * complete our work. The alq ptr is used as a wait channel for threads
790	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
791	 * are not allowed to concurrently compete for resources in the above
792	 * while loop, so we use a different wait channel in this case.
793	 */
794	if (alq->aq_waiters > 0) {
795		if (alq->aq_flags & AQ_ORDERED)
796			waitchan = &alq->aq_waiters;
797		else
798			waitchan = alq;
799	} else
800		waitchan = NULL;
801
802	/* Bail if we're shutting down. */
803	if (alq->aq_flags & AQ_SHUTDOWN) {
804		ALQ_UNLOCK(alq);
805		if (waitchan != NULL)
806			wakeup_one(waitchan);
807		return (NULL);
808	}
809
810	/*
811	 * If we are here, we have a contiguous number of bytes >= len
812	 * available in our buffer starting at aq_writehead.
813	 */
814	alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
815	alq->aq_getpost.ae_bytesused = len;
816
817	return (&alq->aq_getpost);
818}
819
820struct ale *
821alq_get(struct alq *alq, int flags)
822{
823	/* Should only be called in fixed length message (legacy) mode. */
824	KASSERT((alq->aq_flags & AQ_LEGACY),
825	    ("%s: fixed length get on variable length queue", __func__));
826	return (alq_getn(alq, alq->aq_entlen, flags));
827}
828
829void
830alq_post_flags(struct alq *alq, struct ale *ale, int flags)
831{
832	int activate;
833	void *waitchan;
834
835	activate = 0;
836
837	if (ale->ae_bytesused > 0) {
838		if (!(alq->aq_flags & AQ_ACTIVE) &&
839		    !(flags & ALQ_NOACTIVATE)) {
840			alq->aq_flags |= AQ_ACTIVE;
841			activate = 1;
842		}
843
844		alq->aq_writehead += ale->ae_bytesused;
845		alq->aq_freebytes -= ale->ae_bytesused;
846
847		/* Wrap aq_writehead if we filled to the end of the buffer. */
848		if (alq->aq_writehead == alq->aq_buflen)
849			alq->aq_writehead = 0;
850
851		KASSERT((alq->aq_writehead >= 0 &&
852		    alq->aq_writehead < alq->aq_buflen),
853		    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
854		    __func__));
855
856		KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
857	}
858
859	/*
860	 * If there are waiters, we need to signal the waiting threads after we
861	 * complete our work. The alq ptr is used as a wait channel for threads
862	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
863	 * are not allowed to concurrently compete for resources in the
864	 * alq_getn() while loop, so we use a different wait channel in this case.
865	 */
866	if (alq->aq_waiters > 0) {
867		if (alq->aq_flags & AQ_ORDERED)
868			waitchan = &alq->aq_waiters;
869		else
870			waitchan = alq;
871	} else
872		waitchan = NULL;
873
874	ALQ_UNLOCK(alq);
875
876	if (activate) {
877		ALD_LOCK();
878		ald_activate(alq);
879		ALD_UNLOCK();
880	}
881
882	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
883	if (waitchan != NULL)
884		wakeup_one(waitchan);
885}
886
887void
888alq_flush(struct alq *alq)
889{
890	int needwakeup = 0;
891
892	ALD_LOCK();
893	ALQ_LOCK(alq);
894
895	/*
896	 * Pull the lever iff there is data to flush and we're
897	 * not already in the middle of a flush operation.
898	 */
899	if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
900		if (alq->aq_flags & AQ_ACTIVE)
901			ald_deactivate(alq);
902
903		ALD_UNLOCK();
904		needwakeup = alq_doio(alq);
905	} else
906		ALD_UNLOCK();
907
908	ALQ_UNLOCK(alq);
909
910	if (needwakeup)
911		wakeup_one(alq);
912}
913
914/*
915 * Flush remaining data, close the file and free all resources.
916 */
917void
918alq_close(struct alq *alq)
919{
920	/* Only flush and destroy alq if not already shutting down. */
921	if (ald_rem(alq) == 0)
922		alq_destroy(alq);
923}
924
925static int
926alq_load_handler(module_t mod, int what, void *arg)
927{
928	int ret;
929
930	ret = 0;
931
932	switch (what) {
933	case MOD_LOAD:
934	case MOD_SHUTDOWN:
935		break;
936
937	case MOD_QUIESCE:
938		ALD_LOCK();
939		/* Only allow unload if there are no open queues. */
940		if (LIST_FIRST(&ald_queues) == NULL) {
941			ald_shutingdown = 1;
942			ALD_UNLOCK();
943			EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
944			    alq_eventhandler_tag);
945			ald_shutdown(NULL, 0);
946			mtx_destroy(&ald_mtx);
947		} else {
948			ALD_UNLOCK();
949			ret = EBUSY;
950		}
951		break;
952
953	case MOD_UNLOAD:
954		/* If MOD_QUIESCE failed we must fail here too. */
955		if (ald_shutingdown == 0)
956			ret = EBUSY;
957		break;
958
959	default:
960		ret = EINVAL;
961		break;
962	}
963
964	return (ret);
965}
966
967static moduledata_t alq_mod =
968{
969	"alq",
970	alq_load_handler,
971	NULL
972};
973
974DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY);
975MODULE_VERSION(alq, 1);
976