1// RequestConnection.cpp
2
3#include <new>
4
5#include <OS.h>
6
7#include "Channel.h"
8#include "Connection.h"
9#include "RequestChannel.h"
10#include "RequestConnection.h"
11#include "RequestHandler.h"
12
13// DownStreamThread
14class RequestConnection::DownStreamThread {
15public:
16	DownStreamThread()
17		: fThread(-1),
18		  fConnection(NULL),
19		  fChannel(NULL),
20		  fHandler(NULL),
21		  fTerminating(false)
22	{
23	}
24
25	~DownStreamThread()
26	{
27		Terminate();
28		delete fChannel;
29	}
30
31	status_t Init(RequestConnection* connection, Channel* channel,
32		RequestHandler* handler)
33	{
34		if (!connection || !channel || !handler)
35			return B_BAD_VALUE;
36		fConnection = connection;
37		fHandler = handler;
38		fChannel = new(std::nothrow) RequestChannel(channel);
39		if (!fChannel)
40			return B_NO_MEMORY;
41		fThread = spawn_thread(_LoopEntry, "down stream thread",
42			B_NORMAL_PRIORITY, this);
43		if (fThread < 0)
44			return fThread;
45		return B_OK;
46	}
47
48	void Run()
49	{
50		resume_thread(fThread);
51	}
52
53	void Terminate()
54	{
55		fTerminating = true;
56		if (fThread > 0 && find_thread(NULL) != fThread) {
57			int32 result;
58			wait_for_thread(fThread, &result);
59		}
60	}
61
62	RequestChannel* GetRequestChannel()
63	{
64		return fChannel;
65	}
66
67private:
68	static int32 _LoopEntry(void* data)
69	{
70		return ((DownStreamThread*)data)->_Loop();
71	}
72
73	int32 _Loop()
74	{
75		while (!fTerminating) {
76			Request* request;
77			status_t error = fChannel->ReceiveRequest(&request);
78			if (error == B_OK) {
79				error = fHandler->HandleRequest(request, fChannel);
80				delete request;
81			}
82			if (error != B_OK)
83				fTerminating = fConnection->DownStreamChannelError(this, error);
84		}
85		return 0;
86	}
87
88private:
89	thread_id			fThread;
90	RequestConnection*	fConnection;
91	RequestChannel*		fChannel;
92	RequestHandler*		fHandler;
93	volatile bool		fTerminating;
94};
95
96
97// RequestConnection
98
99// constructor
100RequestConnection::RequestConnection(Connection* connection,
101	RequestHandler* requestHandler, bool ownsRequestHandler)
102	: fConnection(connection),
103	  fRequestHandler(requestHandler),
104	  fOwnsRequestHandler(ownsRequestHandler),
105	  fThreads(NULL),
106	  fThreadCount(0),
107	  fTerminationCount(0)
108{
109}
110
111// destructor
112RequestConnection::~RequestConnection()
113{
114	Close();
115	delete[] fThreads;
116	delete fConnection;
117	if (fOwnsRequestHandler)
118		delete fRequestHandler;
119}
120
121// Init
122status_t
123RequestConnection::Init()
124{
125	// check parameters
126	if (!fConnection || !fRequestHandler)
127		return B_BAD_VALUE;
128	if (fConnection->CountDownStreamChannels() < 1)
129		return B_ERROR;
130	// create a thread per down-stream channel
131	fThreadCount = fConnection->CountDownStreamChannels();
132	fThreads = new(std::nothrow) DownStreamThread[fThreadCount];
133	if (!fThreads)
134		return B_NO_MEMORY;
135	// initialize the threads
136	for (int32 i = 0; i < fThreadCount; i++) {
137		status_t error = fThreads[i].Init(this,
138			fConnection->DownStreamChannelAt(i), fRequestHandler);
139		if (error != B_OK)
140			return error;
141	}
142	// run the threads
143	for (int32 i = 0; i < fThreadCount; i++)
144		fThreads[i].Run();
145	return B_OK;
146}
147
148// Close
149void
150RequestConnection::Close()
151{
152	atomic_add(&fTerminationCount, 1);
153	if (fConnection)
154		fConnection->Close();
155	if (fThreads) {
156		for (int32 i = 0; i < fThreadCount; i++)
157			fThreads[i].Terminate();
158	}
159}
160
161// SendRequest
162status_t
163RequestConnection::SendRequest(Request* request, Request** reply)
164{
165	return _SendRequest(request, reply, NULL);
166}
167
168// SendRequest
169status_t
170RequestConnection::SendRequest(Request* request, RequestHandler* replyHandler)
171{
172	if (!replyHandler)
173		return B_BAD_VALUE;
174	return _SendRequest(request, NULL, replyHandler);
175}
176
177// DownStreamChannelError
178bool
179RequestConnection::DownStreamChannelError(DownStreamThread* thread,
180	status_t error)
181{
182	if (atomic_add(&fTerminationCount, 1) == 0 && fRequestHandler) {
183		ConnectionBrokenRequest request;
184		request.error = error;
185		fRequestHandler->HandleRequest(&request, thread->GetRequestChannel());
186	}
187	return true;
188}
189
190// _SendRequest
191status_t
192RequestConnection::_SendRequest(Request* request, Request** _reply,
193	RequestHandler* replyHandler)
194{
195	// check parameters
196	if (!request)
197		return B_BAD_VALUE;
198	// get a channel
199	Channel* channel = NULL;
200	status_t error = fConnection->GetUpStreamChannel(&channel);
201	if (error != B_OK)
202		return error;
203	// send the request
204	RequestChannel requestChannel(channel);
205	error = requestChannel.SendRequest(request);
206	// receive the reply
207	Request* reply = NULL;
208	if (error == B_OK && (_reply || replyHandler)) {
209		error = requestChannel.ReceiveRequest(&reply);
210		// handle the reply
211		if (error == B_OK) {
212			if (replyHandler)
213				error = replyHandler->HandleRequest(reply, &requestChannel);
214			if (error == B_OK && _reply)
215				*_reply = reply;
216			else
217				delete reply;
218		}
219	}
220	// cleanup
221	if (fConnection->PutUpStreamChannel(channel) != B_OK) {
222		// Ugh! A serious error. Probably insufficient memory.
223		delete channel;
224	}
225	return error;
226}
227
228