1/* Copyright (C) 2021 Free Software Foundation, Inc. 2 Contributed by Oracle. 3 4 This file is part of GNU Binutils. 5 6 This program is free software; you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation; either version 3, or (at your option) 9 any later version. 10 11 This program is distributed in the hope that it will be useful, 12 but WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 GNU General Public License for more details. 15 16 You should have received a copy of the GNU General Public License 17 along with this program; if not, write to the Free Software 18 Foundation, 51 Franklin Street - Fifth Floor, Boston, 19 MA 02110-1301, USA. */ 20 21#include "config.h" 22#include <dlfcn.h> 23#include <pthread.h> 24#include <errno.h> 25#include <fcntl.h> 26#include <stdio.h> 27#include <stdlib.h> 28#include <string.h> 29#include <unistd.h> 30#include <sys/mman.h> 31#include <sys/param.h> 32#include <sys/stat.h> 33 34#include "gp-defs.h" 35#include "collector.h" 36#include "gp-experiment.h" 37#include "memmgr.h" 38 39/* TprintfT(<level>,...) definitions. Adjust per module as needed */ 40#define DBG_LT0 0 // for high-level configuration, unexpected errors/warnings 41#define DBG_LT1 1 // for configuration details, warnings 42#define DBG_LT2 2 43#define DBG_LT3 3 44 45/* ------------- Data and prototypes for block management --------- */ 46#define IO_BLK 0 /* Concurrent requests */ 47#define IO_SEQ 1 /* All requests are sequential, f.e. JAVA_CLASSES */ 48#define IO_TXT 2 /* Sequential requests. Text strings. */ 49#define ST_INIT 0 /* Initial state. Not allocated */ 50#define ST_FREE 1 /* Available */ 51#define ST_BUSY 2 /* Not available */ 52 53/* IO_BLK, IO_SEQ */ 54#define NCHUNKS 64 55 56/* IO_TXT */ 57#define NBUFS 64 /* Number of text buffers */ 58#define CUR_BUSY(x) ((uint32_t) ((x)>>63)) /* bit 63 */ 59#define CUR_INDX(x) ((uint32_t) (((x)>>57) & 0x3fULL)) /* bits 62:57 */ 60#define CUR_FOFF(x) ((x) & 0x01ffffffffffffffULL) /* bits 56: 0 */ 61#define CUR_MAKE(busy, indx, foff) ((((uint64_t)(busy))<<63) | (((uint64_t)(indx))<<57) | ((uint64_t)(foff)) ) 62 63typedef struct Buffer 64{ 65 uint8_t *vaddr; 66 uint32_t left; /* bytes left */ 67 uint32_t state; /* ST_FREE or ST_BUSY */ 68} Buffer; 69 70typedef struct DataHandle 71{ 72 Pckt_type kind; /* obsolete (to be removed) */ 73 int iotype; /* IO_BLK, IO_SEQ, IO_TXT */ 74 int active; 75 char fname[MAXPATHLEN]; /* data file name */ 76 77 /* IO_BLK, IO_SEQ */ 78 uint32_t nflow; /* number of data flows */ 79 uint32_t *blkstate; /* block states, nflow*NCHUNKS array */ 80 uint32_t *blkoff; /* block offset, nflow*NCHUNKS array */ 81 uint32_t nchnk; /* number of active chunks, probably small for IO_BLK */ 82 uint8_t *chunks[NCHUNKS]; /* chunks (nflow contiguous blocks in virtual memory) */ 83 uint32_t chblk[NCHUNKS]; /* number of active blocks in a chunk */ 84 uint32_t nblk; /* number of blocks in data file */ 85 int exempt; /* if exempt from experiment size limit */ 86 87 /* IO_TXT */ 88 Buffer *buffers; /* array of text buffers */ 89 uint64_t curpos; /* current buffer and file offset */ 90} DataHandle; 91 92#define PROFILE_DATAHNDL_MAX 16 93static DataHandle data_hndls[PROFILE_DATAHNDL_MAX]; 94static int initialized = 0; 95static long blksz; /* Block size. Multiple of page size. Power of two to make (x%blksz)==(x&(blksz-1)) fast. */ 96static long log2blksz; /* log2(blksz) to make (x/blksz)==(x>>log2blksz) fast. */ 97static uint32_t size_limit; /* Experiment size limit */ 98static uint32_t cur_size; /* Current experiment size */ 99static void init (); 100static void deleteHandle (DataHandle *hndl); 101static int exp_size_ck (int nblocks, char *fname); 102 103/* IO_BLK, IO_SEQ */ 104static int allocateChunk (DataHandle *hndl, unsigned ichunk); 105static uint8_t *getBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk); 106static int remapBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk); 107static int newBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk); 108static void deleteBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk); 109 110/* IO_TXT */ 111static int is_not_the_log_file (char *fname); 112static int mapBuffer (char *fname, Buffer *buf, off64_t foff); 113static int newBuffer (DataHandle *hndl, uint64_t pos); 114static void writeBuffer (Buffer *buf, int blk_off, char *src, int len); 115static void deleteBuffer (Buffer *buf); 116 117/* 118 * Common buffer management routines 119 */ 120static void 121init () 122{ 123 /* set the block size */ 124 long pgsz = CALL_UTIL (sysconf)(_SC_PAGESIZE); 125 blksz = pgsz; 126 log2blksz = 16; /* ensure a minimum size */ 127 while ((1 << log2blksz) < blksz) 128 log2blksz += 1; 129 blksz = 1L << log2blksz; /* ensure that blksz is a power of two */ 130 TprintfT (DBG_LT1, "iolib init: page size=%ld (0x%lx) blksz=%ld (0x%lx) log2blksz=%ld\n", 131 pgsz, pgsz, (long) blksz, (long) blksz, (long) log2blksz); 132 size_limit = 0; 133 cur_size = 0; 134 initialized = 1; 135} 136 137DataHandle * 138__collector_create_handle (char *descp) 139{ 140 int exempt = 0; 141 char *desc = descp; 142 if (desc[0] == '*') 143 { 144 desc++; 145 exempt = 1; 146 } 147 if (!initialized) 148 init (); 149 150 /* set up header for file, file name, etc. */ 151 if (__collector_exp_dir_name == NULL) 152 { 153 __collector_log_write ("<event kind=\"%s\" id=\"%d\">__collector_exp_dir_name==NULL</event>\n", 154 SP_JCMD_CERROR, COL_ERROR_EXPOPEN); 155 return NULL; 156 } 157 char fname[MAXPATHLEN]; 158 CALL_UTIL (strlcpy)(fname, __collector_exp_dir_name, sizeof (fname)); 159 CALL_UTIL (strlcat)(fname, "/", sizeof (fname)); 160 Pckt_type kind = 0; 161 int iotype = IO_BLK; 162 if (__collector_strcmp (desc, SP_HEAPTRACE_FILE) == 0) 163 kind = HEAP_PCKT; 164 else if (__collector_strcmp (desc, SP_SYNCTRACE_FILE) == 0) 165 kind = SYNC_PCKT; 166 else if (__collector_strcmp (desc, SP_IOTRACE_FILE) == 0) 167 kind = IOTRACE_PCKT; 168 else if (__collector_strcmp (desc, SP_RACETRACE_FILE) == 0) 169 kind = RACE_PCKT; 170 else if (__collector_strcmp (desc, SP_PROFILE_FILE) == 0) 171 kind = PROF_PCKT; 172 else if (__collector_strcmp (desc, SP_OMPTRACE_FILE) == 0) 173 kind = OMP_PCKT; 174 else if (__collector_strcmp (desc, SP_HWCNTR_FILE) == 0) 175 kind = HW_PCKT; 176 else if (__collector_strcmp (desc, SP_DEADLOCK_FILE) == 0) 177 kind = DEADLOCK_PCKT; 178 else if (__collector_strcmp (desc, SP_FRINFO_FILE) == 0) 179 CALL_UTIL (strlcat)(fname, "data.", sizeof (fname)); 180 else if (__collector_strcmp (desc, SP_LOG_FILE) == 0) 181 iotype = IO_TXT; 182 else if (__collector_strcmp (desc, SP_MAP_FILE) == 0) 183 iotype = IO_TXT; 184 else if (__collector_strcmp (desc, SP_JCLASSES_FILE) == 0) 185 iotype = IO_SEQ; 186 else 187 { 188 __collector_log_write ("<event kind=\"%s\" id=\"%d\">iolib unknown file desc %s</event>\n", 189 SP_JCMD_CERROR, COL_ERROR_EXPOPEN, desc); 190 return NULL; 191 } 192 193 CALL_UTIL (strlcat)(fname, desc, sizeof (fname)); 194 TprintfT (DBG_LT1, "createHandle calling open on fname = `%s', desc = `%s' %s\n", 195 fname, desc, (exempt == 0 ? "non-exempt" : "exempt")); 196 197 /* allocate a handle -- not mt-safe */ 198 DataHandle *hndl = NULL; 199 for (int i = 0; i < PROFILE_DATAHNDL_MAX; ++i) 200 if (data_hndls[i].active == 0) 201 { 202 hndl = &data_hndls[i]; 203 break; 204 } 205 206 /* out of handles? */ 207 if (hndl == NULL) 208 { 209 __collector_log_write ("<event kind=\"%s\" id=\"%d\">%s</event>\n", 210 SP_JCMD_CERROR, COL_ERROR_NOHNDL, fname); 211 return NULL; 212 } 213 214 hndl->kind = kind; 215 hndl->nblk = 0; 216 hndl->exempt = exempt; 217 CALL_UTIL (strlcpy)(hndl->fname, fname, sizeof (hndl->fname)); 218 int fd = CALL_UTIL (open)(hndl->fname, 219 O_RDWR | O_CREAT | O_TRUNC | O_EXCL, 220 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); 221 if (fd < 0) 222 { 223 TprintfT (0, "createHandle open failed -- hndl->fname = `%s', SP_LOG_FILE = `%s': %s\n", 224 hndl->fname, SP_LOG_FILE, CALL_UTIL (strerror)(errno)); 225 if (is_not_the_log_file (hndl->fname) == 0) 226 { 227 char errbuf[4096]; 228 /* If we are trying to create the handle for the log file, write to stderr, not the experiment */ 229 CALL_UTIL (snprintf)(errbuf, sizeof (errbuf), 230 "create_handle: COL_ERROR_LOG_OPEN %s: %s\n", hndl->fname, CALL_UTIL (strerror)(errno)); 231 CALL_UTIL (write)(2, errbuf, CALL_UTIL (strlen)(errbuf)); 232 233 } 234 else 235 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">%s: create_handle</event>\n", 236 SP_JCMD_CERROR, COL_ERROR_FILEOPN, errno, hndl->fname); 237 return NULL; 238 } 239 CALL_UTIL (close)(fd); 240 241 hndl->iotype = iotype; 242 if (hndl->iotype == IO_TXT) 243 { 244 /* allocate our buffers in virtual memory */ 245 /* later, we will remap buffers individually to the file */ 246 uint8_t *memory = (uint8_t*) CALL_UTIL (mmap64)(0, 247 (size_t) (NBUFS * blksz), 248 PROT_READ | PROT_WRITE, 249#if ARCH(SPARC) 250 MAP_SHARED | MAP_ANON, 251#else 252 MAP_PRIVATE | MAP_ANON, 253#endif 254 -1, 255 (off64_t) 0); 256 if (memory == MAP_FAILED) 257 { 258 TprintfT (0, "create_handle: can't mmap MAP_ANON (for %s): %s\n", hndl->fname, CALL_UTIL (strerror)(errno)); 259 /* see if this is the log file */ 260 if (is_not_the_log_file (hndl->fname) == 0) 261 { 262 /* If we are trying to map the log file, write to stderr, not to the experiment */ 263 char errbuf[4096]; 264 CALL_UTIL (snprintf)(errbuf, sizeof (errbuf), 265 "create_handle: can't mmap MAP_ANON (for %s): %s\n", hndl->fname, CALL_UTIL (strerror)(errno)); 266 CALL_UTIL (write)(2, errbuf, CALL_UTIL (strlen)(errbuf)); 267 } 268 else /* write the error message into the experiment */ 269 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">MAP_ANON (for %s); create_handle</event>\n", 270 SP_JCMD_CERROR, COL_ERROR_FILEMAP, errno, hndl->fname); 271 return NULL; 272 } 273 TprintfT (DBG_LT2, " create_handle IO_TXT data buffer length=%ld (0x%lx) file='%s' memory=%p -- %p\n", 274 (long) (NBUFS * blksz), (long) (NBUFS * blksz), hndl->fname, 275 memory, memory + (NBUFS * blksz) - 1); 276 277 /* set up an array of buffers, pointing them to the virtual addresses */ 278 TprintfT (DBG_LT2, "create_handle IO_TXT Buffer structures fname = `%s', NBUFS= %d, size = %ld (0x%lx)\n", fname, 279 NBUFS, (long) NBUFS * sizeof (Buffer), (long) NBUFS * sizeof (Buffer)); 280 hndl->buffers = (Buffer*) __collector_allocCSize (__collector_heap, NBUFS * sizeof (Buffer), 1); 281 if (hndl->buffers == NULL) 282 { 283 TprintfT (0, "create_handle allocCSize for hndl->buffers failed\n"); 284 CALL_UTIL (munmap)(memory, NBUFS * blksz); 285 return NULL; 286 } 287 for (int i = 0; i < NBUFS; i++) 288 { 289 Buffer *buf = &hndl->buffers[i]; 290 buf->vaddr = memory + i * blksz; 291 buf->state = ST_FREE; 292 } 293 /* set the file pointer to the beginning of the file */ 294 hndl->curpos = CUR_MAKE (0, 0, 0); 295 } 296 else 297 { 298 if (hndl->iotype == IO_BLK) 299 { 300 long nflow = CALL_UTIL (sysconf)(_SC_NPROCESSORS_ONLN); 301 if (nflow < 16) 302 nflow = 16; 303 hndl->nflow = (uint32_t) nflow; 304 } 305 else if (hndl->iotype == IO_SEQ) 306 hndl->nflow = 1; 307 TprintfT (DBG_LT2, "create_handle calling allocCSize blkstate fname=`%s' nflow=%d NCHUNKS=%d size=%ld (0x%lx)\n", 308 fname, hndl->nflow, NCHUNKS, 309 (long) (hndl->nflow * NCHUNKS * sizeof (uint32_t)), 310 (long) (hndl->nflow * NCHUNKS * sizeof (uint32_t))); 311 uint32_t *blkstate = (uint32_t*) __collector_allocCSize (__collector_heap, hndl->nflow * NCHUNKS * sizeof (uint32_t), 1); 312 if (blkstate == NULL) 313 return NULL; 314 for (int j = 0; j < hndl->nflow * NCHUNKS; ++j) 315 blkstate[j] = ST_INIT; 316 hndl->blkstate = blkstate; 317 TprintfT (DBG_LT2, "create_handle calling allocCSize blkoff fname=`%s' nflow=%d NCHUNKS=%d size=%ld (0x%lx)\n", 318 fname, hndl->nflow, NCHUNKS, 319 (long) (hndl->nflow * NCHUNKS * sizeof (uint32_t)), 320 (long) (hndl->nflow * NCHUNKS * sizeof (uint32_t))); 321 hndl->blkoff = (uint32_t*) __collector_allocCSize (__collector_heap, hndl->nflow * NCHUNKS * sizeof (uint32_t), 1); 322 if (hndl->blkoff == NULL) 323 return NULL; 324 hndl->nchnk = 0; 325 for (int j = 0; j < NCHUNKS; ++j) 326 { 327 hndl->chunks[j] = NULL; 328 hndl->chblk[j] = 0; 329 } 330 } 331 hndl->active = 1; 332 return hndl; 333} 334 335static void 336deleteHandle (DataHandle *hndl) 337{ 338 if (hndl->active == 0) 339 return; 340 hndl->active = 0; 341 342 if (hndl->iotype == IO_BLK || hndl->iotype == IO_SEQ) 343 { 344 /* Delete all blocks. */ 345 /* Since access to hndl->active is not synchronized it's still 346 * possible that we leave some blocks undeleted. 347 */ 348 for (int j = 0; j < hndl->nflow * NCHUNKS; ++j) 349 { 350 uint32_t oldstate = hndl->blkstate[j]; 351 if (oldstate != ST_FREE) 352 continue; 353 /* Mark as busy */ 354 uint32_t state = __collector_cas_32 (hndl->blkstate + j, oldstate, ST_BUSY); 355 if (state != oldstate) 356 continue; 357 deleteBlock (hndl, j / NCHUNKS, j % NCHUNKS); 358 } 359 } 360 else if (hndl->iotype == IO_TXT) 361 { 362 /* 363 * First, make sure that buffers are in some "coherent" state: 364 * 365 * At this point, the handle is no longer active. But some threads 366 * might already have passed the active-handle check and are now 367 * trying to schedule writes. So, set the handle pointer to "busy". 368 * This will prevent new writes from being scheduled. Threads that 369 * polling will time out. 370 */ 371 hrtime_t timeout = __collector_gethrtime () + 10 * ((hrtime_t) 1000000000); 372 volatile uint32_t busy = 0; 373 while (1) 374 { 375 uint32_t indx; 376 uint64_t opos, npos, foff; 377 int blk_off; 378 /* read the current pointer */ 379 opos = hndl->curpos; 380 busy = CUR_BUSY (opos); 381 indx = CUR_INDX (opos); 382 foff = CUR_FOFF (opos); 383 if (busy == 1) 384 { 385 if (__collector_gethrtime () > timeout) 386 { 387 TprintfT (0, "deleteHandle ERROR: timeout cleaning up handle for %s\n", hndl->fname); 388 return; 389 } 390 continue; 391 } 392 blk_off = foff & (blksz - 1); 393 if (blk_off > 0) 394 foff += blksz - blk_off; 395 npos = CUR_MAKE (1, indx, foff); 396 397 /* try to update the handle position atomically */ 398 if (__collector_cas_64p (&hndl->curpos, &opos, &npos) != opos) 399 continue; 400 401 /* 402 * If the last buffer won't be filled, account for 403 * the white space at the end so that the buffer will 404 * be deleted properly. 405 */ 406 if (blk_off > 0) 407 { 408 Buffer *buf = &hndl->buffers[indx]; 409 if (__collector_subget_32 (&buf->left, blksz - blk_off) == 0) 410 deleteBuffer (buf); 411 } 412 break; 413 } 414 /* wait for buffers to be deleted */ 415 timeout = __collector_gethrtime () + 10 * ((hrtime_t) 1000000000); 416 for (int i = 0; i < NBUFS; i++) 417 { 418 Buffer *buf = &hndl->buffers[i]; 419 while (__collector_cas_32 (&buf->state, ST_FREE, ST_INIT) != ST_FREE) 420 { 421 if (__collector_gethrtime () > timeout) 422 { 423 TprintfT (0, "deleteHandle ERROR: timeout waiting for buffer %d for %s\n", i, hndl->fname); 424 return; 425 } 426 } 427 CALL_UTIL (munmap)(buf->vaddr, blksz); 428 } 429 430 /* free buffer array */ 431 __collector_freeCSize (__collector_heap, hndl->buffers, NBUFS * sizeof (Buffer)); 432 } 433} 434 435void 436__collector_delete_handle (DataHandle *hndl) 437{ 438 if (hndl == NULL) 439 return; 440 deleteHandle (hndl); 441} 442 443static int 444exp_size_ck (int nblocks, char *fname) 445{ 446 if (size_limit == 0) 447 return 0; 448 /* do an atomic add to the cur_size */ 449 uint32_t old_size = cur_size; 450 uint32_t new_size; 451 for (;;) 452 { 453 new_size = __collector_cas_32 (&cur_size, old_size, old_size + nblocks); 454 if (new_size == old_size) 455 { 456 new_size = old_size + nblocks; 457 break; 458 } 459 old_size = new_size; 460 } 461 TprintfT (DBG_LT2, "exp_size_ck() adding %d block(s); new_size = %d, limit = %d blocks; fname = %s\n", 462 nblocks, new_size, size_limit, fname); 463 464 /* pause the entire collector if we have exceeded the limit */ 465 if (old_size < size_limit && new_size >= size_limit) 466 { 467 TprintfT (0, "exp_size_ck() experiment size limit exceeded; new_size = %ld, limit = %ld blocks; fname = %s\n", 468 (long) new_size, (long) size_limit, fname); 469 (void) __collector_log_write ("<event kind=\"%s\" id=\"%d\">%ld blocks (each %ld bytes)</event>\n", 470 SP_JCMD_CWARN, COL_ERROR_SIZELIM, (long) size_limit, (long) blksz); 471 __collector_pause_m ("size-limit"); 472 __collector_terminate_expt (); 473 return -1; 474 } 475 return 0; 476} 477 478int 479__collector_set_size_limit (char *par) 480{ 481 if (!initialized) 482 init (); 483 484 int lim = CALL_UTIL (strtol)(par, &par, 0); 485 size_limit = (uint32_t) ((uint64_t) lim * 1024 * 1024 / blksz); 486 TprintfT (DBG_LT0, "collector_size_limit set to %d MB. = %d blocks\n", 487 lim, size_limit); 488 (void) __collector_log_write ("<setting limit=\"%d\"/>\n", lim); 489 return COL_ERROR_NONE; 490} 491 492/* 493 * IO_BLK and IO_SEQ files 494 */ 495 496/* 497 * Allocate a chunk (nflow blocks) contiguously in virtual memory. 498 * Its blocks will be mmapped to the file individually. 499 */ 500static int 501allocateChunk (DataHandle *hndl, unsigned ichunk) 502{ 503 /* 504 * hndl->chunks[ichunk] is one of: 505 * - NULL (initial value) 506 * - CHUNK_BUSY (transition state when allocating the chunk) 507 * - some address (the allocated chunk) 508 */ 509 uint8_t *CHUNK_BUSY = (uint8_t *) 1; 510 hrtime_t timeout = 0; 511 while (1) 512 { 513 if (hndl->chunks[ichunk] > CHUNK_BUSY) 514 return 0; /* the chunk has already been allocated */ 515 /* try to allocate the chunk (change: NULL => CHUNK_BUSY) */ 516 if (__collector_cas_ptr (&hndl->chunks[ichunk], NULL, CHUNK_BUSY) == NULL) 517 { 518 /* allocate virtual memory */ 519 uint8_t *newchunk = (uint8_t*) CALL_UTIL (mmap64)(0, 520 (size_t) (blksz * hndl->nflow), 521 PROT_READ | PROT_WRITE, 522#if ARCH(SPARC) 523 MAP_SHARED | MAP_ANON, 524#else 525 MAP_PRIVATE | MAP_ANON, 526#endif 527 -1, (off64_t) 0); 528 if (newchunk == MAP_FAILED) 529 { 530 deleteHandle (hndl); 531 TprintfT (DBG_LT1, " allocateChunk mmap: start=0x%x length=%ld (0x%lx), offset=%d ret=%p\n", 532 0, (long) (blksz * hndl->nflow), 533 (long) (blksz * hndl->nflow), 0, newchunk); 534 TprintfT (0, "allocateChunk: can't mmap MAP_ANON (for %s): %s\n", hndl->fname, CALL_UTIL (strerror) (errno)); 535 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">MAP_ANON (for %s)</event>\n", 536 SP_JCMD_CERROR, COL_ERROR_FILEMAP, errno, hndl->fname); 537 return 1; 538 } 539 540 /* assign allocated address to our chunk */ 541 if (__collector_cas_ptr (&hndl->chunks[ichunk], CHUNK_BUSY, newchunk) != CHUNK_BUSY) 542 { 543 TprintfT (0, "allocateChunk: can't release chunk CAS lock for %s\n", hndl->fname); 544 __collector_log_write ("<event kind=\"%s\" id=\"%d\">couldn't release chunk CAS lock (%s)</event>\n", 545 SP_JCMD_CERROR, COL_ERROR_GENERAL, hndl->fname); 546 } 547 __collector_inc_32 (&hndl->nchnk); 548 return 0; 549 } 550 551 /* check for time out */ 552 if (timeout == 0) 553 timeout = __collector_gethrtime () + 10 * ((hrtime_t) 1000000000); 554 if (__collector_gethrtime () > timeout) 555 { 556 TprintfT (0, "allocateChunk: timeout for %s\n", hndl->fname); 557 __collector_log_write ("<event kind=\"%s\" id=\"%d\">timeout allocating chunk for %s</event>\n", 558 SP_JCMD_CERROR, COL_ERROR_GENERAL, hndl->fname); 559 return 1; 560 } 561 } 562} 563 564/* 565 * Get the address for block (iflow,ichunk). 566 */ 567static uint8_t * 568getBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk) 569{ 570 return hndl->chunks[ichunk] + iflow * blksz; 571} 572 573/* 574 * Map block (iflow,ichunk) to the next part of the file. 575 */ 576static int 577remapBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk) 578{ 579 int rc = 0; 580 int fd; 581 /* Get the old file nblk and increment it atomically. */ 582 uint32_t oldblk = hndl->nblk; 583 for (;;) 584 { 585 uint32_t newblk = __collector_cas_32 (&hndl->nblk, oldblk, oldblk + 1); 586 if (newblk == oldblk) 587 break; 588 oldblk = newblk; 589 } 590 off64_t offset = (off64_t) oldblk * blksz; 591 592 /* 6618470: disable thread cancellation */ 593 int old_cstate; 594 pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_cstate); 595 596 /* Open the file. */ 597 int iter = 0; 598 hrtime_t tso = __collector_gethrtime (); 599 for (;;) 600 { 601 fd = CALL_UTIL (open)(hndl->fname, O_RDWR, 0); 602 if (fd < 0) 603 { 604 if (errno == EMFILE) 605 { 606 /* too many open files */ 607 iter++; 608 if (iter > 1000) 609 { 610 /* we've tried 1000 times; kick error back to caller */ 611 char errmsg[MAXPATHLEN + 50]; 612 hrtime_t teo = __collector_gethrtime (); 613 double deltato = (double) (teo - tso) / 1000000.; 614 (void) CALL_UTIL (snprintf) (errmsg, sizeof (errmsg), " t=%d, %s: open-retries-failed = %d, %3.6f ms.; remap", 615 __collector_thr_self (), hndl->fname, iter, deltato); 616 __collector_log_write ("<event kind=\"%s\" id=\"%d\">%s</event>\n", 617 SP_JCMD_COMMENT, COL_COMMENT_NONE, errmsg); 618 rc = 1; 619 goto exit; 620 } 621 /* keep trying */ 622 continue; 623 } 624 deleteHandle (hndl); 625 TprintfT (0, "remapBlock: can't open file: %s: %s\n", hndl->fname, STR (CALL_UTIL (strerror)(errno))); 626 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">t=%llu, %s: remap </event>\n", 627 SP_JCMD_CERROR, COL_ERROR_FILEOPN, errno, 628 (unsigned long long) __collector_thr_self (), 629 hndl->fname); 630 rc = 1; 631 goto exit; 632 } 633 else 634 break; 635 } 636 637 /* report number of retries of the open due to too many open fd's */ 638 if (iter > 0) 639 { 640 char errmsg[MAXPATHLEN + 50]; 641 hrtime_t teo = __collector_gethrtime (); 642 double deltato = (double) (teo - tso) / 1000000.; 643 (void) CALL_UTIL (snprintf) (errmsg, sizeof (errmsg), " t=%d, %s: open-retries = %d, %3.6f ms.; remap", 644 __collector_thr_self (), hndl->fname, iter, deltato); 645 __collector_log_write ("<event kind=\"%s\" id=\"%d\">%s</event>\n", 646 SP_JCMD_COMMENT, COL_COMMENT_NONE, errmsg); 647 } 648 649 /* Ensure disk space is allocated and the block offset is 0 */ 650 uint32_t zero = 0; 651 int n = CALL_UTIL (pwrite64)(fd, &zero, sizeof (zero), (off64_t) (offset + blksz - sizeof (zero))); 652 if (n <= 0) 653 { 654 deleteHandle (hndl); 655 TprintfT (0, "remapBlock: can't pwrite file: %s : errno=%d\n", hndl->fname, errno); 656 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">%s: remap</event>\n", 657 SP_JCMD_CERROR, COL_ERROR_NOSPACE, errno, hndl->fname); 658 CALL_UTIL (close)(fd); 659 rc = 1; 660 goto exit; 661 } 662 hndl->blkoff[iflow * NCHUNKS + ichunk] = 0; 663 664 /* Map block to file */ 665 uint8_t *bptr = getBlock (hndl, iflow, ichunk); 666 uint8_t *vaddr = (uint8_t *) CALL_UTIL (mmap64)( 667 (void*) bptr, 668 (size_t) blksz, 669 PROT_READ | PROT_WRITE, 670 MAP_SHARED | MAP_FIXED, 671 fd, 672 offset); 673 674 if (vaddr != bptr) 675 { 676 deleteHandle (hndl); 677 TprintfT (DBG_LT1, " remapBlock mmap: start=%p length=%ld (0x%lx) offset=0x%llx ret=%p\n", 678 bptr, (long) blksz, (long) blksz, (long long) offset, vaddr); 679 TprintfT (0, "remapBlock: can't mmap file: %s : errno=%d\n", hndl->fname, errno); 680 (void) __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">%s: remap</event>\n", 681 SP_JCMD_CERROR, COL_ERROR_FILEMAP, errno, hndl->fname); 682 CALL_UTIL (close)(fd); 683 rc = 1; 684 goto exit; 685 } 686 CALL_UTIL (close)(fd); 687 688 if (hndl->exempt == 0) 689 exp_size_ck (1, hndl->fname); 690 else 691 Tprintf (DBG_LT1, "exp_size_ck() bypassed for %d block(s); exempt fname = %s\n", 692 1, hndl->fname); 693exit: 694 /* Restore the previous cancellation state */ 695 pthread_setcancelstate (old_cstate, NULL); 696 697 return rc; 698} 699 700static int 701newBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk) 702{ 703 if (allocateChunk (hndl, ichunk) != 0) 704 return 1; 705 if (remapBlock (hndl, iflow, ichunk) != 0) 706 return 1; 707 708 /* Update the number of active blocks */ 709 __collector_inc_32 (hndl->chblk + ichunk); 710 return 0; 711} 712 713static void 714deleteBlock (DataHandle *hndl, unsigned iflow, unsigned ichunk) 715{ 716 uint8_t *bptr = getBlock (hndl, iflow, ichunk); 717 CALL_UTIL (munmap)((void*) bptr, blksz); 718 hndl->blkstate[iflow * NCHUNKS + ichunk] = ST_INIT; 719 720 /* Update the number of active blocks */ 721 __collector_dec_32 (hndl->chblk + ichunk); 722} 723 724int 725__collector_write_record (DataHandle *hndl, Common_packet *pckt) 726{ 727 if (hndl == NULL || !hndl->active) 728 return 1; 729 /* fill in the fields of the common packet structure */ 730 if (pckt->type == 0) 731 pckt->type = hndl->kind; 732 if (pckt->tstamp == 0) 733 pckt->tstamp = __collector_gethrtime (); 734 if (pckt->lwp_id == 0) 735 pckt->lwp_id = __collector_lwp_self (); 736 if (pckt->thr_id == 0) 737 pckt->thr_id = __collector_thr_self (); 738 if (pckt->cpu_id == 0) 739 pckt->cpu_id = CALL_UTIL (getcpuid)(); 740 if (pckt->tsize == 0) 741 pckt->tsize = sizeof (Common_packet); 742 TprintfT (DBG_LT3, "collector_write_record to %s, type:%d tsize:%d\n", 743 hndl->fname, pckt->type, pckt->tsize); 744 return __collector_write_packet (hndl, (CM_Packet*) pckt); 745} 746 747int 748__collector_write_packet (DataHandle *hndl, CM_Packet *pckt) 749{ 750 if (hndl == NULL || !hndl->active) 751 return 1; 752 753 /* if the experiment is not open, there should be no writes */ 754 if (__collector_expstate != EXP_OPEN) 755 { 756#ifdef DEBUG 757 char *xstate; 758 switch (__collector_expstate) 759 { 760 case EXP_INIT: 761 xstate = "EXP_INIT"; 762 break; 763 case EXP_OPEN: 764 xstate = "EXP_OPEN"; 765 break; 766 case EXP_PAUSED: 767 xstate = "EXP_PAUSED"; 768 break; 769 case EXP_CLOSED: 770 xstate = "EXP_CLOSED"; 771 break; 772 default: 773 xstate = "Unknown"; 774 break; 775 } 776 TprintfT (0, "collector_write_packet: write to %s while experiment state is %s\n", 777 hndl->fname, xstate); 778#endif 779 return 1; 780 } 781 int recsz = pckt->tsize; 782 if (recsz > blksz) 783 { 784 TprintfT (0, "collector_write_packet: packet too long: %d (max %ld)\n", recsz, blksz); 785 return 1; 786 } 787 unsigned tid = (__collector_no_threads ? __collector_lwp_self () : __collector_thr_self ()); 788 unsigned iflow = tid % hndl->nflow; 789 790 /* Acquire block */ 791 uint32_t *sptr = &hndl->blkstate[iflow * NCHUNKS]; 792 uint32_t state = ST_BUSY; 793 unsigned ichunk; 794 for (ichunk = 0; ichunk < NCHUNKS; ++ichunk) 795 { 796 uint32_t oldstate = sptr[ichunk]; 797 if (oldstate == ST_BUSY) 798 continue; 799 /* Mark as busy */ 800 state = __collector_cas_32 (sptr + ichunk, oldstate, ST_BUSY); 801 if (state == oldstate) 802 break; 803 if (state == ST_BUSY) 804 continue; 805 /* It's possible the state changed from ST_INIT to ST_FREE */ 806 oldstate = state; 807 state = __collector_cas_32 (sptr + ichunk, oldstate, ST_BUSY); 808 if (state == oldstate) 809 break; 810 } 811 812 if (state == ST_BUSY || ichunk == NCHUNKS) 813 { 814 /* We are out of blocks for this data flow. 815 * We might switch to another flow but for now report and return. 816 */ 817 TprintfT (0, "collector_write_packet: all %d blocks on flow %d for %s are busy\n", 818 NCHUNKS, iflow, hndl->fname); 819 return 1; 820 } 821 822 if (state == ST_INIT && newBlock (hndl, iflow, ichunk) != 0) 823 return 1; 824 uint8_t *bptr = getBlock (hndl, iflow, ichunk); 825 uint32_t blkoff = hndl->blkoff[iflow * NCHUNKS + ichunk]; 826 if (blkoff + recsz > blksz) 827 { 828 /* The record doesn't fit. Close the block */ 829 if (blkoff < blksz) 830 { 831 Common_packet *closed = (Common_packet *) (bptr + blkoff); 832 closed->type = CLOSED_PCKT; 833 closed->tsize = blksz - blkoff; /* redundant */ 834 } 835 if (remapBlock (hndl, iflow, ichunk) != 0) 836 return 1; 837 blkoff = hndl->blkoff[iflow * NCHUNKS + ichunk]; 838 } 839 if (blkoff + recsz < blksz) 840 { 841 /* Set the empty padding */ 842 Common_packet *empty = (Common_packet *) (bptr + blkoff + recsz); 843 empty->type = EMPTY_PCKT; 844 empty->tsize = blksz - blkoff - recsz; 845 } 846 __collector_memcpy (bptr + blkoff, pckt, recsz); 847 848 /* Release block */ 849 if (hndl->active == 0) 850 { 851 deleteBlock (hndl, iflow, ichunk); 852 return 0; 853 } 854 hndl->blkoff[iflow * NCHUNKS + ichunk] += recsz; 855 sptr[ichunk] = ST_FREE; 856 return 0; 857} 858 859/* 860 * IO_TXT files 861 * 862 * IO_TXT covers the case where many threads are trying to write text messages 863 * sequentially (atomically) to a file. Examples include SP_LOG_FILE and SP_MAP_FILE. 864 * 865 * The file is not written directly, but by writing to mmapped virtual memory. 866 * The granularity of the mapping is a "Buffer". There may be as many as 867 * NBUFS buffers at any one time. 868 * 869 * The current position of the file is handled via hndl->curpos. 870 * 871 * * It is accessed atomically with 64-bit CAS instructions. 872 * 873 * * This 64-bit word encapsulates: 874 * - busy: a bit to lock access to hndl->curpos 875 * - indx: an index indicating which Buffer to use for the current position 876 * - foff: the file offset 877 * 878 * * The contents are accessed with: 879 * - unpack macros: CUR_BUSY CUR_INDX CUR_FOFF 880 * - pack macro : CUR_MAKE 881 * 882 * Conceptually, what happens when a thread wants to write a message is: 883 * - acquire the hndl->curpos "busy" lock 884 * . acquire and map new Buffers if needed to complete the message 885 * . update the file offset 886 * . release the lock 887 * - write to the corresponding buffers 888 * 889 * Each Buffer has a buf->left field that tracks how many more bytes 890 * need to be written to the Buffer. After a thread writes to a Buffer, 891 * it decrements buf->left atomically. When buf->left reaches 0, the 892 * Buffer (mapping) is deleted, freeing the Buffer for a new mapping. 893 * 894 * The actual implementation has some twists: 895 * 896 * * If the entire text message fits into the current Buffer -- that is, 897 * no new Buffers are needed -- the thread does not acquire the lock. 898 * It simply updates hndl->curpos atomically to the new file offset. 899 * 900 * * There are various timeouts to prevent hangs in case of abnormalities. 901 */ 902static int 903is_not_the_log_file (char *fname) 904{ 905 if (CALL_UTIL (strstr)(fname, SP_LOG_FILE) == NULL) 906 return 1; 907 return 0; 908} 909 910static int 911mapBuffer (char *fname, Buffer *buf, off64_t foff) 912{ 913 int rc = 0; 914 /* open fname */ 915 int fd = CALL_UTIL (open)(fname, O_RDWR, 0); 916 if (fd < 0) 917 { 918 TprintfT (0, "mapBuffer ERROR: can't open file: %s\n", fname); 919 if (is_not_the_log_file (fname)) 920 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">%s: mapBuffer</event>\n", 921 SP_JCMD_CERROR, COL_ERROR_FILEOPN, errno, fname); 922 return 1; 923 } 924 TprintfT (DBG_LT2, "mapBuffer pwrite file %s at 0x%llx\n", fname, (long long) foff); 925 926 /* ensure disk space is allocated */ 927 char nl = '\n'; 928 int n = CALL_UTIL (pwrite64)(fd, &nl, sizeof (nl), (off64_t) (foff + blksz - sizeof (nl))); 929 if (n <= 0) 930 { 931 TprintfT (0, "mapBuffer ERROR: can't pwrite file %s at 0x%llx\n", fname, 932 (long long) (foff + blksz - sizeof (nl))); 933 if (is_not_the_log_file (fname)) 934 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">%s: mapBuffer</event>\n", 935 SP_JCMD_CERROR, COL_ERROR_FILETRNC, errno, fname); 936 rc = 1; 937 goto exit; 938 } 939 /* mmap buf->vaddr to fname at foff */ 940 uint8_t *vaddr = CALL_UTIL (mmap64)(buf->vaddr, (size_t) blksz, 941 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, fd, foff); 942 if (vaddr != buf->vaddr) 943 { 944 TprintfT (DBG_LT1, " mapBuffer mmap: start=%p length=%ld (0x%lx) offset=0x%llx ret=%p\n", 945 buf->vaddr, blksz, blksz, (long long) foff, vaddr); 946 TprintfT (0, "mapBuffer ERROR: can't mmap %s: vaddr=%p size=%ld (0x%lx) ret=%p off=0x%llx errno=%d\n", 947 fname, buf->vaddr, blksz, blksz, vaddr, (long long) foff, errno); 948 if (is_not_the_log_file (fname)) 949 __collector_log_write ("<event kind=\"%s\" id=\"%d\" ec=\"%d\">%s: mapBuffer</event>\n", 950 SP_JCMD_CERROR, COL_ERROR_FILEMAP, errno, fname); 951 rc = 1; 952 } 953 else 954 buf->left = blksz; 955exit: 956 CALL_UTIL (close)(fd); 957 958 /* Should we check buffer size? Let's not since: 959 * - IO_TXT is typically not going to be that big 960 * - we want log.xml to be treated specially 961 */ 962 /* exp_size_ck( 1, fname ); */ 963 return rc; 964} 965 966static int 967newBuffer (DataHandle *hndl, uint64_t foff) 968{ 969 /* find a ST_FREE buffer and mark it ST_BUSY */ 970 int ibuf; 971 for (ibuf = 0; ibuf < NBUFS; ibuf++) 972 if (__collector_cas_32 (&hndl->buffers[ibuf].state, ST_FREE, ST_BUSY) == ST_FREE) 973 break; 974 if (ibuf >= NBUFS) 975 { 976 TprintfT (0, "newBuffer ERROR: all buffers busy for %s\n", hndl->fname); 977 return -1; 978 } 979 Buffer *nbuf = hndl->buffers + ibuf; 980 981 /* map buffer */ 982 if (mapBuffer (hndl->fname, nbuf, foff) != 0) 983 { 984 nbuf->state = ST_FREE; 985 ibuf = -1; 986 goto exit; 987 } 988exit: 989 return ibuf; 990} 991 992static void 993writeBuffer (Buffer *buf, int blk_off, char *src, int len) 994{ 995 __collector_memcpy (buf->vaddr + blk_off, src, len); 996 if (__collector_subget_32 (&buf->left, len) == 0) 997 deleteBuffer (buf); 998} 999 1000static void 1001deleteBuffer (Buffer *buf) 1002{ 1003 buf->state = ST_FREE; 1004} 1005 1006int 1007__collector_write_string (DataHandle *hndl, char *src, int len) 1008{ 1009 if (hndl == NULL || !hndl->active) 1010 return 1; 1011 if (len <= 0) 1012 return 0; 1013 1014 hrtime_t timeout = __collector_gethrtime () + 20 * ((hrtime_t) 1000000000); 1015 volatile uint32_t busy = 0; 1016 while (1) 1017 { 1018 uint32_t indx; 1019 uint64_t opos, foff, base; 1020 int blk_off, buf_indices[NBUFS], ibuf, nbufs; 1021 1022 /* read and decode the current pointer */ 1023 opos = hndl->curpos; 1024 busy = CUR_BUSY (opos); 1025 indx = CUR_INDX (opos); 1026 foff = CUR_FOFF (opos); 1027 if (busy == 1) 1028 { 1029 if (__collector_gethrtime () > timeout) 1030 { 1031 /* 1032 * E.g., if another thread deleted the handle 1033 * after we checked hndl->active. 1034 */ 1035 TprintfT (0, "__collector_write_string ERROR: timeout writing length=%d to text file: %s\n", len, hndl->fname); 1036 return 1; 1037 } 1038 continue; 1039 } 1040 1041 /* initial block offset */ 1042 blk_off = foff & (blksz - 1); 1043 1044 /* number of new buffers to map */ 1045 int lastbuf = ((foff + len - 1) >> log2blksz); /* last block file index we will write */ 1046 int firstbuf = ((foff - 1) >> log2blksz); /* last block file index we have written */ 1047 nbufs = lastbuf - firstbuf; 1048 TprintfT (DBG_LT2, "__collector_write_string firstbuf = %d, lastbuf = %d, nbufs = %d, log2blksz = %ld\n", 1049 firstbuf, lastbuf, nbufs, log2blksz); 1050 if (nbufs >= NBUFS) 1051 { 1052 Tprintf (0, "__collector_write_string ERROR: string of length %d too long to be written to text file: %s\n", len, hndl->fname); 1053 return 1; 1054 } 1055 1056 /* things are simple if we don't need new buffers */ 1057 if (nbufs == 0) 1058 { 1059 /* try to update the handle position atomically */ 1060 uint64_t npos = CUR_MAKE (0, indx, foff + len); 1061 if (__collector_cas_64p (&hndl->curpos, &opos, &npos) != opos) 1062 continue; 1063 1064 /* success! copy our string and we're done */ 1065 TprintfT (DBG_LT2, "__collector_write_string writeBuffer[%d]: vaddr = %p, len = %d, foff = %lld, '%s'\n", 1066 indx, hndl->buffers[indx].vaddr, len, (long long) foff, src); 1067 writeBuffer (&hndl->buffers[indx], foff & (blksz - 1), src, len); 1068 break; 1069 } 1070 1071 /* initialize the new signal mask */ 1072 sigset_t new_mask; 1073 sigset_t old_mask; 1074 CALL_UTIL (sigfillset)(&new_mask); 1075 1076 /* 6618470: disable thread cancellation */ 1077 int old_cstate; 1078 pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_cstate); 1079 /* block all signals */ 1080 CALL_UTIL (sigprocmask)(SIG_SETMASK, &new_mask, &old_mask); 1081 1082 /* but if we need new buffers, "lock" the handle pointer */ 1083 uint64_t lpos = CUR_MAKE (1, indx, foff); 1084 if (__collector_cas_64p (&hndl->curpos, &opos, &lpos) != opos) 1085 { 1086 /* restore signal mask */ 1087 CALL_UTIL (sigprocmask)(SIG_SETMASK, &old_mask, NULL); 1088 /* Restore the previous cancellation state */ 1089 pthread_setcancelstate (old_cstate, NULL); 1090 continue; 1091 } 1092 1093 /* map new buffers */ 1094 base = ((foff - 1) & ~(blksz - 1)); /* last buffer to have been mapped */ 1095 for (ibuf = 0; ibuf < nbufs; ibuf++) 1096 { 1097 base += blksz; 1098 buf_indices[ibuf] = newBuffer (hndl, base); 1099 if (buf_indices[ibuf] < 0) 1100 break; 1101 } 1102 1103 /* "unlock" the handle pointer */ 1104 uint64_t npos = CUR_MAKE (0, indx, foff); 1105 if (ibuf == nbufs) 1106 npos = CUR_MAKE (0, buf_indices[nbufs - 1], foff + len); 1107 if (__collector_cas_64p (&hndl->curpos, &lpos, &npos) != lpos) 1108 { 1109 TprintfT (0, "__collector_write_string ERROR: file handle corrupted: %s\n", hndl->fname); 1110 /* 1111 * At this point, the handle is apparently corrupted and 1112 * presumably locked. No telling what's going on. Still 1113 * let's proceed and write our data and let a later thread 1114 * raise an error if it encounters one. 1115 */ 1116 } 1117 1118 /* restore signal mask */ 1119 CALL_UTIL (sigprocmask)(SIG_SETMASK, &old_mask, NULL); 1120 /* Restore the previous cancellation state */ 1121 pthread_setcancelstate (old_cstate, NULL); 1122 1123 /* if we couldn't map all the buffers we needed, don't write any part of the string */ 1124 if (ibuf < nbufs) 1125 { 1126 TprintfT (0, "__collector_write_string ERROR: can't map new buffer: %s\n", hndl->fname); 1127 return 1; 1128 } 1129 1130 /* write any data to the old block */ 1131 if (blk_off > 0) 1132 { 1133 TprintfT (DBG_LT2, "__collector_write_string partial writeBuffer[%d]: len=%ld, foff = %d '%s'\n", 1134 indx, blksz - blk_off, blk_off, src); 1135 writeBuffer (&hndl->buffers[indx], blk_off, src, blksz - blk_off); 1136 src += blksz - blk_off; 1137 len -= blksz - blk_off; 1138 } 1139 1140 /* write data to the new blocks */ 1141 for (ibuf = 0; ibuf < nbufs; ibuf++) 1142 { 1143 int clen = blksz; 1144 if (clen > len) 1145 clen = len; 1146 TprintfT (DBG_LT2, "__collector_write_string continue writeBuffer[%d]: len= %d, %s", 1147 ibuf, clen, src); 1148 writeBuffer (&hndl->buffers[buf_indices[ibuf]], 0, src, clen); 1149 src += clen; 1150 len -= clen; 1151 } 1152 break; 1153 } 1154 return 0; 1155} 1156 1157