1/*
2 * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 */
5
6#include <map>
7#include <new>
8#include <set>
9#include <string.h>
10
11#include <AutoDeleter.h>
12#include <Autolock.h>
13#include <DataIO.h>
14#include <MessagePrivate.h>
15#include <MessengerPrivate.h>
16#include <OS.h>
17#include <TokenSpace.h>
18#include <util/DoublyLinkedList.h>
19
20#include <messaging.h>
21
22#include "Debug.h"
23#include "MessageDeliverer.h"
24#include "Referenceable.h"
25
26using std::map;
27using std::nothrow;
28using std::set;
29
30// sDeliverer -- the singleton instance
31MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
32
33static const bigtime_t	kRetryDelay			= 100000;			// 100 ms
34
35// per port sanity limits
36static const int32		kMaxMessagesPerPort	= 10000;
37static const int32		kMaxDataPerPort		= 50 * 1024 * 1024;	// 50 MB
38
39
40// MessagingTargetSet
41
42// destructor
43MessagingTargetSet::~MessagingTargetSet()
44{
45}
46
47
48// #pragma mark -
49
50// DefaultMessagingTargetSet
51
52// constructor
53DefaultMessagingTargetSet::DefaultMessagingTargetSet(
54		const messaging_target *targets, int32 targetCount)
55	: MessagingTargetSet(),
56	  fTargets(targets),
57	  fTargetCount(targetCount),
58	  fNextIndex(0)
59{
60}
61
62// destructor
63DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
64{
65}
66
67// HasNext
68bool
69DefaultMessagingTargetSet::HasNext() const
70{
71	return (fNextIndex < fTargetCount);
72}
73
74// Next
75bool
76DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
77{
78	if (fNextIndex >= fTargetCount)
79		return false;
80
81	port = fTargets[fNextIndex].port;
82	token = fTargets[fNextIndex].token;
83	fNextIndex++;
84
85	return true;
86}
87
88// Rewind
89void
90DefaultMessagingTargetSet::Rewind()
91{
92	fNextIndex = 0;
93}
94
95
96// #pragma mark -
97
98// SingleMessagingTargetSet
99
100// constructor
101SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
102	: MessagingTargetSet(),
103	  fAtBeginning(true)
104{
105	BMessenger::Private messengerPrivate(target);
106	fPort = messengerPrivate.Port();
107	fToken = (messengerPrivate.IsPreferredTarget()
108		? B_PREFERRED_TOKEN : messengerPrivate.Token());
109}
110
111// constructor
112SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
113	: MessagingTargetSet(),
114	  fPort(port),
115	  fToken(token),
116	  fAtBeginning(true)
117{
118}
119
120// destructor
121SingleMessagingTargetSet::~SingleMessagingTargetSet()
122{
123}
124
125// HasNext
126bool
127SingleMessagingTargetSet::HasNext() const
128{
129	return fAtBeginning;
130}
131
132// Next
133bool
134SingleMessagingTargetSet::Next(port_id &port, int32 &token)
135{
136	if (!fAtBeginning)
137		return false;
138
139	port = fPort;
140	token = fToken;
141	fAtBeginning = false;
142
143	return true;
144}
145
146// Rewind
147void
148SingleMessagingTargetSet::Rewind()
149{
150	fAtBeginning = true;
151}
152
153
154// #pragma mark -
155
156// Message
157/*!	\brief Encapsulates a message to be delivered.
158
159	Besides the flattened message it also stores the when the message was
160	created and when the delivery attempts shall time out.
161*/
162class MessageDeliverer::Message : public BReferenceable {
163public:
164	Message(void *data, int32 dataSize, bigtime_t timeout)
165		: BReferenceable(),
166		  fData(data),
167		  fDataSize(dataSize),
168		  fCreationTime(system_time()),
169		  fBusy(false)
170	{
171		if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
172			fTimeoutTime = B_INFINITE_TIMEOUT;
173		else if (timeout <= 0)
174			fTimeoutTime = fCreationTime;
175		else
176			fTimeoutTime = fCreationTime + timeout;
177	}
178
179	~Message()
180	{
181		free(fData);
182	}
183
184	void *Data() const
185	{
186		return fData;
187	}
188
189	int32 DataSize() const
190	{
191		return fDataSize;
192	}
193
194	bigtime_t CreationTime() const
195	{
196		return fCreationTime;
197	}
198
199	bigtime_t TimeoutTime() const
200	{
201		return fTimeoutTime;
202	}
203
204	bool HasTimeout() const
205	{
206		return (fTimeoutTime < B_INFINITE_TIMEOUT);
207	}
208
209	void SetBusy(bool busy)
210	{
211		fBusy = busy;
212	}
213
214	bool IsBusy() const
215	{
216		return fBusy;
217	}
218
219private:
220	void		*fData;
221	int32		fDataSize;
222	bigtime_t	fCreationTime;
223	bigtime_t	fTimeoutTime;
224	bool		fBusy;
225};
226
227// TargetMessage
228/*!	\brief Encapsulates a Message to be sent to a specific handler.
229
230	A TargetMessage is always associated with (i.e. queued in) a TargetPort.
231	While a Message stores only the message data and some timing info, this
232	object adds the token of a the target BHandler.
233
234	A Message can be referred to by more than one TargetMessage (when
235	broadcasting), but a TargetMessage is referred to exactly once, by
236	the TargetPort.
237*/
238class MessageDeliverer::TargetMessage
239	: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
240public:
241	TargetMessage(Message *message, int32 token)
242		: fMessage(message),
243		  fToken(token)
244	{
245		if (fMessage)
246			fMessage->AcquireReference();
247	}
248
249	~TargetMessage()
250	{
251		if (fMessage)
252			fMessage->ReleaseReference();
253	}
254
255	Message *GetMessage() const
256	{
257		return fMessage;
258	}
259
260	int32 Token() const
261	{
262		return fToken;
263	}
264
265private:
266	Message				*fMessage;
267	int32				fToken;
268};
269
270// TargetMessageHandle
271/*!	\brief A small wrapper for TargetMessage providing a complete order.
272
273	This class only exists to provide the comparison operators required to
274	put a TargetMessage into a set. The order implemented is by ascending by
275	timeout time (primary) and by TargetMessage pointer (secondary).
276	Hence TargetMessageHandles referring to the same TargetMessage are equal
277	(and only those).
278*/
279class MessageDeliverer::TargetMessageHandle {
280public:
281	TargetMessageHandle(TargetMessage *message)
282		: fMessage(message)
283	{
284	}
285
286	TargetMessageHandle(const TargetMessageHandle &other)
287		: fMessage(other.fMessage)
288	{
289	}
290
291	TargetMessage *GetMessage() const
292	{
293		return fMessage;
294	}
295
296	TargetMessageHandle &operator=(const TargetMessageHandle &other)
297	{
298		fMessage = other.fMessage;
299		return *this;
300	}
301
302	bool operator==(const TargetMessageHandle &other) const
303	{
304		return (fMessage == other.fMessage);
305	}
306
307	bool operator!=(const TargetMessageHandle &other) const
308	{
309		return (fMessage != other.fMessage);
310	}
311
312	bool operator<(const TargetMessageHandle &other) const
313	{
314		bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
315		bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
316		if (timeout < otherTimeout)
317			return true;
318		if (timeout > otherTimeout)
319			return false;
320		return (fMessage < other.fMessage);
321	}
322
323private:
324	TargetMessage	*fMessage;
325};
326
327// TargetPort
328/*!	\brief Represents a full target port, queuing the not yet delivered
329		   messages.
330
331	A TargetPort internally queues TargetMessages in the order the are to be
332	delivered. Furthermore the object maintains an ordered set of
333	TargetMessages that can timeout (in ascending order of timeout time), so
334	that timed out messages can be dropped easily.
335*/
336class MessageDeliverer::TargetPort {
337public:
338	TargetPort(port_id portID)
339		: fPortID(portID),
340		  fMessages(),
341		  fMessageCount(0),
342		  fMessageSize(0)
343	{
344	}
345
346	~TargetPort()
347	{
348		while (!fMessages.IsEmpty())
349			PopMessage();
350	}
351
352	port_id PortID() const
353	{
354		return fPortID;
355	}
356
357	status_t PushMessage(Message *message, int32 token)
358	{
359PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32 ", %p, %"
360B_PRId32 ")\n", fPortID, message, token);
361		// create a target message
362		TargetMessage *targetMessage
363			= new(nothrow) TargetMessage(message, token);
364		if (!targetMessage)
365			return B_NO_MEMORY;
366
367		// push it
368		fMessages.Insert(targetMessage);
369		fMessageCount++;
370		fMessageSize += targetMessage->GetMessage()->DataSize();
371
372		// add it to the timeoutable messages, if it has a timeout
373		if (message->HasTimeout())
374			fTimeoutableMessages.insert(targetMessage);
375
376		_EnforceLimits();
377
378		return B_OK;
379	}
380
381	Message *PeekMessage(int32 &token) const
382	{
383		if (!fMessages.Head())
384			return NULL;
385
386		token = fMessages.Head()->Token();
387		return fMessages.Head()->GetMessage();
388	}
389
390	void PopMessage()
391	{
392		if (fMessages.Head()) {
393PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32 ", %p\n",
394fPortID, fMessages.Head()->GetMessage());
395			_RemoveMessage(fMessages.Head());
396		}
397	}
398
399	void DropTimedOutMessages()
400	{
401		bigtime_t now = system_time();
402
403		while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
404			TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
405			if (message->GetMessage()->TimeoutTime() > now)
406				break;
407
408PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
409": message %p timed out\n", fPortID, message->GetMessage());
410			_RemoveMessage(message);
411		}
412	}
413
414	bool IsEmpty() const
415	{
416		return fMessages.IsEmpty();
417	}
418
419private:
420	void _RemoveMessage(TargetMessage *message)
421	{
422		fMessages.Remove(message);
423		fMessageCount--;
424		fMessageSize -= message->GetMessage()->DataSize();
425
426		if (message->GetMessage()->HasTimeout())
427			fTimeoutableMessages.erase(message);
428
429		delete message;
430	}
431
432	void _EnforceLimits()
433	{
434		// message count
435		while (fMessageCount > kMaxMessagesPerPort) {
436PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
437": hit maximum message count limit.\n", fPortID);
438			PopMessage();
439		}
440
441		// message size
442		while (fMessageSize > kMaxDataPerPort) {
443PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
444": hit maximum message size limit.\n", fPortID);
445			PopMessage();
446		}
447	}
448
449	typedef DoublyLinkedList<TargetMessage>	MessageList;
450
451	port_id						fPortID;
452	MessageList					fMessages;
453	int32						fMessageCount;
454	int32						fMessageSize;
455	set<TargetMessageHandle>	fTimeoutableMessages;
456};
457
458// TargetPortMap
459struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
460};
461
462
463// #pragma mark -
464
465/*!	\class MessageDeliverer
466	\brief Service for delivering messages, which retries the delivery as long
467		   as the target port is full.
468
469	For the user of the service only the MessageDeliverer::DeliverMessage()
470	will be of interest. Some of them allow broadcasting a message to several
471	recepients.
472
473	The class maintains a TargetPort for each target port which was full at the
474	time a message was to be delivered to it. A TargetPort has a queue of
475	undelivered messages. A separate worker thread retries periodically to send
476	the yet undelivered messages to the respective target ports.
477*/
478
479// constructor
480MessageDeliverer::MessageDeliverer()
481	: fLock("message deliverer"),
482	  fTargetPorts(NULL),
483	  fDelivererThread(-1),
484	  fTerminating(false)
485{
486}
487
488// destructor
489MessageDeliverer::~MessageDeliverer()
490{
491	fTerminating = true;
492
493	if (fDelivererThread >= 0) {
494		int32 result;
495		wait_for_thread(fDelivererThread, &result);
496	}
497
498	delete fTargetPorts;
499}
500
501// Init
502status_t
503MessageDeliverer::Init()
504{
505	// create the target port map
506	fTargetPorts = new(nothrow) TargetPortMap;
507	if (!fTargetPorts)
508		return B_NO_MEMORY;
509
510	// spawn the deliverer thread
511	fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
512		"message deliverer", B_NORMAL_PRIORITY + 1, this);
513	if (fDelivererThread < 0)
514		return fDelivererThread;
515
516	// resume the deliverer thread
517	resume_thread(fDelivererThread);
518
519	return B_OK;
520}
521
522// CreateDefault
523status_t
524MessageDeliverer::CreateDefault()
525{
526	if (sDeliverer)
527		return B_OK;
528
529	// create the deliverer
530	MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
531	if (!deliverer)
532		return B_NO_MEMORY;
533
534	// init it
535	status_t error = deliverer->Init();
536	if (error != B_OK) {
537		delete deliverer;
538		return error;
539	}
540
541	sDeliverer = deliverer;
542	return B_OK;
543}
544
545// DeleteDefault
546void
547MessageDeliverer::DeleteDefault()
548{
549	if (sDeliverer) {
550		delete sDeliverer;
551		sDeliverer = NULL;
552	}
553}
554
555// Default
556MessageDeliverer *
557MessageDeliverer::Default()
558{
559	return sDeliverer;
560}
561
562// DeliverMessage
563/*!	\brief Delivers a message to the supplied target.
564
565	The method tries to send the message right now (if there are not already
566	messages pending for the target port). If that fails due to a full target
567	port, the message is queued for later delivery.
568
569	\param message The message to be delivered.
570	\param target A BMessenger identifying the delivery target.
571	\param timeout If given, the message will be dropped, when it couldn't be
572		   delivered after this amount of microseconds.
573	\return
574	- \c B_OK, if sending the message succeeded or if the target port was
575	  full and the message has been queued,
576	- another error code otherwise.
577*/
578status_t
579MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
580	bigtime_t timeout)
581{
582	SingleMessagingTargetSet set(target);
583	return DeliverMessage(message, set, timeout);
584}
585
586// DeliverMessage
587/*!	\brief Delivers a message to the supplied targets.
588
589	The method tries to send the message right now to each of the given targets
590	(if there are not already messages pending for a target port). If that
591	fails due to a full target port, the message is queued for later delivery.
592
593	\param message The message to be delivered.
594	\param targets MessagingTargetSet providing the the delivery targets.
595	\param timeout If given, the message will be dropped, when it couldn't be
596		   delivered after this amount of microseconds.
597	\return
598	- \c B_OK, if for each of the given targets sending the message succeeded
599	  or if the target port was full and the message has been queued,
600	- another error code otherwise.
601*/
602status_t
603MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
604	bigtime_t timeout)
605{
606	if (!message)
607		return B_BAD_VALUE;
608
609	// flatten the message
610	BMallocIO mallocIO;
611	status_t error = message->Flatten(&mallocIO, NULL);
612	if (error < B_OK)
613		return error;
614
615	return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets,
616		timeout);
617}
618
619// DeliverMessage
620/*!	\brief Delivers a flattened message to the supplied targets.
621
622	The method tries to send the message right now to each of the given targets
623	(if there are not already messages pending for a target port). If that
624	fails due to a full target port, the message is queued for later delivery.
625
626	\param message The flattened message to be delivered. This may be a
627		   flattened BMessage or KMessage.
628	\param messageSize The size of the flattened message buffer.
629	\param targets MessagingTargetSet providing the the delivery targets.
630	\param timeout If given, the message will be dropped, when it couldn't be
631		   delivered after this amount of microseconds.
632	\return
633	- \c B_OK, if for each of the given targets sending the message succeeded
634	  or if the target port was full and the message has been queued,
635	- another error code otherwise.
636*/
637status_t
638MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
639	MessagingTargetSet &targets, bigtime_t timeout)
640{
641	if (!messageData || messageSize <= 0)
642		return B_BAD_VALUE;
643
644	// clone the buffer
645	void *data = malloc(messageSize);
646	if (!data)
647		return B_NO_MEMORY;
648	memcpy(data, messageData, messageSize);
649
650	// create a Message
651	Message *message = new(nothrow) Message(data, messageSize, timeout);
652	if (!message) {
653		free(data);
654		return B_NO_MEMORY;
655	}
656	BReference<Message> _(message, true);
657
658	// add the message to the respective target ports
659	BAutolock locker(fLock);
660	for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
661		port_id portID;
662		int32 token;
663		targets.Next(portID, token);
664
665		// get the target port
666		TargetPort *port = _GetTargetPort(portID, true);
667		if (!port)
668			return B_NO_MEMORY;
669
670		// try sending the message, if there are no queued messages yet
671		if (port->IsEmpty()) {
672			status_t error = _SendMessage(message, portID, token);
673			// if the message was delivered OK, we're done with the target
674			if (error == B_OK) {
675				_PutTargetPort(port);
676				continue;
677			}
678
679			// if the port is not full, but an error occurred, we skip this target
680			if (error != B_WOULD_BLOCK) {
681				_PutTargetPort(port);
682				if (targetIndex == 0 && !targets.HasNext())
683					return error;
684				continue;
685			}
686		}
687
688		// add the message
689		status_t error = port->PushMessage(message, token);
690		_PutTargetPort(port);
691		if (error != B_OK)
692			return error;
693	}
694
695	return B_OK;
696}
697
698// _GetTargetPort
699MessageDeliverer::TargetPort *
700MessageDeliverer::_GetTargetPort(port_id portID, bool create)
701{
702	// get the port from the map
703	TargetPortMap::iterator it = fTargetPorts->find(portID);
704	if (it != fTargetPorts->end())
705		return it->second;
706
707	if (!create)
708		return NULL;
709
710	// create a port
711	TargetPort *port = new(nothrow) TargetPort(portID);
712	if (!port)
713		return NULL;
714	(*fTargetPorts)[portID] = port;
715
716	return port;
717}
718
719// _PutTargetPort
720void
721MessageDeliverer::_PutTargetPort(TargetPort *port)
722{
723	if (!port)
724		return;
725
726	if (port->IsEmpty()) {
727		fTargetPorts->erase(port->PortID());
728		delete port;
729	}
730}
731
732// _SendMessage
733status_t
734MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
735{
736	status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
737		message->DataSize(), portID, token, 0);
738//PRINT("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
739//message, portID, token, error);
740	return error;
741}
742
743// _DelivererThreadEntry
744int32
745MessageDeliverer::_DelivererThreadEntry(void *data)
746{
747	return ((MessageDeliverer*)data)->_DelivererThread();
748}
749
750// _DelivererThread
751int32
752MessageDeliverer::_DelivererThread()
753{
754	while (!fTerminating) {
755		snooze(kRetryDelay);
756		if (fTerminating)
757			break;
758
759		// iterate through all target ports and try sending the messages
760		BAutolock _(fLock);
761		for (TargetPortMap::iterator it = fTargetPorts->begin();
762			 it != fTargetPorts->end();) {
763			TargetPort *port = it->second;
764			bool portError = false;
765
766			port->DropTimedOutMessages();
767
768			// try sending all messages
769			int32 token;
770			while (Message *message = port->PeekMessage(token)) {
771				status_t error = B_OK;
772//				if (message->TimeoutTime() > system_time()) {
773					error = _SendMessage(message, port->PortID(), token);
774//				} else {
775//					// timeout, drop message
776//					PRINT("MessageDeliverer::_DelivererThread(): port %ld, "
777//						"message %p timed out\n", port->PortID(), message);
778//				}
779
780				if (error == B_OK) {
781					port->PopMessage();
782				} else if (error == B_WOULD_BLOCK) {
783					// no luck yet -- port is still full
784					break;
785				} else {
786					// unexpected error -- probably the port is gone
787					portError = true;
788					break;
789				}
790			}
791
792			// next port
793			if (portError || port->IsEmpty()) {
794				TargetPortMap::iterator oldIt = it;
795				++it;
796				delete port;
797				fTargetPorts->erase(oldIt);
798			} else
799				++it;
800		}
801	}
802
803	return 0;
804}
805