1/* Copyright (C) 2021 Free Software Foundation, Inc.
2   Contributed by Oracle.
3
4   This file is part of GNU Binutils.
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 3, or (at your option)
9   any later version.
10
11   This program is distributed in the hope that it will be useful,
12   but WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   GNU General Public License for more details.
15
16   You should have received a copy of the GNU General Public License
17   along with this program; if not, write to the Free Software
18   Foundation, 51 Franklin Street - Fifth Floor, Boston,
19   MA 02110-1301, USA.  */
20
21#include "config.h"
22#include <stdio.h>
23#include <stdlib.h>
24#include <signal.h>
25#include <unistd.h>
26#include <iostream>
27#include <iomanip>
28#include <sstream>
29#include <queue>
30#include "vec.h"
31#include "util.h"
32#include "ipcio.h"
33#include "DbeThread.h"
34#include "Experiment.h"
35
36#define ipc_trace               if (ipc_flags) ipc_default_log
37#define ipc_request_trace       if (ipc_flags) ipc_request_log
38#define ipc_response_trace      if (ipc_flags) ipc_response_log
39
40using namespace std;
41
42// IPC implementation
43static const int L_PROGRESS = 0;
44static const int L_INTEGER  = 1;
45static const int L_BOOLEAN  = 2;
46static const int L_LONG     = 3;
47static const int L_STRING   = 4;
48static const int L_DOUBLE   = 5;
49static const int L_ARRAY    = 6;
50static const int L_OBJECT   = 7;
51static const int L_CHAR     = 8;
52
53int currentRequestID;
54int currentChannelID;
55static long maxSize;
56
57extern int cancellableChannelID;
58extern int error_flag;
59extern int ipc_delay_microsec;
60extern FILE *responseLogFileP;
61
62IPCresponse *IPCresponseGlobal;
63
64BufferPool *responseBufferPool;
65
66IPCrequest::IPCrequest (int sz, int reqID, int chID)
67{
68  size = sz;
69  requestID = reqID;
70  channelID = chID;
71  status = INITIALIZED;
72  idx = 0;
73  buf = (char *) malloc (size);
74  cancelImmediate = false;
75}
76
77IPCrequest::~IPCrequest ()
78{
79  free (buf);
80}
81
82void
83IPCrequest::read (void)
84{
85  for (int i = 0; i < size; i++)
86    {
87      int c = getc (stdin);
88      ipc_request_trace (TRACE_LVL_4, "  IPCrequest:getc(stdin): %02x\n", c);
89      buf[i] = c;
90    }
91}
92
93IPCrequestStatus
94IPCrequest::getStatus (void)
95{
96  return status;
97}
98
99void
100IPCrequest::setStatus (IPCrequestStatus newStatus)
101{
102  status = newStatus;
103}
104
105static int
106readByte (IPCrequest* req)
107{
108  int c;
109  int val = 0;
110  for (int i = 0; i < 2; i++)
111    {
112      if (req == NULL)
113	{
114	  c = getc (stdin);
115	  ipc_request_trace (TRACE_LVL_4, "  readByte:getc(stdin): %02x\n", c);
116	}
117      else
118	c = req->rgetc ();
119      switch (c)
120	{
121	case '0': case '1': case '2': case '3':
122	case '4': case '5': case '6': case '7':
123	case '8': case '9':
124	  val = val * 16 + c - '0';
125	  break;
126	case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
127	  val = val * 16 + c - 'a' + 10;
128	  break;
129	case EOF:
130	  val = EOF;
131	  break;
132	default:
133	  fprintf (stderr, "readByte: Unknown byte: %d\n", c);
134	  break;
135	}
136    }
137  return val;
138}
139
140static int
141readIVal (IPCrequest *req)
142{
143  int val = readByte (req);
144  for (int i = 0; i < 3; i++)
145    val = val * 256 + readByte (req);
146  ipc_trace ("  readIVal: %d\n", val);
147  return val;
148}
149
150static String
151readSVal (IPCrequest *req)
152{
153  int len = readIVal (req);
154  if (len == -1)
155    {
156      ipc_trace ("  readSVal: <NULL>\n");
157      return NULL;
158    }
159  char *str = (char *) malloc (len + 1);
160  char *s = str;
161  *s = (char) 0;
162  while (len--)
163    *s++ = req->rgetc ();
164  *s = (char) 0;
165  ipc_trace ("  readSVal: '%s'\n", str);
166  return str;
167}
168
169static long long
170readLVal (IPCrequest *req)
171{
172  long long val = readByte (req);
173  for (int i = 0; i < 7; i++)
174    val = val * 256 + readByte (req);
175  ipc_trace ("  readLVal: %lld\n", val);
176  return val;
177}
178
179static bool
180readBVal (IPCrequest *req)
181{
182  int val = readByte (req);
183  ipc_trace ("  readBVal: %s\n", val == 0 ? "true" : "false");
184  return val != 0;
185}
186
187static char
188readCVal (IPCrequest *req)
189{
190  int val = readByte (req);
191  ipc_trace ("  readCVal: %d\n", val);
192  return (char) val;
193}
194
195static double
196readDVal (IPCrequest *req)
197{
198  String s = readSVal (req);
199  double d = atof (s);
200  free (s);
201  return d;
202}
203
204static Object
205readAVal (IPCrequest *req)
206{
207  bool twoD = false;
208  int type = readByte (req);
209  if (type == L_ARRAY)
210    {
211      twoD = true;
212      type = readByte (req);
213    }
214  ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
215
216  int len = readIVal (req);
217  if (len == -1)
218    return NULL;
219  switch (type)
220    {
221    case L_INTEGER:
222      if (twoD)
223	{
224	  Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
225	  for (int i = 0; i < len; i++)
226	    array->store (i, (Vector<int>*)readAVal (req));
227	  return array;
228	}
229      else
230	{
231	  Vector<int> *array = new Vector<int>(len);
232	  for (int i = 0; i < len; i++)
233	    array->store (i, readIVal (req));
234	  return array;
235	}
236      //break;
237    case L_LONG:
238      if (twoD)
239	{
240	  Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
241	  for (int i = 0; i < len; i++)
242	    array->store (i, (Vector<long long>*)readAVal (req));
243	  return array;
244	}
245      else
246	{
247	  Vector<long long> *array = new Vector<long long>(len);
248	  for (int i = 0; i < len; i++)
249	    array->store (i, readLVal (req));
250	  return array;
251	}
252      //break;
253    case L_DOUBLE:
254      if (twoD)
255	{
256	  Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
257	  for (int i = 0; i < len; i++)
258	    array->store (i, (Vector<double>*)readAVal (req));
259	  return array;
260	}
261      else
262	{
263	  Vector<double> *array = new Vector<double>(len);
264	  for (int i = 0; i < len; i++)
265	    array->store (i, readDVal (req));
266	  return array;
267	}
268      //break;
269    case L_BOOLEAN:
270      if (twoD)
271	{
272	  Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
273	  for (int i = 0; i < len; i++)
274	    array->store (i, (Vector<bool>*)readAVal (req));
275	  return array;
276	}
277      else
278	{
279	  Vector<bool> *array = new Vector<bool>(len);
280	  for (int i = 0; i < len; i++)
281	    array->store (i, readBVal (req));
282	  return array;
283	}
284      //break;
285    case L_CHAR:
286      if (twoD)
287	{
288	  Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
289	  for (int i = 0; i < len; i++)
290	    array->store (i, (Vector<char>*)readAVal (req));
291	  return array;
292	}
293      else
294	{
295	  Vector<char> *array = new Vector<char>(len);
296	  for (int i = 0; i < len; i++)
297	    array->store (i, readCVal (req));
298	  return array;
299	}
300      //break;
301    case L_STRING:
302      if (twoD)
303	{
304	  Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
305	  for (int i = 0; i < len; i++)
306	    array->store (i, (Vector<String>*)readAVal (req));
307	  return array;
308	}
309      else
310	{
311	  Vector<String> *array = new Vector<String>(len);
312	  for (int i = 0; i < len; i++)
313	    array->store (i, readSVal (req));
314	  return array;
315	}
316      //break;
317    case L_OBJECT:
318      if (twoD)
319	{
320	  Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
321	  for (int i = 0; i < len; i++)
322	    array->store (i, (Vector<Object>*)readAVal (req));
323	  return array;
324	}
325      else
326	{
327	  Vector<Object> *array = new Vector<Object>(len);
328	  for (int i = 0; i < len; i++)
329	    array->store (i, readAVal (req));
330	  return array;
331	}
332      //break;
333    default:
334      fprintf (stderr, "readAVal: Unknown code: %d\n", type);
335      break;
336    }
337  return NULL;
338}
339
340static int iVal;
341static bool bVal;
342static long long lVal;
343static String sVal;
344static double dVal;
345static Object aVal;
346
347static void
348readResult (int type, IPCrequest *req)
349{
350  int tVal = readByte (req);
351  switch (tVal)
352    {
353    case L_INTEGER:
354      iVal = readIVal (req);
355      break;
356    case L_LONG:
357      lVal = readLVal (req);
358      break;
359    case L_BOOLEAN:
360      bVal = readBVal (req);
361      break;
362    case L_DOUBLE:
363      dVal = readDVal (req);
364      break;
365    case L_STRING:
366      sVal = readSVal (req);
367      break;
368    case L_ARRAY:
369      aVal = readAVal (req);
370      break;
371    case EOF:
372      fprintf (stderr, "EOF read in readResult\n");
373      sVal = NULL;
374      return;
375    default:
376      fprintf (stderr, "Unknown code: %d\n", tVal);
377      abort ();
378    }
379  if (type != tVal)
380    {
381      fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
382      abort ();
383    }
384}
385
386int
387readInt (IPCrequest *req)
388{
389  readResult (L_INTEGER, req);
390  return iVal;
391}
392
393String
394readString (IPCrequest *req)
395{
396  readResult (L_STRING, req);
397  return sVal;
398}
399
400long long
401readLong (IPCrequest *req)
402{
403  readResult (L_LONG, req);
404  return lVal;
405}
406
407double
408readDouble (IPCrequest *req)
409{
410  readResult (L_DOUBLE, req);
411  return dVal;
412}
413
414bool
415readBoolean (IPCrequest *req)
416{
417  readResult (L_BOOLEAN, req);
418  return bVal;
419}
420
421DbeObj
422readObject (IPCrequest *req)
423{
424  readResult (L_LONG, req);
425  return (DbeObj) lVal;
426}
427
428Object
429readArray (IPCrequest *req)
430{
431  readResult (L_ARRAY, req);
432  return aVal;
433}
434
435// Write
436IPCresponse::IPCresponse (int sz)
437{
438  requestID = -1;
439  channelID = -1;
440  responseType = -1;
441  responseStatus = RESPONSE_STATUS_SUCCESS;
442  sb = new StringBuilder (sz);
443  next = NULL;
444}
445
446IPCresponse::~IPCresponse ()
447{
448  delete sb;
449}
450
451void
452IPCresponse::reset ()
453{
454  requestID = -1;
455  channelID = -1;
456  responseType = -1;
457  responseStatus = RESPONSE_STATUS_SUCCESS;
458  sb->setLength (0);
459}
460
461void
462IPCresponse::sendByte (int b)
463{
464  ipc_trace ("sendByte: %02x %d\n", b, b);
465  sb->appendf ("%02x", b);
466}
467
468void
469IPCresponse::sendIVal (int i)
470{
471  ipc_trace ("sendIVal: %08x %d\n", i, i);
472  sb->appendf ("%08x", i);
473}
474
475void
476IPCresponse::sendLVal (long long l)
477{
478  ipc_trace ("sendLVal: %016llx %lld\n", l, l);
479  sb->appendf ("%016llx", l);
480}
481
482void
483IPCresponse::sendSVal (const char *s)
484{
485  if (s == NULL)
486    {
487      sendIVal (-1);
488      return;
489    }
490  sendIVal ((int) strlen (s));
491  ipc_trace ("sendSVal: %s\n", s);
492  sb->appendf ("%s", s);
493}
494
495void
496IPCresponse::sendBVal (bool b)
497{
498  sendByte (b ? 1 : 0);
499}
500
501void
502IPCresponse::sendCVal (char c)
503{
504  sendByte (c);
505}
506
507void
508IPCresponse::sendDVal (double d)
509{
510  char str[32];
511  snprintf (str, sizeof (str), "%.12f", d);
512  sendSVal (str);
513}
514
515void
516IPCresponse::sendAVal (void *ptr)
517{
518  if (ptr == NULL)
519    {
520      sendByte (L_INTEGER);
521      sendIVal (-1);
522      return;
523    }
524
525  VecType type = ((Vector<void*>*)ptr)->type ();
526  switch (type)
527    {
528    case VEC_INTEGER:
529      {
530	sendByte (L_INTEGER);
531	Vector<int> *array = (Vector<int>*)ptr;
532	sendIVal (array->size ());
533	for (int i = 0; i < array->size (); i++)
534	  sendIVal (array->fetch (i));
535	break;
536      }
537    case VEC_BOOL:
538      {
539	sendByte (L_BOOLEAN);
540	Vector<bool> *array = (Vector<bool>*)ptr;
541	sendIVal (array->size ());
542	for (int i = 0; i < array->size (); i++)
543	  sendBVal (array->fetch (i));
544	break;
545      }
546    case VEC_CHAR:
547      {
548	sendByte (L_CHAR);
549	Vector<char> *array = (Vector<char>*)ptr;
550	sendIVal (array->size ());
551	for (int i = 0; i < array->size (); i++)
552	  sendCVal (array->fetch (i));
553	break;
554      }
555    case VEC_LLONG:
556      {
557	sendByte (L_LONG);
558	Vector<long long> *array = (Vector<long long>*)ptr;
559	sendIVal (array->size ());
560	for (int i = 0; i < array->size (); i++)
561	  sendLVal (array->fetch (i));
562	break;
563      }
564    case VEC_DOUBLE:
565      {
566	sendByte (L_DOUBLE);
567	Vector<double> *array = (Vector<double>*)ptr;
568	sendIVal (array->size ());
569	for (int i = 0; i < array->size (); i++)
570	  sendDVal (array->fetch (i));
571	break;
572      }
573    case VEC_STRING:
574      {
575	sendByte (L_STRING);
576	Vector<String> *array = (Vector<String>*)ptr;
577	sendIVal (array->size ());
578	for (int i = 0; i < array->size (); i++)
579	  sendSVal (array->fetch (i));
580	break;
581      }
582    case VEC_STRINGARR:
583      {
584	sendByte (L_ARRAY);
585	sendByte (L_STRING);
586	Vector<void*> *array = (Vector<void*>*)ptr;
587	sendIVal (array->size ());
588	for (int i = 0; i < array->size (); i++)
589	  sendAVal (array->fetch (i));
590	break;
591      }
592    case VEC_INTARR:
593      {
594	sendByte (L_ARRAY);
595	sendByte (L_INTEGER);
596	Vector<void*> *array = (Vector<void*>*)ptr;
597	sendIVal (array->size ());
598	for (int i = 0; i < array->size (); i++)
599	  sendAVal (array->fetch (i));
600	break;
601      }
602    case VEC_LLONGARR:
603      {
604	sendByte (L_ARRAY);
605	sendByte (L_LONG);
606	Vector<void*> *array = (Vector<void*>*)ptr;
607	sendIVal (array->size ());
608	for (int i = 0; i < array->size (); i++)
609	  sendAVal (array->fetch (i));
610	break;
611      }
612    case VEC_VOIDARR:
613      {
614	sendByte (L_OBJECT);
615	Vector<void*> *array = (Vector<void*>*)ptr;
616	sendIVal (array->size ());
617	for (int i = 0; i < array->size (); i++)
618	  sendAVal (array->fetch (i));
619	break;
620      }
621    default:
622      fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
623      abort ();
624    }
625}
626
627static void
628writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes)
629{
630  if (responseType == RESPONSE_TYPE_HANDSHAKE)
631    nBytes = IPC_VERSION_NUMBER;
632  int use_write = 2;
633  ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes);
634  if (use_write)
635    {
636      char buf[23];
637      if (use_write == 1)
638	{
639	  int i = 0;
640	  snprintf (buf + i, 3, "%2x", HEADER_MARKER);
641	  i += 2;
642	  snprintf (buf + i, 9, "%8x", requestID);
643	  i += 8;
644	  snprintf (buf + i, 3, "%2x", responseType);
645	  i += 2;
646	  snprintf (buf + i, 3, "%2x", responseStatus);
647	  i += 2;
648	  snprintf (buf + i, 9, "%8x", nBytes);
649	}
650      else
651	snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID,
652		  responseType, responseStatus, nBytes);
653      buf[22] = 0;
654      write (1, buf, 22);
655    }
656  else
657    {
658      cout << setfill ('0') << setw (2) << hex << HEADER_MARKER;
659      cout << setfill ('0') << setw (8) << hex << requestID;
660      cout << setfill ('0') << setw (2) << hex << responseType;
661      cout << setfill ('0') << setw (2) << hex << responseStatus;
662      cout << setfill ('0') << setw (8) << hex << nBytes;
663      cout.flush ();
664    }
665  ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n");
666  if (nBytes > maxSize)
667    {
668      maxSize = nBytes;
669      ipc_trace ("New maxsize %ld\n", maxSize);
670    }
671}
672
673bool
674cancelNeeded (int chID)
675{
676  if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
677    return true;
678  else
679    return false;
680}
681
682static void
683writeResponseWithHeader (int requestID, int channelID, int responseType,
684			 int responseStatus, IPCresponse* os)
685{
686  if (cancelNeeded (channelID))
687    {
688      responseStatus = RESPONSE_STATUS_CANCELLED;
689      ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
690      // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
691    }
692  os->setRequestID (requestID);
693  os->setChannelID (channelID);
694  os->setResponseType (responseType);
695  os->setResponseStatus (responseStatus);
696  os->print ();
697  os->reset ();
698  responseBufferPool->recycle (os);
699}
700
701void
702writeAckFast (int requestID)
703{
704  writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0);
705}
706
707void
708writeAck (int requestID, int channelID)
709{
710#if DEBUG
711  char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
712#else /* ^DEBUG */
713  char *s = NULL;
714#endif /* ^DEBUG */
715  if (s)
716    {
717      int i = requestID;
718      int j = channelID;
719      ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
720    }
721  else
722    {
723      IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
724      writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
725			       RESPONSE_STATUS_SUCCESS, OUTS);
726    }
727}
728
729void
730writeHandshake (int requestID, int channelID)
731{
732  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
733  writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
734  // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER);
735}
736
737void
738writeResponseGeneric (int responseStatus, int requestID, int channelID)
739{
740  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
741  writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
742}
743
744BufferPool::BufferPool ()
745{
746  pthread_mutex_init (&p_mutex, NULL);
747  smallBuf = NULL;
748  largeBuf = NULL;
749}
750
751BufferPool::~BufferPool ()
752{
753  for (IPCresponse *p = smallBuf; p;)
754    {
755      IPCresponse *tmp = p;
756      p = tmp->next;
757      delete tmp;
758    }
759  for (IPCresponse *p = largeBuf; p;)
760    {
761      IPCresponse *tmp = p;
762      p = tmp->next;
763      delete tmp;
764    }
765}
766
767IPCresponse*
768BufferPool::getNewResponse (int size)
769{
770  pthread_mutex_lock (&p_mutex);
771  if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
772    size = BUFFER_SIZE_LARGE;
773  IPCresponse *newResponse = NULL;
774  if (size >= BUFFER_SIZE_LARGE)
775    {
776      if (largeBuf)
777	{
778	  newResponse = largeBuf;
779	  largeBuf = largeBuf->next;
780	}
781    }
782  else if (smallBuf)
783    {
784      newResponse = smallBuf;
785      smallBuf = smallBuf->next;
786    }
787  if (newResponse)
788    newResponse->reset ();
789  else
790    {
791      newResponse = new IPCresponse (size);
792      ipc_trace ("GETNEWBUFFER %d\n", size);
793    }
794  pthread_mutex_unlock (&p_mutex);
795  return newResponse;
796}
797
798void
799BufferPool::recycle (IPCresponse *respB)
800{
801  pthread_mutex_lock (&p_mutex);
802  if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
803    {
804      respB->next = largeBuf;
805      largeBuf = respB;
806    }
807  else
808    {
809      respB->next = smallBuf;
810      smallBuf = respB;
811    }
812  pthread_mutex_unlock (&p_mutex);
813}
814
815void
816writeArray (void *ptr, IPCrequest* req)
817{
818  if (req->getStatus () == CANCELLED_IMMEDIATE)
819    return;
820  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
821  OUTS->sendByte (L_ARRAY);
822  OUTS->sendAVal (ptr);
823  writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
824			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
825}
826
827void
828writeString (const char *s, IPCrequest* req)
829{
830  if (req->getStatus () == CANCELLED_IMMEDIATE)
831    return;
832  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
833  OUTS->sendByte (L_STRING);
834  OUTS->sendSVal (s);
835  writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
836			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
837}
838
839void
840writeObject (DbeObj obj, IPCrequest* req)
841{
842  writeLong ((long long) obj, req);
843}
844
845void
846writeBoolean (bool b, IPCrequest* req)
847{
848  if (req->getStatus () == CANCELLED_IMMEDIATE)
849    return;
850  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
851  OUTS->sendByte (L_BOOLEAN);
852  OUTS->sendBVal (b);
853  writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
854			   RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
855}
856
857void
858writeInt (int i, IPCrequest* req)
859{
860  if (req->getStatus () == CANCELLED_IMMEDIATE)
861    return;
862  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
863  OUTS->sendByte (L_INTEGER);
864  OUTS->sendIVal (i);
865  writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
866}
867
868void
869writeChar (char c, IPCrequest* req)
870{
871  if (req->getStatus () == CANCELLED_IMMEDIATE)
872    return;
873  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
874  OUTS->sendByte (L_CHAR);
875  OUTS->sendCVal (c);
876  writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
877}
878
879void
880writeLong (long long l, IPCrequest* req)
881{
882  if (req->getStatus () == CANCELLED_IMMEDIATE)
883    return;
884  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
885  OUTS->sendByte (L_LONG);
886  OUTS->sendLVal (l);
887  writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
888}
889
890void
891writeDouble (double d, IPCrequest* req)
892{
893  if (req->getStatus () == CANCELLED_IMMEDIATE) return;
894  IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
895  OUTS->sendByte (L_DOUBLE);
896  OUTS->sendDVal (d);
897  writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
898}
899
900int
901setProgress (int percentage, const char *proc_str)
902{
903  if (cancelNeeded (currentChannelID))
904    {
905      // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
906      // throw (e1);
907      return 1;
908    }
909  if (NULL == proc_str)
910    return 1;
911  int size = strlen (proc_str) + 100; // 100 bytes for additional data
912  int bs = BUFFER_SIZE_MEDIUM;
913  if (size > BUFFER_SIZE_MEDIUM)
914    {
915      if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
916      bs = BUFFER_SIZE_LARGE;
917    }
918  IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
919  OUTS->sendByte (L_PROGRESS);
920  OUTS->sendIVal (percentage);
921  OUTS->sendSVal (proc_str);
922  writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
923  return 0;
924}
925
926void
927IPCresponse::print (void)
928{
929  if (ipc_delay_microsec)
930    usleep (ipc_delay_microsec);
931  int stringSize = sb->length ();
932  writeResponseHeader (requestID, responseType, responseStatus, stringSize);
933  if (stringSize > 0)
934    {
935      char *s = sb->toString ();
936      hrtime_t start_time = gethrtime ();
937      int use_write = 1;
938      if (use_write)
939	write (1, s, stringSize); // write(1, sb->toString(), stringSize);
940      else
941	{
942	  cout << s;
943	  cout.flush ();
944	}
945      hrtime_t end_time = gethrtime ();
946      unsigned long long time_stamp = end_time - start_time;
947      ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu  nanosec \n", requestID, time_stamp);
948      free (s);
949    }
950}
951
952void
953setCancelRequestedCh (int chID)
954{
955  cancelRequestedChannelID = chID;
956}
957
958void
959readRequestHeader ()
960{
961  int marker = readByte (NULL);
962  if (marker != HEADER_MARKER)
963    {
964      fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
965      error_flag = 1;
966      return;
967    }
968  else
969    ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
970  int requestID = readIVal (NULL);
971  int requestType = readByte (NULL);
972  int channelID = readIVal (NULL);
973  int nBytes = readIVal (NULL);
974  if (requestType == REQUEST_TYPE_HANDSHAKE)
975    {
976      // write the ack directly to the wire, not through the response queue
977      // writeAckFast(requestID);
978      writeAck (requestID, channelID);
979      maxSize = 0;
980      writeHandshake (requestID, channelID);
981      ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
982    }
983  else if (requestType == REQUEST_TYPE_CANCEL)
984    {
985      writeAck (requestID, channelID);
986      ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH:  %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
987      if (channelID == cancellableChannelID)
988	{
989	  // we have worked on at least one request belonging to this channel
990	  writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
991	  setCancelRequestedCh (channelID);
992	  ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
993	  if (channelID == currentChannelID)
994	    //  request for this channel is currently in progress
995	    ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
996	    //              ssp_post_cond(waitingToFinish);
997	}
998      else
999	{
1000	  // FIXME:
1001	  // it is possible that a request for this channel is on the requestQ
1002	  // or has been submitted to the work group queue but is waiting for a thread to pick it up
1003	  writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
1004	  setCancelRequestedCh (channelID);
1005	  ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
1006	}
1007    }
1008  else
1009    {
1010      writeAck (requestID, channelID);
1011      ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
1012      IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
1013      nreq->read ();
1014      ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
1015      if (cancelNeeded (channelID))
1016	{
1017	  ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
1018	  writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
1019	  delete nreq;
1020	  return;
1021	}
1022      DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
1023      ipcThreadPool->put_queue (q);
1024    }
1025}
1026