1// RequestConnection.cpp 2 3#include <new> 4 5#include <OS.h> 6 7#include "Channel.h" 8#include "Connection.h" 9#include "RequestChannel.h" 10#include "RequestConnection.h" 11#include "RequestHandler.h" 12 13// DownStreamThread 14class RequestConnection::DownStreamThread { 15public: 16 DownStreamThread() 17 : fThread(-1), 18 fConnection(NULL), 19 fChannel(NULL), 20 fHandler(NULL), 21 fTerminating(false) 22 { 23 } 24 25 ~DownStreamThread() 26 { 27 Terminate(); 28 delete fChannel; 29 } 30 31 status_t Init(RequestConnection* connection, Channel* channel, 32 RequestHandler* handler) 33 { 34 if (!connection || !channel || !handler) 35 return B_BAD_VALUE; 36 fConnection = connection; 37 fHandler = handler; 38 fChannel = new(std::nothrow) RequestChannel(channel); 39 if (!fChannel) 40 return B_NO_MEMORY; 41 fThread = spawn_thread(_LoopEntry, "down stream thread", 42 B_NORMAL_PRIORITY, this); 43 if (fThread < 0) 44 return fThread; 45 return B_OK; 46 } 47 48 void Run() 49 { 50 resume_thread(fThread); 51 } 52 53 void Terminate() 54 { 55 fTerminating = true; 56 if (fThread > 0 && find_thread(NULL) != fThread) { 57 int32 result; 58 wait_for_thread(fThread, &result); 59 } 60 } 61 62 RequestChannel* GetRequestChannel() 63 { 64 return fChannel; 65 } 66 67private: 68 static int32 _LoopEntry(void* data) 69 { 70 return ((DownStreamThread*)data)->_Loop(); 71 } 72 73 int32 _Loop() 74 { 75 while (!fTerminating) { 76 Request* request; 77 status_t error = fChannel->ReceiveRequest(&request); 78 if (error == B_OK) { 79 error = fHandler->HandleRequest(request, fChannel); 80 delete request; 81 } 82 if (error != B_OK) 83 fTerminating = fConnection->DownStreamChannelError(this, error); 84 } 85 return 0; 86 } 87 88private: 89 thread_id fThread; 90 RequestConnection* fConnection; 91 RequestChannel* fChannel; 92 RequestHandler* fHandler; 93 volatile bool fTerminating; 94}; 95 96 97// RequestConnection 98 99// constructor 100RequestConnection::RequestConnection(Connection* connection, 101 RequestHandler* requestHandler, bool ownsRequestHandler) 102 : fConnection(connection), 103 fRequestHandler(requestHandler), 104 fOwnsRequestHandler(ownsRequestHandler), 105 fThreads(NULL), 106 fThreadCount(0), 107 fTerminationCount(0) 108{ 109} 110 111// destructor 112RequestConnection::~RequestConnection() 113{ 114 Close(); 115 delete[] fThreads; 116 delete fConnection; 117 if (fOwnsRequestHandler) 118 delete fRequestHandler; 119} 120 121// Init 122status_t 123RequestConnection::Init() 124{ 125 // check parameters 126 if (!fConnection || !fRequestHandler) 127 return B_BAD_VALUE; 128 if (fConnection->CountDownStreamChannels() < 1) 129 return B_ERROR; 130 // create a thread per down-stream channel 131 fThreadCount = fConnection->CountDownStreamChannels(); 132 fThreads = new(std::nothrow) DownStreamThread[fThreadCount]; 133 if (!fThreads) 134 return B_NO_MEMORY; 135 // initialize the threads 136 for (int32 i = 0; i < fThreadCount; i++) { 137 status_t error = fThreads[i].Init(this, 138 fConnection->DownStreamChannelAt(i), fRequestHandler); 139 if (error != B_OK) 140 return error; 141 } 142 // run the threads 143 for (int32 i = 0; i < fThreadCount; i++) 144 fThreads[i].Run(); 145 return B_OK; 146} 147 148// Close 149void 150RequestConnection::Close() 151{ 152 atomic_add(&fTerminationCount, 1); 153 if (fConnection) 154 fConnection->Close(); 155 if (fThreads) { 156 for (int32 i = 0; i < fThreadCount; i++) 157 fThreads[i].Terminate(); 158 } 159} 160 161// SendRequest 162status_t 163RequestConnection::SendRequest(Request* request, Request** reply) 164{ 165 return _SendRequest(request, reply, NULL); 166} 167 168// SendRequest 169status_t 170RequestConnection::SendRequest(Request* request, RequestHandler* replyHandler) 171{ 172 if (!replyHandler) 173 return B_BAD_VALUE; 174 return _SendRequest(request, NULL, replyHandler); 175} 176 177// DownStreamChannelError 178bool 179RequestConnection::DownStreamChannelError(DownStreamThread* thread, 180 status_t error) 181{ 182 if (atomic_add(&fTerminationCount, 1) == 0 && fRequestHandler) { 183 ConnectionBrokenRequest request; 184 request.error = error; 185 fRequestHandler->HandleRequest(&request, thread->GetRequestChannel()); 186 } 187 return true; 188} 189 190// _SendRequest 191status_t 192RequestConnection::_SendRequest(Request* request, Request** _reply, 193 RequestHandler* replyHandler) 194{ 195 // check parameters 196 if (!request) 197 return B_BAD_VALUE; 198 // get a channel 199 Channel* channel = NULL; 200 status_t error = fConnection->GetUpStreamChannel(&channel); 201 if (error != B_OK) 202 return error; 203 // send the request 204 RequestChannel requestChannel(channel); 205 error = requestChannel.SendRequest(request); 206 // receive the reply 207 Request* reply = NULL; 208 if (error == B_OK && (_reply || replyHandler)) { 209 error = requestChannel.ReceiveRequest(&reply); 210 // handle the reply 211 if (error == B_OK) { 212 if (replyHandler) 213 error = replyHandler->HandleRequest(reply, &requestChannel); 214 if (error == B_OK && _reply) 215 *_reply = reply; 216 else 217 delete reply; 218 } 219 } 220 // cleanup 221 if (fConnection->PutUpStreamChannel(channel) != B_OK) { 222 // Ugh! A serious error. Probably insufficient memory. 223 delete channel; 224 } 225 return error; 226} 227 228