/* * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved. * Distributed under the terms of the MIT License. */ #include #include #include #include #include #include "Debug.h" #include "MessageDeliverer.h" #include "MessagingService.h" using std::map; using std::nothrow; // sService -- the singleton instance MessagingService *MessagingService::sService = NULL; /*! \class MessagingArea \brief Represents an area of the messaging service shared between kernel and registrar. The main purpose of the class is to retrieve (and remove) commands from the area. */ // constructor MessagingArea::MessagingArea() { } // destructor MessagingArea::~MessagingArea() { if (fID >= 0) delete_area(fID); } // Create status_t MessagingArea::Create(area_id kernelAreaID, sem_id lockSem, sem_id counterSem, MessagingArea *&_area) { // allocate the object on the heap MessagingArea *area = new(nothrow) MessagingArea; if (!area) return B_NO_MEMORY; // clone the kernel area area_id areaID = clone_area("messaging", (void**)&area->fHeader, B_ANY_ADDRESS, B_READ_AREA | B_WRITE_AREA, kernelAreaID); if (areaID < 0) { delete area; return areaID; } // finish the initialization of the object area->fID = areaID; area->fSize = area->fHeader->size; area->fLockSem = lockSem; area->fCounterSem = counterSem; area->fNextArea = NULL; _area = area; return B_OK; } // Lock bool MessagingArea::Lock() { // benaphore-like locking if (atomic_add(&fHeader->lock_counter, 1) == 0) return true; return (acquire_sem(fLockSem) == B_OK); } // Unlock void MessagingArea::Unlock() { if (atomic_add(&fHeader->lock_counter, -1) > 1) release_sem(fLockSem); } // ID area_id MessagingArea::ID() const { return fID; } // Size int32 MessagingArea::Size() const { return fSize; } // CountCommands int32 MessagingArea::CountCommands() const { return fHeader->command_count; } // PopCommand const messaging_command * MessagingArea::PopCommand() { if (fHeader->command_count == 0) return NULL; // get the command messaging_command *command = (messaging_command*)((char*)fHeader + fHeader->first_command); // remove it from the area // (as long as the area is still locked, noone will overwrite the contents) if (--fHeader->command_count == 0) fHeader->first_command = fHeader->last_command = 0; else fHeader->first_command = command->next_command; return command; } // Discard void MessagingArea::Discard() { fHeader->size = 0; } // NextKernelAreaID area_id MessagingArea::NextKernelAreaID() const { return fHeader->next_kernel_area; } // SetNextArea void MessagingArea::SetNextArea(MessagingArea *area) { fNextArea = area; } // NextArea MessagingArea * MessagingArea::NextArea() const { return fNextArea; } // #pragma mark - // constructor MessagingCommandHandler::MessagingCommandHandler() { } // destructor MessagingCommandHandler::~MessagingCommandHandler() { } // #pragma mark - // DefaultSendCommandHandler class MessagingService::DefaultSendCommandHandler : public MessagingCommandHandler { virtual void HandleMessagingCommand(uint32 _command, const void *data, int32 dataSize) { const messaging_command_send_message *sendData = (const messaging_command_send_message*)data; const void *messageData = (uint8*)data + sizeof(messaging_command_send_message) + sizeof(messaging_target) * sendData->target_count; DefaultMessagingTargetSet set(sendData->targets, sendData->target_count); MessageDeliverer::Default()->DeliverMessage(messageData, sendData->message_size, set); } }; // CommandHandlerMap struct MessagingService::CommandHandlerMap : map { }; /*! \class MessagingService \brief Userland implementation of the kernel -> userland messaging service. This service provides a way for the kernel to send BMessages (usually notification (e.g. node monitoring) messages) to userland applications. The kernel could write the messages directly to the respective target ports, but this has the disadvantage, that a message needs to be dropped, if the port is full at the moment of sending. By transferring the message to the registrar, it is possible to use the MessageDeliverer which retries sending messages on full ports. The message transfer is implemented via areas shared between kernel and registrar. By default one area is used as a ring buffer. The kernel adds messages to it, the registrar removes them. If the area is full, the kernel creates a new one and adds it to the area list. While the service is called `messaging service' and we were speaking of `messages' being passed through the areas, the service is actually more general. In fact `commands' are passed through the areas. Currently the only implemented command type is to send a message, but it is very easy to add further command types (e.g. one for alerting the user in case of errors). The MessagingService maintains a mapping of command types to command handlers (MessagingCommandHandler, which perform the actual processing of the commands), that can be altered via MessagingService::SetCommandHandler(). */ // constructor MessagingService::MessagingService() : fLock("messaging service"), fLockSem(-1), fCounterSem(-1), fFirstArea(NULL), fCommandHandlers(NULL), fCommandProcessor(-1), fTerminating(false) { } // destructor MessagingService::~MessagingService() { fTerminating = true; if (fLockSem >= 0) delete_sem(fLockSem); if (fCounterSem >= 0) delete_sem(fCounterSem); if (fCommandProcessor >= 0) { int32 result; wait_for_thread(fCommandProcessor, &result); } delete fCommandHandlers; delete fFirstArea; } // Init status_t MessagingService::Init() { // create the semaphores fLockSem = create_sem(0, "messaging lock"); if (fLockSem < 0) return fLockSem; fCounterSem = create_sem(0, "messaging counter"); if (fCounterSem < 0) return fCounterSem; // create the command handler map fCommandHandlers = new(nothrow) CommandHandlerMap; if (!fCommandHandlers) return B_NO_MEMORY; // spawn the command processor fCommandProcessor = spawn_thread(MessagingService::_CommandProcessorEntry, "messaging command processor", B_DISPLAY_PRIORITY, this); if (fCommandProcessor < 0) return fCommandProcessor; // register with the kernel area_id areaID = _kern_register_messaging_service(fLockSem, fCounterSem); if (areaID < 0) return areaID; // create the area status_t error = MessagingArea::Create(areaID, fLockSem, fCounterSem, fFirstArea); if (error != B_OK) { _kern_unregister_messaging_service(); return error; } // resume the command processor resume_thread(fCommandProcessor); // install the default send message command handler MessagingCommandHandler *handler = new(nothrow) DefaultSendCommandHandler; if (!handler) return B_NO_MEMORY; SetCommandHandler(MESSAGING_COMMAND_SEND_MESSAGE, handler); return B_OK; } // CreateDefault status_t MessagingService::CreateDefault() { if (sService) return B_OK; // create the service MessagingService *service = new(nothrow) MessagingService; if (!service) return B_NO_MEMORY; // init it status_t error = service->Init(); if (error != B_OK) { delete service; return error; } sService = service; return B_OK; } // DeleteDefault void MessagingService::DeleteDefault() { if (sService) { delete sService; sService = NULL; } } // Default MessagingService * MessagingService::Default() { return sService; } // SetCommandHandler void MessagingService::SetCommandHandler(uint32 command, MessagingCommandHandler *handler) { BAutolock _(fLock); if (handler) { (*fCommandHandlers)[command] = handler; } else { // no handler: remove and existing entry CommandHandlerMap::iterator it = fCommandHandlers->find(command); if (it != fCommandHandlers->end()) fCommandHandlers->erase(it); } } // _GetCommandHandler MessagingCommandHandler * MessagingService::_GetCommandHandler(uint32 command) const { BAutolock _(fLock); CommandHandlerMap::iterator it = fCommandHandlers->find(command); return (it != fCommandHandlers->end() ? it->second : NULL); } // _CommandProcessorEntry int32 MessagingService::_CommandProcessorEntry(void *data) { return ((MessagingService*)data)->_CommandProcessor(); } // _CommandProcessor int32 MessagingService::_CommandProcessor() { bool commandWaiting = false; while (!fTerminating) { // wait for the next command if (!commandWaiting) { status_t error = acquire_sem(fCounterSem); if (error != B_OK) continue; } else commandWaiting = false; // get it from the first area MessagingArea *area = fFirstArea; area->Lock(); while (area->CountCommands() > 0) { const messaging_command *command = area->PopCommand(); if (!command) { // something's seriously wrong ERROR("MessagingService::_CommandProcessor(): area %p (%" B_PRId32 ") has command count %" B_PRId32 ", but doesn't " "return any more commands.", area, area->ID(), area->CountCommands()); break; } PRINT("MessagingService::_CommandProcessor(): got command %" B_PRIu32 "\n", command->command); // dispatch the command MessagingCommandHandler *handler = _GetCommandHandler(command->command); if (handler) { handler->HandleMessagingCommand(command->command, command->data, command->size - sizeof(messaging_command)); } else { WARNING("MessagingService::_CommandProcessor(): No handler " "found for command %" B_PRIu32 "\n", command->command); } } // there is a new area we don't know yet if (!area->NextArea() && area->NextKernelAreaID() >= 0) { // create it MessagingArea *nextArea; status_t error = MessagingArea::Create(area->NextKernelAreaID(), fLockSem, fCounterSem, nextArea); if (error == B_OK) { area->SetNextArea(nextArea); commandWaiting = true; } else { // Bad, but what can we do? ERROR("MessagingService::_CommandProcessor(): Failed to clone " "kernel area %" B_PRId32 ": %s\n", area->NextKernelAreaID(), strerror(error)); } } // if the current area is empty and there is a next one, we discard the // current one if (area->NextArea() && area->CountCommands() == 0) { fFirstArea = area->NextArea(); area->Discard(); area->Unlock(); delete area; } else { area->Unlock(); } } return 0; }