1/* 2 * This file Copyright (C) Mnemosyne LLC 3 * 4 * This file is licensed by the GPL version 2. Works owned by the 5 * Transmission project are granted a special exemption to clause 2(b) 6 * so that the bulk of its code can remain under the MIT license. 7 * This exemption does not extend to derived works not owned by 8 * the Transmission project. 9 * 10 * $Id: peer-io.c 13464 2012-09-05 11:39:57Z livings124 $ 11 */ 12 13#include <assert.h> 14#include <errno.h> 15#include <string.h> 16 17#include <event2/event.h> 18#include <event2/buffer.h> 19#include <event2/bufferevent.h> 20 21#include <libutp/utp.h> 22 23#include "transmission.h" 24#include "session.h" 25#include "bandwidth.h" 26#include "crypto.h" 27#include "net.h" 28#include "peer-common.h" /* MAX_BLOCK_SIZE */ 29#include "peer-io.h" 30#include "trevent.h" /* tr_runInEventThread() */ 31#include "tr-utp.h" 32#include "utils.h" 33 34 35#ifdef WIN32 36 #define EAGAIN WSAEWOULDBLOCK 37 #define EINTR WSAEINTR 38 #define EINPROGRESS WSAEINPROGRESS 39 #define EPIPE WSAECONNRESET 40#endif 41 42/* The amount of read bufferring that we allow for uTP sockets. */ 43 44#define UTP_READ_BUFFER_SIZE (256 * 1024) 45 46static size_t 47guessPacketOverhead( size_t d ) 48{ 49 /** 50 * http://sd.wareonearth.com/~phil/net/overhead/ 51 * 52 * TCP over Ethernet: 53 * Assuming no header compression (e.g. not PPP) 54 * Add 20 IPv4 header or 40 IPv6 header (no options) 55 * Add 20 TCP header 56 * Add 12 bytes optional TCP timestamps 57 * Max TCP Payload data rates over ethernet are thus: 58 * (1500-40)/(38+1500) = 94.9285 % IPv4, minimal headers 59 * (1500-52)/(38+1500) = 94.1482 % IPv4, TCP timestamps 60 * (1500-52)/(42+1500) = 93.9040 % 802.1q, IPv4, TCP timestamps 61 * (1500-60)/(38+1500) = 93.6281 % IPv6, minimal headers 62 * (1500-72)/(38+1500) = 92.8479 % IPv6, TCP timestamps 63 * (1500-72)/(42+1500) = 92.6070 % 802.1q, IPv6, ICP timestamps 64 */ 65 const double assumed_payload_data_rate = 94.0; 66 67 return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d ); 68} 69 70/** 71*** 72**/ 73 74#define dbgmsg( io, ... ) \ 75 do { \ 76 if( tr_deepLoggingIsActive( ) ) \ 77 tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \ 78 } while( 0 ) 79 80/** 81*** 82**/ 83 84struct tr_datatype 85{ 86 struct tr_datatype * next; 87 size_t length; 88 bool isPieceData; 89}; 90 91static struct tr_datatype * datatype_pool = NULL; 92 93static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false }; 94 95static struct tr_datatype * 96datatype_new( void ) 97{ 98 struct tr_datatype * ret; 99 100 if( datatype_pool == NULL ) 101 ret = tr_new( struct tr_datatype, 1 ); 102 else { 103 ret = datatype_pool; 104 datatype_pool = datatype_pool->next; 105 } 106 107 *ret = TR_DATATYPE_INIT; 108 return ret; 109} 110 111static void 112datatype_free( struct tr_datatype * datatype ) 113{ 114 datatype->next = datatype_pool; 115 datatype_pool = datatype; 116} 117 118static void 119peer_io_pull_datatype( tr_peerIo * io ) 120{ 121 struct tr_datatype * tmp; 122 123 if(( tmp = io->outbuf_datatypes )) 124 { 125 io->outbuf_datatypes = tmp->next; 126 datatype_free( tmp ); 127 } 128} 129 130static void 131peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype ) 132{ 133 struct tr_datatype * tmp; 134 135 if(( tmp = io->outbuf_datatypes )) { 136 while( tmp->next != NULL ) 137 tmp = tmp->next; 138 tmp->next = datatype; 139 } else { 140 io->outbuf_datatypes = datatype; 141 } 142} 143 144/*** 145**** 146***/ 147 148static void 149didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred ) 150{ 151 while( bytes_transferred && tr_isPeerIo( io ) ) 152 { 153 struct tr_datatype * next = io->outbuf_datatypes; 154 155 const unsigned int payload = MIN( next->length, bytes_transferred ); 156 /* For uTP sockets, the overhead is computed in utp_on_overhead. */ 157 const unsigned int overhead = 158 io->socket ? guessPacketOverhead( payload ) : 0; 159 const uint64_t now = tr_time_msec( ); 160 161 tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now ); 162 163 if( overhead > 0 ) 164 tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now ); 165 166 if( io->didWrite ) 167 io->didWrite( io, payload, next->isPieceData, io->userData ); 168 169 if( tr_isPeerIo( io ) ) 170 { 171 bytes_transferred -= payload; 172 next->length -= payload; 173 if( !next->length ) 174 peer_io_pull_datatype( io ); 175 } 176 } 177} 178 179static void 180canReadWrapper( tr_peerIo * io ) 181{ 182 bool err = 0; 183 bool done = 0; 184 tr_session * session; 185 186 dbgmsg( io, "canRead" ); 187 188 tr_peerIoRef( io ); 189 190 session = io->session; 191 192 /* try to consume the input buffer */ 193 if( io->canRead ) 194 { 195 const uint64_t now = tr_time_msec( ); 196 197 tr_sessionLock( session ); 198 199 while( !done && !err ) 200 { 201 size_t piece = 0; 202 const size_t oldLen = evbuffer_get_length( io->inbuf ); 203 const int ret = io->canRead( io, io->userData, &piece ); 204 const size_t used = oldLen - evbuffer_get_length( io->inbuf ); 205 const unsigned int overhead = guessPacketOverhead( used ); 206 207 if( piece || (piece!=used) ) 208 { 209 if( piece ) 210 tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now ); 211 212 if( used != piece ) 213 tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now ); 214 } 215 216 if( overhead > 0 ) 217 tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now ); 218 219 switch( ret ) 220 { 221 case READ_NOW: 222 if( evbuffer_get_length( io->inbuf ) ) 223 continue; 224 done = 1; 225 break; 226 227 case READ_LATER: 228 done = 1; 229 break; 230 231 case READ_ERR: 232 err = 1; 233 break; 234 } 235 236 assert( tr_isPeerIo( io ) ); 237 } 238 239 tr_sessionUnlock( session ); 240 } 241 242 tr_peerIoUnref( io ); 243} 244 245static void 246event_read_cb( int fd, short event UNUSED, void * vio ) 247{ 248 int res; 249 int e; 250 tr_peerIo * io = vio; 251 252 /* Limit the input buffer to 256K, so it doesn't grow too large */ 253 unsigned int howmuch; 254 unsigned int curlen; 255 const tr_direction dir = TR_DOWN; 256 const unsigned int max = 256 * 1024; 257 258 assert( tr_isPeerIo( io ) ); 259 assert( io->socket >= 0 ); 260 261 io->pendingEvents &= ~EV_READ; 262 263 curlen = evbuffer_get_length( io->inbuf ); 264 howmuch = curlen >= max ? 0 : max - curlen; 265 howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch ); 266 267 dbgmsg( io, "libevent says this peer is ready to read" ); 268 269 /* if we don't have any bandwidth left, stop reading */ 270 if( howmuch < 1 ) { 271 tr_peerIoSetEnabled( io, dir, false ); 272 return; 273 } 274 275 EVUTIL_SET_SOCKET_ERROR( 0 ); 276 res = evbuffer_read( io->inbuf, fd, (int)howmuch ); 277 e = EVUTIL_SOCKET_ERROR( ); 278 279 if( res > 0 ) 280 { 281 tr_peerIoSetEnabled( io, dir, true ); 282 283 /* Invoke the user callback - must always be called last */ 284 canReadWrapper( io ); 285 } 286 else 287 { 288 char errstr[512]; 289 short what = BEV_EVENT_READING; 290 291 if( res == 0 ) /* EOF */ 292 what |= BEV_EVENT_EOF; 293 else if( res == -1 ) { 294 if( e == EAGAIN || e == EINTR ) { 295 tr_peerIoSetEnabled( io, dir, true ); 296 return; 297 } 298 what |= BEV_EVENT_ERROR; 299 } 300 301 dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", 302 res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); 303 304 if( io->gotError != NULL ) 305 io->gotError( io, what, io->userData ); 306 } 307} 308 309static int 310tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch ) 311{ 312 int e; 313 int n; 314 char errstr[256]; 315 316 EVUTIL_SET_SOCKET_ERROR( 0 ); 317 n = evbuffer_write_atmost( io->outbuf, fd, howmuch ); 318 e = EVUTIL_SOCKET_ERROR( ); 319 dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") ); 320 321 return n; 322} 323 324static void 325event_write_cb( int fd, short event UNUSED, void * vio ) 326{ 327 int res = 0; 328 int e; 329 short what = BEV_EVENT_WRITING; 330 tr_peerIo * io = vio; 331 size_t howmuch; 332 const tr_direction dir = TR_UP; 333 char errstr[1024]; 334 335 assert( tr_isPeerIo( io ) ); 336 assert( io->socket >= 0 ); 337 338 io->pendingEvents &= ~EV_WRITE; 339 340 dbgmsg( io, "libevent says this peer is ready to write" ); 341 342 /* Write as much as possible, since the socket is non-blocking, write() will 343 * return if it can't write any more data without blocking */ 344 howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) ); 345 346 /* if we don't have any bandwidth left, stop writing */ 347 if( howmuch < 1 ) { 348 tr_peerIoSetEnabled( io, dir, false ); 349 return; 350 } 351 352 EVUTIL_SET_SOCKET_ERROR( 0 ); 353 res = tr_evbuffer_write( io, fd, howmuch ); 354 e = EVUTIL_SOCKET_ERROR( ); 355 356 if (res == -1) { 357 if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS) 358 goto reschedule; 359 /* error case */ 360 what |= BEV_EVENT_ERROR; 361 } else if (res == 0) { 362 /* eof case */ 363 what |= BEV_EVENT_EOF; 364 } 365 if (res <= 0) 366 goto error; 367 368 if( evbuffer_get_length( io->outbuf ) ) 369 tr_peerIoSetEnabled( io, dir, true ); 370 371 didWriteWrapper( io, res ); 372 return; 373 374 reschedule: 375 if( evbuffer_get_length( io->outbuf ) ) 376 tr_peerIoSetEnabled( io, dir, true ); 377 return; 378 379 error: 380 381 tr_net_strerror( errstr, sizeof( errstr ), e ); 382 dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr ); 383 384 if( io->gotError != NULL ) 385 io->gotError( io, what, io->userData ); 386} 387 388/** 389*** 390**/ 391 392static void 393maybeSetCongestionAlgorithm( int socket, const char * algorithm ) 394{ 395 if( algorithm && *algorithm ) 396 { 397 const int rc = tr_netSetCongestionControl( socket, algorithm ); 398 399 if( rc < 0 ) 400 tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s", 401 algorithm, tr_strerror( errno )); 402 } 403} 404 405#ifdef WITH_UTP 406/* UTP callbacks */ 407 408static void 409utp_on_read(void *closure, const unsigned char *buf, size_t buflen) 410{ 411 int rc; 412 tr_peerIo *io = closure; 413 assert( tr_isPeerIo( io ) ); 414 415 rc = evbuffer_add( io->inbuf, buf, buflen ); 416 dbgmsg( io, "utp_on_read got %zu bytes", buflen ); 417 418 if( rc < 0 ) { 419 tr_nerr( "UTP", "On read evbuffer_add" ); 420 return; 421 } 422 423 tr_peerIoSetEnabled( io, TR_DOWN, true ); 424 canReadWrapper( io ); 425} 426 427static void 428utp_on_write(void *closure, unsigned char *buf, size_t buflen) 429{ 430 int rc; 431 tr_peerIo *io = closure; 432 assert( tr_isPeerIo( io ) ); 433 434 rc = evbuffer_remove( io->outbuf, buf, buflen ); 435 dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc ); 436 assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */ 437 if( rc < (long)buflen ) { 438 tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen); 439 } 440 441 didWriteWrapper( io, buflen ); 442} 443 444static size_t 445utp_get_rb_size(void *closure) 446{ 447 size_t bytes; 448 tr_peerIo *io = closure; 449 assert( tr_isPeerIo( io ) ); 450 451 bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE ); 452 453 dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes ); 454 return UTP_READ_BUFFER_SIZE - bytes; 455} 456 457static int tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch ); 458 459static void 460utp_on_writable( tr_peerIo *io ) 461{ 462 int n; 463 464 dbgmsg( io, "libutp says this peer is ready to write" ); 465 466 n = tr_peerIoTryWrite( io, SIZE_MAX ); 467 tr_peerIoSetEnabled( io, TR_UP, n && evbuffer_get_length( io->outbuf ) ); 468} 469 470static void 471utp_on_state_change(void *closure, int state) 472{ 473 tr_peerIo *io = closure; 474 assert( tr_isPeerIo( io ) ); 475 476 if( state == UTP_STATE_CONNECT ) { 477 dbgmsg( io, "utp_on_state_change -- changed to connected" ); 478 io->utpSupported = true; 479 } else if( state == UTP_STATE_WRITABLE ) { 480 dbgmsg( io, "utp_on_state_change -- changed to writable" ); 481 if ( io->pendingEvents & EV_WRITE ) 482 utp_on_writable( io ); 483 } else if( state == UTP_STATE_EOF ) { 484 if( io->gotError ) 485 io->gotError( io, BEV_EVENT_EOF, io->userData ); 486 } else if( state == UTP_STATE_DESTROYING ) { 487 tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" ); 488 return; 489 } else { 490 tr_nerr( "UTP", "Unknown state %d", state ); 491 } 492} 493 494static void 495utp_on_error(void *closure, int errcode) 496{ 497 tr_peerIo *io = closure; 498 assert( tr_isPeerIo( io ) ); 499 500 dbgmsg( io, "utp_on_error -- errcode is %d", errcode ); 501 502 if( io->gotError ) { 503 errno = errcode; 504 io->gotError( io, BEV_EVENT_ERROR, io->userData ); 505 } 506} 507 508static void 509utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED) 510{ 511 tr_peerIo *io = closure; 512 assert( tr_isPeerIo( io ) ); 513 514 dbgmsg( io, "utp_on_overhead -- count is %zu", count ); 515 516 tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN, 517 count, false, tr_time_msec() ); 518} 519 520static struct UTPFunctionTable utp_function_table = { 521 .on_read = utp_on_read, 522 .on_write = utp_on_write, 523 .get_rb_size = utp_get_rb_size, 524 .on_state = utp_on_state_change, 525 .on_error = utp_on_error, 526 .on_overhead = utp_on_overhead 527}; 528 529 530/* Dummy UTP callbacks. */ 531/* We switch a UTP socket to use these after the associated peerIo has been 532 destroyed -- see io_dtor. */ 533 534static void 535dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED ) 536{ 537 /* This cannot happen, as far as I'm aware. */ 538 tr_nerr( "UTP", "On_read called on closed socket" ); 539 540} 541 542static void 543dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen) 544{ 545 /* This can very well happen if we've shut down a peer connection that 546 had unflushed buffers. Complain and send zeroes. */ 547 tr_ndbg( "UTP", "On_write called on closed socket" ); 548 memset( buf, 0, buflen ); 549} 550 551static size_t 552dummy_get_rb_size( void * closure UNUSED ) 553{ 554 return 0; 555} 556 557static void 558dummy_on_state_change(void * closure UNUSED, int state UNUSED ) 559{ 560 return; 561} 562 563static void 564dummy_on_error( void * closure UNUSED, int errcode UNUSED ) 565{ 566 return; 567} 568 569static void 570dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED ) 571{ 572 return; 573} 574 575static struct UTPFunctionTable dummy_utp_function_table = { 576 .on_read = dummy_read, 577 .on_write = dummy_write, 578 .get_rb_size = dummy_get_rb_size, 579 .on_state = dummy_on_state_change, 580 .on_error = dummy_on_error, 581 .on_overhead = dummy_on_overhead 582}; 583 584#endif /* #ifdef WITH_UTP */ 585 586static tr_peerIo* 587tr_peerIoNew( tr_session * session, 588 tr_bandwidth * parent, 589 const tr_address * addr, 590 tr_port port, 591 const uint8_t * torrentHash, 592 bool isIncoming, 593 bool isSeed, 594 int socket, 595 struct UTPSocket * utp_socket) 596{ 597 tr_peerIo * io; 598 599 assert( session != NULL ); 600 assert( session->events != NULL ); 601 assert( tr_isBool( isIncoming ) ); 602 assert( tr_isBool( isSeed ) ); 603 assert( tr_amInEventThread( session ) ); 604 assert( (socket < 0) == (utp_socket != NULL) ); 605#ifndef WITH_UTP 606 assert( socket >= 0 ); 607#endif 608 609 if( socket >= 0 ) { 610 tr_netSetTOS( socket, session->peerSocketTOS ); 611 maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm ); 612 } 613 614 io = tr_new0( tr_peerIo, 1 ); 615 io->magicNumber = PEER_IO_MAGIC_NUMBER; 616 io->refCount = 1; 617 tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming ); 618 io->session = session; 619 io->addr = *addr; 620 io->isSeed = isSeed; 621 io->port = port; 622 io->socket = socket; 623 io->utp_socket = utp_socket; 624 io->isIncoming = isIncoming != 0; 625 io->timeCreated = tr_time( ); 626 io->inbuf = evbuffer_new( ); 627 io->outbuf = evbuffer_new( ); 628 tr_bandwidthConstruct( &io->bandwidth, session, parent ); 629 tr_bandwidthSetPeer( &io->bandwidth, io ); 630 dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent ); 631 dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket ); 632 633 if( io->socket >= 0 ) { 634 io->event_read = event_new( session->event_base, 635 io->socket, EV_READ, event_read_cb, io ); 636 io->event_write = event_new( session->event_base, 637 io->socket, EV_WRITE, event_write_cb, io ); 638 } 639#ifdef WITH_UTP 640 else { 641 UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE ); 642 dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" ); 643 UTP_SetCallbacks( utp_socket, 644 &utp_function_table, 645 io ); 646 if( !isIncoming ) { 647 dbgmsg( io, "%s", "calling UTP_Connect" ); 648 UTP_Connect( utp_socket ); 649 } 650 } 651#endif 652 653 return io; 654} 655 656tr_peerIo* 657tr_peerIoNewIncoming( tr_session * session, 658 tr_bandwidth * parent, 659 const tr_address * addr, 660 tr_port port, 661 int fd, 662 struct UTPSocket * utp_socket ) 663{ 664 assert( session ); 665 assert( tr_address_is_valid( addr ) ); 666 667 return tr_peerIoNew( session, parent, addr, port, NULL, true, false, 668 fd, utp_socket ); 669} 670 671tr_peerIo* 672tr_peerIoNewOutgoing( tr_session * session, 673 tr_bandwidth * parent, 674 const tr_address * addr, 675 tr_port port, 676 const uint8_t * torrentHash, 677 bool isSeed, 678 bool utp ) 679{ 680 int fd = -1; 681 struct UTPSocket * utp_socket = NULL; 682 683 assert( session ); 684 assert( tr_address_is_valid( addr ) ); 685 assert( torrentHash ); 686 687 if( utp ) 688 utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed ); 689 690 if( !utp_socket ) { 691 fd = tr_netOpenPeerSocket( session, addr, port, isSeed ); 692 dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd ); 693 } 694 695 if( fd < 0 && utp_socket == NULL ) 696 return NULL; 697 698 return tr_peerIoNew( session, parent, addr, port, 699 torrentHash, false, isSeed, fd, utp_socket ); 700} 701 702/*** 703**** 704***/ 705 706static void 707event_enable( tr_peerIo * io, short event ) 708{ 709 assert( tr_amInEventThread( io->session ) ); 710 assert( io->session != NULL ); 711 assert( io->session->events != NULL ); 712 713 if( io->socket >= 0 ) 714 { 715 assert( event_initialized( io->event_read ) ); 716 assert( event_initialized( io->event_write ) ); 717 } 718 719 if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) ) 720 { 721 dbgmsg( io, "enabling ready-to-read polling" ); 722 if( io->socket >= 0 ) 723 event_add( io->event_read, NULL ); 724 io->pendingEvents |= EV_READ; 725 } 726 727 if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) ) 728 { 729 dbgmsg( io, "enabling ready-to-write polling" ); 730 if( io->socket >= 0 ) 731 event_add( io->event_write, NULL ); 732 io->pendingEvents |= EV_WRITE; 733 } 734} 735 736static void 737event_disable( struct tr_peerIo * io, short event ) 738{ 739 assert( tr_amInEventThread( io->session ) ); 740 assert( io->session != NULL ); 741 assert( io->session->events != NULL ); 742 743 if( io->socket >= 0 ) 744 { 745 assert( event_initialized( io->event_read ) ); 746 assert( event_initialized( io->event_write ) ); 747 } 748 749 if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) ) 750 { 751 dbgmsg( io, "disabling ready-to-read polling" ); 752 if( io->socket >= 0 ) 753 event_del( io->event_read ); 754 io->pendingEvents &= ~EV_READ; 755 } 756 757 if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) ) 758 { 759 dbgmsg( io, "disabling ready-to-write polling" ); 760 if( io->socket >= 0 ) 761 event_del( io->event_write ); 762 io->pendingEvents &= ~EV_WRITE; 763 } 764} 765 766void 767tr_peerIoSetEnabled( tr_peerIo * io, 768 tr_direction dir, 769 bool isEnabled ) 770{ 771 const short event = dir == TR_UP ? EV_WRITE : EV_READ; 772 773 assert( tr_isPeerIo( io ) ); 774 assert( tr_isDirection( dir ) ); 775 assert( tr_amInEventThread( io->session ) ); 776 assert( io->session->events != NULL ); 777 778 if( isEnabled ) 779 event_enable( io, event ); 780 else 781 event_disable( io, event ); 782} 783 784/*** 785**** 786***/ 787static void 788io_close_socket( tr_peerIo * io ) 789{ 790 if( io->socket >= 0 ) { 791 tr_netClose( io->session, io->socket ); 792 io->socket = -1; 793 } 794 795 if( io->event_read != NULL) { 796 event_free( io->event_read ); 797 io->event_read = NULL; 798 } 799 800 if( io->event_write != NULL) { 801 event_free( io->event_write ); 802 io->event_write = NULL; 803 } 804 805#ifdef WITH_UTP 806 if( io->utp_socket ) { 807 UTP_SetCallbacks( io->utp_socket, 808 &dummy_utp_function_table, 809 NULL ); 810 UTP_Close( io->utp_socket ); 811 812 io->utp_socket = NULL; 813 } 814#endif 815} 816 817static void 818io_dtor( void * vio ) 819{ 820 tr_peerIo * io = vio; 821 822 assert( tr_isPeerIo( io ) ); 823 assert( tr_amInEventThread( io->session ) ); 824 assert( io->session->events != NULL ); 825 826 dbgmsg( io, "in tr_peerIo destructor" ); 827 event_disable( io, EV_READ | EV_WRITE ); 828 tr_bandwidthDestruct( &io->bandwidth ); 829 evbuffer_free( io->outbuf ); 830 evbuffer_free( io->inbuf ); 831 io_close_socket( io ); 832 tr_cryptoDestruct( &io->crypto ); 833 834 while( io->outbuf_datatypes != NULL ) 835 peer_io_pull_datatype( io ); 836 837 memset( io, ~0, sizeof( tr_peerIo ) ); 838 tr_free( io ); 839} 840 841static void 842tr_peerIoFree( tr_peerIo * io ) 843{ 844 if( io ) 845 { 846 dbgmsg( io, "in tr_peerIoFree" ); 847 io->canRead = NULL; 848 io->didWrite = NULL; 849 io->gotError = NULL; 850 tr_runInEventThread( io->session, io_dtor, io ); 851 } 852} 853 854void 855tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io ) 856{ 857 assert( tr_isPeerIo( io ) ); 858 859 dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d", 860 file, line, io->refCount, io->refCount+1 ); 861 862 ++io->refCount; 863} 864 865void 866tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io ) 867{ 868 assert( tr_isPeerIo( io ) ); 869 870 dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d", 871 file, line, io->refCount, io->refCount-1 ); 872 873 if( !--io->refCount ) 874 tr_peerIoFree( io ); 875} 876 877const tr_address* 878tr_peerIoGetAddress( const tr_peerIo * io, tr_port * port ) 879{ 880 assert( tr_isPeerIo( io ) ); 881 882 if( port ) 883 *port = io->port; 884 885 return &io->addr; 886} 887 888const char* 889tr_peerIoAddrStr( const tr_address * addr, tr_port port ) 890{ 891 static char buf[512]; 892 tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) ); 893 return buf; 894} 895 896const char* tr_peerIoGetAddrStr( const tr_peerIo * io ) 897{ 898 return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error"; 899} 900 901void 902tr_peerIoSetIOFuncs( tr_peerIo * io, 903 tr_can_read_cb readcb, 904 tr_did_write_cb writecb, 905 tr_net_error_cb errcb, 906 void * userData ) 907{ 908 io->canRead = readcb; 909 io->didWrite = writecb; 910 io->gotError = errcb; 911 io->userData = userData; 912} 913 914void 915tr_peerIoClear( tr_peerIo * io ) 916{ 917 tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL ); 918 tr_peerIoSetEnabled( io, TR_UP, false ); 919 tr_peerIoSetEnabled( io, TR_DOWN, false ); 920} 921 922int 923tr_peerIoReconnect( tr_peerIo * io ) 924{ 925 short int pendingEvents; 926 tr_session * session; 927 928 assert( tr_isPeerIo( io ) ); 929 assert( !tr_peerIoIsIncoming( io ) ); 930 931 session = tr_peerIoGetSession( io ); 932 933 pendingEvents = io->pendingEvents; 934 event_disable( io, EV_READ | EV_WRITE ); 935 936 io_close_socket( io ); 937 938 io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed ); 939 io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io ); 940 io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io ); 941 942 if( io->socket >= 0 ) 943 { 944 event_enable( io, pendingEvents ); 945 tr_netSetTOS( io->socket, session->peerSocketTOS ); 946 maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm ); 947 return 0; 948 } 949 950 return -1; 951} 952 953/** 954*** 955**/ 956 957void 958tr_peerIoSetTorrentHash( tr_peerIo * io, 959 const uint8_t * hash ) 960{ 961 assert( tr_isPeerIo( io ) ); 962 963 tr_cryptoSetTorrentHash( &io->crypto, hash ); 964} 965 966const uint8_t* 967tr_peerIoGetTorrentHash( tr_peerIo * io ) 968{ 969 assert( tr_isPeerIo( io ) ); 970 971 return tr_cryptoGetTorrentHash( &io->crypto ); 972} 973 974int 975tr_peerIoHasTorrentHash( const tr_peerIo * io ) 976{ 977 assert( tr_isPeerIo( io ) ); 978 979 return tr_cryptoHasTorrentHash( &io->crypto ); 980} 981 982/** 983*** 984**/ 985 986void 987tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id ) 988{ 989 assert( tr_isPeerIo( io ) ); 990 991 if( ( io->peerIdIsSet = peer_id != NULL ) ) 992 memcpy( io->peerId, peer_id, 20 ); 993 else 994 memset( io->peerId, 0, 20 ); 995} 996 997/** 998*** 999**/ 1000 1001static unsigned int 1002getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now ) 1003{ 1004 /* this is all kind of arbitrary, but what seems to work well is 1005 * being large enough to hold the next 20 seconds' worth of input, 1006 * or a few blocks, whichever is bigger. 1007 * It's okay to tweak this as needed */ 1008 const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP ); 1009 const unsigned int period = 15u; /* arbitrary */ 1010 /* the 3 is arbitrary; the .5 is to leave room for messages */ 1011 static const unsigned int ceiling = (unsigned int)( MAX_BLOCK_SIZE * 3.5 ); 1012 return MAX( ceiling, currentSpeed_Bps*period ); 1013} 1014 1015size_t 1016tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now ) 1017{ 1018 const size_t desiredLen = getDesiredOutputBufferSize( io, now ); 1019 const size_t currentLen = evbuffer_get_length( io->outbuf ); 1020 size_t freeSpace = 0; 1021 1022 if( desiredLen > currentLen ) 1023 freeSpace = desiredLen - currentLen; 1024 1025 return freeSpace; 1026} 1027 1028/** 1029*** 1030**/ 1031 1032void 1033tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type ) 1034{ 1035 assert( tr_isPeerIo( io ) ); 1036 assert( encryption_type == PEER_ENCRYPTION_NONE 1037 || encryption_type == PEER_ENCRYPTION_RC4 ); 1038 1039 io->encryption_type = encryption_type; 1040} 1041 1042/** 1043*** 1044**/ 1045 1046static void 1047addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData ) 1048{ 1049 struct tr_datatype * d; 1050 d = datatype_new( ); 1051 d->isPieceData = isPieceData != 0; 1052 d->length = byteCount; 1053 peer_io_push_datatype( io, d ); 1054} 1055 1056static void 1057maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf ) 1058{ 1059 if( io->encryption_type == PEER_ENCRYPTION_RC4 ) 1060 { 1061 struct evbuffer_ptr pos; 1062 struct evbuffer_iovec iovec; 1063 evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET ); 1064 do { 1065 evbuffer_peek( buf, -1, &pos, &iovec, 1 ); 1066 tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base ); 1067 } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) ); 1068 } 1069} 1070 1071void 1072tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData ) 1073{ 1074 const size_t byteCount = evbuffer_get_length( buf ); 1075 maybeEncryptBuffer( io, buf ); 1076 evbuffer_add_buffer( io->outbuf, buf ); 1077 addDatatype( io, byteCount, isPieceData ); 1078} 1079 1080void 1081tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData ) 1082{ 1083 struct evbuffer_iovec iovec; 1084 evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 ); 1085 1086 iovec.iov_len = byteCount; 1087 if( io->encryption_type == PEER_ENCRYPTION_RC4 ) 1088 tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base ); 1089 else 1090 memcpy( iovec.iov_base, bytes, iovec.iov_len ); 1091 evbuffer_commit_space( io->outbuf, &iovec, 1 ); 1092 1093 addDatatype( io, byteCount, isPieceData ); 1094} 1095 1096/*** 1097**** 1098***/ 1099 1100void 1101evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte ) 1102{ 1103 evbuffer_add( outbuf, &byte, 1 ); 1104} 1105 1106void 1107evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs ) 1108{ 1109 const uint16_t ns = htons( addme_hs ); 1110 evbuffer_add( outbuf, &ns, sizeof( ns ) ); 1111} 1112 1113void 1114evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl ) 1115{ 1116 const uint32_t nl = htonl( addme_hl ); 1117 evbuffer_add( outbuf, &nl, sizeof( nl ) ); 1118} 1119 1120void 1121evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll ) 1122{ 1123 const uint64_t nll = tr_htonll( addme_hll ); 1124 evbuffer_add( outbuf, &nll, sizeof( nll ) ); 1125} 1126 1127/*** 1128**** 1129***/ 1130 1131void 1132tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount ) 1133{ 1134 struct evbuffer * tmp; 1135 const size_t old_length = evbuffer_get_length( outbuf ); 1136 1137 assert( tr_isPeerIo( io ) ); 1138 assert( evbuffer_get_length( inbuf ) >= byteCount ); 1139 1140 /* append it to outbuf */ 1141 tmp = evbuffer_new( ); 1142 evbuffer_remove_buffer( inbuf, tmp, byteCount ); 1143 evbuffer_add_buffer( outbuf, tmp ); 1144 evbuffer_free( tmp ); 1145 1146 /* decrypt if needed */ 1147 if( io->encryption_type == PEER_ENCRYPTION_RC4 ) { 1148 struct evbuffer_ptr pos; 1149 struct evbuffer_iovec iovec; 1150 evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET ); 1151 do { 1152 evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 ); 1153 tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base ); 1154 byteCount -= iovec.iov_len; 1155 } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) ); 1156 } 1157} 1158 1159void 1160tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount ) 1161{ 1162 assert( tr_isPeerIo( io ) ); 1163 assert( evbuffer_get_length( inbuf ) >= byteCount ); 1164 1165 switch( io->encryption_type ) 1166 { 1167 case PEER_ENCRYPTION_NONE: 1168 evbuffer_remove( inbuf, bytes, byteCount ); 1169 break; 1170 1171 case PEER_ENCRYPTION_RC4: 1172 evbuffer_remove( inbuf, bytes, byteCount ); 1173 tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes ); 1174 break; 1175 1176 default: 1177 assert( 0 ); 1178 } 1179} 1180 1181void 1182tr_peerIoReadUint16( tr_peerIo * io, 1183 struct evbuffer * inbuf, 1184 uint16_t * setme ) 1185{ 1186 uint16_t tmp; 1187 tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) ); 1188 *setme = ntohs( tmp ); 1189} 1190 1191void tr_peerIoReadUint32( tr_peerIo * io, 1192 struct evbuffer * inbuf, 1193 uint32_t * setme ) 1194{ 1195 uint32_t tmp; 1196 tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) ); 1197 *setme = ntohl( tmp ); 1198} 1199 1200void 1201tr_peerIoDrain( tr_peerIo * io, 1202 struct evbuffer * inbuf, 1203 size_t byteCount ) 1204{ 1205 char buf[4096]; 1206 const size_t buflen = sizeof( buf ); 1207 1208 while( byteCount > 0 ) 1209 { 1210 const size_t thisPass = MIN( byteCount, buflen ); 1211 tr_peerIoReadBytes( io, inbuf, buf, thisPass ); 1212 byteCount -= thisPass; 1213 } 1214} 1215 1216/*** 1217**** 1218***/ 1219 1220static int 1221tr_peerIoTryRead( tr_peerIo * io, size_t howmuch ) 1222{ 1223 int res = 0; 1224 1225 if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch ))) 1226 { 1227 if( io->utp_socket != NULL ) /* utp peer connection */ 1228 { 1229 /* UTP_RBDrained notifies libutp that your read buffer is emtpy. 1230 * It opens up the congestion window by sending an ACK (soonish) 1231 * if one was not going to be sent. */ 1232 if( evbuffer_get_length( io->inbuf ) == 0 ) 1233 UTP_RBDrained( io->utp_socket ); 1234 } 1235 else /* tcp peer connection */ 1236 { 1237 int e; 1238 1239 EVUTIL_SET_SOCKET_ERROR( 0 ); 1240 res = evbuffer_read( io->inbuf, io->socket, (int)howmuch ); 1241 e = EVUTIL_SOCKET_ERROR( ); 1242 1243 dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") ); 1244 1245 if( evbuffer_get_length( io->inbuf ) ) 1246 canReadWrapper( io ); 1247 1248 if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) ) 1249 { 1250 char errstr[512]; 1251 short what = BEV_EVENT_READING | BEV_EVENT_ERROR; 1252 if( res == 0 ) 1253 what |= BEV_EVENT_EOF; 1254 dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", 1255 res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); 1256 io->gotError( io, what, io->userData ); 1257 } 1258 } 1259 } 1260 1261 return res; 1262} 1263 1264static int 1265tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch ) 1266{ 1267 int n = 0; 1268 const size_t old_len = evbuffer_get_length( io->outbuf ); 1269 dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch ); 1270 1271 if( howmuch > old_len ) 1272 howmuch = old_len; 1273 1274 if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch ))) 1275 { 1276 if( io->utp_socket != NULL ) /* utp peer connection */ 1277 { 1278 UTP_Write( io->utp_socket, howmuch ); 1279 n = old_len - evbuffer_get_length( io->outbuf ); 1280 } 1281 else 1282 { 1283 int e; 1284 1285 EVUTIL_SET_SOCKET_ERROR( 0 ); 1286 n = tr_evbuffer_write( io, io->socket, howmuch ); 1287 e = EVUTIL_SOCKET_ERROR( ); 1288 1289 if( n > 0 ) 1290 didWriteWrapper( io, n ); 1291 1292 if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) ) 1293 { 1294 char errstr[512]; 1295 const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR; 1296 1297 dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", 1298 n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); 1299 1300 if( io->gotError != NULL ) 1301 io->gotError( io, what, io->userData ); 1302 } 1303 } 1304 } 1305 1306 return n; 1307} 1308 1309int 1310tr_peerIoFlush( tr_peerIo * io, tr_direction dir, size_t limit ) 1311{ 1312 int bytesUsed = 0; 1313 1314 assert( tr_isPeerIo( io ) ); 1315 assert( tr_isDirection( dir ) ); 1316 1317 if( dir == TR_DOWN ) 1318 bytesUsed = tr_peerIoTryRead( io, limit ); 1319 else 1320 bytesUsed = tr_peerIoTryWrite( io, limit ); 1321 1322 dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed ); 1323 return bytesUsed; 1324} 1325 1326int 1327tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io ) 1328{ 1329 size_t byteCount = 0; 1330 const struct tr_datatype * it; 1331 1332 /* count up how many bytes are used by non-piece-data messages 1333 at the front of our outbound queue */ 1334 for( it=io->outbuf_datatypes; it!=NULL; it=it->next ) 1335 if( it->isPieceData ) 1336 break; 1337 else 1338 byteCount += it->length; 1339 1340 return tr_peerIoFlush( io, TR_UP, byteCount ); 1341} 1342