1/* Copyright (C) 2021 Free Software Foundation, Inc. 2 Contributed by Oracle. 3 4 This file is part of GNU Binutils. 5 6 This program is free software; you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation; either version 3, or (at your option) 9 any later version. 10 11 This program is distributed in the hope that it will be useful, 12 but WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 GNU General Public License for more details. 15 16 You should have received a copy of the GNU General Public License 17 along with this program; if not, write to the Free Software 18 Foundation, 51 Franklin Street - Fifth Floor, Boston, 19 MA 02110-1301, USA. */ 20 21#include "config.h" 22#include <stdio.h> 23#include <stdlib.h> 24#include <signal.h> 25#include <unistd.h> 26#include <iostream> 27#include <iomanip> 28#include <sstream> 29#include <queue> 30#include "vec.h" 31#include "util.h" 32#include "ipcio.h" 33#include "DbeThread.h" 34#include "Experiment.h" 35 36#define ipc_trace if (ipc_flags) ipc_default_log 37#define ipc_request_trace if (ipc_flags) ipc_request_log 38#define ipc_response_trace if (ipc_flags) ipc_response_log 39 40using namespace std; 41 42// IPC implementation 43static const int L_PROGRESS = 0; 44static const int L_INTEGER = 1; 45static const int L_BOOLEAN = 2; 46static const int L_LONG = 3; 47static const int L_STRING = 4; 48static const int L_DOUBLE = 5; 49static const int L_ARRAY = 6; 50static const int L_OBJECT = 7; 51static const int L_CHAR = 8; 52 53int currentRequestID; 54int currentChannelID; 55static long maxSize; 56 57extern int cancellableChannelID; 58extern int error_flag; 59extern int ipc_delay_microsec; 60extern FILE *responseLogFileP; 61 62IPCresponse *IPCresponseGlobal; 63 64BufferPool *responseBufferPool; 65 66IPCrequest::IPCrequest (int sz, int reqID, int chID) 67{ 68 size = sz; 69 requestID = reqID; 70 channelID = chID; 71 status = INITIALIZED; 72 idx = 0; 73 buf = (char *) malloc (size); 74 cancelImmediate = false; 75} 76 77IPCrequest::~IPCrequest () 78{ 79 free (buf); 80} 81 82void 83IPCrequest::read (void) 84{ 85 for (int i = 0; i < size; i++) 86 { 87 int c = getc (stdin); 88 ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c); 89 buf[i] = c; 90 } 91} 92 93IPCrequestStatus 94IPCrequest::getStatus (void) 95{ 96 return status; 97} 98 99void 100IPCrequest::setStatus (IPCrequestStatus newStatus) 101{ 102 status = newStatus; 103} 104 105static int 106readByte (IPCrequest* req) 107{ 108 int c; 109 int val = 0; 110 for (int i = 0; i < 2; i++) 111 { 112 if (req == NULL) 113 { 114 c = getc (stdin); 115 ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c); 116 } 117 else 118 c = req->rgetc (); 119 switch (c) 120 { 121 case '0': case '1': case '2': case '3': 122 case '4': case '5': case '6': case '7': 123 case '8': case '9': 124 val = val * 16 + c - '0'; 125 break; 126 case 'a': case 'b': case 'c': case 'd': case 'e': case 'f': 127 val = val * 16 + c - 'a' + 10; 128 break; 129 case EOF: 130 val = EOF; 131 break; 132 default: 133 fprintf (stderr, "readByte: Unknown byte: %d\n", c); 134 break; 135 } 136 } 137 return val; 138} 139 140static int 141readIVal (IPCrequest *req) 142{ 143 int val = readByte (req); 144 for (int i = 0; i < 3; i++) 145 val = val * 256 + readByte (req); 146 ipc_trace (" readIVal: %d\n", val); 147 return val; 148} 149 150static String 151readSVal (IPCrequest *req) 152{ 153 int len = readIVal (req); 154 if (len == -1) 155 { 156 ipc_trace (" readSVal: <NULL>\n"); 157 return NULL; 158 } 159 char *str = (char *) malloc (len + 1); 160 char *s = str; 161 *s = (char) 0; 162 while (len--) 163 *s++ = req->rgetc (); 164 *s = (char) 0; 165 ipc_trace (" readSVal: '%s'\n", str); 166 return str; 167} 168 169static long long 170readLVal (IPCrequest *req) 171{ 172 long long val = readByte (req); 173 for (int i = 0; i < 7; i++) 174 val = val * 256 + readByte (req); 175 ipc_trace (" readLVal: %lld\n", val); 176 return val; 177} 178 179static bool 180readBVal (IPCrequest *req) 181{ 182 int val = readByte (req); 183 ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false"); 184 return val != 0; 185} 186 187static char 188readCVal (IPCrequest *req) 189{ 190 int val = readByte (req); 191 ipc_trace (" readCVal: %d\n", val); 192 return (char) val; 193} 194 195static double 196readDVal (IPCrequest *req) 197{ 198 String s = readSVal (req); 199 double d = atof (s); 200 free (s); 201 return d; 202} 203 204static Object 205readAVal (IPCrequest *req) 206{ 207 bool twoD = false; 208 int type = readByte (req); 209 if (type == L_ARRAY) 210 { 211 twoD = true; 212 type = readByte (req); 213 } 214 ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type); 215 216 int len = readIVal (req); 217 if (len == -1) 218 return NULL; 219 switch (type) 220 { 221 case L_INTEGER: 222 if (twoD) 223 { 224 Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len); 225 for (int i = 0; i < len; i++) 226 array->store (i, (Vector<int>*)readAVal (req)); 227 return array; 228 } 229 else 230 { 231 Vector<int> *array = new Vector<int>(len); 232 for (int i = 0; i < len; i++) 233 array->store (i, readIVal (req)); 234 return array; 235 } 236 //break; 237 case L_LONG: 238 if (twoD) 239 { 240 Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len); 241 for (int i = 0; i < len; i++) 242 array->store (i, (Vector<long long>*)readAVal (req)); 243 return array; 244 } 245 else 246 { 247 Vector<long long> *array = new Vector<long long>(len); 248 for (int i = 0; i < len; i++) 249 array->store (i, readLVal (req)); 250 return array; 251 } 252 //break; 253 case L_DOUBLE: 254 if (twoD) 255 { 256 Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len); 257 for (int i = 0; i < len; i++) 258 array->store (i, (Vector<double>*)readAVal (req)); 259 return array; 260 } 261 else 262 { 263 Vector<double> *array = new Vector<double>(len); 264 for (int i = 0; i < len; i++) 265 array->store (i, readDVal (req)); 266 return array; 267 } 268 //break; 269 case L_BOOLEAN: 270 if (twoD) 271 { 272 Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len); 273 for (int i = 0; i < len; i++) 274 array->store (i, (Vector<bool>*)readAVal (req)); 275 return array; 276 } 277 else 278 { 279 Vector<bool> *array = new Vector<bool>(len); 280 for (int i = 0; i < len; i++) 281 array->store (i, readBVal (req)); 282 return array; 283 } 284 //break; 285 case L_CHAR: 286 if (twoD) 287 { 288 Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len); 289 for (int i = 0; i < len; i++) 290 array->store (i, (Vector<char>*)readAVal (req)); 291 return array; 292 } 293 else 294 { 295 Vector<char> *array = new Vector<char>(len); 296 for (int i = 0; i < len; i++) 297 array->store (i, readCVal (req)); 298 return array; 299 } 300 //break; 301 case L_STRING: 302 if (twoD) 303 { 304 Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len); 305 for (int i = 0; i < len; i++) 306 array->store (i, (Vector<String>*)readAVal (req)); 307 return array; 308 } 309 else 310 { 311 Vector<String> *array = new Vector<String>(len); 312 for (int i = 0; i < len; i++) 313 array->store (i, readSVal (req)); 314 return array; 315 } 316 //break; 317 case L_OBJECT: 318 if (twoD) 319 { 320 Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len); 321 for (int i = 0; i < len; i++) 322 array->store (i, (Vector<Object>*)readAVal (req)); 323 return array; 324 } 325 else 326 { 327 Vector<Object> *array = new Vector<Object>(len); 328 for (int i = 0; i < len; i++) 329 array->store (i, readAVal (req)); 330 return array; 331 } 332 //break; 333 default: 334 fprintf (stderr, "readAVal: Unknown code: %d\n", type); 335 break; 336 } 337 return NULL; 338} 339 340static int iVal; 341static bool bVal; 342static long long lVal; 343static String sVal; 344static double dVal; 345static Object aVal; 346 347static void 348readResult (int type, IPCrequest *req) 349{ 350 int tVal = readByte (req); 351 switch (tVal) 352 { 353 case L_INTEGER: 354 iVal = readIVal (req); 355 break; 356 case L_LONG: 357 lVal = readLVal (req); 358 break; 359 case L_BOOLEAN: 360 bVal = readBVal (req); 361 break; 362 case L_DOUBLE: 363 dVal = readDVal (req); 364 break; 365 case L_STRING: 366 sVal = readSVal (req); 367 break; 368 case L_ARRAY: 369 aVal = readAVal (req); 370 break; 371 case EOF: 372 fprintf (stderr, "EOF read in readResult\n"); 373 sVal = NULL; 374 return; 375 default: 376 fprintf (stderr, "Unknown code: %d\n", tVal); 377 abort (); 378 } 379 if (type != tVal) 380 { 381 fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type); 382 abort (); 383 } 384} 385 386int 387readInt (IPCrequest *req) 388{ 389 readResult (L_INTEGER, req); 390 return iVal; 391} 392 393String 394readString (IPCrequest *req) 395{ 396 readResult (L_STRING, req); 397 return sVal; 398} 399 400long long 401readLong (IPCrequest *req) 402{ 403 readResult (L_LONG, req); 404 return lVal; 405} 406 407double 408readDouble (IPCrequest *req) 409{ 410 readResult (L_DOUBLE, req); 411 return dVal; 412} 413 414bool 415readBoolean (IPCrequest *req) 416{ 417 readResult (L_BOOLEAN, req); 418 return bVal; 419} 420 421DbeObj 422readObject (IPCrequest *req) 423{ 424 readResult (L_LONG, req); 425 return (DbeObj) lVal; 426} 427 428Object 429readArray (IPCrequest *req) 430{ 431 readResult (L_ARRAY, req); 432 return aVal; 433} 434 435// Write 436IPCresponse::IPCresponse (int sz) 437{ 438 requestID = -1; 439 channelID = -1; 440 responseType = -1; 441 responseStatus = RESPONSE_STATUS_SUCCESS; 442 sb = new StringBuilder (sz); 443 next = NULL; 444} 445 446IPCresponse::~IPCresponse () 447{ 448 delete sb; 449} 450 451void 452IPCresponse::reset () 453{ 454 requestID = -1; 455 channelID = -1; 456 responseType = -1; 457 responseStatus = RESPONSE_STATUS_SUCCESS; 458 sb->setLength (0); 459} 460 461void 462IPCresponse::sendByte (int b) 463{ 464 ipc_trace ("sendByte: %02x %d\n", b, b); 465 sb->appendf ("%02x", b); 466} 467 468void 469IPCresponse::sendIVal (int i) 470{ 471 ipc_trace ("sendIVal: %08x %d\n", i, i); 472 sb->appendf ("%08x", i); 473} 474 475void 476IPCresponse::sendLVal (long long l) 477{ 478 ipc_trace ("sendLVal: %016llx %lld\n", l, l); 479 sb->appendf ("%016llx", l); 480} 481 482void 483IPCresponse::sendSVal (const char *s) 484{ 485 if (s == NULL) 486 { 487 sendIVal (-1); 488 return; 489 } 490 sendIVal ((int) strlen (s)); 491 ipc_trace ("sendSVal: %s\n", s); 492 sb->appendf ("%s", s); 493} 494 495void 496IPCresponse::sendBVal (bool b) 497{ 498 sendByte (b ? 1 : 0); 499} 500 501void 502IPCresponse::sendCVal (char c) 503{ 504 sendByte (c); 505} 506 507void 508IPCresponse::sendDVal (double d) 509{ 510 char str[32]; 511 snprintf (str, sizeof (str), "%.12f", d); 512 sendSVal (str); 513} 514 515void 516IPCresponse::sendAVal (void *ptr) 517{ 518 if (ptr == NULL) 519 { 520 sendByte (L_INTEGER); 521 sendIVal (-1); 522 return; 523 } 524 525 VecType type = ((Vector<void*>*)ptr)->type (); 526 switch (type) 527 { 528 case VEC_INTEGER: 529 { 530 sendByte (L_INTEGER); 531 Vector<int> *array = (Vector<int>*)ptr; 532 sendIVal (array->size ()); 533 for (int i = 0; i < array->size (); i++) 534 sendIVal (array->fetch (i)); 535 break; 536 } 537 case VEC_BOOL: 538 { 539 sendByte (L_BOOLEAN); 540 Vector<bool> *array = (Vector<bool>*)ptr; 541 sendIVal (array->size ()); 542 for (int i = 0; i < array->size (); i++) 543 sendBVal (array->fetch (i)); 544 break; 545 } 546 case VEC_CHAR: 547 { 548 sendByte (L_CHAR); 549 Vector<char> *array = (Vector<char>*)ptr; 550 sendIVal (array->size ()); 551 for (int i = 0; i < array->size (); i++) 552 sendCVal (array->fetch (i)); 553 break; 554 } 555 case VEC_LLONG: 556 { 557 sendByte (L_LONG); 558 Vector<long long> *array = (Vector<long long>*)ptr; 559 sendIVal (array->size ()); 560 for (int i = 0; i < array->size (); i++) 561 sendLVal (array->fetch (i)); 562 break; 563 } 564 case VEC_DOUBLE: 565 { 566 sendByte (L_DOUBLE); 567 Vector<double> *array = (Vector<double>*)ptr; 568 sendIVal (array->size ()); 569 for (int i = 0; i < array->size (); i++) 570 sendDVal (array->fetch (i)); 571 break; 572 } 573 case VEC_STRING: 574 { 575 sendByte (L_STRING); 576 Vector<String> *array = (Vector<String>*)ptr; 577 sendIVal (array->size ()); 578 for (int i = 0; i < array->size (); i++) 579 sendSVal (array->fetch (i)); 580 break; 581 } 582 case VEC_STRINGARR: 583 { 584 sendByte (L_ARRAY); 585 sendByte (L_STRING); 586 Vector<void*> *array = (Vector<void*>*)ptr; 587 sendIVal (array->size ()); 588 for (int i = 0; i < array->size (); i++) 589 sendAVal (array->fetch (i)); 590 break; 591 } 592 case VEC_INTARR: 593 { 594 sendByte (L_ARRAY); 595 sendByte (L_INTEGER); 596 Vector<void*> *array = (Vector<void*>*)ptr; 597 sendIVal (array->size ()); 598 for (int i = 0; i < array->size (); i++) 599 sendAVal (array->fetch (i)); 600 break; 601 } 602 case VEC_LLONGARR: 603 { 604 sendByte (L_ARRAY); 605 sendByte (L_LONG); 606 Vector<void*> *array = (Vector<void*>*)ptr; 607 sendIVal (array->size ()); 608 for (int i = 0; i < array->size (); i++) 609 sendAVal (array->fetch (i)); 610 break; 611 } 612 case VEC_VOIDARR: 613 { 614 sendByte (L_OBJECT); 615 Vector<void*> *array = (Vector<void*>*)ptr; 616 sendIVal (array->size ()); 617 for (int i = 0; i < array->size (); i++) 618 sendAVal (array->fetch (i)); 619 break; 620 } 621 default: 622 fprintf (stderr, "sendAVal: Unknown type: %d\n", type); 623 abort (); 624 } 625} 626 627static void 628writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes) 629{ 630 if (responseType == RESPONSE_TYPE_HANDSHAKE) 631 nBytes = IPC_VERSION_NUMBER; 632 int use_write = 2; 633 ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes); 634 if (use_write) 635 { 636 char buf[23]; 637 if (use_write == 1) 638 { 639 int i = 0; 640 snprintf (buf + i, 3, "%2x", HEADER_MARKER); 641 i += 2; 642 snprintf (buf + i, 9, "%8x", requestID); 643 i += 8; 644 snprintf (buf + i, 3, "%2x", responseType); 645 i += 2; 646 snprintf (buf + i, 3, "%2x", responseStatus); 647 i += 2; 648 snprintf (buf + i, 9, "%8x", nBytes); 649 } 650 else 651 snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID, 652 responseType, responseStatus, nBytes); 653 buf[22] = 0; 654 write (1, buf, 22); 655 } 656 else 657 { 658 cout << setfill ('0') << setw (2) << hex << HEADER_MARKER; 659 cout << setfill ('0') << setw (8) << hex << requestID; 660 cout << setfill ('0') << setw (2) << hex << responseType; 661 cout << setfill ('0') << setw (2) << hex << responseStatus; 662 cout << setfill ('0') << setw (8) << hex << nBytes; 663 cout.flush (); 664 } 665 ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n"); 666 if (nBytes > maxSize) 667 { 668 maxSize = nBytes; 669 ipc_trace ("New maxsize %ld\n", maxSize); 670 } 671} 672 673bool 674cancelNeeded (int chID) 675{ 676 if (chID == cancellableChannelID && chID == cancelRequestedChannelID) 677 return true; 678 else 679 return false; 680} 681 682static void 683writeResponseWithHeader (int requestID, int channelID, int responseType, 684 int responseStatus, IPCresponse* os) 685{ 686 if (cancelNeeded (channelID)) 687 { 688 responseStatus = RESPONSE_STATUS_CANCELLED; 689 ipc_trace ("CANCELLING %d %d\n", requestID, channelID); 690 // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here 691 } 692 os->setRequestID (requestID); 693 os->setChannelID (channelID); 694 os->setResponseType (responseType); 695 os->setResponseStatus (responseStatus); 696 os->print (); 697 os->reset (); 698 responseBufferPool->recycle (os); 699} 700 701void 702writeAckFast (int requestID) 703{ 704 writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0); 705} 706 707void 708writeAck (int requestID, int channelID) 709{ 710#if DEBUG 711 char *s = getenv (NTXT ("SP_NO_IPC_ACK")); 712#else /* ^DEBUG */ 713 char *s = NULL; 714#endif /* ^DEBUG */ 715 if (s) 716 { 717 int i = requestID; 718 int j = channelID; 719 ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j); 720 } 721 else 722 { 723 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); 724 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK, 725 RESPONSE_STATUS_SUCCESS, OUTS); 726 } 727} 728 729void 730writeHandshake (int requestID, int channelID) 731{ 732 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); 733 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS); 734 // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER); 735} 736 737void 738writeResponseGeneric (int responseStatus, int requestID, int channelID) 739{ 740 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); 741 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS); 742} 743 744BufferPool::BufferPool () 745{ 746 pthread_mutex_init (&p_mutex, NULL); 747 smallBuf = NULL; 748 largeBuf = NULL; 749} 750 751BufferPool::~BufferPool () 752{ 753 for (IPCresponse *p = smallBuf; p;) 754 { 755 IPCresponse *tmp = p; 756 p = tmp->next; 757 delete tmp; 758 } 759 for (IPCresponse *p = largeBuf; p;) 760 { 761 IPCresponse *tmp = p; 762 p = tmp->next; 763 delete tmp; 764 } 765} 766 767IPCresponse* 768BufferPool::getNewResponse (int size) 769{ 770 pthread_mutex_lock (&p_mutex); 771 if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE) 772 size = BUFFER_SIZE_LARGE; 773 IPCresponse *newResponse = NULL; 774 if (size >= BUFFER_SIZE_LARGE) 775 { 776 if (largeBuf) 777 { 778 newResponse = largeBuf; 779 largeBuf = largeBuf->next; 780 } 781 } 782 else if (smallBuf) 783 { 784 newResponse = smallBuf; 785 smallBuf = smallBuf->next; 786 } 787 if (newResponse) 788 newResponse->reset (); 789 else 790 { 791 newResponse = new IPCresponse (size); 792 ipc_trace ("GETNEWBUFFER %d\n", size); 793 } 794 pthread_mutex_unlock (&p_mutex); 795 return newResponse; 796} 797 798void 799BufferPool::recycle (IPCresponse *respB) 800{ 801 pthread_mutex_lock (&p_mutex); 802 if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE) 803 { 804 respB->next = largeBuf; 805 largeBuf = respB; 806 } 807 else 808 { 809 respB->next = smallBuf; 810 smallBuf = respB; 811 } 812 pthread_mutex_unlock (&p_mutex); 813} 814 815void 816writeArray (void *ptr, IPCrequest* req) 817{ 818 if (req->getStatus () == CANCELLED_IMMEDIATE) 819 return; 820 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE); 821 OUTS->sendByte (L_ARRAY); 822 OUTS->sendAVal (ptr); 823 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), 824 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 825} 826 827void 828writeString (const char *s, IPCrequest* req) 829{ 830 if (req->getStatus () == CANCELLED_IMMEDIATE) 831 return; 832 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE); 833 OUTS->sendByte (L_STRING); 834 OUTS->sendSVal (s); 835 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), 836 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 837} 838 839void 840writeObject (DbeObj obj, IPCrequest* req) 841{ 842 writeLong ((long long) obj, req); 843} 844 845void 846writeBoolean (bool b, IPCrequest* req) 847{ 848 if (req->getStatus () == CANCELLED_IMMEDIATE) 849 return; 850 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 851 OUTS->sendByte (L_BOOLEAN); 852 OUTS->sendBVal (b); 853 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), 854 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 855} 856 857void 858writeInt (int i, IPCrequest* req) 859{ 860 if (req->getStatus () == CANCELLED_IMMEDIATE) 861 return; 862 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 863 OUTS->sendByte (L_INTEGER); 864 OUTS->sendIVal (i); 865 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 866} 867 868void 869writeChar (char c, IPCrequest* req) 870{ 871 if (req->getStatus () == CANCELLED_IMMEDIATE) 872 return; 873 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 874 OUTS->sendByte (L_CHAR); 875 OUTS->sendCVal (c); 876 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 877} 878 879void 880writeLong (long long l, IPCrequest* req) 881{ 882 if (req->getStatus () == CANCELLED_IMMEDIATE) 883 return; 884 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 885 OUTS->sendByte (L_LONG); 886 OUTS->sendLVal (l); 887 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 888} 889 890void 891writeDouble (double d, IPCrequest* req) 892{ 893 if (req->getStatus () == CANCELLED_IMMEDIATE) return; 894 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); 895 OUTS->sendByte (L_DOUBLE); 896 OUTS->sendDVal (d); 897 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); 898} 899 900int 901setProgress (int percentage, const char *proc_str) 902{ 903 if (cancelNeeded (currentChannelID)) 904 { 905 // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException(); 906 // throw (e1); 907 return 1; 908 } 909 if (NULL == proc_str) 910 return 1; 911 int size = strlen (proc_str) + 100; // 100 bytes for additional data 912 int bs = BUFFER_SIZE_MEDIUM; 913 if (size > BUFFER_SIZE_MEDIUM) 914 { 915 if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen 916 bs = BUFFER_SIZE_LARGE; 917 } 918 IPCresponse *OUTS = responseBufferPool->getNewResponse (bs); 919 OUTS->sendByte (L_PROGRESS); 920 OUTS->sendIVal (percentage); 921 OUTS->sendSVal (proc_str); 922 writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS); 923 return 0; 924} 925 926void 927IPCresponse::print (void) 928{ 929 if (ipc_delay_microsec) 930 usleep (ipc_delay_microsec); 931 int stringSize = sb->length (); 932 writeResponseHeader (requestID, responseType, responseStatus, stringSize); 933 if (stringSize > 0) 934 { 935 char *s = sb->toString (); 936 hrtime_t start_time = gethrtime (); 937 int use_write = 1; 938 if (use_write) 939 write (1, s, stringSize); // write(1, sb->toString(), stringSize); 940 else 941 { 942 cout << s; 943 cout.flush (); 944 } 945 hrtime_t end_time = gethrtime (); 946 unsigned long long time_stamp = end_time - start_time; 947 ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu nanosec \n", requestID, time_stamp); 948 free (s); 949 } 950} 951 952void 953setCancelRequestedCh (int chID) 954{ 955 cancelRequestedChannelID = chID; 956} 957 958void 959readRequestHeader () 960{ 961 int marker = readByte (NULL); 962 if (marker != HEADER_MARKER) 963 { 964 fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker); 965 error_flag = 1; 966 return; 967 } 968 else 969 ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n"); 970 int requestID = readIVal (NULL); 971 int requestType = readByte (NULL); 972 int channelID = readIVal (NULL); 973 int nBytes = readIVal (NULL); 974 if (requestType == REQUEST_TYPE_HANDSHAKE) 975 { 976 // write the ack directly to the wire, not through the response queue 977 // writeAckFast(requestID); 978 writeAck (requestID, channelID); 979 maxSize = 0; 980 writeHandshake (requestID, channelID); 981 ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); 982 } 983 else if (requestType == REQUEST_TYPE_CANCEL) 984 { 985 writeAck (requestID, channelID); 986 ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); 987 if (channelID == cancellableChannelID) 988 { 989 // we have worked on at least one request belonging to this channel 990 writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID); 991 setCancelRequestedCh (channelID); 992 ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID); 993 if (channelID == currentChannelID) 994 // request for this channel is currently in progress 995 ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION"); 996 // ssp_post_cond(waitingToFinish); 997 } 998 else 999 { 1000 // FIXME: 1001 // it is possible that a request for this channel is on the requestQ 1002 // or has been submitted to the work group queue but is waiting for a thread to pick it up 1003 writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID); 1004 setCancelRequestedCh (channelID); 1005 ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID); 1006 } 1007 } 1008 else 1009 { 1010 writeAck (requestID, channelID); 1011 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); 1012 IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID); 1013 nreq->read (); 1014 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID); 1015 if (cancelNeeded (channelID)) 1016 { 1017 ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID); 1018 writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID); 1019 delete nreq; 1020 return; 1021 } 1022 DbeQueue *q = new DbeQueue (ipc_doWork, nreq); 1023 ipcThreadPool->put_queue (q); 1024 } 1025} 1026