1/* 2 * Copyright 2012-2013 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Pawe�� Dziepak, pdziepak@quarnos.org 7 */ 8 9 10#include "Connection.h" 11 12#include <arpa/inet.h> 13#include <errno.h> 14#include <stdio.h> 15#include <stdlib.h> 16#include <string.h> 17#include <unistd.h> 18 19#include <AutoDeleter.h> 20#include <net/dns_resolver.h> 21#include <util/kernel_cpp.h> 22#include <util/Random.h> 23 24 25#define NFS4_PORT 2049 26 27#define LAST_FRAGMENT 0x80000000 28#define MAX_PACKET_SIZE 65535 29 30#define NFS_MIN_PORT 665 31 32 33bool 34PeerAddress::operator==(const PeerAddress& address) 35{ 36 return memcmp(&fAddress, &address.fAddress, sizeof(fAddress)) == 0 37 && fProtocol == address.fProtocol; 38} 39 40 41bool 42PeerAddress::operator<(const PeerAddress& address) 43{ 44 int compare = memcmp(&fAddress, &address.fAddress, sizeof(fAddress)); 45 return compare < 0 || (compare == 0 && fProtocol < address.fProtocol); 46} 47 48 49PeerAddress& 50PeerAddress::operator=(const PeerAddress& address) 51{ 52 fAddress = address.fAddress; 53 fProtocol = address.fProtocol; 54 return *this; 55} 56 57 58PeerAddress::PeerAddress() 59 : 60 fProtocol(0) 61{ 62 memset(&fAddress, 0, sizeof(fAddress)); 63} 64 65 66PeerAddress::PeerAddress(int networkFamily) 67 : 68 fProtocol(0) 69{ 70 ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6); 71 72 memset(&fAddress, 0, sizeof(fAddress)); 73 74 fAddress.ss_family = networkFamily; 75 switch (networkFamily) { 76 case AF_INET: 77 fAddress.ss_len = sizeof(sockaddr_in); 78 break; 79 case AF_INET6: 80 fAddress.ss_len = sizeof(sockaddr_in6); 81 break; 82 } 83} 84 85 86const char* 87PeerAddress::ProtocolString() const 88{ 89 static const char* tcpName = "tcp"; 90 static const char* udpName = "udp"; 91 static const char* unknown = ""; 92 93 switch (fProtocol) { 94 case IPPROTO_TCP: 95 return tcpName; 96 case IPPROTO_UDP: 97 return udpName; 98 default: 99 return unknown; 100 } 101} 102 103 104void 105PeerAddress::SetProtocol(const char* protocol) 106{ 107 ASSERT(protocol != NULL); 108 109 if (strcmp(protocol, "tcp") == 0) 110 fProtocol = IPPROTO_TCP; 111 else if (strcmp(protocol, "udp") == 0) 112 fProtocol = IPPROTO_UDP; 113} 114 115 116char* 117PeerAddress::UniversalAddress() const 118{ 119 char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16)); 120 if (uAddr == NULL) 121 return NULL; 122 123 if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL) 124 return NULL; 125 126 char port[16]; 127 sprintf(port, ".%d.%d", Port() >> 8, Port() & 0xff); 128 strcat(uAddr, port); 129 130 return uAddr; 131} 132 133 134socklen_t 135PeerAddress::AddressSize() const 136{ 137 switch (Family()) { 138 case AF_INET: 139 return sizeof(sockaddr_in); 140 case AF_INET6: 141 return sizeof(sockaddr_in6); 142 default: 143 return 0; 144 } 145} 146 147 148uint16 149PeerAddress::Port() const 150{ 151 uint16 port; 152 153 switch (Family()) { 154 case AF_INET: 155 port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port; 156 break; 157 case AF_INET6: 158 port = reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_port; 159 break; 160 default: 161 port = 0; 162 } 163 164 return ntohs(port); 165} 166 167 168void 169PeerAddress::SetPort(uint16 port) 170{ 171 port = htons(port); 172 173 switch (Family()) { 174 case AF_INET: 175 reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port; 176 break; 177 case AF_INET6: 178 reinterpret_cast<sockaddr_in6*>(&fAddress)->sin6_port = port; 179 break; 180 } 181} 182 183const void* 184PeerAddress::InAddr() const 185{ 186 switch (Family()) { 187 case AF_INET: 188 return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr; 189 case AF_INET6: 190 return &reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_addr; 191 default: 192 return NULL; 193 } 194} 195 196 197size_t 198PeerAddress::InAddrSize() const 199{ 200 switch (Family()) { 201 case AF_INET: 202 return sizeof(in_addr); 203 case AF_INET6: 204 return sizeof(in6_addr); 205 default: 206 return 0; 207 } 208} 209 210 211AddressResolver::AddressResolver(const char* name) 212 : 213 fHead(NULL), 214 fCurrent(NULL), 215 fForcedPort(htons(NFS4_PORT)), 216 fForcedProtocol(IPPROTO_TCP) 217{ 218 fStatus = ResolveAddress(name); 219} 220 221 222AddressResolver::~AddressResolver() 223{ 224 freeaddrinfo(fHead); 225} 226 227 228status_t 229AddressResolver::ResolveAddress(const char* name) 230{ 231 ASSERT(name != NULL); 232 233 if (fHead != NULL) { 234 freeaddrinfo(fHead); 235 fHead = NULL; 236 fCurrent = NULL; 237 } 238 239 // getaddrinfo() is very expensive when called from kernel, so we do not 240 // want to call it unless there is no other choice. 241 struct sockaddr_in addr; 242 memset(&addr, 0, sizeof(addr)); 243 if (inet_aton(name, &addr.sin_addr) == 1) { 244 addr.sin_len = sizeof(addr); 245 addr.sin_family = AF_INET; 246 addr.sin_port = htons(NFS4_PORT); 247 248 memcpy(&fAddress.fAddress, &addr, sizeof(addr)); 249 fAddress.fProtocol = IPPROTO_TCP; 250 return B_OK; 251 } 252 253 status_t result = getaddrinfo(name, NULL, NULL, &fHead); 254 fCurrent = fHead; 255 256 return result; 257} 258 259 260void 261AddressResolver::ForceProtocol(const char* protocol) 262{ 263 ASSERT(protocol != NULL); 264 265 if (strcmp(protocol, "tcp") == 0) 266 fForcedProtocol = IPPROTO_TCP; 267 else if (strcmp(protocol, "udp") == 0) 268 fForcedProtocol = IPPROTO_UDP; 269 270 fAddress.SetProtocol(protocol); 271} 272 273 274void 275AddressResolver::ForcePort(uint16 port) 276{ 277 fForcedPort = htons(port); 278 fAddress.SetPort(port); 279} 280 281 282status_t 283AddressResolver::GetNextAddress(PeerAddress* address) 284{ 285 ASSERT(address != NULL); 286 287 if (fStatus != B_OK) 288 return fStatus; 289 290 if (fHead == NULL) { 291 *address = fAddress; 292 fStatus = B_NAME_NOT_FOUND; 293 return B_OK; 294 } 295 296 address->fProtocol = fForcedProtocol; 297 298 while (fCurrent != NULL) { 299 if (fCurrent->ai_family == AF_INET) { 300 memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in)); 301 reinterpret_cast<sockaddr_in*>(&address->fAddress)->sin_port 302 = fForcedPort; 303 } else if (fCurrent->ai_family == AF_INET6) { 304 memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in6)); 305 reinterpret_cast<sockaddr_in6*>(&address->fAddress)->sin6_port 306 = fForcedPort; 307 } else { 308 fCurrent = fCurrent->ai_next; 309 continue; 310 } 311 312 fCurrent = fCurrent->ai_next; 313 return B_OK; 314 } 315 316 return B_NAME_NOT_FOUND; 317} 318 319 320Connection::Connection(const PeerAddress& address) 321 : 322 ConnectionBase(address) 323{ 324} 325 326 327ConnectionListener::ConnectionListener(const PeerAddress& address) 328 : 329 ConnectionBase(address) 330{ 331} 332 333 334ConnectionBase::ConnectionBase(const PeerAddress& address) 335 : 336 fWaitCancel(create_sem(0, NULL)), 337 fSocket(-1), 338 fPeerAddress(address) 339{ 340 mutex_init(&fSocketLock, NULL); 341} 342 343 344ConnectionStream::ConnectionStream(const PeerAddress& address) 345 : 346 Connection(address) 347{ 348} 349 350 351ConnectionPacket::ConnectionPacket(const PeerAddress& address) 352 : 353 Connection(address) 354{ 355} 356 357 358ConnectionBase::~ConnectionBase() 359{ 360 if (fSocket != -1) 361 close(fSocket); 362 mutex_destroy(&fSocketLock); 363 delete_sem(fWaitCancel); 364} 365 366 367status_t 368ConnectionBase::GetLocalAddress(PeerAddress* address) 369{ 370 ASSERT(address != NULL); 371 372 address->fProtocol = fPeerAddress.fProtocol; 373 374 socklen_t addressSize = sizeof(address->fAddress); 375 return getsockname(fSocket, (struct sockaddr*)&address->fAddress, 376 &addressSize); 377} 378 379 380status_t 381ConnectionStream::Send(const void* buffer, uint32 size) 382{ 383 ASSERT(buffer != NULL); 384 385 status_t result; 386 387 uint32* buf = reinterpret_cast<uint32*>(malloc(size + sizeof(uint32))); 388 if (buf == NULL) 389 return B_NO_MEMORY; 390 MemoryDeleter _(buf); 391 392 buf[0] = htonl(size | LAST_FRAGMENT); 393 memcpy(buf + 1, buffer, size); 394 395 // More than one threads may send data and ksend is allowed to send partial 396 // data. Need a lock here. 397 uint32 sent = 0; 398 mutex_lock(&fSocketLock); 399 do { 400 result = send(fSocket, buf + sent, size + sizeof(uint32) - sent, 0); 401 sent += result; 402 } while (result > 0 && sent < size + sizeof(uint32)); 403 mutex_unlock(&fSocketLock); 404 if (result < 0) { 405 result = errno; 406 return result; 407 } else if (result == 0) 408 return B_IO_ERROR; 409 410 return B_OK; 411} 412 413 414status_t 415ConnectionPacket::Send(const void* buffer, uint32 size) 416{ 417 ASSERT(buffer != NULL); 418 ASSERT(size < 65535); 419 420 // send on DGRAM sockets is atomic. No need to lock. 421 status_t result = send(fSocket, buffer, size, 0); 422 if (result < 0) 423 return errno; 424 return B_OK; 425} 426 427 428status_t 429ConnectionStream::Receive(void** _buffer, uint32* _size) 430{ 431 ASSERT(_buffer != NULL); 432 ASSERT(_size != NULL); 433 434 status_t result; 435 436 uint32 size = 0; 437 void* buffer = NULL; 438 439 uint32 record_size; 440 bool last_one = false; 441 442 object_wait_info object[2]; 443 object[0].object = fWaitCancel; 444 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 445 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 446 447 object[1].object = fSocket; 448 object[1].type = B_OBJECT_TYPE_FD; 449 object[1].events = B_EVENT_READ; 450 451 do { 452 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 453 object[1].events = B_EVENT_READ; 454 455 result = wait_for_objects(object, 2); 456 if (result < B_OK 457 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 458 free(buffer); 459 return ECONNABORTED; 460 } else if ((object[1].events & B_EVENT_READ) == 0) 461 continue; 462 463 // There is only one listener thread per connection. No need to lock. 464 uint32 received = 0; 465 do { 466 result = recv(fSocket, ((uint8*)&record_size) + received, 467 sizeof(record_size) - received, 0); 468 received += result; 469 } while (result > 0 && received < sizeof(record_size)); 470 if (result < 0) { 471 result = errno; 472 free(buffer); 473 return result; 474 } else if (result == 0) { 475 free(buffer); 476 return ECONNABORTED; 477 } 478 479 record_size = ntohl(record_size); 480 ASSERT(record_size > 0); 481 482 last_one = (record_size & LAST_FRAGMENT) != 0; 483 record_size &= LAST_FRAGMENT - 1; 484 485 void* ptr = realloc(buffer, size + record_size); 486 if (ptr == NULL) { 487 free(buffer); 488 return B_NO_MEMORY; 489 } else 490 buffer = ptr; 491 MemoryDeleter bufferDeleter(buffer); 492 493 received = 0; 494 do { 495 result = recv(fSocket, (uint8*)buffer + size + received, 496 record_size - received, 0); 497 received += result; 498 } while (result > 0 && received < record_size); 499 if (result < 0) 500 return errno; 501 else if (result == 0) 502 return ECONNABORTED; 503 504 bufferDeleter.Detach(); 505 size += record_size; 506 } while (!last_one); 507 508 509 *_buffer = buffer; 510 *_size = size; 511 512 return B_OK; 513} 514 515 516status_t 517ConnectionPacket::Receive(void** _buffer, uint32* _size) 518{ 519 ASSERT(_buffer != NULL); 520 ASSERT(_size != NULL); 521 522 status_t result; 523 int32 size = MAX_PACKET_SIZE; 524 void* buffer = malloc(size); 525 526 if (buffer == NULL) 527 return B_NO_MEMORY; 528 529 object_wait_info object[2]; 530 object[0].object = fWaitCancel; 531 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 532 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 533 534 object[1].object = fSocket; 535 object[1].type = B_OBJECT_TYPE_FD; 536 object[1].events = B_EVENT_READ; 537 538 do { 539 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 540 object[1].events = B_EVENT_READ; 541 542 result = wait_for_objects(object, 2); 543 if (result < B_OK 544 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 545 free(buffer); 546 return ECONNABORTED; 547 } else if ((object[1].events & B_EVENT_READ) == 0) 548 continue; 549 break; 550 } while (true); 551 552 // There is only one listener thread per connection. No need to lock. 553 size = recv(fSocket, buffer, size, 0); 554 if (size < 0) { 555 result = errno; 556 free(buffer); 557 return result; 558 } else if (size == 0) { 559 free(buffer); 560 return ECONNABORTED; 561 } 562 563 *_buffer = buffer; 564 *_size = size; 565 566 return B_OK; 567} 568 569 570Connection* 571Connection::CreateObject(const PeerAddress& address) 572{ 573 switch (address.fProtocol) { 574 case IPPROTO_TCP: 575 return new(std::nothrow) ConnectionStream(address); 576 case IPPROTO_UDP: 577 return new(std::nothrow) ConnectionPacket(address); 578 default: 579 return NULL; 580 } 581} 582 583 584status_t 585Connection::Connect(Connection **_connection, const PeerAddress& address) 586{ 587 ASSERT(_connection != NULL); 588 589 Connection* conn = CreateObject(address); 590 if (conn == NULL) 591 return B_NO_MEMORY; 592 593 status_t result; 594 if (conn->fWaitCancel < B_OK) { 595 result = conn->fWaitCancel; 596 delete conn; 597 return result; 598 } 599 600 result = conn->Connect(); 601 if (result != B_OK) { 602 delete conn; 603 return result; 604 } 605 606 *_connection = conn; 607 608 return B_OK; 609} 610 611 612status_t 613Connection::SetTo(Connection **_connection, int socket, 614 const PeerAddress& address) 615{ 616 ASSERT(_connection != NULL); 617 ASSERT(socket != -1); 618 619 Connection* conn = CreateObject(address); 620 if (conn == NULL) 621 return B_NO_MEMORY; 622 623 status_t result; 624 if (conn->fWaitCancel < B_OK) { 625 result = conn->fWaitCancel; 626 delete conn; 627 return result; 628 } 629 630 conn->fSocket = socket; 631 632 *_connection = conn; 633 634 return B_OK; 635} 636 637 638status_t 639Connection::Connect() 640{ 641 switch (fPeerAddress.fProtocol) { 642 case IPPROTO_TCP: 643 fSocket = socket(fPeerAddress.Family(), SOCK_STREAM, IPPROTO_TCP); 644 break; 645 case IPPROTO_UDP: 646 fSocket = socket(fPeerAddress.Family(), SOCK_DGRAM, IPPROTO_UDP); 647 break; 648 default: 649 return B_BAD_VALUE; 650 } 651 if (fSocket < 0) 652 return errno; 653 654 status_t result; 655 uint16 port, attempt = 0; 656 657 PeerAddress address(fPeerAddress.Family()); 658 659 do { 660 port = get_random<uint16>() % (IPPORT_RESERVED - NFS_MIN_PORT); 661 port += NFS_MIN_PORT; 662 663 if (attempt == 9) 664 port = 0; 665 attempt++; 666 667 address.SetPort(port); 668 result = bind(fSocket, (sockaddr*)&address.fAddress, 669 address.AddressSize()); 670 } while (attempt <= 10 && result != B_OK); 671 672 if (attempt > 10) { 673 close(fSocket); 674 return result; 675 } 676 677 result = connect(fSocket, (sockaddr*)&fPeerAddress.fAddress, 678 fPeerAddress.AddressSize()); 679 if (result != 0) { 680 result = errno; 681 close(fSocket); 682 return result; 683 } 684 685 return B_OK; 686} 687 688 689status_t 690Connection::Reconnect() 691{ 692 release_sem(fWaitCancel); 693 close(fSocket); 694 acquire_sem(fWaitCancel); 695 return Connect(); 696} 697 698 699void 700ConnectionBase::Disconnect() 701{ 702 release_sem(fWaitCancel); 703 704 close(fSocket); 705 fSocket = -1; 706} 707 708 709status_t 710ConnectionListener::Listen(ConnectionListener** listener, int networkFamily, 711 uint16 port) 712{ 713 ASSERT(listener != NULL); 714 ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6); 715 716 int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP); 717 if (sock < 0) 718 return errno; 719 720 PeerAddress address(networkFamily); 721 address.SetPort(port); 722 address.fProtocol = IPPROTO_TCP; 723 724 status_t result = bind(sock, (sockaddr*)&address.fAddress, 725 address.AddressSize()); 726 if (result != B_OK) { 727 close(sock); 728 return errno; 729 } 730 731 if (listen(sock, 5) != B_OK) { 732 close(sock); 733 return errno; 734 } 735 736 *listener = new(std::nothrow) ConnectionListener(address); 737 if (*listener == NULL) { 738 close(sock); 739 return B_NO_MEMORY; 740 } 741 742 if ((*listener)->fWaitCancel < B_OK) { 743 result = (*listener)->fWaitCancel; 744 close(sock); 745 delete *listener; 746 return result; 747 } 748 749 (*listener)->fSocket = sock; 750 751 return B_OK; 752} 753 754 755status_t 756ConnectionListener::AcceptConnection(Connection** connection) 757{ 758 ASSERT(connection != NULL); 759 760 object_wait_info object[2]; 761 object[0].object = fWaitCancel; 762 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 763 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 764 765 object[1].object = fSocket; 766 object[1].type = B_OBJECT_TYPE_FD; 767 object[1].events = B_EVENT_READ; 768 769 do { 770 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 771 object[1].events = B_EVENT_READ; 772 773 status_t result = wait_for_objects(object, 2); 774 if (result < B_OK 775 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 776 return ECONNABORTED; 777 } else if ((object[1].events & B_EVENT_READ) == 0) 778 continue; 779 break; 780 } while (true); 781 782 sockaddr_storage addr; 783 socklen_t length = sizeof(addr); 784 int sock = accept(fSocket, reinterpret_cast<sockaddr*>(&addr), &length); 785 if (sock < 0) 786 return errno; 787 788 PeerAddress address; 789 address.fProtocol = IPPROTO_TCP; 790 address.fAddress = addr; 791 792 status_t result = Connection::SetTo(connection, sock, address); 793 if (result != B_OK) { 794 close(sock); 795 return result; 796 } 797 798 return B_OK; 799} 800 801