fifolog_write_poll.c revision 176998
1/*- 2 * Copyright (c) 2005-2008 Poul-Henning Kamp 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: head/usr.sbin/fifolog/lib/fifolog_write_poll.c 176998 2008-03-09 19:14:36Z phk $ 27 */ 28 29#include <assert.h> 30#include <stdio.h> 31#include <string.h> 32#include <stdlib.h> 33#include <unistd.h> 34#include <time.h> 35#include <sys/endian.h> 36 37#include <zlib.h> 38 39#include "fifolog.h" 40#include "libfifolog.h" 41#include "libfifolog_int.h" 42#include "fifolog_write.h" 43#include "miniobj.h" 44 45#define ALLOC(ptr, size) do { \ 46 (*(ptr)) = calloc(size, 1); \ 47 assert(*(ptr) != NULL); \ 48} while (0) 49 50 51const char *fifolog_write_statnames[] = { 52[FIFOLOG_PT_BYTES_PRE] = "Bytes before compression", 53[FIFOLOG_PT_BYTES_POST] = "Bytes after compression", 54[FIFOLOG_PT_WRITES] = "Writes", 55[FIFOLOG_PT_FLUSH] = "Flushes", 56[FIFOLOG_PT_SYNC] = "Syncs", 57[FIFOLOG_PT_RUNTIME] = "Runtime" 58}; 59 60/* 61 * Check that everything is all right 62 */ 63static void 64fifolog_write_assert(const struct fifolog_writer *f) 65{ 66 67 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 68 assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in); 69 assert(f->ff->zs->next_out + f->ff->zs->avail_out == \ 70 f->ff->recbuf + f->ff->recsize); 71} 72 73struct fifolog_writer * 74fifolog_write_new(void) 75{ 76 struct fifolog_writer *f; 77 78 ALLOC(&f, sizeof *f); 79 f->magic = FIFOLOG_WRITER_MAGIC; 80 return (f); 81} 82 83void 84fifolog_write_destroy(struct fifolog_writer *f) 85{ 86 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 87 free(f); 88} 89 90void 91fifolog_write_close(struct fifolog_writer *f) 92{ 93 94 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 95 fifolog_int_close(&f->ff); 96 if (f->ibuf != NULL) 97 free(f->ibuf); 98 free(f); 99} 100 101static void 102fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag) 103{ 104 105 memset(f->ff->recbuf, 0, f->ff->recsize); 106 f->ff->zs->next_out = f->ff->recbuf + 5; 107 f->ff->zs->avail_out = f->ff->recsize - 5; 108 if (f->recno == 0 && f->seq == 0) { 109 srandomdev(); 110 do { 111 f->seq = random(); 112 } while (f->seq == 0); 113 } 114 be32enc(f->ff->recbuf, f->seq++); 115 f->ff->recbuf[4] = f->flag; 116 f->flag = 0; 117 if (flag) { 118 f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC; 119 be32enc(f->ff->recbuf + 5, (u_int)now); 120 f->ff->zs->next_out += 4; 121 f->ff->zs->avail_out -= 4; 122 } 123 fifolog_write_assert(f); 124} 125 126const char * 127fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression) 128{ 129 const char *es; 130 int i; 131 time_t now; 132 off_t o; 133 134 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 135 136 /* Check for legal compression value */ 137 if (compression < Z_DEFAULT_COMPRESSION || 138 compression > Z_BEST_COMPRESSION) 139 return ("Illegal compression value"); 140 141 f->writerate = writerate; 142 f->syncrate = syncrate; 143 f->compression = compression; 144 145 /* Reset statistics */ 146 memset(f->cnt, 0, sizeof f->cnt); 147 148 es = fifolog_int_open(&f->ff, fn, 1); 149 if (es != NULL) 150 return (es); 151 es = fifolog_int_findend(f->ff, &o); 152 if (es != NULL) 153 return (es); 154 if (o == 0) { 155 f->seq = 0; 156 f->recno = 0; 157 } else { 158 i = fifolog_int_read(f->ff, o); 159 if (i) 160 return ("Read error, looking for seq"); 161 f->seq = be32dec(f->ff->recbuf) + 1; 162 f->recno = o + 1; 163 } 164 165 f->ibufsize = 32768; 166 ALLOC(&f->ibuf, f->ibufsize); 167 f->iptr = f->ibuf; 168 f->ff->zs->next_in = f->iptr; 169 i = deflateInit(f->ff->zs, (int)f->compression); 170 assert(i == Z_OK); 171 172 f->flag |= FIFOLOG_FLG_RESTART; 173 174 time(&now); 175 fifo_prepobuf(f, now, 1); 176 f->starttime = now; 177 178 fifolog_write_assert(f); 179 return (NULL); 180} 181 182static void 183fifo_writerec(struct fifolog_writer *f) 184{ 185 int i; 186 time_t t; 187 188 fifolog_write_assert(f); 189 f->writes_since_sync++; 190 191 assert(f->recno < f->ff->logsize); 192 f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out; 193 if (f->ff->zs->avail_out == 0) { 194 /* nothing */ 195 } else if (f->ff->zs->avail_out <= 255) { 196 f->ff->recbuf[f->ff->recsize - 1] = 197 (u_char)f->ff->zs->avail_out; 198 f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE; 199 } else { 200 be32enc(f->ff->recbuf + f->ff->recsize - 4, 201 f->ff->zs->avail_out); 202 f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE; 203 } 204 i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize, 205 (f->recno + 1) * f->ff->recsize); 206 assert (i == (int)f->ff->recsize); 207 if (++f->recno == f->ff->logsize) 208 f->recno = 0; 209 f->cnt[FIFOLOG_PT_WRITES]++; 210 time(&t); 211 f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */ 212 fifolog_write_assert(f); 213} 214 215int 216fifolog_write_poll(struct fifolog_writer *f, time_t now) 217{ 218 int i, fl, bo, bf; 219 220 if (now == 0) 221 time(&now); 222 223 fifolog_write_assert(f); 224 if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) { 225 /* 226 * We always check the sync timer, otherwise a flood of data 227 * would not get any sync records at all 228 */ 229 f->cleanup = 0; 230 fl = Z_FINISH; 231 f->lastsync = now; 232 f->lastwrite = now; 233 f->cnt[FIFOLOG_PT_SYNC]++; 234 } else if (f->ff->zs->avail_in == 0 && 235 now >= (int)(f->lastwrite + f->writerate)) { 236 /* 237 * We only check for writerate timeouts when the input 238 * buffer is empty. It would be silly to force a write if 239 * pending input could cause it to happen on its own. 240 */ 241 fl = Z_SYNC_FLUSH; 242 f->lastwrite = now; 243 f->cnt[FIFOLOG_PT_FLUSH]++; 244 } else if (f->ff->zs->avail_in == 0) 245 return (0); /* nothing to do */ 246 else 247 fl = Z_NO_FLUSH; 248 249 for (;;) { 250 assert(f->ff->zs->avail_out > 0); 251 252 bf = f->ff->zs->avail_out; 253 254 i = deflate(f->ff->zs, fl); 255 assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END); 256 257 bo = f->ff->zs->avail_out; 258 259 /* If we have output space and not in a hurry.. */ 260 if (bo > 0 && fl == Z_NO_FLUSH) 261 break; 262 263 /* Write output buffer, if anything in it */ 264 if (bo != bf) 265 fifo_writerec(f); 266 267 /* If the buffer were full, we need to check again */ 268 if (bo == 0) { 269 fifo_prepobuf(f, now, 0); 270 continue; 271 } 272 273 if (fl == Z_FINISH) { 274 /* Make next record a SYNC record */ 275 fifo_prepobuf(f, now, 1); 276 /* And reset the zlib engine */ 277 i = deflateReset(f->ff->zs); 278 assert(i == Z_OK); 279 f->writes_since_sync = 0; 280 } else { 281 fifo_prepobuf(f, now, 0); 282 } 283 break; 284 } 285 286 if (f->ff->zs->avail_in == 0) { 287 /* Reset input buffer when empty */ 288 f->iptr = f->ibuf; 289 f->ff->zs->next_in = f->iptr; 290 } 291 292 fifolog_write_assert(f); 293 return (1); 294} 295 296static void 297fifolog_acct(struct fifolog_writer *f, unsigned bytes) 298{ 299 300 f->ff->zs->avail_in += bytes; 301 f->iptr += bytes; 302 f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes; 303} 304 305/* 306 * Attempt to write an entry. 307 * Return zero if there is no space, one otherwise 308 */ 309 310int 311fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 312{ 313 u_int l; 314 const unsigned char *p; 315 316 fifolog_write_assert(f); 317 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 318 assert(ptr != NULL); 319 320 if (len == 0) { 321 len = strlen(ptr); 322 l = 4 + len; /* id */ 323 p = ptr; 324 } else { 325 assert(len <= 255); 326 p = ptr; 327 id |= FIFOLOG_LENGTH; 328 l = 5 + len; /* id + len */ 329 } 330 331 l += 4; /* A timestamp may be necessary */ 332 333 /* Now do timestamp, if needed */ 334 if (now == 0) 335 time(&now); 336 337 assert(l < f->ibufsize); 338 339 /* Wait until there is sufficient space without the lock */ 340 if (f->iptr + l > f->ibuf + f->ibufsize) 341 return (0); 342 343 if (now != f->last) { 344 id |= FIFOLOG_TIMESTAMP; 345 f->last = now; 346 } 347 348 /* Emit instance+flag and length */ 349 be32enc(f->iptr, id); 350 fifolog_acct(f, 4); 351 352 if (id & FIFOLOG_TIMESTAMP) { 353 be32enc(f->iptr, (uint32_t)f->last); 354 fifolog_acct(f, 4); 355 } 356 if (id & FIFOLOG_LENGTH) { 357 f->iptr[0] = (u_char)len; 358 fifolog_acct(f, 1); 359 } 360 361 assert (len > 0); 362 memcpy(f->iptr, p, len); 363 fifolog_acct(f, len); 364 fifolog_write_assert(f); 365 return (1); 366} 367 368/* 369 * Write an entry, polling until success. 370 * Long binary entries are broken into 255 byte chunks. 371 */ 372 373void 374fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 375{ 376 u_int l; 377 const unsigned char *p; 378 379 fifolog_write_assert(f); 380 381 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 382 assert(ptr != NULL); 383 384 if (len == 0) { 385 while (!fifolog_write_bytes(f, id, now, ptr, len)) { 386 (void)fifolog_write_poll(f, now); 387 (void)usleep(10000); 388 } 389 } else { 390 p = ptr; 391 for (p = ptr; len > 0; len -= l, p += l) { 392 l = len; 393 if (l > 255) 394 l = 255; 395 while (!fifolog_write_bytes(f, id, now, p, l)) { 396 (void)fifolog_write_poll(f, now); 397 (void)usleep(10000); 398 } 399 } 400 } 401 fifolog_write_assert(f); 402} 403 404int 405fifolog_write_flush(struct fifolog_writer *f) 406{ 407 int i; 408 409 fifolog_write_assert(f); 410 411 f->cleanup = 1; 412 for (i = 0; fifolog_write_poll(f, 0); i = 1) 413 continue; 414 fifolog_write_assert(f); 415 return (i); 416} 417