fifolog_write_poll.c revision 216257
1/*- 2 * Copyright (c) 2005-2008 Poul-Henning Kamp 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: head/usr.sbin/fifolog/lib/fifolog_write_poll.c 216257 2010-12-07 16:30:52Z phk $ 27 */ 28 29#include <assert.h> 30#include <stdio.h> 31#include <string.h> 32#include <stdlib.h> 33#include <unistd.h> 34#include <time.h> 35#include <sys/endian.h> 36#if 0 37#include <sys/uio.h> 38#endif 39 40#include <zlib.h> 41 42#include "fifolog.h" 43#include "libfifolog.h" 44#include "libfifolog_int.h" 45#include "fifolog_write.h" 46#include "miniobj.h" 47 48#define ALLOC(ptr, size) do { \ 49 (*(ptr)) = calloc(size, 1); \ 50 assert(*(ptr) != NULL); \ 51} while (0) 52 53 54const char *fifolog_write_statnames[] = { 55[FIFOLOG_PT_BYTES_PRE] = "Bytes before compression", 56[FIFOLOG_PT_BYTES_POST] = "Bytes after compression", 57[FIFOLOG_PT_WRITES] = "Writes", 58[FIFOLOG_PT_FLUSH] = "Flushes", 59[FIFOLOG_PT_SYNC] = "Syncs", 60[FIFOLOG_PT_RUNTIME] = "Runtime" 61}; 62 63/* 64 * Check that everything is all right 65 */ 66static void 67fifolog_write_assert(const struct fifolog_writer *f) 68{ 69 70 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 71 assert(f->ff->zs->next_out + f->ff->zs->avail_out == \ 72 f->obuf + f->obufsize); 73} 74 75struct fifolog_writer * 76fifolog_write_new(void) 77{ 78 struct fifolog_writer *f; 79 80 ALLOC_OBJ(f, FIFOLOG_WRITER_MAGIC); 81 assert(f != NULL); 82 return (f); 83} 84 85void 86fifolog_write_destroy(struct fifolog_writer *f) 87{ 88 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 89 free(f); 90} 91 92void 93fifolog_write_close(struct fifolog_writer *f) 94{ 95 96 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 97 fifolog_int_close(&f->ff); 98 free(f->ff); 99 if (f->obuf != NULL) 100 free(f->obuf); 101 free(f); 102} 103 104const char * 105fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression) 106{ 107 const char *es; 108 int i; 109 time_t now; 110 off_t o; 111 112 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 113 114 /* Check for legal compression value */ 115 if (compression < Z_DEFAULT_COMPRESSION || 116 compression > Z_BEST_COMPRESSION) 117 return ("Illegal compression value"); 118 119 f->writerate = writerate; 120 f->syncrate = syncrate; 121 f->compression = compression; 122 123 /* Reset statistics */ 124 memset(f->cnt, 0, sizeof f->cnt); 125 126 es = fifolog_int_open(&f->ff, fn, 1); 127 if (es != NULL) 128 return (es); 129 es = fifolog_int_findend(f->ff, &o); 130 if (es != NULL) 131 return (es); 132 i = fifolog_int_read(f->ff, o); 133 if (i) 134 return ("Read error, looking for seq"); 135 f->seq = be32dec(f->ff->recbuf); 136 if (f->seq == 0) { 137 /* Empty fifolog */ 138 f->seq = random(); 139 } else { 140 f->recno = o + 1; 141 f->seq++; 142 } 143 144 f->obufsize = f->ff->recsize; 145 ALLOC(&f->obuf, f->obufsize); 146 147 i = deflateInit(f->ff->zs, (int)f->compression); 148 assert(i == Z_OK); 149 150 f->flag |= FIFOLOG_FLG_RESTART; 151 f->flag |= FIFOLOG_FLG_SYNC; 152 f->ff->zs->next_out = f->obuf + 9; 153 f->ff->zs->avail_out = f->obufsize - 9; 154 155 time(&now); 156 f->starttime = now; 157 f->lastsync = now; 158 f->lastwrite = now; 159 160 fifolog_write_assert(f); 161 return (NULL); 162} 163 164static int 165fifolog_write_output(struct fifolog_writer *f, int fl, time_t now) 166{ 167 long h, l = f->ff->zs->next_out - f->obuf; 168 int i, w; 169 170 h = 4; /* seq */ 171 be32enc(f->obuf, f->seq); 172 f->obuf[h] = f->flag; 173 h += 1; /* flag */ 174 if (f->flag & FIFOLOG_FLG_SYNC) { 175 be32enc(f->obuf + h, now); 176 h += 4; /* timestamp */ 177 } 178 179 assert(l <= (long)f->ff->recsize); 180 assert(l >= h); 181 if (l == h) 182 return (0); 183 184 185 if (h + l < (long)f->ff->recsize && fl == Z_NO_FLUSH) 186 return (0); 187 188 w = f->ff->recsize - l; 189 if (w > 255) { 190 be32enc(f->obuf + f->ff->recsize - 4, w); 191 f->obuf[4] |= FIFOLOG_FLG_4BYTE; 192 } else if (w > 0) { 193 f->obuf[f->ff->recsize - 1] = w; 194 f->obuf[4] |= FIFOLOG_FLG_1BYTE; 195 } 196 197 f->cnt[FIFOLOG_PT_BYTES_POST] += w; 198 199#ifdef DBG 200fprintf(stderr, "W: fl=%d h=%ld l=%ld w=%d recno=%jd fx %02x\n", 201 fl, h, l, w, f->recno, f->obuf[4]); 202#endif 203 204 i = pwrite(f->ff->fd, f->obuf, f->ff->recsize, 205 (f->recno + 1) * f->ff->recsize); 206 assert(i == (int)f->ff->recsize); 207 208 f->cnt[FIFOLOG_PT_WRITES]++; 209 210 f->lastwrite = now; 211 f->seq++; 212 f->recno++; 213#ifdef DBG 214if (f->flag) 215fprintf(stderr, "SYNC- %d\n", __LINE__); 216#endif 217 f->flag = 0; 218 219 memset(f->obuf, 0, f->obufsize); 220 f->ff->zs->next_out = f->obuf + 5; 221 f->ff->zs->avail_out = f->obufsize - 5; 222 return (1); 223} 224 225static void 226fifolog_write_gzip(struct fifolog_writer *f, const void *p, int len, time_t now, int fin) 227{ 228 int i, fl; 229 230 f->cnt[FIFOLOG_PT_BYTES_PRE] += len; 231 232 if (fin == 0) 233 fl = Z_NO_FLUSH; 234 else if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) { 235 f->cleanup = 0; 236 fl = Z_FINISH; 237 f->cnt[FIFOLOG_PT_SYNC]++; 238 } else if (now >= (int)(f->lastwrite + f->writerate)) { 239 fl = Z_SYNC_FLUSH; 240 f->cnt[FIFOLOG_PT_FLUSH]++; 241 } else if (p == NULL) 242 return; 243 else 244 fl = Z_NO_FLUSH; 245 246 f->ff->zs->avail_in = len; 247 f->ff->zs->next_in = (void*)(uintptr_t)p; 248#ifdef DBG 249if (fl != Z_NO_FLUSH) 250fprintf(stderr, "Z len %3d fin %d now %ld fl %d ai %u ao %u\n", 251 len, fin, now, fl, 252 f->ff->zs->avail_in, 253 f->ff->zs->avail_out); 254#endif 255 256 while (1) { 257 i = deflate(f->ff->zs, fl); 258 259#ifdef DBG 260if (i || f->ff->zs->avail_in) 261fprintf(stderr, "fl = %d, i = %d ai = %u ao = %u fx=%02x\n", fl, i, 262 f->ff->zs->avail_in, 263 f->ff->zs->avail_out, f->flag); 264#endif 265 266 assert(i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END); 267 assert(f->ff->zs->avail_in == 0); 268 269 if (!fifolog_write_output(f, fl, now)) 270 break; 271 } 272 assert(f->ff->zs->avail_in == 0); 273 if (fl == Z_FINISH) { 274 f->flag |= FIFOLOG_FLG_SYNC; 275 f->ff->zs->next_out = f->obuf + 9; 276 f->ff->zs->avail_out = f->obufsize - 9; 277 f->lastsync = now; 278#ifdef DBG 279fprintf(stderr, "SYNC %d\n", __LINE__); 280#endif 281 assert(Z_OK == deflateReset(f->ff->zs)); 282 } 283} 284 285int 286fifolog_write_poll(struct fifolog_writer *f, time_t now) 287{ 288 if (now == 0) 289 time(&now); 290 fifolog_write_gzip(f, NULL, 0, now, 1); 291 return (0); 292} 293 294/* 295 * Attempt to write an entry. 296 * Return zero if there is no space, one otherwise 297 */ 298 299int 300fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 301{ 302 const unsigned char *p; 303 uint8_t buf[4]; 304 305 fifolog_write_assert(f); 306 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 307 assert(ptr != NULL); 308 309 p = ptr; 310 if (len == 0) { 311 len = strlen(ptr) + 1; 312 } else { 313 assert(len <= 255); 314 id |= FIFOLOG_LENGTH; 315 } 316 317 /* Now do timestamp, if needed */ 318 if (now == 0) 319 time(&now); 320 321 if (now != f->last) { 322 id |= FIFOLOG_TIMESTAMP; 323 f->last = now; 324 } 325 326 /* Emit instance+flag */ 327 be32enc(buf, id); 328 fifolog_write_gzip(f, buf, 4, now, 0); 329 330 if (id & FIFOLOG_TIMESTAMP) { 331 be32enc(buf, (uint32_t)f->last); 332 fifolog_write_gzip(f, buf, 4, now, 0); 333 } 334 if (id & FIFOLOG_LENGTH) { 335 buf[0] = (u_char)len; 336 fifolog_write_gzip(f, buf, 1, now, 0); 337 } 338 339 assert (len > 0); 340#if 1 341 if (len > f->ibufsize) { 342 free(f->ibuf); 343 f->ibufsize = len; 344 ALLOC(&f->ibuf, f->ibufsize); 345 } 346 memcpy(f->ibuf, p, len); 347 fifolog_write_gzip(f, f->ibuf, len, now, 1); 348#else 349 fifolog_write_gzip(f, p, len, now, 1); 350#endif 351 fifolog_write_assert(f); 352 return (1); 353} 354 355/* 356 * Write an entry, polling until success. 357 * Long binary entries are broken into 255 byte chunks. 358 */ 359 360void 361fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 362{ 363 u_int l; 364 const unsigned char *p; 365 366 fifolog_write_assert(f); 367 368 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 369 assert(ptr != NULL); 370 371 if (len == 0) { 372 while (!fifolog_write_bytes(f, id, now, ptr, len)) { 373 (void)usleep(10000); 374 } 375 } else { 376 p = ptr; 377 for (p = ptr; len > 0; len -= l, p += l) { 378 l = len; 379 if (l > 255) 380 l = 255; 381 while (!fifolog_write_bytes(f, id, now, p, l)) { 382 (void)usleep(10000); 383 } 384 } 385 } 386 fifolog_write_assert(f); 387} 388 389int 390fifolog_write_flush(struct fifolog_writer *f) 391{ 392 int i; 393 394 fifolog_write_assert(f); 395 396 f->cleanup = 1; 397 for (i = 0; fifolog_write_poll(f, 0); i = 1) 398 continue; 399 fifolog_write_assert(f); 400 return (i); 401} 402