1#include "betalk.h" 2#include "sysdepdefs.h" 3#include "beserved_rpc.h" 4 5#include "netdb.h" 6 7#define BT_RPC_THREAD_NAME "BeServed RPC Marshaller" 8#define BT_MAX_TOTAL_ATTEMPTS 4 9#define BT_ATTEMPTS_BEFORE_RESTART 2 10 11bool btReconnect(bt_rpcinfo *info); 12int32 btRPCReceive(void *data); 13 14int btRecv(int sock, void *data, int dataLen, int flags); 15int btSend(int sock, void *data, int dataLen, int flags); 16 17void btRPCRecordCall(bt_rpccall *call); 18void btRPCFreeCall(bt_rpccall *call); 19void btCreateInPacket(bt_rpccall *call, char *buffer, unsigned int length); 20void btDestroyInPacket(bt_inPacket *packet); 21 22bt_rpccall *rootCall; 23sem_id callSem; 24sem_id connectSem; 25int32 nextXID = 1; 26 27 28void btRPCInit(bt_rpcinfo *info) 29{ 30 info->s = INVALID_SOCKET; 31 info->rpcThread = 0; 32 info->quitXID = 0; 33 34 callSem = create_sem(1, "rpcCall"); 35// set_sem_owner(callSem, B_SYSTEM_TEAM); 36 37 connectSem = create_sem(1, "rpcConnection"); 38// set_sem_owner(connectSem, B_SYSTEM_TEAM); 39 40 info->rpcThread = spawn_thread(btRPCReceive, BT_RPC_THREAD_NAME, B_NORMAL_PRIORITY, info); 41 resume_thread(info->rpcThread); 42} 43 44void btRPCClose(bt_rpcinfo *info) 45{ 46 if (info->rpcThread > 0) 47 { 48 status_t exitVal; 49 wait_for_thread(info->rpcThread, &exitVal); 50 } 51 52 // Close the socket used for all file system RPC communications, 53 // now that we know the RPC recipient thread is dead. 54 closesocket(info->s); 55 56 delete_sem(connectSem); 57 delete_sem(callSem); 58} 59 60int btRecvMsg(int sock, void *data, int dataLen, int flags) 61{ 62 int bytesRead = 0; 63 do 64 { 65 int bytes = btRecv(sock, (char *) data + bytesRead, dataLen - bytesRead, flags); 66 if (bytes == -1) 67 return -1; 68 69 bytesRead += bytes; 70 } while (bytesRead < dataLen); 71 72 return bytesRead; 73} 74 75int btRecv(int sock, void *data, int dataLen, int flags) 76{ 77 int bytes; 78 79 for (;;) 80 { 81 bytes = recv(sock, data, dataLen, flags); 82 if (bytes == 0) 83 return -1; 84 else if (bytes == -1) 85 if (errno == EINTR) 86 continue; 87 else 88 return -1; 89 else 90 break; 91 } 92 93 return bytes; 94} 95 96int btSendMsg(int sock, void *data, int dataLen, int flags) 97{ 98 int bytesSent = 0; 99 do 100 { 101 int bytes = btSend(sock, (char *) data + bytesSent, dataLen - bytesSent, flags); 102 if (bytes == -1) 103 return -1; 104 105 bytesSent += bytes; 106 } while (bytesSent < dataLen); 107 108 return bytesSent; 109} 110 111int btSend(int sock, void *data, int dataLen, int flags) 112{ 113 int bytes; 114 115 for (;;) 116 { 117 bytes = send(sock, data, dataLen, flags); 118 if (bytes == -1) 119 if (errno == EINTR) 120 continue; 121 else 122 return -1; 123 else 124 break; 125 } 126 127 return bytes; 128} 129 130int32 btRPCReceive(void *data) 131{ 132 bt_rpcinfo *info = (bt_rpcinfo *) data; 133 bt_rpccall *call; 134 char signature[20], *buffer; 135 int32 xid, length, sigLen; 136 int bytes; 137 bool failure = false; 138 139 while (info->s == INVALID_SOCKET) 140 snooze(100); 141 142 int sock = info->s; 143 144#ifdef BONE_VERSION 145 fd_set sockSet; 146 struct timeval timeout; 147 148 FD_ZERO(&sockSet); 149 timeout.tv_sec = 30; 150 timeout.tv_usec = 0; 151#endif 152 153 while (!failure) 154 { 155#ifdef BONE_VERSION 156 FD_SET(sock, &sockSet); 157 select(sock + 1, &sockSet, NULL, NULL, &timeout); 158 159 if (FD_ISSET(sock, &sockSet)) 160 { 161#endif 162 163 // Receive the signature. If a socket error occurs, break out of the loop and 164 // effectively exit this thread because the socket is closed. 165 sigLen = strlen(BT_RPC_SIGNATURE); 166 memset(signature, 0, sigLen); 167 if (btRecvMsg(sock, signature, sigLen, 0) == -1) 168 break; 169 170 // Make sure the signature is correct. Otherwise, ignore it and start over. 171 signature[sigLen] = 0; 172 if (strcmp(signature, BT_RPC_SIGNATURE)) 173 continue; 174 175 // Now read the transaction id (XID) and the length of the packet body. 176 bytes = btRecvMsg(sock, &xid, sizeof(int32), 0); 177 if (bytes > 0) 178 bytes = btRecvMsg(sock, &length, sizeof(int32), 0); 179 180 xid = B_LENDIAN_TO_HOST_INT32(xid); 181 length = B_LENDIAN_TO_HOST_INT32(length); 182 if (length <= 0 || length >= BT_RPC_MAX_PACKET_SIZE) 183 continue; 184 185 buffer = (char *) malloc(length + 1); 186 if (!buffer) 187 continue; 188 189 // Read the remainder of the packet. 190 if (btRecvMsg(sock, buffer, length, 0) == -1) 191 failure = true; 192 193 buffer[length] = 0; 194 195 while (acquire_sem(callSem) == B_INTERRUPTED); 196 197 call = rootCall; 198 while (call) 199 { 200 if (call->xid == xid) 201 { 202 btCreateInPacket(call, buffer, length); 203 release_sem(call->sem); 204 break; 205 } 206 207 call = call->next; 208 } 209 210 release_sem(callSem); 211 212 // The originating RPC call was not found. This is probably not a very 213 // good sign. 214 if (!call) 215 free(buffer); 216 217 // If a valid quit XID has been defined, and it's equal to the current 218 // XID, quit. 219 if (info->quitXID) 220 if (info->quitXID == xid) 221 break; 222 223#ifdef BONE_VERSION 224 } 225#endif 226 227 } 228} 229 230void btCreateInPacket(bt_rpccall *call, char *buffer, unsigned int length) 231{ 232 bt_inPacket *packet; 233 234 packet = (bt_inPacket *) malloc(sizeof(bt_inPacket)); 235 if (!packet) 236 return; 237 238 packet->buffer = buffer; 239 packet->length = length; 240 packet->offset = 0; 241 242 call->inPacket = packet; 243} 244 245void btDestroyInPacket(bt_inPacket *packet) 246{ 247 if (packet) 248 { 249 if (packet->buffer) 250 free(packet->buffer); 251 252 free(packet); 253 } 254} 255 256void btRPCRecordCall(bt_rpccall *call) 257{ 258 bt_rpccall *curCall, *lastCall; 259 260 while (acquire_sem(callSem) == B_INTERRUPTED); 261 262 curCall = lastCall = rootCall; 263 while (curCall) 264 { 265 lastCall = curCall; 266 curCall = curCall->next; 267 } 268 269 call->next = NULL; 270 call->prev = lastCall; 271 272 if (lastCall == NULL) 273 rootCall = call; 274 else 275 lastCall->next = call; 276 277 release_sem(callSem); 278} 279 280// btRPCRemoveCall() 281// 282void btRPCRemoveCall(bt_rpccall *call) 283{ 284 if (call) 285 { 286 if (call->sem > 0) 287 delete_sem(call->sem); 288 289 while (acquire_sem(callSem) == B_INTERRUPTED); 290 291 // Make this entry's predecessor point to its successor. 292 if (call->prev) 293 call->prev->next = call->next; 294 295 // Make this entry's successor point to its predecessor. 296 if (call->next) 297 call->next->prev = call->prev; 298 299 // If we just deleted the root node of the list, then the next node 300 // has become the root. 301 if (call == rootCall) 302 rootCall = call->next; 303 304 release_sem(callSem); 305 306 // Now we can release the memory allocated for this packet. 307 btDestroyInPacket(call->inPacket); 308 free(call); 309 } 310} 311 312// btConnect() 313// 314int btConnect(bt_rpcinfo *info, unsigned int serverIP, int port) 315{ 316 struct sockaddr_in serverAddr; 317 int session, addrLen; 318#ifdef BONE_VERSION 319 int flags; 320#endif 321 322 info->serverIP = serverIP; 323 info->serverPort = port; 324 325 // Store the length of the socket addressing structure for accept(). 326 addrLen = sizeof(struct sockaddr_in); 327 328 // Initialize the server address structure. 329 memset(&serverAddr, 0, sizeof(serverAddr)); 330 serverAddr.sin_port = htons(port); 331 serverAddr.sin_family = AF_INET; 332 serverAddr.sin_addr.s_addr = htonl(serverIP); 333 334 // Create a new socket to receive incoming requests. 335 session = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 336 if (session == INVALID_SOCKET) 337 return INVALID_SOCKET; 338 339 // Bind that socket to the address constructed above. 340 if (connect(session, (struct sockaddr *) &serverAddr, sizeof(serverAddr))) 341 return INVALID_SOCKET; 342 343 // Enabled periodic keep-alive messages, so we stay connected during times of 344 // no activity. This isn't supported by ksocketd, only in BONE. :-( 345#ifdef BONE_VERSION 346 flags = 1; 347 setsockopt(session, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)); 348#endif 349 350 return session; 351} 352 353void btDisconnect(bt_rpcinfo *info) 354{ 355 bt_outPacket *packet; 356 bt_rpccall *call; 357 358 packet = btRPCPutHeader(BT_CMD_QUIT, 0, 0); 359 call = btRPCInvoke(info, packet, true); 360 if (call) 361 btRPCRemoveCall(call); 362} 363 364bt_outPacket *btRPCPutHeader(unsigned char command, unsigned char argc, int32 length) 365{ 366 bt_outPacket *packet; 367 368 packet = (bt_outPacket *) malloc(sizeof(bt_outPacket)); 369 if (!packet) 370 return NULL; 371 372 packet->size = BT_RPC_MIN_PACKET_SIZE; 373 packet->buffer = (char *) malloc(packet->size); 374 packet->length = 0; 375 376 if (!packet->buffer) 377 { 378 free(packet); 379 return NULL; 380 } 381 382 strcpy(packet->buffer, BT_RPC_SIGNATURE); 383 packet->length += strlen(BT_RPC_SIGNATURE); 384 385// btRPCPutChar(packet, BT_RPC_VERSION_HI); 386// btRPCPutChar(packet, BT_RPC_VERSION_LO); 387 btRPCPutInt32(packet, 7 + (8 * argc) + length); 388 btRPCPutChar(packet, command); 389 btRPCPutChar(packet, argc); 390 391 return packet; 392} 393 394void btRPCPutArg(bt_outPacket *packet, unsigned int type, void *data, int length) 395{ 396 btRPCPutInt32(packet, type); 397 btRPCPutInt32(packet, length); 398 btRPCPutBinary(packet, data, length); 399} 400 401bt_rpccall *btRPCInvoke(bt_rpcinfo *info, bt_outPacket *packet, bool lastPkt) 402{ 403 status_t result; 404 bt_rpccall *call; 405 int attempts; 406 bool failure; 407 408 call = (bt_rpccall *) malloc(sizeof(bt_rpccall)); 409 if (!call) 410 return NULL; 411 412 attempts = 0; 413 414 call->inPacket = NULL; 415 call->xid = atomic_add(&nextXID, 1); 416 if ((call->sem = create_sem(0, "rpc call")) < B_OK) 417 { 418 free(call); 419 return NULL; 420 } 421 422 btRPCRecordCall(call); 423 424 btRPCPutInt32(packet, call->xid); 425 btRPCPutChar(packet, BT_CMD_TERMINATOR); 426 427 // If this is the last RPC packet that will be transmitted, store 428 // its XID so the RPC recipient thread will know when to quit. 429 if (lastPkt) 430 info->quitXID = call->xid; 431 432doSend: 433 failure = false; 434 if (btSendMsg(info->s, packet->buffer, packet->length, 0) == -1) 435 failure = true; 436 437 if (!failure) 438 do 439 { 440 result = acquire_sem_etc(call->sem, 1, B_RELATIVE_TIMEOUT, 2500000); 441 } 442 while (result == B_INTERRUPTED); 443 444 if (failure || result == B_TIMED_OUT) 445 { 446 attempts++; 447 if (attempts >= BT_MAX_TOTAL_ATTEMPTS) 448 { 449 free(packet->buffer); 450 free(packet); 451 btRPCRemoveCall(call); 452 return NULL; 453 } 454 else if (attempts == BT_ATTEMPTS_BEFORE_RESTART) 455 btReconnect(info); 456 457 goto doSend; 458 } 459 460 free(packet->buffer); 461 free(packet); 462 return call; 463} 464 465bool btReconnect(bt_rpcinfo *info) 466{ 467 static int counter = 0; 468 int curCount = counter; 469 bool connected = true; 470 471 while (acquire_sem(connectSem) == B_INTERRUPTED); 472 473 if (curCount == counter) 474 { 475 connected = false; 476 477 closesocket(info->s); 478 if (info->rpcThread > 0) 479 kill_thread(info->rpcThread); 480 481 info->s = btConnect(info, info->serverIP, info->serverPort); 482 if (info->s != INVALID_SOCKET) 483 { 484 info->rpcThread = spawn_thread(btRPCReceive, BT_RPC_THREAD_NAME, B_NORMAL_PRIORITY, info); 485 resume_thread(info->rpcThread); 486 connected = true; 487 } 488 489 counter++; 490 } 491 492 release_sem(connectSem); 493 return connected; 494} 495 496void btRPCGrowPacket(bt_outPacket *packet, int bytes) 497{ 498 if (packet->length + bytes > packet->size) 499 { 500 int growth = ((bytes / BT_RPC_MIN_PACKET_SIZE) + 1) * BT_RPC_MIN_PACKET_SIZE; 501 packet->buffer = (char *) realloc(packet->buffer, packet->size + growth); 502 packet->size += growth; 503 } 504} 505 506unsigned int btRPCGetInt32(bt_inPacket *packet) 507{ 508 int32 value; 509 510 if (packet->offset < packet->length) 511 value = B_LENDIAN_TO_HOST_INT32(*((int32 *) &packet->buffer[packet->offset])); 512 else 513 value = 0; 514 515 packet->offset += sizeof(value); 516 return value; 517} 518 519int64 btRPCGetInt64(bt_inPacket *packet) 520{ 521 int64 value; 522 523 if (packet->offset < packet->length) 524 value = B_LENDIAN_TO_HOST_INT64(*((int64 *) &packet->buffer[packet->offset])); 525 else 526 value = 0; 527 528 packet->offset += sizeof(value); 529 return value; 530} 531 532char *btRPCGetNewString(bt_inPacket *packet) 533{ 534 char *str; 535 unsigned int bytes; 536 537 if (packet->offset >= packet->length) 538 return NULL; 539 540 bytes = B_LENDIAN_TO_HOST_INT32(*((int32 *) &packet->buffer[packet->offset])); 541 packet->offset += sizeof(bytes); 542 if (!bytes) 543 return NULL; 544 545 str = (char *) malloc(bytes + 1); 546 if (!str) 547 return NULL; 548 549 memcpy(str, &packet->buffer[packet->offset], bytes); 550 str[bytes] = 0; 551 552 packet->offset += bytes; 553 554 return str; 555} 556 557int btRPCGetString(bt_inPacket *packet, char *buffer, int length) 558{ 559 unsigned int bytes; 560 561 if (packet->offset >= packet->length) 562 return 0; 563 564 bytes = B_LENDIAN_TO_HOST_INT64(*((int32 *) &packet->buffer[packet->offset])); 565 packet->offset += sizeof(bytes); 566 if (!bytes) 567 return 0L; 568 569 if (length < bytes) 570 return ERANGE; 571 572 memcpy(buffer, &packet->buffer[packet->offset], bytes); 573 packet->offset += bytes; 574 return bytes; 575} 576 577void btRPCPutChar(bt_outPacket *packet, char value) 578{ 579 btRPCGrowPacket(packet, sizeof(value)); 580 packet->buffer[packet->length] = value; 581 packet->length += sizeof(value); 582} 583 584void btRPCPutInt32(bt_outPacket *packet, int32 value) 585{ 586 btRPCGrowPacket(packet, sizeof(value)); 587 *(int32 *)(&packet->buffer[packet->length]) = B_HOST_TO_LENDIAN_INT32(value); 588 packet->length += sizeof(value); 589} 590 591void btRPCPutInt64(bt_outPacket *packet, int64 value) 592{ 593 btRPCGrowPacket(packet, sizeof(value)); 594 *(int64 *)(&packet->buffer[packet->length]) = B_HOST_TO_LENDIAN_INT64(value); 595 packet->length += sizeof(value); 596} 597 598void btRPCPutString(bt_outPacket *packet, char *buffer, int length) 599{ 600 if (packet && buffer) 601 { 602 btRPCGrowPacket(packet, sizeof(length) + length); 603 btRPCPutInt32(packet, length); 604 memcpy(&packet->buffer[packet->length], buffer, length); 605 packet->length += length; 606 } 607} 608 609void btRPCPutBinary(bt_outPacket *packet, void *buffer, int length) 610{ 611 if (packet && buffer) 612 { 613 btRPCGrowPacket(packet, length); 614 memcpy(&packet->buffer[packet->length], buffer, length); 615 packet->length += length; 616 } 617} 618 619int btRPCGetStat(bt_inPacket *packet, struct stat *st) 620{ 621 st->st_dev = 0; 622 st->st_nlink = btRPCGetInt32(packet); 623 st->st_uid = btRPCGetInt32(packet); 624 st->st_gid = btRPCGetInt32(packet); 625 st->st_size = btRPCGetInt64(packet); 626 st->st_blksize = btRPCGetInt32(packet); 627 st->st_rdev = btRPCGetInt32(packet); 628 st->st_ino = btRPCGetInt64(packet); 629 st->st_mode = btRPCGetInt32(packet); 630 st->st_atime = btRPCGetInt32(packet); 631 st->st_mtime = btRPCGetInt32(packet); 632 st->st_ctime = btRPCGetInt32(packet); 633} 634 635void btRPCPutStat(bt_outPacket *packet, struct stat *st) 636{ 637 if (packet && st) 638 { 639 btRPCPutInt32(packet, (int) st->st_nlink); 640 btRPCPutInt32(packet, (int) st->st_uid); 641 btRPCPutInt32(packet, (int) st->st_gid); 642 btRPCPutInt64(packet, (int64) st->st_size); 643 btRPCPutInt32(packet, (int) st->st_blksize); 644 btRPCPutInt32(packet, (int) st->st_rdev); 645 btRPCPutInt64(packet, (int64) st->st_ino); 646 btRPCPutInt32(packet, (int) st->st_mode); 647 btRPCPutInt32(packet, (int) st->st_atime); 648 btRPCPutInt32(packet, (int) st->st_mtime); 649 btRPCPutInt32(packet, (int) st->st_ctime); 650 } 651} 652