1/*
2 * Copyright 2009-2011, Axel Dörfler, axeld@pinc-software.de.
3 * Copyright 2016, Rene Gollent, rene@gollent.com.
4 */
5
6
7#include <SocketMessenger.h>
8
9#include <Message.h>
10#include <MessageQueue.h>
11#include <Messenger.h>
12
13#include <AutoDeleter.h>
14#include <AutoLocker.h>
15#include <HashMap.h>
16
17
18static const char* kReplySenderIDField = "socket_messenger:sender_reply_id";
19static const char* kReplyReceiverIDField = "socket_messenger:reply_id";
20
21
22// #pragma mark - BSocketMessenger::Private
23
24
25struct BSocketMessenger::Private {
26			typedef SynchronizedHashMap<HashKey64<int64>,
27									BMessage> ReplyMessageMap;
28
29								Private();
30	virtual						~Private();
31
32			void				ClearMessages();
33
34			sem_id				fMessageWaiters;
35			thread_id			fReplyReader;
36			ReplyMessageMap		fReceivedReplies;
37			BMessageQueue		fReceivedMessages;
38			int64				fReplyIDCounter;
39};
40
41
42BSocketMessenger::Private::Private()
43	:
44	fMessageWaiters(-1),
45	fReplyReader(-1),
46	fReceivedReplies(),
47	fReceivedMessages(),
48	fReplyIDCounter(0)
49{
50}
51
52
53BSocketMessenger::Private::~Private()
54{
55	if (fMessageWaiters > 0)
56		delete_sem(fMessageWaiters);
57	if (fReplyReader > 0)
58		wait_for_thread(fReplyReader, NULL);
59
60	ClearMessages();
61}
62
63
64void
65BSocketMessenger::Private::ClearMessages()
66{
67	fReceivedReplies.Clear();
68	AutoLocker<BMessageQueue> queueLocker(fReceivedMessages);
69	while (!fReceivedMessages.IsEmpty())
70		delete fReceivedMessages.NextMessage();
71}
72
73
74// #pragma mark - BSocketMessenger
75
76
77BSocketMessenger::BSocketMessenger()
78	:
79	fPrivateData(NULL),
80	fSocket(),
81	fInitStatus(B_NO_INIT)
82{
83	_Init();
84}
85
86
87BSocketMessenger::BSocketMessenger(const BNetworkAddress& address,
88	bigtime_t timeout)
89	:
90	fPrivateData(NULL),
91	fSocket(),
92	fInitStatus(B_NO_INIT)
93{
94	_Init();
95	SetTo(address, timeout);
96}
97
98
99BSocketMessenger::BSocketMessenger(const BSocket& socket)
100	:
101	fPrivateData(NULL),
102	fSocket(socket),
103	fInitStatus(B_NO_INIT)
104{
105	_Init();
106	if (fPrivateData == NULL)
107		return;
108
109	fInitStatus = socket.InitCheck();
110	if (fInitStatus != B_OK)
111		return;
112
113	fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
114		"Message Reader", B_NORMAL_PRIORITY, this);
115	if (fPrivateData->fReplyReader < 0)
116		fInitStatus = fPrivateData->fReplyReader;
117	if (fInitStatus != B_OK) {
118		exit_thread(fPrivateData->fReplyReader);
119		fPrivateData->fReplyReader = -1;
120		return;
121	}
122
123	fInitStatus = resume_thread(fPrivateData->fReplyReader);
124}
125
126
127BSocketMessenger::~BSocketMessenger()
128{
129	Unset();
130
131	delete fPrivateData;
132}
133
134
135void
136BSocketMessenger::Unset()
137{
138	if (fPrivateData == NULL)
139		return;
140
141	fSocket.Disconnect();
142	wait_for_thread(fPrivateData->fReplyReader, NULL);
143	fPrivateData->fReplyReader = -1;
144	fPrivateData->ClearMessages();
145
146	release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL);
147
148	fInitStatus = B_NO_INIT;
149}
150
151
152status_t
153BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout)
154{
155	Unset();
156
157	if (fPrivateData == NULL)
158		return B_NO_MEMORY;
159
160	fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
161		"Message Reader", B_NORMAL_PRIORITY, this);
162	if (fPrivateData->fReplyReader < 0)
163		return fPrivateData->fReplyReader;
164	status_t error = fSocket.Connect(address, timeout);
165	if (error != B_OK) {
166		Unset();
167		return error;
168	}
169
170	return fInitStatus = resume_thread(fPrivateData->fReplyReader);
171}
172
173
174status_t
175BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout)
176{
177	return SetTo(target.Address(), timeout);
178}
179
180
181status_t
182BSocketMessenger::SendMessage(const BMessage& message)
183{
184	return _SendMessage(message);
185}
186
187
188status_t
189BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply,
190	bigtime_t timeout)
191{
192	int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1);
193	BMessage temp(message);
194	temp.AddInt64(kReplySenderIDField, replyID);
195	status_t error = _SendMessage(temp);
196	if (error != B_OK)
197		return error;
198
199	return _ReadReply(replyID, _reply, timeout);
200}
201
202
203status_t
204BSocketMessenger::SendMessage(const BMessage& message,
205	BMessenger& replyTarget, bigtime_t timeout)
206{
207	BMessage reply;
208	status_t error = SendMessage(message, reply, timeout);
209	if (error != B_OK)
210		return error;
211
212	return replyTarget.SendMessage(&reply);
213}
214
215
216status_t
217BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply)
218{
219	int64 replyID;
220	if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK)
221		return B_NOT_ALLOWED;
222
223	BMessage replyMessage(reply);
224	replyMessage.AddInt64(kReplyReceiverIDField, replyID);
225	return SendMessage(replyMessage);
226}
227
228
229status_t
230BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout)
231{
232	status_t error = B_OK;
233	AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages);
234	for (;;) {
235		if (!fPrivateData->fReceivedMessages.IsEmpty()) {
236			BMessage* nextMessage
237				= fPrivateData->fReceivedMessages.NextMessage();
238			_message = *nextMessage;
239			delete nextMessage;
240			break;
241		}
242
243		queueLocker.Unlock();
244		error = _WaitForMessage(timeout);
245		if (error != B_OK)
246			break;
247		if (!fSocket.IsConnected()) {
248			error = B_CANCELED;
249			break;
250		}
251		queueLocker.Lock();
252	}
253
254	return error;
255}
256
257
258void
259BSocketMessenger::_Init()
260{
261	if (fPrivateData != NULL)
262		return;
263
264	BSocketMessenger::Private* data
265		= new(std::nothrow) BSocketMessenger::Private;
266
267	if (data == NULL) {
268		fInitStatus = B_NO_MEMORY;
269		return;
270	}
271
272	data->fMessageWaiters = create_sem(0, "message waiters");
273	if (data->fMessageWaiters < 0) {
274		fInitStatus = data->fMessageWaiters;
275		delete data;
276		return;
277	}
278
279	fPrivateData = data;
280}
281
282
283status_t
284BSocketMessenger::_WaitForMessage(bigtime_t timeout)
285{
286	for (;;) {
287		status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1,
288			B_RELATIVE_TIMEOUT, timeout);
289		if (error == B_INTERRUPTED) {
290			if (timeout != B_INFINITE_TIMEOUT)
291				timeout -= system_time();
292			continue;
293		}
294		if (error != B_OK)
295			return error;
296		break;
297	}
298
299	return B_OK;
300}
301
302
303status_t
304BSocketMessenger::_SendMessage(const BMessage& message)
305{
306	ssize_t flatSize = message.FlattenedSize();
307	ssize_t totalSize = flatSize + sizeof(ssize_t);
308
309	char* buffer = new(std::nothrow) char[totalSize];
310	if (buffer == NULL)
311		return B_NO_MEMORY;
312
313	ArrayDeleter<char> bufferDeleter(buffer);
314	*(ssize_t*)buffer = flatSize;
315	char* messageBuffer = buffer + sizeof(ssize_t);
316	status_t error = message.Flatten(messageBuffer, flatSize);
317	if (error != B_OK)
318		return error;
319
320	ssize_t size = fSocket.Write(buffer, totalSize);
321	if (size < 0)
322		return size;
323
324	return B_OK;
325}
326
327
328status_t
329BSocketMessenger::_ReadMessage(BMessage& _message)
330{
331	status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT);
332	if (error != B_OK)
333		return error;
334
335	ssize_t size = 0;
336	ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t));
337	if (readSize < 0)
338		return readSize;
339
340	if (readSize != sizeof(ssize_t))
341		return B_BAD_VALUE;
342
343	if (size <= 0)
344		return B_MISMATCHED_VALUES;
345
346	char* buffer = new(std::nothrow) char[size];
347	if (buffer == NULL)
348		return B_NO_MEMORY;
349
350	ArrayDeleter<char> bufferDeleter(buffer);
351	readSize = fSocket.Read(buffer, size);
352	if (readSize < 0)
353		return readSize;
354
355	if (readSize != size)
356		return B_MISMATCHED_VALUES;
357
358	return _message.Unflatten(buffer);
359}
360
361
362status_t
363BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply,
364	bigtime_t timeout)
365{
366	status_t error = B_OK;
367	for (;;) {
368		if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) {
369			reply = fPrivateData->fReceivedReplies.Remove(replyID);
370			break;
371		}
372
373		error = _WaitForMessage(timeout);
374		if (error != B_OK)
375			break;
376		if (!fSocket.IsConnected()) {
377			error = B_CANCELED;
378			break;
379		}
380	}
381
382	return error;
383}
384
385
386status_t
387BSocketMessenger::_MessageReader(void* arg)
388{
389	BSocketMessenger* messenger = (BSocketMessenger*)arg;
390	BSocketMessenger::Private* data = messenger->fPrivateData;
391	status_t error = B_OK;
392
393	for (;;) {
394		BMessage message;
395		error = messenger->_ReadMessage(message);
396		if (error != B_OK)
397			break;
398
399		int64 replyID;
400		if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) {
401			error = data->fReceivedReplies.Put(replyID, message);
402			if (error != B_OK)
403				break;
404		} else {
405			BMessage* queueMessage = new(std::nothrow) BMessage(message);
406			if (queueMessage == NULL) {
407				error = B_NO_MEMORY;
408				break;
409			}
410
411			AutoLocker<BMessageQueue> queueLocker(
412				data->fReceivedMessages);
413			data->fReceivedMessages.AddMessage(queueMessage);
414		}
415
416
417		release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
418	}
419
420	// if we exit our message loop, ensure everybody wakes up and knows
421	// we're no longer receiving messages.
422	messenger->fSocket.Disconnect();
423	release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
424	return error;
425}
426