1/* 2 * Copyright 2009-2011, Axel D��rfler, axeld@pinc-software.de. 3 * Copyright 2016, Rene Gollent, rene@gollent.com. 4 */ 5 6 7#include <SocketMessenger.h> 8 9#include <Message.h> 10#include <MessageQueue.h> 11#include <Messenger.h> 12 13#include <AutoDeleter.h> 14#include <AutoLocker.h> 15#include <HashMap.h> 16 17 18static const char* kReplySenderIDField = "socket_messenger:sender_reply_id"; 19static const char* kReplyReceiverIDField = "socket_messenger:reply_id"; 20 21 22// #pragma mark - BSocketMessenger::Private 23 24 25struct BSocketMessenger::Private { 26 typedef SynchronizedHashMap<HashKey64<int64>, 27 BMessage> ReplyMessageMap; 28 29 Private(); 30 virtual ~Private(); 31 32 void ClearMessages(); 33 34 sem_id fMessageWaiters; 35 thread_id fReplyReader; 36 ReplyMessageMap fReceivedReplies; 37 BMessageQueue fReceivedMessages; 38 int64 fReplyIDCounter; 39}; 40 41 42BSocketMessenger::Private::Private() 43 : 44 fMessageWaiters(-1), 45 fReplyReader(-1), 46 fReceivedReplies(), 47 fReceivedMessages(), 48 fReplyIDCounter(0) 49{ 50} 51 52 53BSocketMessenger::Private::~Private() 54{ 55 if (fMessageWaiters > 0) 56 delete_sem(fMessageWaiters); 57 if (fReplyReader > 0) 58 wait_for_thread(fReplyReader, NULL); 59 60 ClearMessages(); 61} 62 63 64void 65BSocketMessenger::Private::ClearMessages() 66{ 67 fReceivedReplies.Clear(); 68 AutoLocker<BMessageQueue> queueLocker(fReceivedMessages); 69 while (!fReceivedMessages.IsEmpty()) 70 delete fReceivedMessages.NextMessage(); 71} 72 73 74// #pragma mark - BSocketMessenger 75 76 77BSocketMessenger::BSocketMessenger() 78 : 79 fPrivateData(NULL), 80 fSocket(), 81 fInitStatus(B_NO_INIT) 82{ 83 _Init(); 84} 85 86 87BSocketMessenger::BSocketMessenger(const BNetworkAddress& address, 88 bigtime_t timeout) 89 : 90 fPrivateData(NULL), 91 fSocket(), 92 fInitStatus(B_NO_INIT) 93{ 94 _Init(); 95 SetTo(address, timeout); 96} 97 98 99BSocketMessenger::BSocketMessenger(const BSocket& socket) 100 : 101 fPrivateData(NULL), 102 fSocket(socket), 103 fInitStatus(B_NO_INIT) 104{ 105 _Init(); 106 if (fPrivateData == NULL) 107 return; 108 109 fInitStatus = socket.InitCheck(); 110 if (fInitStatus != B_OK) 111 return; 112 113 fPrivateData->fReplyReader = spawn_thread(&_MessageReader, 114 "Message Reader", B_NORMAL_PRIORITY, this); 115 if (fPrivateData->fReplyReader < 0) 116 fInitStatus = fPrivateData->fReplyReader; 117 if (fInitStatus != B_OK) { 118 exit_thread(fPrivateData->fReplyReader); 119 fPrivateData->fReplyReader = -1; 120 return; 121 } 122 123 fInitStatus = resume_thread(fPrivateData->fReplyReader); 124} 125 126 127BSocketMessenger::~BSocketMessenger() 128{ 129 Unset(); 130 131 delete fPrivateData; 132} 133 134 135void 136BSocketMessenger::Unset() 137{ 138 if (fPrivateData == NULL) 139 return; 140 141 fSocket.Disconnect(); 142 wait_for_thread(fPrivateData->fReplyReader, NULL); 143 fPrivateData->fReplyReader = -1; 144 fPrivateData->ClearMessages(); 145 146 release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL); 147 148 fInitStatus = B_NO_INIT; 149} 150 151 152status_t 153BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout) 154{ 155 Unset(); 156 157 if (fPrivateData == NULL) 158 return B_NO_MEMORY; 159 160 fPrivateData->fReplyReader = spawn_thread(&_MessageReader, 161 "Message Reader", B_NORMAL_PRIORITY, this); 162 if (fPrivateData->fReplyReader < 0) 163 return fPrivateData->fReplyReader; 164 status_t error = fSocket.Connect(address, timeout); 165 if (error != B_OK) { 166 Unset(); 167 return error; 168 } 169 170 return fInitStatus = resume_thread(fPrivateData->fReplyReader); 171} 172 173 174status_t 175BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout) 176{ 177 return SetTo(target.Address(), timeout); 178} 179 180 181status_t 182BSocketMessenger::SendMessage(const BMessage& message) 183{ 184 return _SendMessage(message); 185} 186 187 188status_t 189BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply, 190 bigtime_t timeout) 191{ 192 int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1); 193 BMessage temp(message); 194 temp.AddInt64(kReplySenderIDField, replyID); 195 status_t error = _SendMessage(temp); 196 if (error != B_OK) 197 return error; 198 199 return _ReadReply(replyID, _reply, timeout); 200} 201 202 203status_t 204BSocketMessenger::SendMessage(const BMessage& message, 205 BMessenger& replyTarget, bigtime_t timeout) 206{ 207 BMessage reply; 208 status_t error = SendMessage(message, reply, timeout); 209 if (error != B_OK) 210 return error; 211 212 return replyTarget.SendMessage(&reply); 213} 214 215 216status_t 217BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply) 218{ 219 int64 replyID; 220 if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK) 221 return B_NOT_ALLOWED; 222 223 BMessage replyMessage(reply); 224 replyMessage.AddInt64(kReplyReceiverIDField, replyID); 225 return SendMessage(replyMessage); 226} 227 228 229status_t 230BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout) 231{ 232 status_t error = B_OK; 233 AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages); 234 for (;;) { 235 if (!fPrivateData->fReceivedMessages.IsEmpty()) { 236 BMessage* nextMessage 237 = fPrivateData->fReceivedMessages.NextMessage(); 238 _message = *nextMessage; 239 delete nextMessage; 240 break; 241 } 242 243 queueLocker.Unlock(); 244 error = _WaitForMessage(timeout); 245 if (error != B_OK) 246 break; 247 if (!fSocket.IsConnected()) { 248 error = B_CANCELED; 249 break; 250 } 251 queueLocker.Lock(); 252 } 253 254 return error; 255} 256 257 258void 259BSocketMessenger::_Init() 260{ 261 if (fPrivateData != NULL) 262 return; 263 264 BSocketMessenger::Private* data 265 = new(std::nothrow) BSocketMessenger::Private; 266 267 if (data == NULL) { 268 fInitStatus = B_NO_MEMORY; 269 return; 270 } 271 272 data->fMessageWaiters = create_sem(0, "message waiters"); 273 if (data->fMessageWaiters < 0) { 274 fInitStatus = data->fMessageWaiters; 275 delete data; 276 return; 277 } 278 279 fPrivateData = data; 280} 281 282 283status_t 284BSocketMessenger::_WaitForMessage(bigtime_t timeout) 285{ 286 for (;;) { 287 status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1, 288 B_RELATIVE_TIMEOUT, timeout); 289 if (error == B_INTERRUPTED) { 290 if (timeout != B_INFINITE_TIMEOUT) 291 timeout -= system_time(); 292 continue; 293 } 294 if (error != B_OK) 295 return error; 296 break; 297 } 298 299 return B_OK; 300} 301 302 303status_t 304BSocketMessenger::_SendMessage(const BMessage& message) 305{ 306 ssize_t flatSize = message.FlattenedSize(); 307 ssize_t totalSize = flatSize + sizeof(ssize_t); 308 309 char* buffer = new(std::nothrow) char[totalSize]; 310 if (buffer == NULL) 311 return B_NO_MEMORY; 312 313 ArrayDeleter<char> bufferDeleter(buffer); 314 *(ssize_t*)buffer = flatSize; 315 char* messageBuffer = buffer + sizeof(ssize_t); 316 status_t error = message.Flatten(messageBuffer, flatSize); 317 if (error != B_OK) 318 return error; 319 320 ssize_t size = fSocket.Write(buffer, totalSize); 321 if (size < 0) 322 return size; 323 324 return B_OK; 325} 326 327 328status_t 329BSocketMessenger::_ReadMessage(BMessage& _message) 330{ 331 status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT); 332 if (error != B_OK) 333 return error; 334 335 ssize_t size = 0; 336 ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t)); 337 if (readSize < 0) 338 return readSize; 339 340 if (readSize != sizeof(ssize_t)) 341 return B_BAD_VALUE; 342 343 if (size <= 0) 344 return B_MISMATCHED_VALUES; 345 346 char* buffer = new(std::nothrow) char[size]; 347 if (buffer == NULL) 348 return B_NO_MEMORY; 349 350 ArrayDeleter<char> bufferDeleter(buffer); 351 readSize = fSocket.Read(buffer, size); 352 if (readSize < 0) 353 return readSize; 354 355 if (readSize != size) 356 return B_MISMATCHED_VALUES; 357 358 return _message.Unflatten(buffer); 359} 360 361 362status_t 363BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply, 364 bigtime_t timeout) 365{ 366 status_t error = B_OK; 367 for (;;) { 368 if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) { 369 reply = fPrivateData->fReceivedReplies.Remove(replyID); 370 break; 371 } 372 373 error = _WaitForMessage(timeout); 374 if (error != B_OK) 375 break; 376 if (!fSocket.IsConnected()) { 377 error = B_CANCELED; 378 break; 379 } 380 } 381 382 return error; 383} 384 385 386status_t 387BSocketMessenger::_MessageReader(void* arg) 388{ 389 BSocketMessenger* messenger = (BSocketMessenger*)arg; 390 BSocketMessenger::Private* data = messenger->fPrivateData; 391 status_t error = B_OK; 392 393 for (;;) { 394 BMessage message; 395 error = messenger->_ReadMessage(message); 396 if (error != B_OK) 397 break; 398 399 int64 replyID; 400 if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) { 401 error = data->fReceivedReplies.Put(replyID, message); 402 if (error != B_OK) 403 break; 404 } else { 405 BMessage* queueMessage = new(std::nothrow) BMessage(message); 406 if (queueMessage == NULL) { 407 error = B_NO_MEMORY; 408 break; 409 } 410 411 AutoLocker<BMessageQueue> queueLocker( 412 data->fReceivedMessages); 413 data->fReceivedMessages.AddMessage(queueMessage); 414 } 415 416 417 release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL); 418 } 419 420 // if we exit our message loop, ensure everybody wakes up and knows 421 // we're no longer receiving messages. 422 messenger->fSocket.Disconnect(); 423 release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL); 424 return error; 425} 426