fifolog_write_poll.c revision 177381
1/*- 2 * Copyright (c) 2005-2008 Poul-Henning Kamp 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: head/usr.sbin/fifolog/lib/fifolog_write_poll.c 177381 2008-03-19 10:56:51Z phk $ 27 */ 28 29#include <assert.h> 30#include <stdio.h> 31#include <string.h> 32#include <stdlib.h> 33#include <unistd.h> 34#include <time.h> 35#include <sys/endian.h> 36 37#include <zlib.h> 38 39#include "fifolog.h" 40#include "libfifolog.h" 41#include "libfifolog_int.h" 42#include "fifolog_write.h" 43#include "miniobj.h" 44 45#define ALLOC(ptr, size) do { \ 46 (*(ptr)) = calloc(size, 1); \ 47 assert(*(ptr) != NULL); \ 48} while (0) 49 50 51const char *fifolog_write_statnames[] = { 52[FIFOLOG_PT_BYTES_PRE] = "Bytes before compression", 53[FIFOLOG_PT_BYTES_POST] = "Bytes after compression", 54[FIFOLOG_PT_WRITES] = "Writes", 55[FIFOLOG_PT_FLUSH] = "Flushes", 56[FIFOLOG_PT_SYNC] = "Syncs", 57[FIFOLOG_PT_RUNTIME] = "Runtime" 58}; 59 60/* 61 * Check that everything is all right 62 */ 63static void 64fifolog_write_assert(const struct fifolog_writer *f) 65{ 66 67 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 68 assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in); 69 assert(f->ff->zs->next_out + f->ff->zs->avail_out == \ 70 f->ff->recbuf + f->ff->recsize); 71} 72 73struct fifolog_writer * 74fifolog_write_new(void) 75{ 76 struct fifolog_writer *f; 77 78 ALLOC(&f, sizeof *f); 79 f->magic = FIFOLOG_WRITER_MAGIC; 80 return (f); 81} 82 83void 84fifolog_write_destroy(struct fifolog_writer *f) 85{ 86 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 87 free(f); 88} 89 90void 91fifolog_write_close(struct fifolog_writer *f) 92{ 93 94 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 95 fifolog_int_close(&f->ff); 96 free(f->ff); 97 if (f->ibuf != NULL) 98 free(f->ibuf); 99 free(f); 100} 101 102static void 103fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag) 104{ 105 106 memset(f->ff->recbuf, 0, f->ff->recsize); 107 f->ff->zs->next_out = f->ff->recbuf + 5; 108 f->ff->zs->avail_out = f->ff->recsize - 5; 109 if (f->recno == 0 && f->seq == 0) { 110 srandomdev(); 111 do { 112 f->seq = random(); 113 } while (f->seq == 0); 114 } 115 be32enc(f->ff->recbuf, f->seq++); 116 f->ff->recbuf[4] = f->flag; 117 f->flag = 0; 118 if (flag) { 119 f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC; 120 be32enc(f->ff->recbuf + 5, (u_int)now); 121 f->ff->zs->next_out += 4; 122 f->ff->zs->avail_out -= 4; 123 } 124 fifolog_write_assert(f); 125} 126 127const char * 128fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression) 129{ 130 const char *es; 131 int i; 132 time_t now; 133 off_t o; 134 135 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 136 137 /* Check for legal compression value */ 138 if (compression < Z_DEFAULT_COMPRESSION || 139 compression > Z_BEST_COMPRESSION) 140 return ("Illegal compression value"); 141 142 f->writerate = writerate; 143 f->syncrate = syncrate; 144 f->compression = compression; 145 146 /* Reset statistics */ 147 memset(f->cnt, 0, sizeof f->cnt); 148 149 es = fifolog_int_open(&f->ff, fn, 1); 150 if (es != NULL) 151 return (es); 152 es = fifolog_int_findend(f->ff, &o); 153 if (es != NULL) 154 return (es); 155 if (o == 0) { 156 f->seq = 0; 157 f->recno = 0; 158 } else { 159 i = fifolog_int_read(f->ff, o); 160 if (i) 161 return ("Read error, looking for seq"); 162 f->seq = be32dec(f->ff->recbuf) + 1; 163 f->recno = o + 1; 164 } 165 166 f->ibufsize = 32768; 167 ALLOC(&f->ibuf, f->ibufsize); 168 f->iptr = f->ibuf; 169 f->ff->zs->next_in = f->iptr; 170 i = deflateInit(f->ff->zs, (int)f->compression); 171 assert(i == Z_OK); 172 173 f->flag |= FIFOLOG_FLG_RESTART; 174 175 time(&now); 176 fifo_prepobuf(f, now, 1); 177 f->starttime = now; 178 179 fifolog_write_assert(f); 180 return (NULL); 181} 182 183static void 184fifo_writerec(struct fifolog_writer *f) 185{ 186 int i; 187 time_t t; 188 189 fifolog_write_assert(f); 190 f->writes_since_sync++; 191 192 assert(f->recno < f->ff->logsize); 193 f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out; 194 if (f->ff->zs->avail_out == 0) { 195 /* nothing */ 196 } else if (f->ff->zs->avail_out <= 255) { 197 f->ff->recbuf[f->ff->recsize - 1] = 198 (u_char)f->ff->zs->avail_out; 199 f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE; 200 } else { 201 be32enc(f->ff->recbuf + f->ff->recsize - 4, 202 f->ff->zs->avail_out); 203 f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE; 204 } 205 i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize, 206 (f->recno + 1) * f->ff->recsize); 207 assert (i == (int)f->ff->recsize); 208 if (++f->recno == f->ff->logsize) 209 f->recno = 0; 210 f->cnt[FIFOLOG_PT_WRITES]++; 211 time(&t); 212 f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */ 213 fifolog_write_assert(f); 214} 215 216int 217fifolog_write_poll(struct fifolog_writer *f, time_t now) 218{ 219 int i, fl, bo, bf; 220 221 if (now == 0) 222 time(&now); 223 224 fifolog_write_assert(f); 225 if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) { 226 /* 227 * We always check the sync timer, otherwise a flood of data 228 * would not get any sync records at all 229 */ 230 f->cleanup = 0; 231 fl = Z_FINISH; 232 f->lastsync = now; 233 f->lastwrite = now; 234 f->cnt[FIFOLOG_PT_SYNC]++; 235 } else if (f->ff->zs->avail_in == 0 && 236 now >= (int)(f->lastwrite + f->writerate)) { 237 /* 238 * We only check for writerate timeouts when the input 239 * buffer is empty. It would be silly to force a write if 240 * pending input could cause it to happen on its own. 241 */ 242 fl = Z_SYNC_FLUSH; 243 f->lastwrite = now; 244 f->cnt[FIFOLOG_PT_FLUSH]++; 245 } else if (f->ff->zs->avail_in == 0) 246 return (0); /* nothing to do */ 247 else 248 fl = Z_NO_FLUSH; 249 250 for (;;) { 251 assert(f->ff->zs->avail_out > 0); 252 253 bf = f->ff->zs->avail_out; 254 255 i = deflate(f->ff->zs, fl); 256 assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END); 257 258 bo = f->ff->zs->avail_out; 259 260 /* If we have output space and not in a hurry.. */ 261 if (bo > 0 && fl == Z_NO_FLUSH) 262 break; 263 264 /* Write output buffer, if anything in it */ 265 if (bo != bf) 266 fifo_writerec(f); 267 268 /* If the buffer were full, we need to check again */ 269 if (bo == 0) { 270 fifo_prepobuf(f, now, 0); 271 continue; 272 } 273 274 if (fl == Z_FINISH) { 275 /* Make next record a SYNC record */ 276 fifo_prepobuf(f, now, 1); 277 /* And reset the zlib engine */ 278 i = deflateReset(f->ff->zs); 279 assert(i == Z_OK); 280 f->writes_since_sync = 0; 281 } else { 282 fifo_prepobuf(f, now, 0); 283 } 284 break; 285 } 286 287 if (f->ff->zs->avail_in == 0) { 288 /* Reset input buffer when empty */ 289 f->iptr = f->ibuf; 290 f->ff->zs->next_in = f->iptr; 291 } 292 293 fifolog_write_assert(f); 294 return (1); 295} 296 297static void 298fifolog_acct(struct fifolog_writer *f, unsigned bytes) 299{ 300 301 f->ff->zs->avail_in += bytes; 302 f->iptr += bytes; 303 f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes; 304} 305 306/* 307 * Attempt to write an entry. 308 * Return zero if there is no space, one otherwise 309 */ 310 311int 312fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 313{ 314 u_int l; 315 const unsigned char *p; 316 317 fifolog_write_assert(f); 318 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 319 assert(ptr != NULL); 320 321 p = ptr; 322 if (len == 0) { 323 len = strlen(ptr) + 1; 324 l = 4 + len; /* id */ 325 } else { 326 assert(len <= 255); 327 id |= FIFOLOG_LENGTH; 328 l = 5 + len; /* id + len */ 329 } 330 331 l += 4; /* A timestamp may be necessary */ 332 333 /* Now do timestamp, if needed */ 334 if (now == 0) 335 time(&now); 336 337 assert(l < f->ibufsize); 338 339 /* Return if there is not enough space */ 340 if (f->iptr + l > f->ibuf + f->ibufsize) 341 return (0); 342 343 if (now != f->last) { 344 id |= FIFOLOG_TIMESTAMP; 345 f->last = now; 346 } 347 348 /* Emit instance+flag and length */ 349 be32enc(f->iptr, id); 350 fifolog_acct(f, 4); 351 352 if (id & FIFOLOG_TIMESTAMP) { 353 be32enc(f->iptr, (uint32_t)f->last); 354 fifolog_acct(f, 4); 355 } 356 if (id & FIFOLOG_LENGTH) { 357 f->iptr[0] = (u_char)len; 358 fifolog_acct(f, 1); 359 } 360 361 assert (len > 0); 362 memcpy(f->iptr, p, len); 363 fifolog_acct(f, len); 364 fifolog_write_assert(f); 365 return (1); 366} 367 368/* 369 * Write an entry, polling until success. 370 * Long binary entries are broken into 255 byte chunks. 371 */ 372 373void 374fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 375{ 376 u_int l; 377 const unsigned char *p; 378 379 fifolog_write_assert(f); 380 381 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 382 assert(ptr != NULL); 383 384 if (len == 0) { 385 while (!fifolog_write_bytes(f, id, now, ptr, len)) { 386 (void)fifolog_write_poll(f, now); 387 (void)usleep(10000); 388 } 389 } else { 390 p = ptr; 391 for (p = ptr; len > 0; len -= l, p += l) { 392 l = len; 393 if (l > 255) 394 l = 255; 395 while (!fifolog_write_bytes(f, id, now, p, l)) { 396 (void)fifolog_write_poll(f, now); 397 (void)usleep(10000); 398 } 399 } 400 } 401 fifolog_write_assert(f); 402} 403 404int 405fifolog_write_flush(struct fifolog_writer *f) 406{ 407 int i; 408 409 fifolog_write_assert(f); 410 411 f->cleanup = 1; 412 for (i = 0; fifolog_write_poll(f, 0); i = 1) 413 continue; 414 fifolog_write_assert(f); 415 return (i); 416} 417