kern_alq.c revision 207223
1/*-
2 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
3 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
4 * Copyright (c) 2009-2010, The FreeBSD Foundation
5 * All rights reserved.
6 *
7 * Portions of this software were developed at the Centre for Advanced
8 * Internet Architectures, Swinburne University of Technology, Melbourne,
9 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions
13 * are met:
14 * 1. Redistributions of source code must retain the above copyright
15 *    notice unmodified, this list of conditions, and the following
16 *    disclaimer.
17 * 2. Redistributions in binary form must reproduce the above copyright
18 *    notice, this list of conditions and the following disclaimer in the
19 *    documentation and/or other materials provided with the distribution.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
22 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
23 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
24 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
25 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
26 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
30 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33#include <sys/cdefs.h>
34__FBSDID("$FreeBSD: head/sys/kern/kern_alq.c 207223 2010-04-26 13:48:22Z lstewart $");
35
36#include "opt_mac.h"
37
38#include <sys/param.h>
39#include <sys/systm.h>
40#include <sys/kernel.h>
41#include <sys/kthread.h>
42#include <sys/lock.h>
43#include <sys/mount.h>
44#include <sys/mutex.h>
45#include <sys/namei.h>
46#include <sys/proc.h>
47#include <sys/vnode.h>
48#include <sys/alq.h>
49#include <sys/malloc.h>
50#include <sys/unistd.h>
51#include <sys/fcntl.h>
52#include <sys/eventhandler.h>
53
54#include <security/mac/mac_framework.h>
55
56/* Async. Logging Queue */
57struct alq {
58	char	*aq_entbuf;		/* Buffer for stored entries */
59	int	aq_entmax;		/* Max entries */
60	int	aq_entlen;		/* Entry length */
61	int	aq_freebytes;		/* Bytes available in buffer */
62	int	aq_buflen;		/* Total length of our buffer */
63	int	aq_writehead;		/* Location for next write */
64	int	aq_writetail;		/* Flush starts at this location */
65	int	aq_wrapearly;		/* # bytes left blank at end of buf */
66	int	aq_flags;		/* Queue flags */
67	int	aq_waiters;		/* Num threads waiting for resources
68					 * NB: Used as a wait channel so must
69					 * not be first field in the alq struct
70					 */
71	struct	ale	aq_getpost;	/* ALE for use by get/post */
72	struct mtx	aq_mtx;		/* Queue lock */
73	struct vnode	*aq_vp;		/* Open vnode handle */
74	struct ucred	*aq_cred;	/* Credentials of the opening thread */
75	LIST_ENTRY(alq)	aq_act;		/* List of active queues */
76	LIST_ENTRY(alq)	aq_link;	/* List of all queues */
77};
78
79#define	AQ_WANTED	0x0001		/* Wakeup sleeper when io is done */
80#define	AQ_ACTIVE	0x0002		/* on the active list */
81#define	AQ_FLUSHING	0x0004		/* doing IO */
82#define	AQ_SHUTDOWN	0x0008		/* Queue no longer valid */
83#define	AQ_ORDERED	0x0010		/* Queue enforces ordered writes */
84#define	AQ_LEGACY	0x0020		/* Legacy queue (fixed length writes) */
85
86#define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
87#define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
88
89#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
90
91static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
92
93/*
94 * The ald_mtx protects the ald_queues list and the ald_active list.
95 */
96static struct mtx ald_mtx;
97static LIST_HEAD(, alq) ald_queues;
98static LIST_HEAD(, alq) ald_active;
99static int ald_shutingdown = 0;
100struct thread *ald_thread;
101static struct proc *ald_proc;
102
103#define	ALD_LOCK()	mtx_lock(&ald_mtx)
104#define	ALD_UNLOCK()	mtx_unlock(&ald_mtx)
105
106/* Daemon functions */
107static int ald_add(struct alq *);
108static int ald_rem(struct alq *);
109static void ald_startup(void *);
110static void ald_daemon(void);
111static void ald_shutdown(void *, int);
112static void ald_activate(struct alq *);
113static void ald_deactivate(struct alq *);
114
115/* Internal queue functions */
116static void alq_shutdown(struct alq *);
117static void alq_destroy(struct alq *);
118static int alq_doio(struct alq *);
119
120
121/*
122 * Add a new queue to the global list.  Fail if we're shutting down.
123 */
124static int
125ald_add(struct alq *alq)
126{
127	int error;
128
129	error = 0;
130
131	ALD_LOCK();
132	if (ald_shutingdown) {
133		error = EBUSY;
134		goto done;
135	}
136	LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
137done:
138	ALD_UNLOCK();
139	return (error);
140}
141
142/*
143 * Remove a queue from the global list unless we're shutting down.  If so,
144 * the ald will take care of cleaning up it's resources.
145 */
146static int
147ald_rem(struct alq *alq)
148{
149	int error;
150
151	error = 0;
152
153	ALD_LOCK();
154	if (ald_shutingdown) {
155		error = EBUSY;
156		goto done;
157	}
158	LIST_REMOVE(alq, aq_link);
159done:
160	ALD_UNLOCK();
161	return (error);
162}
163
164/*
165 * Put a queue on the active list.  This will schedule it for writing.
166 */
167static void
168ald_activate(struct alq *alq)
169{
170	LIST_INSERT_HEAD(&ald_active, alq, aq_act);
171	wakeup(&ald_active);
172}
173
174static void
175ald_deactivate(struct alq *alq)
176{
177	LIST_REMOVE(alq, aq_act);
178	alq->aq_flags &= ~AQ_ACTIVE;
179}
180
181static void
182ald_startup(void *unused)
183{
184	mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
185	LIST_INIT(&ald_queues);
186	LIST_INIT(&ald_active);
187}
188
189static void
190ald_daemon(void)
191{
192	int needwakeup;
193	struct alq *alq;
194
195	ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
196
197	EVENTHANDLER_REGISTER(shutdown_pre_sync, ald_shutdown, NULL,
198	    SHUTDOWN_PRI_FIRST);
199
200	ALD_LOCK();
201
202	for (;;) {
203		while ((alq = LIST_FIRST(&ald_active)) == NULL &&
204		    !ald_shutingdown)
205			mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
206
207		/* Don't shutdown until all active ALQs are flushed. */
208		if (ald_shutingdown && alq == NULL) {
209			ALD_UNLOCK();
210			break;
211		}
212
213		ALQ_LOCK(alq);
214		ald_deactivate(alq);
215		ALD_UNLOCK();
216		needwakeup = alq_doio(alq);
217		ALQ_UNLOCK(alq);
218		if (needwakeup)
219			wakeup_one(alq);
220		ALD_LOCK();
221	}
222
223	kproc_exit(0);
224}
225
226static void
227ald_shutdown(void *arg, int howto)
228{
229	struct alq *alq;
230
231	ALD_LOCK();
232
233	/* Ensure no new queues can be created. */
234	ald_shutingdown = 1;
235
236	/* Shutdown all ALQs prior to terminating the ald_daemon. */
237	while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
238		LIST_REMOVE(alq, aq_link);
239		ALD_UNLOCK();
240		alq_shutdown(alq);
241		ALD_LOCK();
242	}
243
244	/* At this point, all ALQs are flushed and shutdown. */
245
246	/*
247	 * Wake ald_daemon so that it exits. It won't be able to do
248	 * anything until we mtx_sleep because we hold the ald_mtx.
249	 */
250	wakeup(&ald_active);
251
252	/* Wait for ald_daemon to exit. */
253	mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
254
255	ALD_UNLOCK();
256}
257
258static void
259alq_shutdown(struct alq *alq)
260{
261	ALQ_LOCK(alq);
262
263	/* Stop any new writers. */
264	alq->aq_flags |= AQ_SHUTDOWN;
265
266	/*
267	 * If the ALQ isn't active but has unwritten data (possible if
268	 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
269	 * ALQ here so that the pending data gets flushed by the ald_daemon.
270	 */
271	if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
272		alq->aq_flags |= AQ_ACTIVE;
273		ALQ_UNLOCK(alq);
274		ALD_LOCK();
275		ald_activate(alq);
276		ALD_UNLOCK();
277		ALQ_LOCK(alq);
278	}
279
280	/* Drain IO */
281	while (alq->aq_flags & AQ_ACTIVE) {
282		alq->aq_flags |= AQ_WANTED;
283		msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
284	}
285	ALQ_UNLOCK(alq);
286
287	vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
288	    curthread);
289	crfree(alq->aq_cred);
290}
291
292void
293alq_destroy(struct alq *alq)
294{
295	/* Drain all pending IO. */
296	alq_shutdown(alq);
297
298	mtx_destroy(&alq->aq_mtx);
299	free(alq->aq_entbuf, M_ALD);
300	free(alq, M_ALD);
301}
302
303/*
304 * Flush all pending data to disk.  This operation will block.
305 */
306static int
307alq_doio(struct alq *alq)
308{
309	struct thread *td;
310	struct mount *mp;
311	struct vnode *vp;
312	struct uio auio;
313	struct iovec aiov[2];
314	int totlen;
315	int iov;
316	int vfslocked;
317	int wrapearly;
318
319	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
320
321	vp = alq->aq_vp;
322	td = curthread;
323	totlen = 0;
324	iov = 1;
325	wrapearly = alq->aq_wrapearly;
326
327	bzero(&aiov, sizeof(aiov));
328	bzero(&auio, sizeof(auio));
329
330	/* Start the write from the location of our buffer tail pointer. */
331	aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
332
333	if (alq->aq_writetail < alq->aq_writehead) {
334		/* Buffer not wrapped. */
335		totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
336	} else if (alq->aq_writehead == 0) {
337		/* Buffer not wrapped (special case to avoid an empty iov). */
338		totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
339		    wrapearly;
340	} else {
341		/*
342		 * Buffer wrapped, requires 2 aiov entries:
343		 * - first is from writetail to end of buffer
344		 * - second is from start of buffer to writehead
345		 */
346		aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
347		    wrapearly;
348		iov++;
349		aiov[1].iov_base = alq->aq_entbuf;
350		aiov[1].iov_len =  alq->aq_writehead;
351		totlen = aiov[0].iov_len + aiov[1].iov_len;
352	}
353
354	alq->aq_flags |= AQ_FLUSHING;
355	ALQ_UNLOCK(alq);
356
357	auio.uio_iov = &aiov[0];
358	auio.uio_offset = 0;
359	auio.uio_segflg = UIO_SYSSPACE;
360	auio.uio_rw = UIO_WRITE;
361	auio.uio_iovcnt = iov;
362	auio.uio_resid = totlen;
363	auio.uio_td = td;
364
365	/*
366	 * Do all of the junk required to write now.
367	 */
368	vfslocked = VFS_LOCK_GIANT(vp->v_mount);
369	vn_start_write(vp, &mp, V_WAIT);
370	vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
371	/*
372	 * XXX: VOP_WRITE error checks are ignored.
373	 */
374#ifdef MAC
375	if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
376#endif
377		VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
378	VOP_UNLOCK(vp, 0);
379	vn_finished_write(mp);
380	VFS_UNLOCK_GIANT(vfslocked);
381
382	ALQ_LOCK(alq);
383	alq->aq_flags &= ~AQ_FLUSHING;
384
385	/* Adjust writetail as required, taking into account wrapping. */
386	alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
387	    alq->aq_buflen;
388	alq->aq_freebytes += totlen + wrapearly;
389
390	/*
391	 * If we just flushed part of the buffer which wrapped, reset the
392	 * wrapearly indicator.
393	 */
394	if (wrapearly)
395		alq->aq_wrapearly = 0;
396
397	/*
398	 * If we just flushed the buffer completely, reset indexes to 0 to
399	 * minimise buffer wraps.
400	 * This is also required to ensure alq_getn() can't wedge itself.
401	 */
402	if (!HAS_PENDING_DATA(alq))
403		alq->aq_writehead = alq->aq_writetail = 0;
404
405	KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
406	    ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
407
408	if (alq->aq_flags & AQ_WANTED) {
409		alq->aq_flags &= ~AQ_WANTED;
410		return (1);
411	}
412
413	return(0);
414}
415
416static struct kproc_desc ald_kp = {
417        "ALQ Daemon",
418        ald_daemon,
419        &ald_proc
420};
421
422SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
423SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
424
425
426/* User visible queue functions */
427
428/*
429 * Create the queue data structure, allocate the buffer, and open the file.
430 */
431
432int
433alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
434    int size, int flags)
435{
436	struct thread *td;
437	struct nameidata nd;
438	struct alq *alq;
439	int oflags;
440	int error;
441	int vfslocked;
442
443	KASSERT((size > 0), ("%s: size <= 0", __func__));
444
445	*alqp = NULL;
446	td = curthread;
447
448	NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
449	oflags = FWRITE | O_NOFOLLOW | O_CREAT;
450
451	error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
452	if (error)
453		return (error);
454
455	vfslocked = NDHASGIANT(&nd);
456	NDFREE(&nd, NDF_ONLY_PNBUF);
457	/* We just unlock so we hold a reference */
458	VOP_UNLOCK(nd.ni_vp, 0);
459	VFS_UNLOCK_GIANT(vfslocked);
460
461	alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
462	alq->aq_vp = nd.ni_vp;
463	alq->aq_cred = crhold(cred);
464
465	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
466
467	alq->aq_buflen = size;
468	alq->aq_entmax = 0;
469	alq->aq_entlen = 0;
470
471	alq->aq_freebytes = alq->aq_buflen;
472	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
473	alq->aq_writehead = alq->aq_writetail = 0;
474	if (flags & ALQ_ORDERED)
475		alq->aq_flags |= AQ_ORDERED;
476
477	if ((error = ald_add(alq)) != 0) {
478		alq_destroy(alq);
479		return (error);
480	}
481
482	*alqp = alq;
483
484	return (0);
485}
486
487int
488alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
489    int size, int count)
490{
491	int ret;
492
493	KASSERT((count >= 0), ("%s: count < 0", __func__));
494
495	if (count > 0) {
496		ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
497		(*alqp)->aq_flags |= AQ_LEGACY;
498		(*alqp)->aq_entmax = count;
499		(*alqp)->aq_entlen = size;
500	} else
501		ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
502
503	return (ret);
504}
505
506
507/*
508 * Copy a new entry into the queue.  If the operation would block either
509 * wait or return an error depending on the value of waitok.
510 */
511int
512alq_writen(struct alq *alq, void *data, int len, int flags)
513{
514	int activate, copy, ret;
515	void *waitchan;
516
517	KASSERT((len > 0 && len <= alq->aq_buflen),
518	    ("%s: len <= 0 || len > aq_buflen", __func__));
519
520	activate = ret = 0;
521	copy = len;
522	waitchan = NULL;
523
524	ALQ_LOCK(alq);
525
526	/*
527	 * Fail to perform the write and return EWOULDBLOCK if:
528	 * - The message is larger than our underlying buffer.
529	 * - The ALQ is being shutdown.
530	 * - There is insufficient free space in our underlying buffer
531	 *   to accept the message and the user can't wait for space.
532	 * - There is insufficient free space in our underlying buffer
533	 *   to accept the message and the alq is inactive due to prior
534	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
535	 */
536	if (len > alq->aq_buflen ||
537	    alq->aq_flags & AQ_SHUTDOWN ||
538	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
539	    HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
540		ALQ_UNLOCK(alq);
541		return (EWOULDBLOCK);
542	}
543
544	/*
545	 * If we want ordered writes and there is already at least one thread
546	 * waiting for resources to become available, sleep until we're woken.
547	 */
548	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
549		KASSERT(!(flags & ALQ_NOWAIT),
550		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
551		alq->aq_waiters++;
552		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
553		alq->aq_waiters--;
554	}
555
556	/*
557	 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
558	 * enter while loop and sleep until we have enough free bytes (former)
559	 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
560	 * be in this loop. Otherwise, multiple threads may be sleeping here
561	 * competing for ALQ resources.
562	 */
563	while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
564		KASSERT(!(flags & ALQ_NOWAIT),
565		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
566		alq->aq_flags |= AQ_WANTED;
567		alq->aq_waiters++;
568		if (waitchan)
569			wakeup(waitchan);
570		msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
571		alq->aq_waiters--;
572
573		/*
574		 * If we're the first thread to wake after an AQ_WANTED wakeup
575		 * but there isn't enough free space for us, we're going to loop
576		 * and sleep again. If there are other threads waiting in this
577		 * loop, schedule a wakeup so that they can see if the space
578		 * they require is available.
579		 */
580		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
581		    alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
582			waitchan = alq;
583		else
584			waitchan = NULL;
585	}
586
587	/*
588	 * If there are waiters, we need to signal the waiting threads after we
589	 * complete our work. The alq ptr is used as a wait channel for threads
590	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
591	 * are not allowed to concurrently compete for resources in the above
592	 * while loop, so we use a different wait channel in this case.
593	 */
594	if (alq->aq_waiters > 0) {
595		if (alq->aq_flags & AQ_ORDERED)
596			waitchan = &alq->aq_waiters;
597		else
598			waitchan = alq;
599	} else
600		waitchan = NULL;
601
602	/* Bail if we're shutting down. */
603	if (alq->aq_flags & AQ_SHUTDOWN) {
604		ret = EWOULDBLOCK;
605		goto unlock;
606	}
607
608	/*
609	 * If we need to wrap the buffer to accommodate the write,
610	 * we'll need 2 calls to bcopy.
611	 */
612	if ((alq->aq_buflen - alq->aq_writehead) < len)
613		copy = alq->aq_buflen - alq->aq_writehead;
614
615	/* Copy message (or part thereof if wrap required) to the buffer. */
616	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
617	alq->aq_writehead += copy;
618
619	if (alq->aq_writehead >= alq->aq_buflen) {
620		KASSERT((alq->aq_writehead == alq->aq_buflen),
621		    ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
622		    __func__,
623		    alq->aq_writehead,
624		    alq->aq_buflen));
625		alq->aq_writehead = 0;
626	}
627
628	if (copy != len) {
629		/*
630		 * Wrap the buffer by copying the remainder of our message
631		 * to the start of the buffer and resetting aq_writehead.
632		 */
633		bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
634		alq->aq_writehead = len - copy;
635	}
636
637	KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
638	    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
639
640	alq->aq_freebytes -= len;
641
642	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
643		alq->aq_flags |= AQ_ACTIVE;
644		activate = 1;
645	}
646
647	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
648
649unlock:
650	ALQ_UNLOCK(alq);
651
652	if (activate) {
653		ALD_LOCK();
654		ald_activate(alq);
655		ALD_UNLOCK();
656	}
657
658	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
659	if (waitchan != NULL)
660		wakeup_one(waitchan);
661
662	return (ret);
663}
664
665int
666alq_write(struct alq *alq, void *data, int flags)
667{
668	/* Should only be called in fixed length message (legacy) mode. */
669	KASSERT((alq->aq_flags & AQ_LEGACY),
670	    ("%s: fixed length write on variable length queue", __func__));
671	return (alq_writen(alq, data, alq->aq_entlen, flags));
672}
673
674/*
675 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
676 */
677struct ale *
678alq_getn(struct alq *alq, int len, int flags)
679{
680	int contigbytes;
681	void *waitchan;
682
683	KASSERT((len > 0 && len <= alq->aq_buflen),
684	    ("%s: len <= 0 || len > alq->aq_buflen", __func__));
685
686	waitchan = NULL;
687
688	ALQ_LOCK(alq);
689
690	/*
691	 * Determine the number of free contiguous bytes.
692	 * We ensure elsewhere that if aq_writehead == aq_writetail because
693	 * the buffer is empty, they will both be set to 0 and therefore
694	 * aq_freebytes == aq_buflen and is fully contiguous.
695	 * If they are equal and the buffer is not empty, aq_freebytes will
696	 * be 0 indicating the buffer is full.
697	 */
698	if (alq->aq_writehead <= alq->aq_writetail)
699		contigbytes = alq->aq_freebytes;
700	else {
701		contigbytes = alq->aq_buflen - alq->aq_writehead;
702
703		if (contigbytes < len) {
704			/*
705			 * Insufficient space at end of buffer to handle a
706			 * contiguous write. Wrap early if there's space at
707			 * the beginning. This will leave a hole at the end
708			 * of the buffer which we will have to skip over when
709			 * flushing the buffer to disk.
710			 */
711			if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
712				/* Keep track of # bytes left blank. */
713				alq->aq_wrapearly = contigbytes;
714				/* Do the wrap and adjust counters. */
715				contigbytes = alq->aq_freebytes =
716				    alq->aq_writetail;
717				alq->aq_writehead = 0;
718			}
719		}
720	}
721
722	/*
723	 * Return a NULL ALE if:
724	 * - The message is larger than our underlying buffer.
725	 * - The ALQ is being shutdown.
726	 * - There is insufficient free space in our underlying buffer
727	 *   to accept the message and the user can't wait for space.
728	 * - There is insufficient free space in our underlying buffer
729	 *   to accept the message and the alq is inactive due to prior
730	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
731	 */
732	if (len > alq->aq_buflen ||
733	    alq->aq_flags & AQ_SHUTDOWN ||
734	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
735	    HAS_PENDING_DATA(alq))) && contigbytes < len)) {
736		ALQ_UNLOCK(alq);
737		return (NULL);
738	}
739
740	/*
741	 * If we want ordered writes and there is already at least one thread
742	 * waiting for resources to become available, sleep until we're woken.
743	 */
744	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
745		KASSERT(!(flags & ALQ_NOWAIT),
746		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
747		alq->aq_waiters++;
748		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
749		alq->aq_waiters--;
750	}
751
752	/*
753	 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
754	 * while loop and sleep until we have enough contiguous free bytes
755	 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
756	 * time will be in this loop. Otherwise, multiple threads may be
757	 * sleeping here competing for ALQ resources.
758	 */
759	while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
760		KASSERT(!(flags & ALQ_NOWAIT),
761		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
762		alq->aq_flags |= AQ_WANTED;
763		alq->aq_waiters++;
764		if (waitchan)
765			wakeup(waitchan);
766		msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
767		alq->aq_waiters--;
768
769		if (alq->aq_writehead <= alq->aq_writetail)
770			contigbytes = alq->aq_freebytes;
771		else
772			contigbytes = alq->aq_buflen - alq->aq_writehead;
773
774		/*
775		 * If we're the first thread to wake after an AQ_WANTED wakeup
776		 * but there isn't enough free space for us, we're going to loop
777		 * and sleep again. If there are other threads waiting in this
778		 * loop, schedule a wakeup so that they can see if the space
779		 * they require is available.
780		 */
781		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
782		    contigbytes < len && !(alq->aq_flags & AQ_WANTED))
783			waitchan = alq;
784		else
785			waitchan = NULL;
786	}
787
788	/*
789	 * If there are waiters, we need to signal the waiting threads after we
790	 * complete our work. The alq ptr is used as a wait channel for threads
791	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
792	 * are not allowed to concurrently compete for resources in the above
793	 * while loop, so we use a different wait channel in this case.
794	 */
795	if (alq->aq_waiters > 0) {
796		if (alq->aq_flags & AQ_ORDERED)
797			waitchan = &alq->aq_waiters;
798		else
799			waitchan = alq;
800	} else
801		waitchan = NULL;
802
803	/* Bail if we're shutting down. */
804	if (alq->aq_flags & AQ_SHUTDOWN) {
805		ALQ_UNLOCK(alq);
806		if (waitchan != NULL)
807			wakeup_one(waitchan);
808		return (NULL);
809	}
810
811	/*
812	 * If we are here, we have a contiguous number of bytes >= len
813	 * available in our buffer starting at aq_writehead.
814	 */
815	alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
816	alq->aq_getpost.ae_bytesused = len;
817
818	return (&alq->aq_getpost);
819}
820
821struct ale *
822alq_get(struct alq *alq, int flags)
823{
824	/* Should only be called in fixed length message (legacy) mode. */
825	KASSERT((alq->aq_flags & AQ_LEGACY),
826	    ("%s: fixed length get on variable length queue", __func__));
827	return (alq_getn(alq, alq->aq_entlen, flags));
828}
829
830void
831alq_post_flags(struct alq *alq, struct ale *ale, int flags)
832{
833	int activate;
834	void *waitchan;
835
836	activate = 0;
837
838	if (ale->ae_bytesused > 0) {
839		if (!(alq->aq_flags & AQ_ACTIVE) &&
840		    !(flags & ALQ_NOACTIVATE)) {
841			alq->aq_flags |= AQ_ACTIVE;
842			activate = 1;
843		}
844
845		alq->aq_writehead += ale->ae_bytesused;
846		alq->aq_freebytes -= ale->ae_bytesused;
847
848		/* Wrap aq_writehead if we filled to the end of the buffer. */
849		if (alq->aq_writehead == alq->aq_buflen)
850			alq->aq_writehead = 0;
851
852		KASSERT((alq->aq_writehead >= 0 &&
853		    alq->aq_writehead < alq->aq_buflen),
854		    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
855		    __func__));
856
857		KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
858	}
859
860	/*
861	 * If there are waiters, we need to signal the waiting threads after we
862	 * complete our work. The alq ptr is used as a wait channel for threads
863	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
864	 * are not allowed to concurrently compete for resources in the
865	 * alq_getn() while loop, so we use a different wait channel in this case.
866	 */
867	if (alq->aq_waiters > 0) {
868		if (alq->aq_flags & AQ_ORDERED)
869			waitchan = &alq->aq_waiters;
870		else
871			waitchan = alq;
872	} else
873		waitchan = NULL;
874
875	ALQ_UNLOCK(alq);
876
877	if (activate) {
878		ALD_LOCK();
879		ald_activate(alq);
880		ALD_UNLOCK();
881	}
882
883	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
884	if (waitchan != NULL)
885		wakeup_one(waitchan);
886}
887
888void
889alq_flush(struct alq *alq)
890{
891	int needwakeup = 0;
892
893	ALD_LOCK();
894	ALQ_LOCK(alq);
895
896	/*
897	 * Pull the lever iff there is data to flush and we're
898	 * not already in the middle of a flush operation.
899	 */
900	if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
901		if (alq->aq_flags & AQ_ACTIVE)
902			ald_deactivate(alq);
903
904		ALD_UNLOCK();
905		needwakeup = alq_doio(alq);
906	} else
907		ALD_UNLOCK();
908
909	ALQ_UNLOCK(alq);
910
911	if (needwakeup)
912		wakeup_one(alq);
913}
914
915/*
916 * Flush remaining data, close the file and free all resources.
917 */
918void
919alq_close(struct alq *alq)
920{
921	/* Only flush and destroy alq if not already shutting down. */
922	if (ald_rem(alq) == 0)
923		alq_destroy(alq);
924}
925
926static int
927alq_load_handler(module_t mod, int what, void *arg)
928{
929	int ret;
930
931	ret = 0;
932
933	switch (what) {
934	case MOD_LOAD:
935	case MOD_SHUTDOWN:
936		break;
937
938	case MOD_QUIESCE:
939		ALD_LOCK();
940		/* Only allow unload if there are no open queues. */
941		if (LIST_FIRST(&ald_queues) == NULL) {
942			ald_shutingdown = 1;
943			ALD_UNLOCK();
944			ald_shutdown(NULL, 0);
945			mtx_destroy(&ald_mtx);
946		} else {
947			ALD_UNLOCK();
948			ret = EBUSY;
949		}
950		break;
951
952	case MOD_UNLOAD:
953		/* If MOD_QUIESCE failed we must fail here too. */
954		if (ald_shutingdown == 0)
955			ret = EBUSY;
956		break;
957
958	default:
959		ret = EINVAL;
960		break;
961	}
962
963	return (ret);
964}
965
966static moduledata_t alq_mod =
967{
968	"alq",
969	alq_load_handler,
970	NULL
971};
972
973DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
974MODULE_VERSION(alq, 1);
975