1/*
2 * Copyright 2015, Haiku.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *			Joseph Groover <looncraz@looncraz.net>
7*/
8
9
10#include "DelayedMessage.h"
11
12#include <stdio.h>
13#include <stdlib.h>
14#include <string.h>
15
16#include <Autolock.h>
17#include <String.h>
18
19#include <LinkSender.h>
20#include <ServerProtocol.h>
21
22
23// DelayedMessageSender constants
24static const int32 kWakeupMessage = AS_LAST_CODE + 2048;
25static const int32 kExitMessage = kWakeupMessage + 1;
26
27static const char* kName = "DMT is here for you, eventually...";
28static int32 kPriority = B_URGENT_DISPLAY_PRIORITY;
29static int32 kPortCapacity = 10;
30
31
32//! Data attachment structure.
33struct Attachment {
34								Attachment(const void* data, size_t size);
35								~Attachment();
36
37			const void*			constData;
38			void*				data;
39			size_t				size;
40};
41
42
43typedef BObjectList<Attachment> AttachmentList;
44
45
46/*!	\class ScheduledMessage
47	\brief Responsible for sending of delayed message.
48*/
49class ScheduledMessage {
50public:
51								ScheduledMessage(DelayedMessage& message);
52								~ScheduledMessage();
53
54			int32				CountTargets() const;
55
56			void				Finalize();
57			bigtime_t			ScheduledTime() const;
58			int32				SendMessage();
59			bool				IsValid() const;
60			bool				Merge(DelayedMessage& message);
61
62			status_t			SendMessageToPort(port_id port);
63			bool 				operator<(const ScheduledMessage& other) const;
64
65			DelayedMessageData*	fData;
66};
67
68
69/*!	\class DelayedMessageSender DelayedMessageSender.h
70	\brief Responsible for scheduling and sending of delayed messages
71*/
72class DelayedMessageSender {
73public:
74			explicit			DelayedMessageSender();
75								~DelayedMessageSender();
76
77			status_t			ScheduleMessage	(DelayedMessage& message);
78
79			int32				CountDelayedMessages() const;
80			int64				CountSentMessages() const;
81
82private:
83			void				_MessageLoop();
84			int32				_SendDelayedMessages();
85	static	int32				_thread_func(void* sender);
86			void				_Wakeup(bigtime_t whatTime);
87
88private:
89	typedef BObjectList<ScheduledMessage> ScheduledList;
90
91	mutable	BLocker				fLock;
92			ScheduledList		fMessages;
93
94			bigtime_t			fScheduledWakeup;
95
96			int32				fWakeupRetry;
97			thread_id			fThread;
98			port_id				fPort;
99
100	mutable	int64				fSentCount;
101};
102
103
104DelayedMessageSender gDelayedMessageSender;
105
106
107/*!	\class DelayedMessageData DelayedMessageSender.h
108	\brief Owns DelayedMessage data, allocates memory and copies data only
109			when needed,
110*/
111class DelayedMessageData {
112	typedef BObjectList<port_id> PortList;
113	typedef void(*FailureCallback)(int32 code, port_id port, void* data);
114public:
115								DelayedMessageData(int32 code, bigtime_t delay,
116									bool isSpecificTime);
117								~DelayedMessageData();
118
119			bool				AddTarget(port_id port);
120			void				RemoveTarget(port_id port);
121			int32				CountTargets() const;
122
123			void				MergeTargets(DelayedMessageData* other);
124
125			bool				CopyData();
126			bool				MergeData(DelayedMessageData* other);
127
128			bool				IsValid() const;
129				// Only valid after a successful CopyData().
130
131			status_t			Attach(const void* data, size_t size);
132
133			bool				Compare(Attachment* one, Attachment* two,
134									int32 index);
135
136			void				SetMerge(DMMergeMode mode, uint32 mask);
137			void				SendFailed(port_id port);
138
139			void				SetFailureCallback(FailureCallback callback,
140									void* data);
141
142			// Accessors.
143			int32&				Code() {return fCode;}
144			const int32&		Code() const {return fCode;}
145
146			bigtime_t&			ScheduledTime() {return fScheduledTime;}
147			const bigtime_t&	ScheduledTime() const {return fScheduledTime;}
148
149			AttachmentList&		Attachments() {return fAttachments;}
150			const AttachmentList&	Attachments() const {return fAttachments;}
151
152			PortList&			Targets() {return fTargets;}
153			const PortList&		Targets() const {return fTargets;}
154
155private:
156		// Data members.
157
158			int32				fCode;
159			bigtime_t			fScheduledTime;
160			bool				fValid;
161
162			AttachmentList		fAttachments;
163			PortList			fTargets;
164
165			DMMergeMode			fMergeMode;
166			uint32				fMergeMask;
167
168			FailureCallback		fFailureCallback;
169			void*				fFailureData;
170};
171
172
173// #pragma mark -
174
175
176
177DelayedMessage::DelayedMessage(int32 code, bigtime_t delay,
178		bool isSpecificTime)
179	:
180	fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY
181		? DM_MINIMUM_DELAY : delay, isSpecificTime)),
182	fHandedOff(false)
183{
184}
185
186
187DelayedMessage::~DelayedMessage()
188{
189	// Message is canceled without a handoff.
190	if (!fHandedOff)
191		delete fData;
192}
193
194
195bool
196DelayedMessage::AddTarget(port_id port)
197{
198	if (fData == NULL || fHandedOff)
199		return false;
200
201	return fData->AddTarget(port);
202}
203
204
205void
206DelayedMessage::SetMerge(DMMergeMode mode, uint32 match)
207{
208	if (fData == NULL || fHandedOff)
209		return;
210
211	fData->SetMerge(mode, match);
212}
213
214
215void
216DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*),
217	void* data)
218{
219	if (fData == NULL || fHandedOff)
220		return;
221
222	fData->SetFailureCallback(callback, data);
223}
224
225
226//! Attach data to message. Memory is not allocated nor copied until handoff.
227status_t
228DelayedMessage::Attach(const void* data, size_t size)
229{
230	if (fData == NULL)
231		return B_NO_MEMORY;
232
233	if (fHandedOff)
234		return B_ERROR;
235
236	if (data == NULL || size == 0)
237		return B_BAD_VALUE;
238
239	return	fData->Attach(data, size);
240}
241
242
243status_t
244DelayedMessage::Flush()
245{
246	if (fData == NULL)
247		return B_NO_MEMORY;
248
249	if (fHandedOff)
250		return B_ERROR;
251
252	if (fData->CountTargets() == 0)
253		return B_BAD_VALUE;
254
255	return gDelayedMessageSender.ScheduleMessage(*this);
256}
257
258
259/*!	The data handoff occurs upon scheduling and reduces copies to only
260	when a message is actually scheduled. Canceled messages have low cost.
261*/
262DelayedMessageData*
263DelayedMessage::HandOff()
264{
265	if (fData == NULL || fHandedOff)
266		return NULL;
267
268	if (fData->CopyData()) {
269		fHandedOff = true;
270		return fData;
271	}
272
273	return NULL;
274}
275
276
277// #pragma mark -
278
279
280Attachment::Attachment(const void* _data, size_t _size)
281	:
282	constData(_data),
283	data(NULL),
284	size(_size)
285{
286}
287
288
289Attachment::~Attachment()
290{
291	free(data);
292}
293
294
295// #pragma mark -
296
297
298DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay,
299	bool isSpecificTime)
300	:
301	fCode(code),
302	fScheduledTime(delay + (isSpecificTime ? 0 : system_time())),
303	fValid(false),
304
305	fAttachments(3, true),
306	fTargets(4, true),
307
308	fMergeMode(DM_NO_MERGE),
309	fMergeMask(DM_DATA_DEFAULT),
310
311	fFailureCallback(NULL),
312	fFailureData(NULL)
313{
314}
315
316
317DelayedMessageData::~DelayedMessageData()
318{
319}
320
321
322bool
323DelayedMessageData::AddTarget(port_id port)
324{
325	if (port <= 0)
326		return false;
327
328	// check for duplicates:
329	for (int32 index = 0; index < fTargets.CountItems(); ++index) {
330		if (port == *fTargets.ItemAt(index))
331			return false;
332	}
333
334	return fTargets.AddItem(new(std::nothrow) port_id(port));
335}
336
337
338void
339DelayedMessageData::RemoveTarget(port_id port)
340{
341	if (port == B_BAD_PORT_ID)
342		return;
343
344	// Search for a match by value.
345	for (int32 index = 0; index < fTargets.CountItems(); ++index) {
346		port_id* target = fTargets.ItemAt(index);
347		if (port == *target) {
348			fTargets.RemoveItem(target, true);
349			return;
350		}
351	}
352}
353
354
355int32
356DelayedMessageData::CountTargets() const
357{
358	return fTargets.CountItems();
359}
360
361
362void
363DelayedMessageData::MergeTargets(DelayedMessageData* other)
364{
365	// Failure to add one target does not abort the loop!
366	// It could just mean we already have the target.
367	for (int32 index = 0; index < other->fTargets.CountItems(); ++index)
368		AddTarget(*(other->fTargets.ItemAt(index)));
369}
370
371
372//! Copy data from original location - merging failed
373bool
374DelayedMessageData::CopyData()
375{
376	Attachment* attached = NULL;
377
378	for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
379		attached = fAttachments.ItemAt(index);
380
381		if (attached == NULL || attached->data != NULL)
382			return false;
383
384		attached->data = malloc(attached->size);
385		if (attached->data == NULL)
386			return false;
387
388		memcpy(attached->data, attached->constData, attached->size);
389	}
390
391	fValid = true;
392	return true;
393}
394
395
396bool
397DelayedMessageData::MergeData(DelayedMessageData* other)
398{
399	if (!fValid
400		|| other == NULL
401		|| other->fCode != fCode
402		|| fMergeMode == DM_NO_MERGE
403		|| other->fMergeMode == DM_NO_MERGE
404		|| other->fMergeMode != fMergeMode
405		|| other->fAttachments.CountItems() != fAttachments.CountItems())
406		return false;
407
408	if (other->fMergeMode == DM_MERGE_CANCEL) {
409		MergeTargets(other);
410		return true;
411	}
412
413	// Compare data
414	Attachment* attached = NULL;
415	Attachment* otherAttached = NULL;
416
417	for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
418		attached = fAttachments.ItemAt(index);
419		otherAttached = other->fAttachments.ItemAt(index);
420
421		if (attached == NULL
422			|| otherAttached == NULL
423			|| attached->data == NULL
424			|| otherAttached->constData == NULL
425			|| attached->size != otherAttached->size)
426			return false;
427
428		// Compares depending upon mode & flags
429		if (!Compare(attached, otherAttached, index))
430			return false;
431	}
432
433	// add any targets not included in the existing message!
434	MergeTargets(other);
435
436	// since these are duplicates, we need not copy anything...
437	if (fMergeMode == DM_MERGE_DUPLICATES)
438		return true;
439
440	// DM_MERGE_REPLACE:
441
442	// Import the new data!
443	for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
444		attached = fAttachments.ItemAt(index);
445		otherAttached = other->fAttachments.ItemAt(index);
446
447		// We already have allocated our memory, but the other data
448		// has not.  So this reduces memory allocations.
449		memcpy(attached->data, otherAttached->constData, attached->size);
450	}
451
452	return true;
453}
454
455
456bool
457DelayedMessageData::IsValid() const
458{
459	return fValid;
460}
461
462
463status_t
464DelayedMessageData::Attach(const void* data, size_t size)
465{
466	// Sanity checking already performed
467	Attachment* attach = new(std::nothrow) Attachment(data, size);
468
469	if (attach == NULL)
470		return B_NO_MEMORY;
471
472	if (fAttachments.AddItem(attach) == false) {
473		delete attach;
474		return B_ERROR;
475	}
476
477	return B_OK;
478}
479
480
481bool
482DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index)
483{
484	if (fMergeMode == DM_MERGE_DUPLICATES) {
485
486		// Default-policy: all data must match
487		if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0)
488			return memcmp(one->data, two->constData, one->size) == 0;
489
490	} else if (fMergeMode == DM_MERGE_REPLACE) {
491
492		// Default Policy: no data needs to match
493		if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0)
494			return memcmp(one->data, two->constData, one->size) == 0;
495	}
496
497	return true;
498}
499
500
501void
502DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask)
503{
504	fMergeMode = mode;
505	fMergeMask = mask;
506}
507
508
509void
510DelayedMessageData::SendFailed(port_id port)
511{
512	if (fFailureCallback != NULL)
513		fFailureCallback(fCode, port, fFailureData);
514}
515
516
517void
518DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data)
519{
520	fFailureCallback = callback;
521	fFailureData = data;
522}
523
524
525// #pragma mark -
526
527
528ScheduledMessage::ScheduledMessage(DelayedMessage& message)
529	:
530	fData(message.HandOff())
531{
532}
533
534
535ScheduledMessage::~ScheduledMessage()
536{
537	delete fData;
538}
539
540
541int32
542ScheduledMessage::CountTargets() const
543{
544	if (fData == NULL)
545		return 0;
546
547	return fData->CountTargets();
548}
549
550
551bigtime_t
552ScheduledMessage::ScheduledTime() const
553{
554	if (fData == NULL)
555		return 0;
556
557	return fData->ScheduledTime();
558}
559
560
561//! Send our message and data to their intended target(s)
562int32
563ScheduledMessage::SendMessage()
564{
565	if (fData == NULL || !fData->IsValid())
566		return 0;
567
568	int32 sent = 0;
569	for (int32 index = 0; index < fData->Targets().CountItems(); ++index) {
570		port_id port = *(fData->Targets().ItemAt(index));
571		status_t error = SendMessageToPort(port);
572
573		if (error == B_OK) {
574			++sent;
575			continue;
576		}
577
578		if (error != B_TIMED_OUT)
579			fData->SendFailed(port);
580	}
581
582	return sent;
583}
584
585
586status_t
587ScheduledMessage::SendMessageToPort(port_id port)
588{
589	if (fData == NULL || !fData->IsValid())
590		return B_BAD_DATA;
591
592	if (port == B_BAD_PORT_ID)
593		return B_BAD_VALUE;
594
595	BPrivate::LinkSender sender(port);
596	if (sender.StartMessage(fData->Code()) != B_OK)
597		return B_ERROR;
598
599	AttachmentList& list = fData->Attachments();
600	Attachment* attached = NULL;
601	status_t error = B_OK;
602
603	// The data has been checked already, so we assume it is all good
604	for (int32 index = 0; index < list.CountItems(); ++index) {
605		attached = list.ItemAt(index);
606
607		error = sender.Attach(attached->data, attached->size);
608		if (error != B_OK) {
609			sender.CancelMessage();
610			return error;
611		}
612	}
613
614	// We do not want to ever hold up the sender thread for too long, we
615	// set a 1 second sending delay, which should be more than enough for
616	// 99.992% of all cases.  Approximately.
617	error = sender.Flush(1000000);
618
619	if (error == B_OK || error == B_BAD_PORT_ID)
620		fData->RemoveTarget(port);
621
622	return error;
623}
624
625
626bool
627ScheduledMessage::IsValid() const
628{
629	return fData != NULL && fData->IsValid();
630}
631
632
633bool
634ScheduledMessage::Merge(DelayedMessage& other)
635{
636	if (!IsValid())
637		return false;
638
639	return fData->MergeData(other.Data());
640}
641
642
643bool
644ScheduledMessage::operator<(const ScheduledMessage& other) const
645{
646	if (!IsValid() || !other.IsValid())
647		return false;
648
649	return fData->ScheduledTime() < other.fData->ScheduledTime();
650}
651
652
653int
654CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two)
655{
656	return *one < *two;
657}
658
659
660// #pragma mark -
661
662
663DelayedMessageSender::DelayedMessageSender()
664	:
665	fLock("DelayedMessageSender"),
666	fMessages(20, true),
667	fScheduledWakeup(B_INFINITE_TIMEOUT),
668	fWakeupRetry(0),
669	fThread(spawn_thread(&_thread_func, kName, kPriority, this)),
670	fPort(create_port(kPortCapacity, "DelayedMessageSender")),
671	fSentCount(0)
672{
673	resume_thread(fThread);
674}
675
676
677DelayedMessageSender::~DelayedMessageSender()
678{
679	// write the exit message to our port
680	write_port(fPort, kExitMessage, NULL, 0);
681
682	status_t status = B_OK;
683	while (wait_for_thread(fThread, &status) == B_OK);
684
685	// We now know the thread has exited, it is safe to cleanup
686	delete_port(fPort);
687}
688
689
690status_t
691DelayedMessageSender::ScheduleMessage(DelayedMessage& message)
692{
693	BAutolock _(fLock);
694
695	// Can we merge with a pending message?
696	ScheduledMessage* pending = NULL;
697	for (int32 index = 0; index < fMessages.CountItems(); ++index) {
698		pending = fMessages.ItemAt(index);
699		if (pending->Merge(message))
700			return B_OK;
701	}
702
703	// Guess not, add it to our list!
704	ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message);
705
706	if (scheduled == NULL)
707		return B_NO_MEMORY;
708
709	if (!scheduled->IsValid()) {
710		delete scheduled;
711		return B_BAD_DATA;
712	}
713
714	if (fMessages.AddItem(scheduled)) {
715		fMessages.SortItems(&CompareMessages);
716		_Wakeup(scheduled->ScheduledTime());
717		return B_OK;
718	}
719
720	return B_ERROR;
721}
722
723
724int32
725DelayedMessageSender::CountDelayedMessages() const
726{
727	BAutolock _(fLock);
728	return fMessages.CountItems();
729}
730
731
732int64
733DelayedMessageSender::CountSentMessages() const
734{
735	return atomic_get64(&fSentCount);
736}
737
738
739void
740DelayedMessageSender::_MessageLoop()
741{
742	int32 code = -1;
743	status_t status = B_TIMED_OUT;
744	bigtime_t timeout = B_INFINITE_TIMEOUT;
745
746	while (true) {
747		timeout = atomic_get64(&fScheduledWakeup) - (system_time()
748			+ (DM_MINIMUM_DELAY / 2));
749
750		if (timeout > DM_MINIMUM_DELAY / 4) {
751			status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT,
752				timeout);
753		} else
754			status = B_TIMED_OUT;
755
756		if (status == B_INTERRUPTED)
757			continue;
758
759		if (status == B_TIMED_OUT) {
760			_SendDelayedMessages();
761			continue;
762		}
763
764		if (status == B_OK) {
765			switch (code) {
766				case kWakeupMessage:
767					continue;
768
769				case kExitMessage:
770					return;
771
772				// TODO: trace unhandled messages
773				default:
774					continue;
775			}
776		}
777
778		// port deleted?
779		if (status < B_OK)
780			break;
781	}
782}
783
784
785int32
786DelayedMessageSender::_thread_func(void* sender)
787{
788	(static_cast<DelayedMessageSender*>(sender))->_MessageLoop();
789	return 0;
790}
791
792
793//! Sends pending messages, call ONLY from sender thread!
794int32
795DelayedMessageSender::_SendDelayedMessages()
796{
797	// avoid sending messages during times of contention
798	if (fLock.LockWithTimeout(30000) != B_OK) {
799		atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY);
800		return 0;
801	}
802
803	atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT);
804
805	if (fMessages.CountItems() == 0) {
806		fLock.Unlock();
807		return 0;
808	}
809
810	int32 sent = 0;
811
812	bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2;
813		// capture any that may be on the verge of being sent.
814
815	BObjectList<ScheduledMessage> remove;
816
817	ScheduledMessage* message = NULL;
818	for (int32 index = 0; index < fMessages.CountItems(); ++index) {
819		message = fMessages.ItemAt(index);
820
821		if (message->ScheduledTime() > time) {
822			atomic_set64(&fScheduledWakeup, message->ScheduledTime());
823			break;
824		}
825
826		int32 sendCount = message->SendMessage();
827		if (sendCount > 0)
828			sent += sendCount;
829
830		if (message->CountTargets() == 0)
831			remove.AddItem(message);
832	}
833
834	// remove serviced messages
835	for (int32 index = 0; index < remove.CountItems(); ++index)
836		fMessages.RemoveItem(remove.ItemAt(index));
837
838	atomic_add64(&fSentCount, sent);
839
840	// catch any partly-failed messages (possibly late):
841	if (fMessages.CountItems() > 0
842		&& atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) {
843
844		fMessages.SortItems(&CompareMessages);
845		message = fMessages.ItemAt(0);
846		bigtime_t timeout = message->ScheduledTime() - time;
847
848		if (timeout < 0)
849			timeout = DM_MINIMUM_DELAY;
850
851		atomic_set64(&fScheduledWakeup, timeout);
852	}
853
854	fLock.Unlock();
855	return sent;
856}
857
858
859void
860DelayedMessageSender::_Wakeup(bigtime_t when)
861{
862	if (atomic_get64(&fScheduledWakeup) < when
863		&& atomic_get(&fWakeupRetry) == 0)
864		return;
865
866	atomic_set64(&fScheduledWakeup, when);
867
868	BPrivate::LinkSender sender(fPort);
869	sender.StartMessage(kWakeupMessage);
870	status_t error = sender.Flush(30000);
871	atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT);
872}
873
874