1/*- 2 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> 3 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org> 4 * Copyright (c) 2009-2010, The FreeBSD Foundation 5 * All rights reserved. 6 * 7 * Portions of this software were developed at the Centre for Advanced 8 * Internet Architectures, Swinburne University of Technology, Melbourne, 9 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation. 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 1. Redistributions of source code must retain the above copyright 15 * notice unmodified, this list of conditions, and the following 16 * disclaimer. 17 * 2. Redistributions in binary form must reproduce the above copyright 18 * notice, this list of conditions and the following disclaimer in the 19 * documentation and/or other materials provided with the distribution. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 22 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 23 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 24 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 25 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 26 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 30 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33#include <sys/cdefs.h> 34__FBSDID("$FreeBSD$"); 35 36#include "opt_mac.h" 37 38#include <sys/param.h> 39#include <sys/systm.h> 40#include <sys/kernel.h> 41#include <sys/kthread.h> 42#include <sys/lock.h> 43#include <sys/mount.h> 44#include <sys/mutex.h> 45#include <sys/namei.h> 46#include <sys/proc.h> 47#include <sys/vnode.h> 48#include <sys/alq.h> 49#include <sys/malloc.h> 50#include <sys/unistd.h> 51#include <sys/fcntl.h> 52#include <sys/eventhandler.h> 53 54#include <security/mac/mac_framework.h> 55 56/* Async. Logging Queue */ 57struct alq { 58 char *aq_entbuf; /* Buffer for stored entries */ 59 int aq_entmax; /* Max entries */ 60 int aq_entlen; /* Entry length */ 61 int aq_freebytes; /* Bytes available in buffer */ 62 int aq_buflen; /* Total length of our buffer */ 63 int aq_writehead; /* Location for next write */ 64 int aq_writetail; /* Flush starts at this location */ 65 int aq_wrapearly; /* # bytes left blank at end of buf */ 66 int aq_flags; /* Queue flags */ 67 int aq_waiters; /* Num threads waiting for resources 68 * NB: Used as a wait channel so must 69 * not be first field in the alq struct 70 */ 71 struct ale aq_getpost; /* ALE for use by get/post */ 72 struct mtx aq_mtx; /* Queue lock */ 73 struct vnode *aq_vp; /* Open vnode handle */ 74 struct ucred *aq_cred; /* Credentials of the opening thread */ 75 LIST_ENTRY(alq) aq_act; /* List of active queues */ 76 LIST_ENTRY(alq) aq_link; /* List of all queues */ 77}; 78 79#define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ 80#define AQ_ACTIVE 0x0002 /* on the active list */ 81#define AQ_FLUSHING 0x0004 /* doing IO */ 82#define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ 83#define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ 84#define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */ 85 86#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) 87#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) 88 89#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen) 90 91static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); 92 93/* 94 * The ald_mtx protects the ald_queues list and the ald_active list. 95 */ 96static struct mtx ald_mtx; 97static LIST_HEAD(, alq) ald_queues; 98static LIST_HEAD(, alq) ald_active; 99static int ald_shutingdown = 0; 100struct thread *ald_thread; 101static struct proc *ald_proc; 102static eventhandler_tag alq_eventhandler_tag = NULL; 103 104#define ALD_LOCK() mtx_lock(&ald_mtx) 105#define ALD_UNLOCK() mtx_unlock(&ald_mtx) 106 107/* Daemon functions */ 108static int ald_add(struct alq *); 109static int ald_rem(struct alq *); 110static void ald_startup(void *); 111static void ald_daemon(void); 112static void ald_shutdown(void *, int); 113static void ald_activate(struct alq *); 114static void ald_deactivate(struct alq *); 115 116/* Internal queue functions */ 117static void alq_shutdown(struct alq *); 118static void alq_destroy(struct alq *); 119static int alq_doio(struct alq *); 120 121 122/* 123 * Add a new queue to the global list. Fail if we're shutting down. 124 */ 125static int 126ald_add(struct alq *alq) 127{ 128 int error; 129 130 error = 0; 131 132 ALD_LOCK(); 133 if (ald_shutingdown) { 134 error = EBUSY; 135 goto done; 136 } 137 LIST_INSERT_HEAD(&ald_queues, alq, aq_link); 138done: 139 ALD_UNLOCK(); 140 return (error); 141} 142 143/* 144 * Remove a queue from the global list unless we're shutting down. If so, 145 * the ald will take care of cleaning up it's resources. 146 */ 147static int 148ald_rem(struct alq *alq) 149{ 150 int error; 151 152 error = 0; 153 154 ALD_LOCK(); 155 if (ald_shutingdown) { 156 error = EBUSY; 157 goto done; 158 } 159 LIST_REMOVE(alq, aq_link); 160done: 161 ALD_UNLOCK(); 162 return (error); 163} 164 165/* 166 * Put a queue on the active list. This will schedule it for writing. 167 */ 168static void 169ald_activate(struct alq *alq) 170{ 171 LIST_INSERT_HEAD(&ald_active, alq, aq_act); 172 wakeup(&ald_active); 173} 174 175static void 176ald_deactivate(struct alq *alq) 177{ 178 LIST_REMOVE(alq, aq_act); 179 alq->aq_flags &= ~AQ_ACTIVE; 180} 181 182static void 183ald_startup(void *unused) 184{ 185 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); 186 LIST_INIT(&ald_queues); 187 LIST_INIT(&ald_active); 188} 189 190static void 191ald_daemon(void) 192{ 193 int needwakeup; 194 struct alq *alq; 195 196 ald_thread = FIRST_THREAD_IN_PROC(ald_proc); 197 198 alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync, 199 ald_shutdown, NULL, SHUTDOWN_PRI_FIRST); 200 201 ALD_LOCK(); 202 203 for (;;) { 204 while ((alq = LIST_FIRST(&ald_active)) == NULL && 205 !ald_shutingdown) 206 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); 207 208 /* Don't shutdown until all active ALQs are flushed. */ 209 if (ald_shutingdown && alq == NULL) { 210 ALD_UNLOCK(); 211 break; 212 } 213 214 ALQ_LOCK(alq); 215 ald_deactivate(alq); 216 ALD_UNLOCK(); 217 needwakeup = alq_doio(alq); 218 ALQ_UNLOCK(alq); 219 if (needwakeup) 220 wakeup_one(alq); 221 ALD_LOCK(); 222 } 223 224 kproc_exit(0); 225} 226 227static void 228ald_shutdown(void *arg, int howto) 229{ 230 struct alq *alq; 231 232 ALD_LOCK(); 233 234 /* Ensure no new queues can be created. */ 235 ald_shutingdown = 1; 236 237 /* Shutdown all ALQs prior to terminating the ald_daemon. */ 238 while ((alq = LIST_FIRST(&ald_queues)) != NULL) { 239 LIST_REMOVE(alq, aq_link); 240 ALD_UNLOCK(); 241 alq_shutdown(alq); 242 ALD_LOCK(); 243 } 244 245 /* At this point, all ALQs are flushed and shutdown. */ 246 247 /* 248 * Wake ald_daemon so that it exits. It won't be able to do 249 * anything until we mtx_sleep because we hold the ald_mtx. 250 */ 251 wakeup(&ald_active); 252 253 /* Wait for ald_daemon to exit. */ 254 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0); 255 256 ALD_UNLOCK(); 257} 258 259static void 260alq_shutdown(struct alq *alq) 261{ 262 ALQ_LOCK(alq); 263 264 /* Stop any new writers. */ 265 alq->aq_flags |= AQ_SHUTDOWN; 266 267 /* 268 * If the ALQ isn't active but has unwritten data (possible if 269 * the ALQ_NOACTIVATE flag has been used), explicitly activate the 270 * ALQ here so that the pending data gets flushed by the ald_daemon. 271 */ 272 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) { 273 alq->aq_flags |= AQ_ACTIVE; 274 ALQ_UNLOCK(alq); 275 ALD_LOCK(); 276 ald_activate(alq); 277 ALD_UNLOCK(); 278 ALQ_LOCK(alq); 279 } 280 281 /* Drain IO */ 282 while (alq->aq_flags & AQ_ACTIVE) { 283 alq->aq_flags |= AQ_WANTED; 284 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0); 285 } 286 ALQ_UNLOCK(alq); 287 288 vn_close(alq->aq_vp, FWRITE, alq->aq_cred, 289 curthread); 290 crfree(alq->aq_cred); 291} 292 293void 294alq_destroy(struct alq *alq) 295{ 296 /* Drain all pending IO. */ 297 alq_shutdown(alq); 298 299 mtx_destroy(&alq->aq_mtx); 300 free(alq->aq_entbuf, M_ALD); 301 free(alq, M_ALD); 302} 303 304/* 305 * Flush all pending data to disk. This operation will block. 306 */ 307static int 308alq_doio(struct alq *alq) 309{ 310 struct thread *td; 311 struct mount *mp; 312 struct vnode *vp; 313 struct uio auio; 314 struct iovec aiov[2]; 315 int totlen; 316 int iov; 317 int vfslocked; 318 int wrapearly; 319 320 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 321 322 vp = alq->aq_vp; 323 td = curthread; 324 totlen = 0; 325 iov = 1; 326 wrapearly = alq->aq_wrapearly; 327 328 bzero(&aiov, sizeof(aiov)); 329 bzero(&auio, sizeof(auio)); 330 331 /* Start the write from the location of our buffer tail pointer. */ 332 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail; 333 334 if (alq->aq_writetail < alq->aq_writehead) { 335 /* Buffer not wrapped. */ 336 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail; 337 } else if (alq->aq_writehead == 0) { 338 /* Buffer not wrapped (special case to avoid an empty iov). */ 339 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 340 wrapearly; 341 } else { 342 /* 343 * Buffer wrapped, requires 2 aiov entries: 344 * - first is from writetail to end of buffer 345 * - second is from start of buffer to writehead 346 */ 347 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 348 wrapearly; 349 iov++; 350 aiov[1].iov_base = alq->aq_entbuf; 351 aiov[1].iov_len = alq->aq_writehead; 352 totlen = aiov[0].iov_len + aiov[1].iov_len; 353 } 354 355 alq->aq_flags |= AQ_FLUSHING; 356 ALQ_UNLOCK(alq); 357 358 auio.uio_iov = &aiov[0]; 359 auio.uio_offset = 0; 360 auio.uio_segflg = UIO_SYSSPACE; 361 auio.uio_rw = UIO_WRITE; 362 auio.uio_iovcnt = iov; 363 auio.uio_resid = totlen; 364 auio.uio_td = td; 365 366 /* 367 * Do all of the junk required to write now. 368 */ 369 vfslocked = VFS_LOCK_GIANT(vp->v_mount); 370 vn_start_write(vp, &mp, V_WAIT); 371 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); 372 /* 373 * XXX: VOP_WRITE error checks are ignored. 374 */ 375#ifdef MAC 376 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0) 377#endif 378 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred); 379 VOP_UNLOCK(vp, 0); 380 vn_finished_write(mp); 381 VFS_UNLOCK_GIANT(vfslocked); 382 383 ALQ_LOCK(alq); 384 alq->aq_flags &= ~AQ_FLUSHING; 385 386 /* Adjust writetail as required, taking into account wrapping. */ 387 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) % 388 alq->aq_buflen; 389 alq->aq_freebytes += totlen + wrapearly; 390 391 /* 392 * If we just flushed part of the buffer which wrapped, reset the 393 * wrapearly indicator. 394 */ 395 if (wrapearly) 396 alq->aq_wrapearly = 0; 397 398 /* 399 * If we just flushed the buffer completely, reset indexes to 0 to 400 * minimise buffer wraps. 401 * This is also required to ensure alq_getn() can't wedge itself. 402 */ 403 if (!HAS_PENDING_DATA(alq)) 404 alq->aq_writehead = alq->aq_writetail = 0; 405 406 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen), 407 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__)); 408 409 if (alq->aq_flags & AQ_WANTED) { 410 alq->aq_flags &= ~AQ_WANTED; 411 return (1); 412 } 413 414 return(0); 415} 416 417static struct kproc_desc ald_kp = { 418 "ALQ Daemon", 419 ald_daemon, 420 &ald_proc 421}; 422 423SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp); 424SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL); 425 426 427/* User visible queue functions */ 428 429/* 430 * Create the queue data structure, allocate the buffer, and open the file. 431 */ 432 433int 434alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 435 int size, int flags) 436{ 437 struct thread *td; 438 struct nameidata nd; 439 struct alq *alq; 440 int oflags; 441 int error; 442 int vfslocked; 443 444 KASSERT((size > 0), ("%s: size <= 0", __func__)); 445 446 *alqp = NULL; 447 td = curthread; 448 449 NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td); 450 oflags = FWRITE | O_NOFOLLOW | O_CREAT; 451 452 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); 453 if (error) 454 return (error); 455 456 vfslocked = NDHASGIANT(&nd); 457 NDFREE(&nd, NDF_ONLY_PNBUF); 458 /* We just unlock so we hold a reference */ 459 VOP_UNLOCK(nd.ni_vp, 0); 460 VFS_UNLOCK_GIANT(vfslocked); 461 462 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); 463 alq->aq_vp = nd.ni_vp; 464 alq->aq_cred = crhold(cred); 465 466 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); 467 468 alq->aq_buflen = size; 469 alq->aq_entmax = 0; 470 alq->aq_entlen = 0; 471 472 alq->aq_freebytes = alq->aq_buflen; 473 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); 474 alq->aq_writehead = alq->aq_writetail = 0; 475 if (flags & ALQ_ORDERED) 476 alq->aq_flags |= AQ_ORDERED; 477 478 if ((error = ald_add(alq)) != 0) { 479 alq_destroy(alq); 480 return (error); 481 } 482 483 *alqp = alq; 484 485 return (0); 486} 487 488int 489alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 490 int size, int count) 491{ 492 int ret; 493 494 KASSERT((count >= 0), ("%s: count < 0", __func__)); 495 496 if (count > 0) { 497 if ((ret = alq_open_flags(alqp, file, cred, cmode, 498 size*count, 0)) == 0) { 499 (*alqp)->aq_flags |= AQ_LEGACY; 500 (*alqp)->aq_entmax = count; 501 (*alqp)->aq_entlen = size; 502 } 503 } else 504 ret = alq_open_flags(alqp, file, cred, cmode, size, 0); 505 506 return (ret); 507} 508 509 510/* 511 * Copy a new entry into the queue. If the operation would block either 512 * wait or return an error depending on the value of waitok. 513 */ 514int 515alq_writen(struct alq *alq, void *data, int len, int flags) 516{ 517 int activate, copy, ret; 518 void *waitchan; 519 520 KASSERT((len > 0 && len <= alq->aq_buflen), 521 ("%s: len <= 0 || len > aq_buflen", __func__)); 522 523 activate = ret = 0; 524 copy = len; 525 waitchan = NULL; 526 527 ALQ_LOCK(alq); 528 529 /* 530 * Fail to perform the write and return EWOULDBLOCK if: 531 * - The message is larger than our underlying buffer. 532 * - The ALQ is being shutdown. 533 * - There is insufficient free space in our underlying buffer 534 * to accept the message and the user can't wait for space. 535 * - There is insufficient free space in our underlying buffer 536 * to accept the message and the alq is inactive due to prior 537 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 538 */ 539 if (len > alq->aq_buflen || 540 alq->aq_flags & AQ_SHUTDOWN || 541 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 542 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) { 543 ALQ_UNLOCK(alq); 544 return (EWOULDBLOCK); 545 } 546 547 /* 548 * If we want ordered writes and there is already at least one thread 549 * waiting for resources to become available, sleep until we're woken. 550 */ 551 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 552 KASSERT(!(flags & ALQ_NOWAIT), 553 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 554 alq->aq_waiters++; 555 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0); 556 alq->aq_waiters--; 557 } 558 559 /* 560 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either 561 * enter while loop and sleep until we have enough free bytes (former) 562 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will 563 * be in this loop. Otherwise, multiple threads may be sleeping here 564 * competing for ALQ resources. 565 */ 566 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 567 KASSERT(!(flags & ALQ_NOWAIT), 568 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 569 alq->aq_flags |= AQ_WANTED; 570 alq->aq_waiters++; 571 if (waitchan) 572 wakeup(waitchan); 573 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0); 574 alq->aq_waiters--; 575 576 /* 577 * If we're the first thread to wake after an AQ_WANTED wakeup 578 * but there isn't enough free space for us, we're going to loop 579 * and sleep again. If there are other threads waiting in this 580 * loop, schedule a wakeup so that they can see if the space 581 * they require is available. 582 */ 583 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 584 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED)) 585 waitchan = alq; 586 else 587 waitchan = NULL; 588 } 589 590 /* 591 * If there are waiters, we need to signal the waiting threads after we 592 * complete our work. The alq ptr is used as a wait channel for threads 593 * requiring resources to be freed up. In the AQ_ORDERED case, threads 594 * are not allowed to concurrently compete for resources in the above 595 * while loop, so we use a different wait channel in this case. 596 */ 597 if (alq->aq_waiters > 0) { 598 if (alq->aq_flags & AQ_ORDERED) 599 waitchan = &alq->aq_waiters; 600 else 601 waitchan = alq; 602 } else 603 waitchan = NULL; 604 605 /* Bail if we're shutting down. */ 606 if (alq->aq_flags & AQ_SHUTDOWN) { 607 ret = EWOULDBLOCK; 608 goto unlock; 609 } 610 611 /* 612 * If we need to wrap the buffer to accommodate the write, 613 * we'll need 2 calls to bcopy. 614 */ 615 if ((alq->aq_buflen - alq->aq_writehead) < len) 616 copy = alq->aq_buflen - alq->aq_writehead; 617 618 /* Copy message (or part thereof if wrap required) to the buffer. */ 619 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); 620 alq->aq_writehead += copy; 621 622 if (alq->aq_writehead >= alq->aq_buflen) { 623 KASSERT((alq->aq_writehead == alq->aq_buflen), 624 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)", 625 __func__, 626 alq->aq_writehead, 627 alq->aq_buflen)); 628 alq->aq_writehead = 0; 629 } 630 631 if (copy != len) { 632 /* 633 * Wrap the buffer by copying the remainder of our message 634 * to the start of the buffer and resetting aq_writehead. 635 */ 636 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy); 637 alq->aq_writehead = len - copy; 638 } 639 640 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), 641 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); 642 643 alq->aq_freebytes -= len; 644 645 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { 646 alq->aq_flags |= AQ_ACTIVE; 647 activate = 1; 648 } 649 650 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 651 652unlock: 653 ALQ_UNLOCK(alq); 654 655 if (activate) { 656 ALD_LOCK(); 657 ald_activate(alq); 658 ALD_UNLOCK(); 659 } 660 661 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 662 if (waitchan != NULL) 663 wakeup_one(waitchan); 664 665 return (ret); 666} 667 668int 669alq_write(struct alq *alq, void *data, int flags) 670{ 671 /* Should only be called in fixed length message (legacy) mode. */ 672 KASSERT((alq->aq_flags & AQ_LEGACY), 673 ("%s: fixed length write on variable length queue", __func__)); 674 return (alq_writen(alq, data, alq->aq_entlen, flags)); 675} 676 677/* 678 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy. 679 */ 680struct ale * 681alq_getn(struct alq *alq, int len, int flags) 682{ 683 int contigbytes; 684 void *waitchan; 685 686 KASSERT((len > 0 && len <= alq->aq_buflen), 687 ("%s: len <= 0 || len > alq->aq_buflen", __func__)); 688 689 waitchan = NULL; 690 691 ALQ_LOCK(alq); 692 693 /* 694 * Determine the number of free contiguous bytes. 695 * We ensure elsewhere that if aq_writehead == aq_writetail because 696 * the buffer is empty, they will both be set to 0 and therefore 697 * aq_freebytes == aq_buflen and is fully contiguous. 698 * If they are equal and the buffer is not empty, aq_freebytes will 699 * be 0 indicating the buffer is full. 700 */ 701 if (alq->aq_writehead <= alq->aq_writetail) 702 contigbytes = alq->aq_freebytes; 703 else { 704 contigbytes = alq->aq_buflen - alq->aq_writehead; 705 706 if (contigbytes < len) { 707 /* 708 * Insufficient space at end of buffer to handle a 709 * contiguous write. Wrap early if there's space at 710 * the beginning. This will leave a hole at the end 711 * of the buffer which we will have to skip over when 712 * flushing the buffer to disk. 713 */ 714 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) { 715 /* Keep track of # bytes left blank. */ 716 alq->aq_wrapearly = contigbytes; 717 /* Do the wrap and adjust counters. */ 718 contigbytes = alq->aq_freebytes = 719 alq->aq_writetail; 720 alq->aq_writehead = 0; 721 } 722 } 723 } 724 725 /* 726 * Return a NULL ALE if: 727 * - The message is larger than our underlying buffer. 728 * - The ALQ is being shutdown. 729 * - There is insufficient free space in our underlying buffer 730 * to accept the message and the user can't wait for space. 731 * - There is insufficient free space in our underlying buffer 732 * to accept the message and the alq is inactive due to prior 733 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 734 */ 735 if (len > alq->aq_buflen || 736 alq->aq_flags & AQ_SHUTDOWN || 737 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 738 HAS_PENDING_DATA(alq))) && contigbytes < len)) { 739 ALQ_UNLOCK(alq); 740 return (NULL); 741 } 742 743 /* 744 * If we want ordered writes and there is already at least one thread 745 * waiting for resources to become available, sleep until we're woken. 746 */ 747 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 748 KASSERT(!(flags & ALQ_NOWAIT), 749 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 750 alq->aq_waiters++; 751 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0); 752 alq->aq_waiters--; 753 } 754 755 /* 756 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter 757 * while loop and sleep until we have enough contiguous free bytes 758 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a 759 * time will be in this loop. Otherwise, multiple threads may be 760 * sleeping here competing for ALQ resources. 761 */ 762 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 763 KASSERT(!(flags & ALQ_NOWAIT), 764 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 765 alq->aq_flags |= AQ_WANTED; 766 alq->aq_waiters++; 767 if (waitchan) 768 wakeup(waitchan); 769 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0); 770 alq->aq_waiters--; 771 772 if (alq->aq_writehead <= alq->aq_writetail) 773 contigbytes = alq->aq_freebytes; 774 else 775 contigbytes = alq->aq_buflen - alq->aq_writehead; 776 777 /* 778 * If we're the first thread to wake after an AQ_WANTED wakeup 779 * but there isn't enough free space for us, we're going to loop 780 * and sleep again. If there are other threads waiting in this 781 * loop, schedule a wakeup so that they can see if the space 782 * they require is available. 783 */ 784 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 785 contigbytes < len && !(alq->aq_flags & AQ_WANTED)) 786 waitchan = alq; 787 else 788 waitchan = NULL; 789 } 790 791 /* 792 * If there are waiters, we need to signal the waiting threads after we 793 * complete our work. The alq ptr is used as a wait channel for threads 794 * requiring resources to be freed up. In the AQ_ORDERED case, threads 795 * are not allowed to concurrently compete for resources in the above 796 * while loop, so we use a different wait channel in this case. 797 */ 798 if (alq->aq_waiters > 0) { 799 if (alq->aq_flags & AQ_ORDERED) 800 waitchan = &alq->aq_waiters; 801 else 802 waitchan = alq; 803 } else 804 waitchan = NULL; 805 806 /* Bail if we're shutting down. */ 807 if (alq->aq_flags & AQ_SHUTDOWN) { 808 ALQ_UNLOCK(alq); 809 if (waitchan != NULL) 810 wakeup_one(waitchan); 811 return (NULL); 812 } 813 814 /* 815 * If we are here, we have a contiguous number of bytes >= len 816 * available in our buffer starting at aq_writehead. 817 */ 818 alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead; 819 alq->aq_getpost.ae_bytesused = len; 820 821 return (&alq->aq_getpost); 822} 823 824struct ale * 825alq_get(struct alq *alq, int flags) 826{ 827 /* Should only be called in fixed length message (legacy) mode. */ 828 KASSERT((alq->aq_flags & AQ_LEGACY), 829 ("%s: fixed length get on variable length queue", __func__)); 830 return (alq_getn(alq, alq->aq_entlen, flags)); 831} 832 833void 834alq_post_flags(struct alq *alq, struct ale *ale, int flags) 835{ 836 int activate; 837 void *waitchan; 838 839 activate = 0; 840 841 if (ale->ae_bytesused > 0) { 842 if (!(alq->aq_flags & AQ_ACTIVE) && 843 !(flags & ALQ_NOACTIVATE)) { 844 alq->aq_flags |= AQ_ACTIVE; 845 activate = 1; 846 } 847 848 alq->aq_writehead += ale->ae_bytesused; 849 alq->aq_freebytes -= ale->ae_bytesused; 850 851 /* Wrap aq_writehead if we filled to the end of the buffer. */ 852 if (alq->aq_writehead == alq->aq_buflen) 853 alq->aq_writehead = 0; 854 855 KASSERT((alq->aq_writehead >= 0 && 856 alq->aq_writehead < alq->aq_buflen), 857 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", 858 __func__)); 859 860 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 861 } 862 863 /* 864 * If there are waiters, we need to signal the waiting threads after we 865 * complete our work. The alq ptr is used as a wait channel for threads 866 * requiring resources to be freed up. In the AQ_ORDERED case, threads 867 * are not allowed to concurrently compete for resources in the 868 * alq_getn() while loop, so we use a different wait channel in this case. 869 */ 870 if (alq->aq_waiters > 0) { 871 if (alq->aq_flags & AQ_ORDERED) 872 waitchan = &alq->aq_waiters; 873 else 874 waitchan = alq; 875 } else 876 waitchan = NULL; 877 878 ALQ_UNLOCK(alq); 879 880 if (activate) { 881 ALD_LOCK(); 882 ald_activate(alq); 883 ALD_UNLOCK(); 884 } 885 886 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 887 if (waitchan != NULL) 888 wakeup_one(waitchan); 889} 890 891void 892alq_flush(struct alq *alq) 893{ 894 int needwakeup = 0; 895 896 ALD_LOCK(); 897 ALQ_LOCK(alq); 898 899 /* 900 * Pull the lever iff there is data to flush and we're 901 * not already in the middle of a flush operation. 902 */ 903 if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) { 904 if (alq->aq_flags & AQ_ACTIVE) 905 ald_deactivate(alq); 906 907 ALD_UNLOCK(); 908 needwakeup = alq_doio(alq); 909 } else 910 ALD_UNLOCK(); 911 912 ALQ_UNLOCK(alq); 913 914 if (needwakeup) 915 wakeup_one(alq); 916} 917 918/* 919 * Flush remaining data, close the file and free all resources. 920 */ 921void 922alq_close(struct alq *alq) 923{ 924 /* Only flush and destroy alq if not already shutting down. */ 925 if (ald_rem(alq) == 0) 926 alq_destroy(alq); 927} 928 929static int 930alq_load_handler(module_t mod, int what, void *arg) 931{ 932 int ret; 933 934 ret = 0; 935 936 switch (what) { 937 case MOD_LOAD: 938 case MOD_SHUTDOWN: 939 break; 940 941 case MOD_QUIESCE: 942 ALD_LOCK(); 943 /* Only allow unload if there are no open queues. */ 944 if (LIST_FIRST(&ald_queues) == NULL) { 945 ald_shutingdown = 1; 946 ALD_UNLOCK(); 947 EVENTHANDLER_DEREGISTER(shutdown_pre_sync, 948 alq_eventhandler_tag); 949 ald_shutdown(NULL, 0); 950 mtx_destroy(&ald_mtx); 951 } else { 952 ALD_UNLOCK(); 953 ret = EBUSY; 954 } 955 break; 956 957 case MOD_UNLOAD: 958 /* If MOD_QUIESCE failed we must fail here too. */ 959 if (ald_shutingdown == 0) 960 ret = EBUSY; 961 break; 962 963 default: 964 ret = EINVAL; 965 break; 966 } 967 968 return (ret); 969} 970 971static moduledata_t alq_mod = 972{ 973 "alq", 974 alq_load_handler, 975 NULL 976}; 977 978DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY); 979MODULE_VERSION(alq, 1); 980