1/* 2 * Copyright 2015, Haiku. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Joseph Groover <looncraz@looncraz.net> 7*/ 8 9 10#include "DelayedMessage.h" 11 12#include <stdio.h> 13#include <stdlib.h> 14#include <string.h> 15 16#include <Autolock.h> 17#include <String.h> 18 19#include <LinkSender.h> 20#include <ServerProtocol.h> 21 22 23// DelayedMessageSender constants 24static const int32 kWakeupMessage = AS_LAST_CODE + 2048; 25static const int32 kExitMessage = kWakeupMessage + 1; 26 27static const char* kName = "DMT is here for you, eventually..."; 28static int32 kPriority = B_URGENT_DISPLAY_PRIORITY; 29static int32 kPortCapacity = 10; 30 31 32//! Data attachment structure. 33struct Attachment { 34 Attachment(const void* data, size_t size); 35 ~Attachment(); 36 37 const void* constData; 38 void* data; 39 size_t size; 40}; 41 42 43typedef BObjectList<Attachment> AttachmentList; 44 45 46/*! \class ScheduledMessage 47 \brief Responsible for sending of delayed message. 48*/ 49class ScheduledMessage { 50public: 51 ScheduledMessage(DelayedMessage& message); 52 ~ScheduledMessage(); 53 54 int32 CountTargets() const; 55 56 void Finalize(); 57 bigtime_t ScheduledTime() const; 58 int32 SendMessage(); 59 bool IsValid() const; 60 bool Merge(DelayedMessage& message); 61 62 status_t SendMessageToPort(port_id port); 63 bool operator<(const ScheduledMessage& other) const; 64 65 DelayedMessageData* fData; 66}; 67 68 69/*! \class DelayedMessageSender DelayedMessageSender.h 70 \brief Responsible for scheduling and sending of delayed messages 71*/ 72class DelayedMessageSender { 73public: 74 explicit DelayedMessageSender(); 75 ~DelayedMessageSender(); 76 77 status_t ScheduleMessage (DelayedMessage& message); 78 79 int32 CountDelayedMessages() const; 80 int64 CountSentMessages() const; 81 82private: 83 void _MessageLoop(); 84 int32 _SendDelayedMessages(); 85 static int32 _thread_func(void* sender); 86 void _Wakeup(bigtime_t whatTime); 87 88private: 89 typedef BObjectList<ScheduledMessage> ScheduledList; 90 91 mutable BLocker fLock; 92 ScheduledList fMessages; 93 94 bigtime_t fScheduledWakeup; 95 96 int32 fWakeupRetry; 97 thread_id fThread; 98 port_id fPort; 99 100 mutable int64 fSentCount; 101}; 102 103 104DelayedMessageSender gDelayedMessageSender; 105 106 107/*! \class DelayedMessageData DelayedMessageSender.h 108 \brief Owns DelayedMessage data, allocates memory and copies data only 109 when needed, 110*/ 111class DelayedMessageData { 112 typedef BObjectList<port_id> PortList; 113 typedef void(*FailureCallback)(int32 code, port_id port, void* data); 114public: 115 DelayedMessageData(int32 code, bigtime_t delay, 116 bool isSpecificTime); 117 ~DelayedMessageData(); 118 119 bool AddTarget(port_id port); 120 void RemoveTarget(port_id port); 121 int32 CountTargets() const; 122 123 void MergeTargets(DelayedMessageData* other); 124 125 bool CopyData(); 126 bool MergeData(DelayedMessageData* other); 127 128 bool IsValid() const; 129 // Only valid after a successful CopyData(). 130 131 status_t Attach(const void* data, size_t size); 132 133 bool Compare(Attachment* one, Attachment* two, 134 int32 index); 135 136 void SetMerge(DMMergeMode mode, uint32 mask); 137 void SendFailed(port_id port); 138 139 void SetFailureCallback(FailureCallback callback, 140 void* data); 141 142 // Accessors. 143 int32& Code() {return fCode;} 144 const int32& Code() const {return fCode;} 145 146 bigtime_t& ScheduledTime() {return fScheduledTime;} 147 const bigtime_t& ScheduledTime() const {return fScheduledTime;} 148 149 AttachmentList& Attachments() {return fAttachments;} 150 const AttachmentList& Attachments() const {return fAttachments;} 151 152 PortList& Targets() {return fTargets;} 153 const PortList& Targets() const {return fTargets;} 154 155private: 156 // Data members. 157 158 int32 fCode; 159 bigtime_t fScheduledTime; 160 bool fValid; 161 162 AttachmentList fAttachments; 163 PortList fTargets; 164 165 DMMergeMode fMergeMode; 166 uint32 fMergeMask; 167 168 FailureCallback fFailureCallback; 169 void* fFailureData; 170}; 171 172 173// #pragma mark - 174 175 176 177DelayedMessage::DelayedMessage(int32 code, bigtime_t delay, 178 bool isSpecificTime) 179 : 180 fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY 181 ? DM_MINIMUM_DELAY : delay, isSpecificTime)), 182 fHandedOff(false) 183{ 184} 185 186 187DelayedMessage::~DelayedMessage() 188{ 189 // Message is canceled without a handoff. 190 if (!fHandedOff) 191 delete fData; 192} 193 194 195bool 196DelayedMessage::AddTarget(port_id port) 197{ 198 if (fData == NULL || fHandedOff) 199 return false; 200 201 return fData->AddTarget(port); 202} 203 204 205void 206DelayedMessage::SetMerge(DMMergeMode mode, uint32 match) 207{ 208 if (fData == NULL || fHandedOff) 209 return; 210 211 fData->SetMerge(mode, match); 212} 213 214 215void 216DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*), 217 void* data) 218{ 219 if (fData == NULL || fHandedOff) 220 return; 221 222 fData->SetFailureCallback(callback, data); 223} 224 225 226//! Attach data to message. Memory is not allocated nor copied until handoff. 227status_t 228DelayedMessage::Attach(const void* data, size_t size) 229{ 230 if (fData == NULL) 231 return B_NO_MEMORY; 232 233 if (fHandedOff) 234 return B_ERROR; 235 236 if (data == NULL || size == 0) 237 return B_BAD_VALUE; 238 239 return fData->Attach(data, size); 240} 241 242 243status_t 244DelayedMessage::Flush() 245{ 246 if (fData == NULL) 247 return B_NO_MEMORY; 248 249 if (fHandedOff) 250 return B_ERROR; 251 252 if (fData->CountTargets() == 0) 253 return B_BAD_VALUE; 254 255 return gDelayedMessageSender.ScheduleMessage(*this); 256} 257 258 259/*! The data handoff occurs upon scheduling and reduces copies to only 260 when a message is actually scheduled. Canceled messages have low cost. 261*/ 262DelayedMessageData* 263DelayedMessage::HandOff() 264{ 265 if (fData == NULL || fHandedOff) 266 return NULL; 267 268 if (fData->CopyData()) { 269 fHandedOff = true; 270 return fData; 271 } 272 273 return NULL; 274} 275 276 277// #pragma mark - 278 279 280Attachment::Attachment(const void* _data, size_t _size) 281 : 282 constData(_data), 283 data(NULL), 284 size(_size) 285{ 286} 287 288 289Attachment::~Attachment() 290{ 291 free(data); 292} 293 294 295// #pragma mark - 296 297 298DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay, 299 bool isSpecificTime) 300 : 301 fCode(code), 302 fScheduledTime(delay + (isSpecificTime ? 0 : system_time())), 303 fValid(false), 304 305 fAttachments(3, true), 306 fTargets(4, true), 307 308 fMergeMode(DM_NO_MERGE), 309 fMergeMask(DM_DATA_DEFAULT), 310 311 fFailureCallback(NULL), 312 fFailureData(NULL) 313{ 314} 315 316 317DelayedMessageData::~DelayedMessageData() 318{ 319} 320 321 322bool 323DelayedMessageData::AddTarget(port_id port) 324{ 325 if (port <= 0) 326 return false; 327 328 // check for duplicates: 329 for (int32 index = 0; index < fTargets.CountItems(); ++index) { 330 if (port == *fTargets.ItemAt(index)) 331 return false; 332 } 333 334 return fTargets.AddItem(new(std::nothrow) port_id(port)); 335} 336 337 338void 339DelayedMessageData::RemoveTarget(port_id port) 340{ 341 if (port == B_BAD_PORT_ID) 342 return; 343 344 // Search for a match by value. 345 for (int32 index = 0; index < fTargets.CountItems(); ++index) { 346 port_id* target = fTargets.ItemAt(index); 347 if (port == *target) { 348 fTargets.RemoveItem(target, true); 349 return; 350 } 351 } 352} 353 354 355int32 356DelayedMessageData::CountTargets() const 357{ 358 return fTargets.CountItems(); 359} 360 361 362void 363DelayedMessageData::MergeTargets(DelayedMessageData* other) 364{ 365 // Failure to add one target does not abort the loop! 366 // It could just mean we already have the target. 367 for (int32 index = 0; index < other->fTargets.CountItems(); ++index) 368 AddTarget(*(other->fTargets.ItemAt(index))); 369} 370 371 372//! Copy data from original location - merging failed 373bool 374DelayedMessageData::CopyData() 375{ 376 Attachment* attached = NULL; 377 378 for (int32 index = 0; index < fAttachments.CountItems(); ++index) { 379 attached = fAttachments.ItemAt(index); 380 381 if (attached == NULL || attached->data != NULL) 382 return false; 383 384 attached->data = malloc(attached->size); 385 if (attached->data == NULL) 386 return false; 387 388 memcpy(attached->data, attached->constData, attached->size); 389 } 390 391 fValid = true; 392 return true; 393} 394 395 396bool 397DelayedMessageData::MergeData(DelayedMessageData* other) 398{ 399 if (!fValid 400 || other == NULL 401 || other->fCode != fCode 402 || fMergeMode == DM_NO_MERGE 403 || other->fMergeMode == DM_NO_MERGE 404 || other->fMergeMode != fMergeMode 405 || other->fAttachments.CountItems() != fAttachments.CountItems()) 406 return false; 407 408 if (other->fMergeMode == DM_MERGE_CANCEL) { 409 MergeTargets(other); 410 return true; 411 } 412 413 // Compare data 414 Attachment* attached = NULL; 415 Attachment* otherAttached = NULL; 416 417 for (int32 index = 0; index < fAttachments.CountItems(); ++index) { 418 attached = fAttachments.ItemAt(index); 419 otherAttached = other->fAttachments.ItemAt(index); 420 421 if (attached == NULL 422 || otherAttached == NULL 423 || attached->data == NULL 424 || otherAttached->constData == NULL 425 || attached->size != otherAttached->size) 426 return false; 427 428 // Compares depending upon mode & flags 429 if (!Compare(attached, otherAttached, index)) 430 return false; 431 } 432 433 // add any targets not included in the existing message! 434 MergeTargets(other); 435 436 // since these are duplicates, we need not copy anything... 437 if (fMergeMode == DM_MERGE_DUPLICATES) 438 return true; 439 440 // DM_MERGE_REPLACE: 441 442 // Import the new data! 443 for (int32 index = 0; index < fAttachments.CountItems(); ++index) { 444 attached = fAttachments.ItemAt(index); 445 otherAttached = other->fAttachments.ItemAt(index); 446 447 // We already have allocated our memory, but the other data 448 // has not. So this reduces memory allocations. 449 memcpy(attached->data, otherAttached->constData, attached->size); 450 } 451 452 return true; 453} 454 455 456bool 457DelayedMessageData::IsValid() const 458{ 459 return fValid; 460} 461 462 463status_t 464DelayedMessageData::Attach(const void* data, size_t size) 465{ 466 // Sanity checking already performed 467 Attachment* attach = new(std::nothrow) Attachment(data, size); 468 469 if (attach == NULL) 470 return B_NO_MEMORY; 471 472 if (fAttachments.AddItem(attach) == false) { 473 delete attach; 474 return B_ERROR; 475 } 476 477 return B_OK; 478} 479 480 481bool 482DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index) 483{ 484 if (fMergeMode == DM_MERGE_DUPLICATES) { 485 486 // Default-policy: all data must match 487 if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0) 488 return memcmp(one->data, two->constData, one->size) == 0; 489 490 } else if (fMergeMode == DM_MERGE_REPLACE) { 491 492 // Default Policy: no data needs to match 493 if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0) 494 return memcmp(one->data, two->constData, one->size) == 0; 495 } 496 497 return true; 498} 499 500 501void 502DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask) 503{ 504 fMergeMode = mode; 505 fMergeMask = mask; 506} 507 508 509void 510DelayedMessageData::SendFailed(port_id port) 511{ 512 if (fFailureCallback != NULL) 513 fFailureCallback(fCode, port, fFailureData); 514} 515 516 517void 518DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data) 519{ 520 fFailureCallback = callback; 521 fFailureData = data; 522} 523 524 525// #pragma mark - 526 527 528ScheduledMessage::ScheduledMessage(DelayedMessage& message) 529 : 530 fData(message.HandOff()) 531{ 532} 533 534 535ScheduledMessage::~ScheduledMessage() 536{ 537 delete fData; 538} 539 540 541int32 542ScheduledMessage::CountTargets() const 543{ 544 if (fData == NULL) 545 return 0; 546 547 return fData->CountTargets(); 548} 549 550 551bigtime_t 552ScheduledMessage::ScheduledTime() const 553{ 554 if (fData == NULL) 555 return 0; 556 557 return fData->ScheduledTime(); 558} 559 560 561//! Send our message and data to their intended target(s) 562int32 563ScheduledMessage::SendMessage() 564{ 565 if (fData == NULL || !fData->IsValid()) 566 return 0; 567 568 int32 sent = 0; 569 for (int32 index = 0; index < fData->Targets().CountItems(); ++index) { 570 port_id port = *(fData->Targets().ItemAt(index)); 571 status_t error = SendMessageToPort(port); 572 573 if (error == B_OK) { 574 ++sent; 575 continue; 576 } 577 578 if (error != B_TIMED_OUT) 579 fData->SendFailed(port); 580 } 581 582 return sent; 583} 584 585 586status_t 587ScheduledMessage::SendMessageToPort(port_id port) 588{ 589 if (fData == NULL || !fData->IsValid()) 590 return B_BAD_DATA; 591 592 if (port == B_BAD_PORT_ID) 593 return B_BAD_VALUE; 594 595 BPrivate::LinkSender sender(port); 596 if (sender.StartMessage(fData->Code()) != B_OK) 597 return B_ERROR; 598 599 AttachmentList& list = fData->Attachments(); 600 Attachment* attached = NULL; 601 status_t error = B_OK; 602 603 // The data has been checked already, so we assume it is all good 604 for (int32 index = 0; index < list.CountItems(); ++index) { 605 attached = list.ItemAt(index); 606 607 error = sender.Attach(attached->data, attached->size); 608 if (error != B_OK) { 609 sender.CancelMessage(); 610 return error; 611 } 612 } 613 614 // We do not want to ever hold up the sender thread for too long, we 615 // set a 1 second sending delay, which should be more than enough for 616 // 99.992% of all cases. Approximately. 617 error = sender.Flush(1000000); 618 619 if (error == B_OK || error == B_BAD_PORT_ID) 620 fData->RemoveTarget(port); 621 622 return error; 623} 624 625 626bool 627ScheduledMessage::IsValid() const 628{ 629 return fData != NULL && fData->IsValid(); 630} 631 632 633bool 634ScheduledMessage::Merge(DelayedMessage& other) 635{ 636 if (!IsValid()) 637 return false; 638 639 return fData->MergeData(other.Data()); 640} 641 642 643bool 644ScheduledMessage::operator<(const ScheduledMessage& other) const 645{ 646 if (!IsValid() || !other.IsValid()) 647 return false; 648 649 return fData->ScheduledTime() < other.fData->ScheduledTime(); 650} 651 652 653int 654CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two) 655{ 656 return *one < *two; 657} 658 659 660// #pragma mark - 661 662 663DelayedMessageSender::DelayedMessageSender() 664 : 665 fLock("DelayedMessageSender"), 666 fMessages(20, true), 667 fScheduledWakeup(B_INFINITE_TIMEOUT), 668 fWakeupRetry(0), 669 fThread(spawn_thread(&_thread_func, kName, kPriority, this)), 670 fPort(create_port(kPortCapacity, "DelayedMessageSender")), 671 fSentCount(0) 672{ 673 resume_thread(fThread); 674} 675 676 677DelayedMessageSender::~DelayedMessageSender() 678{ 679 // write the exit message to our port 680 write_port(fPort, kExitMessage, NULL, 0); 681 682 status_t status = B_OK; 683 while (wait_for_thread(fThread, &status) == B_OK); 684 685 // We now know the thread has exited, it is safe to cleanup 686 delete_port(fPort); 687} 688 689 690status_t 691DelayedMessageSender::ScheduleMessage(DelayedMessage& message) 692{ 693 BAutolock _(fLock); 694 695 // Can we merge with a pending message? 696 ScheduledMessage* pending = NULL; 697 for (int32 index = 0; index < fMessages.CountItems(); ++index) { 698 pending = fMessages.ItemAt(index); 699 if (pending->Merge(message)) 700 return B_OK; 701 } 702 703 // Guess not, add it to our list! 704 ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message); 705 706 if (scheduled == NULL) 707 return B_NO_MEMORY; 708 709 if (!scheduled->IsValid()) { 710 delete scheduled; 711 return B_BAD_DATA; 712 } 713 714 if (fMessages.AddItem(scheduled)) { 715 fMessages.SortItems(&CompareMessages); 716 _Wakeup(scheduled->ScheduledTime()); 717 return B_OK; 718 } 719 720 return B_ERROR; 721} 722 723 724int32 725DelayedMessageSender::CountDelayedMessages() const 726{ 727 BAutolock _(fLock); 728 return fMessages.CountItems(); 729} 730 731 732int64 733DelayedMessageSender::CountSentMessages() const 734{ 735 return atomic_get64(&fSentCount); 736} 737 738 739void 740DelayedMessageSender::_MessageLoop() 741{ 742 int32 code = -1; 743 status_t status = B_TIMED_OUT; 744 bigtime_t timeout = B_INFINITE_TIMEOUT; 745 746 while (true) { 747 timeout = atomic_get64(&fScheduledWakeup) - (system_time() 748 + (DM_MINIMUM_DELAY / 2)); 749 750 if (timeout > DM_MINIMUM_DELAY / 4) { 751 status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT, 752 timeout); 753 } else 754 status = B_TIMED_OUT; 755 756 if (status == B_INTERRUPTED) 757 continue; 758 759 if (status == B_TIMED_OUT) { 760 _SendDelayedMessages(); 761 continue; 762 } 763 764 if (status == B_OK) { 765 switch (code) { 766 case kWakeupMessage: 767 continue; 768 769 case kExitMessage: 770 return; 771 772 // TODO: trace unhandled messages 773 default: 774 continue; 775 } 776 } 777 778 // port deleted? 779 if (status < B_OK) 780 break; 781 } 782} 783 784 785int32 786DelayedMessageSender::_thread_func(void* sender) 787{ 788 (static_cast<DelayedMessageSender*>(sender))->_MessageLoop(); 789 return 0; 790} 791 792 793//! Sends pending messages, call ONLY from sender thread! 794int32 795DelayedMessageSender::_SendDelayedMessages() 796{ 797 // avoid sending messages during times of contention 798 if (fLock.LockWithTimeout(30000) != B_OK) { 799 atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY); 800 return 0; 801 } 802 803 atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT); 804 805 if (fMessages.CountItems() == 0) { 806 fLock.Unlock(); 807 return 0; 808 } 809 810 int32 sent = 0; 811 812 bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2; 813 // capture any that may be on the verge of being sent. 814 815 BObjectList<ScheduledMessage> remove; 816 817 ScheduledMessage* message = NULL; 818 for (int32 index = 0; index < fMessages.CountItems(); ++index) { 819 message = fMessages.ItemAt(index); 820 821 if (message->ScheduledTime() > time) { 822 atomic_set64(&fScheduledWakeup, message->ScheduledTime()); 823 break; 824 } 825 826 int32 sendCount = message->SendMessage(); 827 if (sendCount > 0) 828 sent += sendCount; 829 830 if (message->CountTargets() == 0) 831 remove.AddItem(message); 832 } 833 834 // remove serviced messages 835 for (int32 index = 0; index < remove.CountItems(); ++index) 836 fMessages.RemoveItem(remove.ItemAt(index)); 837 838 atomic_add64(&fSentCount, sent); 839 840 // catch any partly-failed messages (possibly late): 841 if (fMessages.CountItems() > 0 842 && atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) { 843 844 fMessages.SortItems(&CompareMessages); 845 message = fMessages.ItemAt(0); 846 bigtime_t timeout = message->ScheduledTime() - time; 847 848 if (timeout < 0) 849 timeout = DM_MINIMUM_DELAY; 850 851 atomic_set64(&fScheduledWakeup, timeout); 852 } 853 854 fLock.Unlock(); 855 return sent; 856} 857 858 859void 860DelayedMessageSender::_Wakeup(bigtime_t when) 861{ 862 if (atomic_get64(&fScheduledWakeup) < when 863 && atomic_get(&fWakeupRetry) == 0) 864 return; 865 866 atomic_set64(&fScheduledWakeup, when); 867 868 BPrivate::LinkSender sender(fPort); 869 sender.StartMessage(kWakeupMessage); 870 status_t error = sender.Flush(30000); 871 atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT); 872} 873 874