1/* $NetBSD$ */ 2 3/* 4 * Copyright (c) 2004 by Internet Systems Consortium, Inc. ("ISC") 5 * Copyright (c) 1996-1999 by Internet Software Consortium 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 17 * OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20/* ev_streams.c - implement asynch stream file IO for the eventlib 21 * vix 04mar96 [initial] 22 */ 23 24#if !defined(LINT) && !defined(CODECENTER) 25static const char rcsid[] = "Id: ev_streams.c,v 1.5 2005/04/27 04:56:36 sra Exp "; 26#endif 27 28#include "port_before.h" 29#include "fd_setsize.h" 30 31#include <sys/types.h> 32#include <sys/uio.h> 33 34#include <errno.h> 35 36#include <isc/eventlib.h> 37#include <isc/assertions.h> 38#include "eventlib_p.h" 39 40#include "port_after.h" 41 42static int copyvec(evStream *str, const struct iovec *iov, int iocnt); 43static void consume(evStream *str, size_t bytes); 44static void done(evContext opaqueCtx, evStream *str); 45static void writable(evContext opaqueCtx, void *uap, int fd, int evmask); 46static void readable(evContext opaqueCtx, void *uap, int fd, int evmask); 47 48struct iovec 49evConsIovec(void *buf, size_t cnt) { 50 struct iovec ret; 51 52 memset(&ret, 0xf5, sizeof ret); 53 ret.iov_base = buf; 54 ret.iov_len = cnt; 55 return (ret); 56} 57 58int 59evWrite(evContext opaqueCtx, int fd, const struct iovec *iov, int iocnt, 60 evStreamFunc func, void *uap, evStreamID *id) 61{ 62 evContext_p *ctx = opaqueCtx.opaque; 63 evStream *new; 64 int save; 65 66 OKNEW(new); 67 new->func = func; 68 new->uap = uap; 69 new->fd = fd; 70 new->flags = 0; 71 if (evSelectFD(opaqueCtx, fd, EV_WRITE, writable, new, &new->file) < 0) 72 goto free; 73 if (copyvec(new, iov, iocnt) < 0) 74 goto free; 75 new->prevDone = NULL; 76 new->nextDone = NULL; 77 if (ctx->streams != NULL) 78 ctx->streams->prev = new; 79 new->prev = NULL; 80 new->next = ctx->streams; 81 ctx->streams = new; 82 if (id != NULL) 83 id->opaque = new; 84 return (0); 85 free: 86 save = errno; 87 FREE(new); 88 errno = save; 89 return (-1); 90} 91 92int 93evRead(evContext opaqueCtx, int fd, const struct iovec *iov, int iocnt, 94 evStreamFunc func, void *uap, evStreamID *id) 95{ 96 evContext_p *ctx = opaqueCtx.opaque; 97 evStream *new; 98 int save; 99 100 OKNEW(new); 101 new->func = func; 102 new->uap = uap; 103 new->fd = fd; 104 new->flags = 0; 105 if (evSelectFD(opaqueCtx, fd, EV_READ, readable, new, &new->file) < 0) 106 goto free; 107 if (copyvec(new, iov, iocnt) < 0) 108 goto free; 109 new->prevDone = NULL; 110 new->nextDone = NULL; 111 if (ctx->streams != NULL) 112 ctx->streams->prev = new; 113 new->prev = NULL; 114 new->next = ctx->streams; 115 ctx->streams = new; 116 if (id) 117 id->opaque = new; 118 return (0); 119 free: 120 save = errno; 121 FREE(new); 122 errno = save; 123 return (-1); 124} 125 126int 127evTimeRW(evContext opaqueCtx, evStreamID id, evTimerID timer) /*ARGSUSED*/ { 128 evStream *str = id.opaque; 129 130 UNUSED(opaqueCtx); 131 132 str->timer = timer; 133 str->flags |= EV_STR_TIMEROK; 134 return (0); 135} 136 137int 138evUntimeRW(evContext opaqueCtx, evStreamID id) /*ARGSUSED*/ { 139 evStream *str = id.opaque; 140 141 UNUSED(opaqueCtx); 142 143 str->flags &= ~EV_STR_TIMEROK; 144 return (0); 145} 146 147int 148evCancelRW(evContext opaqueCtx, evStreamID id) { 149 evContext_p *ctx = opaqueCtx.opaque; 150 evStream *old = id.opaque; 151 152 /* 153 * The streams list is doubly threaded. First, there's ctx->streams 154 * that's used by evDestroy() to find and cancel all streams. Second, 155 * there's ctx->strDone (head) and ctx->strLast (tail) which thread 156 * through the potentially smaller number of "IO completed" streams, 157 * used in evGetNext() to avoid scanning the entire list. 158 */ 159 160 /* Unlink from ctx->streams. */ 161 if (old->prev != NULL) 162 old->prev->next = old->next; 163 else 164 ctx->streams = old->next; 165 if (old->next != NULL) 166 old->next->prev = old->prev; 167 168 /* 169 * If 'old' is on the ctx->strDone list, remove it. Update 170 * ctx->strLast if necessary. 171 */ 172 if (old->prevDone == NULL && old->nextDone == NULL) { 173 /* 174 * Either 'old' is the only item on the done list, or it's 175 * not on the done list. If the former, then we unlink it 176 * from the list. If the latter, we leave the list alone. 177 */ 178 if (ctx->strDone == old) { 179 ctx->strDone = NULL; 180 ctx->strLast = NULL; 181 } 182 } else { 183 if (old->prevDone != NULL) 184 old->prevDone->nextDone = old->nextDone; 185 else 186 ctx->strDone = old->nextDone; 187 if (old->nextDone != NULL) 188 old->nextDone->prevDone = old->prevDone; 189 else 190 ctx->strLast = old->prevDone; 191 } 192 193 /* Deallocate the stream. */ 194 if (old->file.opaque) 195 evDeselectFD(opaqueCtx, old->file); 196 memput(old->iovOrig, sizeof (struct iovec) * old->iovOrigCount); 197 FREE(old); 198 return (0); 199} 200 201/* Copy a scatter/gather vector and initialize a stream handler's IO. */ 202static int 203copyvec(evStream *str, const struct iovec *iov, int iocnt) { 204 int i; 205 206 str->iovOrig = (struct iovec *)memget(sizeof(struct iovec) * iocnt); 207 if (str->iovOrig == NULL) { 208 errno = ENOMEM; 209 return (-1); 210 } 211 str->ioTotal = 0; 212 for (i = 0; i < iocnt; i++) { 213 str->iovOrig[i] = iov[i]; 214 str->ioTotal += iov[i].iov_len; 215 } 216 str->iovOrigCount = iocnt; 217 str->iovCur = str->iovOrig; 218 str->iovCurCount = str->iovOrigCount; 219 str->ioDone = 0; 220 return (0); 221} 222 223/* Pull off or truncate lead iovec(s). */ 224static void 225consume(evStream *str, size_t bytes) { 226 while (bytes > 0U) { 227 if (bytes < (size_t)str->iovCur->iov_len) { 228 str->iovCur->iov_len -= bytes; 229 str->iovCur->iov_base = (void *) 230 ((u_char *)str->iovCur->iov_base + bytes); 231 str->ioDone += bytes; 232 bytes = 0; 233 } else { 234 bytes -= str->iovCur->iov_len; 235 str->ioDone += str->iovCur->iov_len; 236 str->iovCur++; 237 str->iovCurCount--; 238 } 239 } 240} 241 242/* Add a stream to Done list and deselect the FD. */ 243static void 244done(evContext opaqueCtx, evStream *str) { 245 evContext_p *ctx = opaqueCtx.opaque; 246 247 if (ctx->strLast != NULL) { 248 str->prevDone = ctx->strLast; 249 ctx->strLast->nextDone = str; 250 ctx->strLast = str; 251 } else { 252 INSIST(ctx->strDone == NULL); 253 ctx->strDone = ctx->strLast = str; 254 } 255 evDeselectFD(opaqueCtx, str->file); 256 str->file.opaque = NULL; 257 /* evDrop() will call evCancelRW() on us. */ 258} 259 260/* Dribble out some bytes on the stream. (Called by evDispatch().) */ 261static void 262writable(evContext opaqueCtx, void *uap, int fd, int evmask) { 263 evStream *str = uap; 264 int bytes; 265 266 UNUSED(evmask); 267 268 bytes = writev(fd, str->iovCur, str->iovCurCount); 269 if (bytes > 0) { 270 if ((str->flags & EV_STR_TIMEROK) != 0) 271 evTouchIdleTimer(opaqueCtx, str->timer); 272 consume(str, bytes); 273 } else { 274 if (bytes < 0 && errno != EINTR) { 275 str->ioDone = -1; 276 str->ioErrno = errno; 277 } 278 } 279 if (str->ioDone == -1 || str->ioDone == str->ioTotal) 280 done(opaqueCtx, str); 281} 282 283/* Scoop up some bytes from the stream. (Called by evDispatch().) */ 284static void 285readable(evContext opaqueCtx, void *uap, int fd, int evmask) { 286 evStream *str = uap; 287 int bytes; 288 289 UNUSED(evmask); 290 291 bytes = readv(fd, str->iovCur, str->iovCurCount); 292 if (bytes > 0) { 293 if ((str->flags & EV_STR_TIMEROK) != 0) 294 evTouchIdleTimer(opaqueCtx, str->timer); 295 consume(str, bytes); 296 } else { 297 if (bytes == 0) 298 str->ioDone = 0; 299 else { 300 if (errno != EINTR) { 301 str->ioDone = -1; 302 str->ioErrno = errno; 303 } 304 } 305 } 306 if (str->ioDone <= 0 || str->ioDone == str->ioTotal) 307 done(opaqueCtx, str); 308} 309 310/*! \file */ 311