1/* 2 * This file Copyright (C) Mnemosyne LLC 3 * 4 * This file is licensed by the GPL version 2. Works owned by the 5 * Transmission project are granted a special exemption to clause 2(b) 6 * so that the bulk of its code can remain under the MIT license. 7 * This exemption does not extend to derived works not owned by 8 * the Transmission project. 9 * 10 * $Id: peer-msgs.c 13504 2012-09-19 05:11:19Z jordan $ 11 */ 12 13#include <assert.h> 14#include <errno.h> 15#include <stdarg.h> 16#include <stdlib.h> 17#include <string.h> 18 19#include <event2/buffer.h> 20#include <event2/bufferevent.h> 21#include <event2/event.h> 22 23#include "transmission.h" 24#include "bencode.h" 25#include "cache.h" 26#include "completion.h" 27#include "crypto.h" /* tr_sha1() */ 28#include "peer-io.h" 29#include "peer-mgr.h" 30#include "peer-msgs.h" 31#include "session.h" 32#include "torrent.h" 33#include "torrent-magnet.h" 34#include "tr-dht.h" 35#include "utils.h" 36#include "version.h" 37 38/** 39*** 40**/ 41 42enum 43{ 44 BT_CHOKE = 0, 45 BT_UNCHOKE = 1, 46 BT_INTERESTED = 2, 47 BT_NOT_INTERESTED = 3, 48 BT_HAVE = 4, 49 BT_BITFIELD = 5, 50 BT_REQUEST = 6, 51 BT_PIECE = 7, 52 BT_CANCEL = 8, 53 BT_PORT = 9, 54 55 BT_FEXT_SUGGEST = 13, 56 BT_FEXT_HAVE_ALL = 14, 57 BT_FEXT_HAVE_NONE = 15, 58 BT_FEXT_REJECT = 16, 59 BT_FEXT_ALLOWED_FAST = 17, 60 61 BT_LTEP = 20, 62 63 LTEP_HANDSHAKE = 0, 64 65 UT_PEX_ID = 1, 66 UT_METADATA_ID = 3, 67 68 MAX_PEX_PEER_COUNT = 50, 69 70 MIN_CHOKE_PERIOD_SEC = 10, 71 72 /* idle seconds before we send a keepalive */ 73 KEEPALIVE_INTERVAL_SECS = 100, 74 75 PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */ 76 77 REQQ = 512, 78 79 METADATA_REQQ = 64, 80 81 /* used in lowering the outMessages queue period */ 82 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0, 83 HIGH_PRIORITY_INTERVAL_SECS = 2, 84 LOW_PRIORITY_INTERVAL_SECS = 10, 85 86 /* number of pieces we'll allow in our fast set */ 87 MAX_FAST_SET_SIZE = 3, 88 89 /* defined in BEP #9 */ 90 METADATA_MSG_TYPE_REQUEST = 0, 91 METADATA_MSG_TYPE_DATA = 1, 92 METADATA_MSG_TYPE_REJECT = 2 93}; 94 95enum 96{ 97 AWAITING_BT_LENGTH, 98 AWAITING_BT_ID, 99 AWAITING_BT_MESSAGE, 100 AWAITING_BT_PIECE 101}; 102 103/** 104*** 105**/ 106 107struct peer_request 108{ 109 uint32_t index; 110 uint32_t offset; 111 uint32_t length; 112}; 113 114static void 115blockToReq( const tr_torrent * tor, 116 tr_block_index_t block, 117 struct peer_request * setme ) 118{ 119 tr_torrentGetBlockLocation( tor, block, &setme->index, 120 &setme->offset, 121 &setme->length ); 122} 123 124/** 125*** 126**/ 127 128/* this is raw, unchanged data from the peer regarding 129 * the current message that it's sending us. */ 130struct tr_incoming 131{ 132 uint8_t id; 133 uint32_t length; /* includes the +1 for id length */ 134 struct peer_request blockReq; /* metadata for incoming blocks */ 135 struct evbuffer * block; /* piece data for incoming blocks */ 136}; 137 138/** 139 * Low-level communication state information about a connected peer. 140 * 141 * This structure remembers the low-level protocol states that we're 142 * in with this peer, such as active requests, pex messages, and so on. 143 * Its fields are all private to peer-msgs.c. 144 * 145 * Data not directly involved with sending & receiving messages is 146 * stored in tr_peer, where it can be accessed by both peermsgs and 147 * the peer manager. 148 * 149 * @see struct peer_atom 150 * @see tr_peer 151 */ 152struct tr_peermsgs 153{ 154 bool peerSupportsPex; 155 bool peerSupportsMetadataXfer; 156 bool clientSentLtepHandshake; 157 bool peerSentLtepHandshake; 158 159 /*bool haveFastSet;*/ 160 161 int desiredRequestCount; 162 163 int prefetchCount; 164 165 /* how long the outMessages batch should be allowed to grow before 166 * it's flushed -- some messages (like requests >:) should be sent 167 * very quickly; others aren't as urgent. */ 168 int8_t outMessagesBatchPeriod; 169 170 uint8_t state; 171 uint8_t ut_pex_id; 172 uint8_t ut_metadata_id; 173 uint16_t pexCount; 174 uint16_t pexCount6; 175 176 size_t metadata_size_hint; 177#if 0 178 size_t fastsetSize; 179 tr_piece_index_t fastset[MAX_FAST_SET_SIZE]; 180#endif 181 182 tr_peer * peer; 183 184 tr_torrent * torrent; 185 186 tr_peer_callback * callback; 187 void * callbackData; 188 189 struct evbuffer * outMessages; /* all the non-piece messages */ 190 191 struct peer_request peerAskedFor[REQQ]; 192 193 int peerAskedForMetadata[METADATA_REQQ]; 194 int peerAskedForMetadataCount; 195 196 tr_pex * pex; 197 tr_pex * pex6; 198 199 /*time_t clientSentPexAt;*/ 200 time_t clientSentAnythingAt; 201 202 /* when we started batching the outMessages */ 203 time_t outMessagesBatchedAt; 204 205 struct tr_incoming incoming; 206 207 /* if the peer supports the Extension Protocol in BEP 10 and 208 supplied a reqq argument, it's stored here. Otherwise, the 209 value is zero and should be ignored. */ 210 int64_t reqq; 211 212 struct event * pexTimer; 213}; 214 215/** 216*** 217**/ 218 219static inline tr_session* 220getSession( struct tr_peermsgs * msgs ) 221{ 222 return msgs->torrent->session; 223} 224 225/** 226*** 227**/ 228 229static void 230myDebug( const char * file, int line, 231 const struct tr_peermsgs * msgs, 232 const char * fmt, ... ) 233{ 234 FILE * fp = tr_getLog( ); 235 236 if( fp ) 237 { 238 va_list args; 239 char timestr[64]; 240 struct evbuffer * buf = evbuffer_new( ); 241 char * base = tr_basename( file ); 242 char * message; 243 244 evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ", 245 tr_getLogTimeStr( timestr, sizeof( timestr ) ), 246 tr_torrentName( msgs->torrent ), 247 tr_peerIoGetAddrStr( msgs->peer->io ), 248 msgs->peer->client ); 249 va_start( args, fmt ); 250 evbuffer_add_vprintf( buf, fmt, args ); 251 va_end( args ); 252 evbuffer_add_printf( buf, " (%s:%d)\n", base, line ); 253 254 message = evbuffer_free_to_str( buf ); 255 fputs( message, fp ); 256 257 tr_free( base ); 258 tr_free( message ); 259 } 260} 261 262#define dbgmsg( msgs, ... ) \ 263 do { \ 264 if( tr_deepLoggingIsActive( ) ) \ 265 myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \ 266 } while( 0 ) 267 268/** 269*** 270**/ 271 272static void 273pokeBatchPeriod( tr_peermsgs * msgs, int interval ) 274{ 275 if( msgs->outMessagesBatchPeriod > interval ) 276 { 277 msgs->outMessagesBatchPeriod = interval; 278 dbgmsg( msgs, "lowering batch interval to %d seconds", interval ); 279 } 280} 281 282static void 283dbgOutMessageLen( tr_peermsgs * msgs ) 284{ 285 dbgmsg( msgs, "outMessage size is now %zu", evbuffer_get_length( msgs->outMessages ) ); 286} 287 288static void 289protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req ) 290{ 291 struct evbuffer * out = msgs->outMessages; 292 293 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 294 295 evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) ); 296 evbuffer_add_uint8 ( out, BT_FEXT_REJECT ); 297 evbuffer_add_uint32( out, req->index ); 298 evbuffer_add_uint32( out, req->offset ); 299 evbuffer_add_uint32( out, req->length ); 300 301 dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length ); 302 dbgOutMessageLen( msgs ); 303} 304 305static void 306protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req ) 307{ 308 struct evbuffer * out = msgs->outMessages; 309 310 evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) ); 311 evbuffer_add_uint8 ( out, BT_REQUEST ); 312 evbuffer_add_uint32( out, req->index ); 313 evbuffer_add_uint32( out, req->offset ); 314 evbuffer_add_uint32( out, req->length ); 315 316 dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length ); 317 dbgOutMessageLen( msgs ); 318 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 319} 320 321static void 322protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req ) 323{ 324 struct evbuffer * out = msgs->outMessages; 325 326 evbuffer_add_uint32( out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) ); 327 evbuffer_add_uint8 ( out, BT_CANCEL ); 328 evbuffer_add_uint32( out, req->index ); 329 evbuffer_add_uint32( out, req->offset ); 330 evbuffer_add_uint32( out, req->length ); 331 332 dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length ); 333 dbgOutMessageLen( msgs ); 334 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 335} 336 337static void 338protocolSendPort(tr_peermsgs *msgs, uint16_t port) 339{ 340 struct evbuffer * out = msgs->outMessages; 341 342 dbgmsg( msgs, "sending Port %u", port); 343 evbuffer_add_uint32( out, 3 ); 344 evbuffer_add_uint8 ( out, BT_PORT ); 345 evbuffer_add_uint16( out, port); 346} 347 348static void 349protocolSendHave( tr_peermsgs * msgs, uint32_t index ) 350{ 351 struct evbuffer * out = msgs->outMessages; 352 353 evbuffer_add_uint32( out, sizeof(uint8_t) + sizeof(uint32_t) ); 354 evbuffer_add_uint8 ( out, BT_HAVE ); 355 evbuffer_add_uint32( out, index ); 356 357 dbgmsg( msgs, "sending Have %u", index ); 358 dbgOutMessageLen( msgs ); 359 pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS ); 360} 361 362#if 0 363static void 364protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex ) 365{ 366 tr_peerIo * io = msgs->peer->io; 367 struct evbuffer * out = msgs->outMessages; 368 369 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 370 371 evbuffer_add_uint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) ); 372 evbuffer_add_uint8 ( io, out, BT_FEXT_ALLOWED_FAST ); 373 evbuffer_add_uint32( io, out, pieceIndex ); 374 375 dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex ); 376 dbgOutMessageLen( msgs ); 377} 378#endif 379 380static void 381protocolSendChoke( tr_peermsgs * msgs, int choke ) 382{ 383 struct evbuffer * out = msgs->outMessages; 384 385 evbuffer_add_uint32( out, sizeof( uint8_t ) ); 386 evbuffer_add_uint8 ( out, choke ? BT_CHOKE : BT_UNCHOKE ); 387 388 dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" ); 389 dbgOutMessageLen( msgs ); 390 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 391} 392 393static void 394protocolSendHaveAll( tr_peermsgs * msgs ) 395{ 396 struct evbuffer * out = msgs->outMessages; 397 398 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 399 400 evbuffer_add_uint32( out, sizeof( uint8_t ) ); 401 evbuffer_add_uint8 ( out, BT_FEXT_HAVE_ALL ); 402 403 dbgmsg( msgs, "sending HAVE_ALL..." ); 404 dbgOutMessageLen( msgs ); 405 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 406} 407 408static void 409protocolSendHaveNone( tr_peermsgs * msgs ) 410{ 411 struct evbuffer * out = msgs->outMessages; 412 413 assert( tr_peerIoSupportsFEXT( msgs->peer->io ) ); 414 415 evbuffer_add_uint32( out, sizeof( uint8_t ) ); 416 evbuffer_add_uint8 ( out, BT_FEXT_HAVE_NONE ); 417 418 dbgmsg( msgs, "sending HAVE_NONE..." ); 419 dbgOutMessageLen( msgs ); 420 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 421} 422 423/** 424*** EVENTS 425**/ 426 427static void 428publish( tr_peermsgs * msgs, tr_peer_event * e ) 429{ 430 assert( msgs->peer ); 431 assert( msgs->peer->msgs == msgs ); 432 433 if( msgs->callback != NULL ) 434 msgs->callback( msgs->peer, e, msgs->callbackData ); 435} 436 437static void 438fireError( tr_peermsgs * msgs, int err ) 439{ 440 tr_peer_event e = TR_PEER_EVENT_INIT; 441 e.eventType = TR_PEER_ERROR; 442 e.err = err; 443 publish( msgs, &e ); 444} 445 446static void 447fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req ) 448{ 449 tr_peer_event e = TR_PEER_EVENT_INIT; 450 e.eventType = TR_PEER_CLIENT_GOT_BLOCK; 451 e.pieceIndex = req->index; 452 e.offset = req->offset; 453 e.length = req->length; 454 publish( msgs, &e ); 455} 456 457static void 458fireGotRej( tr_peermsgs * msgs, const struct peer_request * req ) 459{ 460 tr_peer_event e = TR_PEER_EVENT_INIT; 461 e.eventType = TR_PEER_CLIENT_GOT_REJ; 462 e.pieceIndex = req->index; 463 e.offset = req->offset; 464 e.length = req->length; 465 publish( msgs, &e ); 466} 467 468static void 469fireGotChoke( tr_peermsgs * msgs ) 470{ 471 tr_peer_event e = TR_PEER_EVENT_INIT; 472 e.eventType = TR_PEER_CLIENT_GOT_CHOKE; 473 publish( msgs, &e ); 474} 475 476static void 477fireClientGotHaveAll( tr_peermsgs * msgs ) 478{ 479 tr_peer_event e = TR_PEER_EVENT_INIT; 480 e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL; 481 publish( msgs, &e ); 482} 483 484static void 485fireClientGotHaveNone( tr_peermsgs * msgs ) 486{ 487 tr_peer_event e = TR_PEER_EVENT_INIT; 488 e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE; 489 publish( msgs, &e ); 490} 491 492static void 493fireClientGotData( tr_peermsgs * msgs, uint32_t length, int wasPieceData ) 494{ 495 tr_peer_event e = TR_PEER_EVENT_INIT; 496 497 e.length = length; 498 e.eventType = TR_PEER_CLIENT_GOT_DATA; 499 e.wasPieceData = wasPieceData; 500 publish( msgs, &e ); 501} 502 503static void 504fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex ) 505{ 506 tr_peer_event e = TR_PEER_EVENT_INIT; 507 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; 508 e.pieceIndex = pieceIndex; 509 publish( msgs, &e ); 510} 511 512static void 513fireClientGotPort( tr_peermsgs * msgs, tr_port port ) 514{ 515 tr_peer_event e = TR_PEER_EVENT_INIT; 516 e.eventType = TR_PEER_CLIENT_GOT_PORT; 517 e.port = port; 518 publish( msgs, &e ); 519} 520 521static void 522fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex ) 523{ 524 tr_peer_event e = TR_PEER_EVENT_INIT; 525 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; 526 e.pieceIndex = pieceIndex; 527 publish( msgs, &e ); 528} 529 530static void 531fireClientGotBitfield( tr_peermsgs * msgs, tr_bitfield * bitfield ) 532{ 533 tr_peer_event e = TR_PEER_EVENT_INIT; 534 e.eventType = TR_PEER_CLIENT_GOT_BITFIELD; 535 e.bitfield = bitfield; 536 publish( msgs, &e ); 537} 538 539static void 540fireClientGotHave( tr_peermsgs * msgs, tr_piece_index_t index ) 541{ 542 tr_peer_event e = TR_PEER_EVENT_INIT; 543 e.eventType = TR_PEER_CLIENT_GOT_HAVE; 544 e.pieceIndex = index; 545 publish( msgs, &e ); 546} 547 548static void 549firePeerGotData( tr_peermsgs * msgs, uint32_t length, bool wasPieceData ) 550{ 551 tr_peer_event e = TR_PEER_EVENT_INIT; 552 553 e.length = length; 554 e.eventType = TR_PEER_PEER_GOT_DATA; 555 e.wasPieceData = wasPieceData; 556 557 publish( msgs, &e ); 558} 559 560/** 561*** ALLOWED FAST SET 562*** For explanation, see http://www.bittorrent.org/beps/bep_0006.html 563**/ 564 565#if 0 566size_t 567tr_generateAllowedSet( tr_piece_index_t * setmePieces, 568 size_t desiredSetSize, 569 size_t pieceCount, 570 const uint8_t * infohash, 571 const tr_address * addr ) 572{ 573 size_t setSize = 0; 574 575 assert( setmePieces ); 576 assert( desiredSetSize <= pieceCount ); 577 assert( desiredSetSize ); 578 assert( pieceCount ); 579 assert( infohash ); 580 assert( addr ); 581 582 if( addr->type == TR_AF_INET ) 583 { 584 uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w; 585 uint8_t x[SHA_DIGEST_LENGTH]; 586 587 uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 ); /* (1) */ 588 memcpy( w, &ui32, sizeof( uint32_t ) ); 589 walk += sizeof( uint32_t ); 590 memcpy( walk, infohash, SHA_DIGEST_LENGTH ); /* (2) */ 591 walk += SHA_DIGEST_LENGTH; 592 tr_sha1( x, w, walk-w, NULL ); /* (3) */ 593 assert( sizeof( w ) == walk-w ); 594 595 while( setSize<desiredSetSize ) 596 { 597 int i; 598 for( i=0; i<5 && setSize<desiredSetSize; ++i ) /* (4) */ 599 { 600 size_t k; 601 uint32_t j = i * 4; /* (5) */ 602 uint32_t y = ntohl( *( uint32_t* )( x + j ) ); /* (6) */ 603 uint32_t index = y % pieceCount; /* (7) */ 604 605 for( k=0; k<setSize; ++k ) /* (8) */ 606 if( setmePieces[k] == index ) 607 break; 608 609 if( k == setSize ) 610 setmePieces[setSize++] = index; /* (9) */ 611 } 612 613 tr_sha1( x, x, sizeof( x ), NULL ); /* (3) */ 614 } 615 } 616 617 return setSize; 618} 619 620static void 621updateFastSet( tr_peermsgs * msgs UNUSED ) 622{ 623 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 624 const int peerIsNeedy = msgs->peer->progress < 0.10; 625 626 if( fext && peerIsNeedy && !msgs->haveFastSet ) 627 { 628 size_t i; 629 const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL ); 630 const tr_info * inf = &msgs->torrent->info; 631 const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount ); 632 633 /* build the fast set */ 634 msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr ); 635 msgs->haveFastSet = 1; 636 637 /* send it to the peer */ 638 for( i=0; i<msgs->fastsetSize; ++i ) 639 protocolSendAllowedFast( msgs, msgs->fastset[i] ); 640 } 641} 642#endif 643 644/** 645*** INTEREST 646**/ 647 648static void 649sendInterest( tr_peermsgs * msgs, bool clientIsInterested ) 650{ 651 struct evbuffer * out = msgs->outMessages; 652 653 assert( msgs ); 654 assert( tr_isBool( clientIsInterested ) ); 655 656 msgs->peer->clientIsInterested = clientIsInterested; 657 dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" ); 658 evbuffer_add_uint32( out, sizeof( uint8_t ) ); 659 evbuffer_add_uint8 ( out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED ); 660 661 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 662 dbgOutMessageLen( msgs ); 663} 664 665static void 666updateInterest( tr_peermsgs * msgs UNUSED ) 667{ 668 /* FIXME -- might need to poke the mgr on startup */ 669} 670 671void 672tr_peerMsgsSetInterested( tr_peermsgs * msgs, bool clientIsInterested ) 673{ 674 assert( tr_isBool( clientIsInterested ) ); 675 676 if( clientIsInterested != msgs->peer->clientIsInterested ) 677 sendInterest( msgs, clientIsInterested ); 678} 679 680static bool 681popNextMetadataRequest( tr_peermsgs * msgs, int * piece ) 682{ 683 if( msgs->peerAskedForMetadataCount == 0 ) 684 return false; 685 686 *piece = msgs->peerAskedForMetadata[0]; 687 688 tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ), 689 msgs->peerAskedForMetadataCount-- ); 690 691 return true; 692} 693 694static bool 695popNextRequest( tr_peermsgs * msgs, struct peer_request * setme ) 696{ 697 if( msgs->peer->pendingReqsToClient == 0 ) 698 return false; 699 700 *setme = msgs->peerAskedFor[0]; 701 702 tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ), 703 msgs->peer->pendingReqsToClient-- ); 704 705 return true; 706} 707 708static void 709cancelAllRequestsToClient( tr_peermsgs * msgs ) 710{ 711 struct peer_request req; 712 const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io ); 713 714 while( popNextRequest( msgs, &req )) 715 if( mustSendCancel ) 716 protocolSendReject( msgs, &req ); 717} 718 719void 720tr_peerMsgsSetChoke( tr_peermsgs * msgs, bool peerIsChoked ) 721{ 722 const time_t now = tr_time( ); 723 const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC; 724 725 assert( msgs ); 726 assert( msgs->peer ); 727 assert( tr_isBool( peerIsChoked ) ); 728 729 if( msgs->peer->chokeChangedAt > fibrillationTime ) 730 { 731 dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", peerIsChoked ); 732 } 733 else if( msgs->peer->peerIsChoked != peerIsChoked ) 734 { 735 msgs->peer->peerIsChoked = peerIsChoked; 736 if( peerIsChoked ) 737 cancelAllRequestsToClient( msgs ); 738 protocolSendChoke( msgs, peerIsChoked ); 739 msgs->peer->chokeChangedAt = now; 740 } 741} 742 743/** 744*** 745**/ 746 747void 748tr_peerMsgsHave( tr_peermsgs * msgs, uint32_t index ) 749{ 750 protocolSendHave( msgs, index ); 751 752 /* since we have more pieces now, we might not be interested in this peer */ 753 updateInterest( msgs ); 754} 755 756/** 757*** 758**/ 759 760static bool 761reqIsValid( const tr_peermsgs * peer, 762 uint32_t index, 763 uint32_t offset, 764 uint32_t length ) 765{ 766 return tr_torrentReqIsValid( peer->torrent, index, offset, length ); 767} 768 769static bool 770requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req ) 771{ 772 return reqIsValid( msgs, req->index, req->offset, req->length ); 773} 774 775void 776tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block ) 777{ 778 struct peer_request req; 779/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/ 780 blockToReq( msgs->torrent, block, &req ); 781 protocolSendCancel( msgs, &req ); 782} 783 784/** 785*** 786**/ 787 788static void 789sendLtepHandshake( tr_peermsgs * msgs ) 790{ 791 tr_benc val; 792 bool allow_pex; 793 bool allow_metadata_xfer; 794 struct evbuffer * payload; 795 struct evbuffer * out = msgs->outMessages; 796 const unsigned char * ipv6 = tr_globalIPv6(); 797 798 if( msgs->clientSentLtepHandshake ) 799 return; 800 801 dbgmsg( msgs, "sending an ltep handshake" ); 802 msgs->clientSentLtepHandshake = 1; 803 804 /* decide if we want to advertise metadata xfer support (BEP 9) */ 805 if( tr_torrentIsPrivate( msgs->torrent ) ) 806 allow_metadata_xfer = 0; 807 else 808 allow_metadata_xfer = 1; 809 810 /* decide if we want to advertise pex support */ 811 if( !tr_torrentAllowsPex( msgs->torrent ) ) 812 allow_pex = 0; 813 else if( msgs->peerSentLtepHandshake ) 814 allow_pex = msgs->peerSupportsPex ? 1 : 0; 815 else 816 allow_pex = 1; 817 818 tr_bencInitDict( &val, 8 ); 819 tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED ); 820 if( ipv6 != NULL ) 821 tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 ); 822 if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent ) 823 && ( msgs->torrent->infoDictLength > 0 ) ) 824 tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength ); 825 tr_bencDictAddInt( &val, "p", tr_sessionGetPublicPeerPort( getSession(msgs) ) ); 826 tr_bencDictAddInt( &val, "reqq", REQQ ); 827 tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) ); 828 tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX ); 829 if( allow_metadata_xfer || allow_pex ) { 830 tr_benc * m = tr_bencDictAddDict( &val, "m", 2 ); 831 if( allow_metadata_xfer ) 832 tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID ); 833 if( allow_pex ) 834 tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID ); 835 } 836 837 payload = tr_bencToBuf( &val, TR_FMT_BENC ); 838 839 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) ); 840 evbuffer_add_uint8 ( out, BT_LTEP ); 841 evbuffer_add_uint8 ( out, LTEP_HANDSHAKE ); 842 evbuffer_add_buffer( out, payload ); 843 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 844 dbgOutMessageLen( msgs ); 845 846 /* cleanup */ 847 evbuffer_free( payload ); 848 tr_bencFree( &val ); 849} 850 851static void 852parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf ) 853{ 854 int64_t i; 855 tr_benc val, * sub; 856 uint8_t * tmp = tr_new( uint8_t, len ); 857 const uint8_t *addr; 858 size_t addr_len; 859 tr_pex pex; 860 int8_t seedProbability = -1; 861 862 memset( &pex, 0, sizeof( tr_pex ) ); 863 864 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len ); 865 msgs->peerSentLtepHandshake = 1; 866 867 if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) ) 868 { 869 dbgmsg( msgs, "GET extended-handshake, couldn't get dictionary" ); 870 tr_free( tmp ); 871 return; 872 } 873 874 dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len, tmp ); 875 876 /* does the peer prefer encrypted connections? */ 877 if( tr_bencDictFindInt( &val, "e", &i ) ) { 878 msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES 879 : ENCRYPTION_PREFERENCE_NO; 880 if( i ) 881 pex.flags |= ADDED_F_ENCRYPTION_FLAG; 882 } 883 884 /* check supported messages for utorrent pex */ 885 msgs->peerSupportsPex = 0; 886 msgs->peerSupportsMetadataXfer = 0; 887 888 if( tr_bencDictFindDict( &val, "m", &sub ) ) { 889 if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) { 890 msgs->peerSupportsPex = i != 0; 891 msgs->ut_pex_id = (uint8_t) i; 892 dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id ); 893 } 894 if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) { 895 msgs->peerSupportsMetadataXfer = i != 0; 896 msgs->ut_metadata_id = (uint8_t) i; 897 dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id ); 898 } 899 if( tr_bencDictFindInt( sub, "ut_holepunch", &i ) ) { 900 /* Mysterious ��Torrent extension that we don't grok. However, 901 it implies support for ��TP, so use it to indicate that. */ 902 tr_peerMgrSetUtpFailed( msgs->torrent, 903 tr_peerIoGetAddress( msgs->peer->io, NULL ), 904 false ); 905 } 906 } 907 908 /* look for metainfo size (BEP 9) */ 909 if( tr_bencDictFindInt( &val, "metadata_size", &i ) ) { 910 tr_torrentSetMetadataSizeHint( msgs->torrent, i ); 911 msgs->metadata_size_hint = (size_t) i; 912 } 913 914 /* look for upload_only (BEP 21) */ 915 if( tr_bencDictFindInt( &val, "upload_only", &i ) ) 916 seedProbability = i==0 ? 0 : 100; 917 918 /* get peer's listening port */ 919 if( tr_bencDictFindInt( &val, "p", &i ) ) { 920 pex.port = htons( (uint16_t)i ); 921 fireClientGotPort( msgs, pex.port ); 922 dbgmsg( msgs, "peer's port is now %d", (int)i ); 923 } 924 925 if( tr_peerIoIsIncoming( msgs->peer->io ) 926 && tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len ) 927 && ( addr_len == 4 ) ) 928 { 929 pex.addr.type = TR_AF_INET; 930 memcpy( &pex.addr.addr.addr4, addr, 4 ); 931 tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability ); 932 } 933 934 if( tr_peerIoIsIncoming( msgs->peer->io ) 935 && tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len ) 936 && ( addr_len == 16 ) ) 937 { 938 pex.addr.type = TR_AF_INET6; 939 memcpy( &pex.addr.addr.addr6, addr, 16 ); 940 tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability ); 941 } 942 943 /* get peer's maximum request queue size */ 944 if( tr_bencDictFindInt( &val, "reqq", &i ) ) 945 msgs->reqq = i; 946 947 tr_bencFree( &val ); 948 tr_free( tmp ); 949} 950 951static void 952parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf ) 953{ 954 tr_benc dict; 955 char * msg_end; 956 char * benc_end; 957 int64_t msg_type = -1; 958 int64_t piece = -1; 959 int64_t total_size = 0; 960 uint8_t * tmp = tr_new( uint8_t, msglen ); 961 962 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen ); 963 msg_end = (char*)tmp + msglen; 964 965 if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) ) 966 { 967 tr_bencDictFindInt( &dict, "msg_type", &msg_type ); 968 tr_bencDictFindInt( &dict, "piece", &piece ); 969 tr_bencDictFindInt( &dict, "total_size", &total_size ); 970 tr_bencFree( &dict ); 971 } 972 973 dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d", 974 (int)msg_type, (int)piece, (int)total_size ); 975 976 if( msg_type == METADATA_MSG_TYPE_REJECT ) 977 { 978 /* NOOP */ 979 } 980 981 if( ( msg_type == METADATA_MSG_TYPE_DATA ) 982 && ( !tr_torrentHasMetadata( msgs->torrent ) ) 983 && ( msg_end - benc_end <= METADATA_PIECE_SIZE ) 984 && ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) ) 985 { 986 const int pieceLen = msg_end - benc_end; 987 tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen ); 988 } 989 990 if( msg_type == METADATA_MSG_TYPE_REQUEST ) 991 { 992 if( ( piece >= 0 ) 993 && tr_torrentHasMetadata( msgs->torrent ) 994 && !tr_torrentIsPrivate( msgs->torrent ) 995 && ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) ) 996 { 997 msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece; 998 } 999 else 1000 { 1001 tr_benc tmp; 1002 struct evbuffer * payload; 1003 struct evbuffer * out = msgs->outMessages; 1004 1005 /* build the rejection message */ 1006 tr_bencInitDict( &tmp, 2 ); 1007 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT ); 1008 tr_bencDictAddInt( &tmp, "piece", piece ); 1009 payload = tr_bencToBuf( &tmp, TR_FMT_BENC ); 1010 1011 /* write it out as a LTEP message to our outMessages buffer */ 1012 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) ); 1013 evbuffer_add_uint8 ( out, BT_LTEP ); 1014 evbuffer_add_uint8 ( out, msgs->ut_metadata_id ); 1015 evbuffer_add_buffer( out, payload ); 1016 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 1017 dbgOutMessageLen( msgs ); 1018 1019 /* cleanup */ 1020 evbuffer_free( payload ); 1021 tr_bencFree( &tmp ); 1022 } 1023 } 1024 1025 tr_free( tmp ); 1026} 1027 1028static void 1029parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf ) 1030{ 1031 int loaded = 0; 1032 uint8_t * tmp = tr_new( uint8_t, msglen ); 1033 tr_benc val; 1034 tr_torrent * tor = msgs->torrent; 1035 const uint8_t * added; 1036 size_t added_len; 1037 1038 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen ); 1039 1040 if( tr_torrentAllowsPex( tor ) 1041 && ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) ) 1042 { 1043 if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) ) 1044 { 1045 tr_pex * pex; 1046 size_t i, n; 1047 size_t added_f_len = 0; 1048 const uint8_t * added_f = NULL; 1049 1050 tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len ); 1051 pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n ); 1052 1053 n = MIN( n, MAX_PEX_PEER_COUNT ); 1054 for( i=0; i<n; ++i ) 1055 { 1056 int seedProbability = -1; 1057 if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0; 1058 tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability ); 1059 } 1060 1061 tr_free( pex ); 1062 } 1063 1064 if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) ) 1065 { 1066 tr_pex * pex; 1067 size_t i, n; 1068 size_t added_f_len = 0; 1069 const uint8_t * added_f = NULL; 1070 1071 tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len ); 1072 pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n ); 1073 1074 n = MIN( n, MAX_PEX_PEER_COUNT ); 1075 for( i=0; i<n; ++i ) 1076 { 1077 int seedProbability = -1; 1078 if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0; 1079 tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability ); 1080 } 1081 1082 tr_free( pex ); 1083 } 1084 } 1085 1086 if( loaded ) 1087 tr_bencFree( &val ); 1088 tr_free( tmp ); 1089} 1090 1091static void sendPex( tr_peermsgs * msgs ); 1092 1093static void 1094parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf ) 1095{ 1096 uint8_t ltep_msgid; 1097 1098 tr_peerIoReadUint8( msgs->peer->io, inbuf, <ep_msgid ); 1099 msglen--; 1100 1101 if( ltep_msgid == LTEP_HANDSHAKE ) 1102 { 1103 dbgmsg( msgs, "got ltep handshake" ); 1104 parseLtepHandshake( msgs, msglen, inbuf ); 1105 if( tr_peerIoSupportsLTEP( msgs->peer->io ) ) 1106 { 1107 sendLtepHandshake( msgs ); 1108 sendPex( msgs ); 1109 } 1110 } 1111 else if( ltep_msgid == UT_PEX_ID ) 1112 { 1113 dbgmsg( msgs, "got ut pex" ); 1114 msgs->peerSupportsPex = 1; 1115 parseUtPex( msgs, msglen, inbuf ); 1116 } 1117 else if( ltep_msgid == UT_METADATA_ID ) 1118 { 1119 dbgmsg( msgs, "got ut metadata" ); 1120 msgs->peerSupportsMetadataXfer = 1; 1121 parseUtMetadata( msgs, msglen, inbuf ); 1122 } 1123 else 1124 { 1125 dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid ); 1126 evbuffer_drain( inbuf, msglen ); 1127 } 1128} 1129 1130static int 1131readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) 1132{ 1133 uint32_t len; 1134 1135 if( inlen < sizeof( len ) ) 1136 return READ_LATER; 1137 1138 tr_peerIoReadUint32( msgs->peer->io, inbuf, &len ); 1139 1140 if( len == 0 ) /* peer sent us a keepalive message */ 1141 dbgmsg( msgs, "got KeepAlive" ); 1142 else 1143 { 1144 msgs->incoming.length = len; 1145 msgs->state = AWAITING_BT_ID; 1146 } 1147 1148 return READ_NOW; 1149} 1150 1151static int readBtMessage( tr_peermsgs *, struct evbuffer *, size_t ); 1152 1153static int 1154readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) 1155{ 1156 uint8_t id; 1157 1158 if( inlen < sizeof( uint8_t ) ) 1159 return READ_LATER; 1160 1161 tr_peerIoReadUint8( msgs->peer->io, inbuf, &id ); 1162 msgs->incoming.id = id; 1163 dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length ); 1164 1165 if( id == BT_PIECE ) 1166 { 1167 msgs->state = AWAITING_BT_PIECE; 1168 return READ_NOW; 1169 } 1170 else if( msgs->incoming.length != 1 ) 1171 { 1172 msgs->state = AWAITING_BT_MESSAGE; 1173 return READ_NOW; 1174 } 1175 else return readBtMessage( msgs, inbuf, inlen - 1 ); 1176} 1177 1178static void 1179updatePeerProgress( tr_peermsgs * msgs ) 1180{ 1181 tr_peerUpdateProgress( msgs->torrent, msgs->peer ); 1182 1183 /*updateFastSet( msgs );*/ 1184 updateInterest( msgs ); 1185} 1186 1187static void 1188prefetchPieces( tr_peermsgs *msgs ) 1189{ 1190 int i; 1191 1192 if( !getSession(msgs)->isPrefetchEnabled ) 1193 return; 1194 1195 /* Maintain 12 prefetched blocks per unchoked peer */ 1196 for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i ) 1197 { 1198 const struct peer_request * req = msgs->peerAskedFor + i; 1199 if( requestIsValid( msgs, req ) ) 1200 { 1201 tr_cachePrefetchBlock( getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length ); 1202 ++msgs->prefetchCount; 1203 } 1204 } 1205} 1206 1207static void 1208peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req ) 1209{ 1210 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1211 const int reqIsValid = requestIsValid( msgs, req ); 1212 const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ); 1213 const int peerIsChoked = msgs->peer->peerIsChoked; 1214 1215 int allow = false; 1216 1217 if( !reqIsValid ) 1218 dbgmsg( msgs, "rejecting an invalid request." ); 1219 else if( !clientHasPiece ) 1220 dbgmsg( msgs, "rejecting request for a piece we don't have." ); 1221 else if( peerIsChoked ) 1222 dbgmsg( msgs, "rejecting request from choked peer" ); 1223 else if( msgs->peer->pendingReqsToClient + 1 >= REQQ ) 1224 dbgmsg( msgs, "rejecting request ... reqq is full" ); 1225 else 1226 allow = true; 1227 1228 if( allow ) { 1229 msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req; 1230 prefetchPieces( msgs ); 1231 } else if( fext ) { 1232 protocolSendReject( msgs, req ); 1233 } 1234} 1235 1236static bool 1237messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len ) 1238{ 1239 switch( id ) 1240 { 1241 case BT_CHOKE: 1242 case BT_UNCHOKE: 1243 case BT_INTERESTED: 1244 case BT_NOT_INTERESTED: 1245 case BT_FEXT_HAVE_ALL: 1246 case BT_FEXT_HAVE_NONE: 1247 return len == 1; 1248 1249 case BT_HAVE: 1250 case BT_FEXT_SUGGEST: 1251 case BT_FEXT_ALLOWED_FAST: 1252 return len == 5; 1253 1254 case BT_BITFIELD: 1255 if( tr_torrentHasMetadata( msg->torrent ) ) 1256 return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u; 1257 /* we don't know the piece count yet, 1258 so we can only guess whether to send true or false */ 1259 if( msg->metadata_size_hint > 0 ) 1260 return len <= msg->metadata_size_hint; 1261 return true; 1262 1263 case BT_REQUEST: 1264 case BT_CANCEL: 1265 case BT_FEXT_REJECT: 1266 return len == 13; 1267 1268 case BT_PIECE: 1269 return len > 9 && len <= 16393; 1270 1271 case BT_PORT: 1272 return len == 3; 1273 1274 case BT_LTEP: 1275 return len >= 2; 1276 1277 default: 1278 return false; 1279 } 1280} 1281 1282static int clientGotBlock( tr_peermsgs * msgs, 1283 struct evbuffer * block, 1284 const struct peer_request * req ); 1285 1286static int 1287readBtPiece( tr_peermsgs * msgs, 1288 struct evbuffer * inbuf, 1289 size_t inlen, 1290 size_t * setme_piece_bytes_read ) 1291{ 1292 struct peer_request * req = &msgs->incoming.blockReq; 1293 1294 assert( evbuffer_get_length( inbuf ) >= inlen ); 1295 dbgmsg( msgs, "In readBtPiece" ); 1296 1297 if( !req->length ) 1298 { 1299 if( inlen < 8 ) 1300 return READ_LATER; 1301 1302 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index ); 1303 tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset ); 1304 req->length = msgs->incoming.length - 9; 1305 dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length ); 1306 return READ_NOW; 1307 } 1308 else 1309 { 1310 int err; 1311 size_t n; 1312 size_t nLeft; 1313 struct evbuffer * block_buffer; 1314 1315 if( msgs->incoming.block == NULL ) 1316 msgs->incoming.block = evbuffer_new( ); 1317 block_buffer = msgs->incoming.block; 1318 1319 /* read in another chunk of data */ 1320 nLeft = req->length - evbuffer_get_length( block_buffer ); 1321 n = MIN( nLeft, inlen ); 1322 1323 tr_peerIoReadBytesToBuf( msgs->peer->io, inbuf, block_buffer, n ); 1324 1325 fireClientGotData( msgs, n, true ); 1326 *setme_piece_bytes_read += n; 1327 dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain", 1328 n, req->index, req->offset, req->length, 1329 (int)( req->length - evbuffer_get_length( block_buffer ) ) ); 1330 if( evbuffer_get_length( block_buffer ) < req->length ) 1331 return READ_LATER; 1332 1333 /* pass the block along... */ 1334 err = clientGotBlock( msgs, block_buffer, req ); 1335 evbuffer_drain( block_buffer, evbuffer_get_length( block_buffer ) ); 1336 1337 /* cleanup */ 1338 req->length = 0; 1339 msgs->state = AWAITING_BT_LENGTH; 1340 return err ? READ_ERR : READ_NOW; 1341 } 1342} 1343 1344static void updateDesiredRequestCount( tr_peermsgs * msgs ); 1345 1346static int 1347readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) 1348{ 1349 uint32_t ui32; 1350 uint32_t msglen = msgs->incoming.length; 1351 const uint8_t id = msgs->incoming.id; 1352#ifndef NDEBUG 1353 const size_t startBufLen = evbuffer_get_length( inbuf ); 1354#endif 1355 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1356 1357 --msglen; /* id length */ 1358 1359 dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen ); 1360 1361 if( inlen < msglen ) 1362 return READ_LATER; 1363 1364 if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) ) 1365 { 1366 dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen ); 1367 fireError( msgs, EMSGSIZE ); 1368 return READ_ERR; 1369 } 1370 1371 switch( id ) 1372 { 1373 case BT_CHOKE: 1374 dbgmsg( msgs, "got Choke" ); 1375 msgs->peer->clientIsChoked = 1; 1376 if( !fext ) 1377 fireGotChoke( msgs ); 1378 break; 1379 1380 case BT_UNCHOKE: 1381 dbgmsg( msgs, "got Unchoke" ); 1382 msgs->peer->clientIsChoked = 0; 1383 updateDesiredRequestCount( msgs ); 1384 break; 1385 1386 case BT_INTERESTED: 1387 dbgmsg( msgs, "got Interested" ); 1388 msgs->peer->peerIsInterested = 1; 1389 break; 1390 1391 case BT_NOT_INTERESTED: 1392 dbgmsg( msgs, "got Not Interested" ); 1393 msgs->peer->peerIsInterested = 0; 1394 break; 1395 1396 case BT_HAVE: 1397 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 ); 1398 dbgmsg( msgs, "got Have: %u", ui32 ); 1399 if( tr_torrentHasMetadata( msgs->torrent ) 1400 && ( ui32 >= msgs->torrent->info.pieceCount ) ) 1401 { 1402 fireError( msgs, ERANGE ); 1403 return READ_ERR; 1404 } 1405 1406 /* a peer can send the same HAVE message twice... */ 1407 if( !tr_bitfieldHas( &msgs->peer->have, ui32 ) ) { 1408 tr_bitfieldAdd( &msgs->peer->have, ui32 ); 1409 fireClientGotHave( msgs, ui32 ); 1410 } 1411 updatePeerProgress( msgs ); 1412 break; 1413 1414 case BT_BITFIELD: { 1415 uint8_t * tmp = tr_new( uint8_t, msglen ); 1416 dbgmsg( msgs, "got a bitfield" ); 1417 tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen ); 1418 tr_bitfieldSetRaw( &msgs->peer->have, tmp, msglen, tr_torrentHasMetadata( msgs->torrent ) ); 1419 fireClientGotBitfield( msgs, &msgs->peer->have ); 1420 updatePeerProgress( msgs ); 1421 tr_free( tmp ); 1422 break; 1423 } 1424 1425 case BT_REQUEST: 1426 { 1427 struct peer_request r; 1428 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); 1429 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1430 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1431 dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length ); 1432 peerMadeRequest( msgs, &r ); 1433 break; 1434 } 1435 1436 case BT_CANCEL: 1437 { 1438 int i; 1439 struct peer_request r; 1440 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); 1441 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1442 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1443 tr_historyAdd( &msgs->peer->cancelsSentToClient, tr_time( ), 1 ); 1444 dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length ); 1445 1446 for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) { 1447 const struct peer_request * req = msgs->peerAskedFor + i; 1448 if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) ) 1449 break; 1450 } 1451 1452 if( i < msgs->peer->pendingReqsToClient ) 1453 tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ), 1454 msgs->peer->pendingReqsToClient-- ); 1455 break; 1456 } 1457 1458 case BT_PIECE: 1459 assert( 0 ); /* handled elsewhere! */ 1460 break; 1461 1462 case BT_PORT: 1463 dbgmsg( msgs, "Got a BT_PORT" ); 1464 tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port ); 1465 if( msgs->peer->dht_port > 0 ) 1466 tr_dhtAddNode( getSession(msgs), 1467 tr_peerAddress( msgs->peer ), 1468 msgs->peer->dht_port, 0 ); 1469 break; 1470 1471 case BT_FEXT_SUGGEST: 1472 dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" ); 1473 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 ); 1474 if( fext ) 1475 fireClientGotSuggest( msgs, ui32 ); 1476 else { 1477 fireError( msgs, EMSGSIZE ); 1478 return READ_ERR; 1479 } 1480 break; 1481 1482 case BT_FEXT_ALLOWED_FAST: 1483 dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" ); 1484 tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 ); 1485 if( fext ) 1486 fireClientGotAllowedFast( msgs, ui32 ); 1487 else { 1488 fireError( msgs, EMSGSIZE ); 1489 return READ_ERR; 1490 } 1491 break; 1492 1493 case BT_FEXT_HAVE_ALL: 1494 dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" ); 1495 if( fext ) { 1496 tr_bitfieldSetHasAll( &msgs->peer->have ); 1497assert( tr_bitfieldHasAll( &msgs->peer->have ) ); 1498 fireClientGotHaveAll( msgs ); 1499 updatePeerProgress( msgs ); 1500 } else { 1501 fireError( msgs, EMSGSIZE ); 1502 return READ_ERR; 1503 } 1504 break; 1505 1506 case BT_FEXT_HAVE_NONE: 1507 dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" ); 1508 if( fext ) { 1509 tr_bitfieldSetHasNone( &msgs->peer->have ); 1510 fireClientGotHaveNone( msgs ); 1511 updatePeerProgress( msgs ); 1512 } else { 1513 fireError( msgs, EMSGSIZE ); 1514 return READ_ERR; 1515 } 1516 break; 1517 1518 case BT_FEXT_REJECT: 1519 { 1520 struct peer_request r; 1521 dbgmsg( msgs, "Got a BT_FEXT_REJECT" ); 1522 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index ); 1523 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset ); 1524 tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length ); 1525 if( fext ) 1526 fireGotRej( msgs, &r ); 1527 else { 1528 fireError( msgs, EMSGSIZE ); 1529 return READ_ERR; 1530 } 1531 break; 1532 } 1533 1534 case BT_LTEP: 1535 dbgmsg( msgs, "Got a BT_LTEP" ); 1536 parseLtep( msgs, msglen, inbuf ); 1537 break; 1538 1539 default: 1540 dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id ); 1541 tr_peerIoDrain( msgs->peer->io, inbuf, msglen ); 1542 break; 1543 } 1544 1545 assert( msglen + 1 == msgs->incoming.length ); 1546 assert( evbuffer_get_length( inbuf ) == startBufLen - msglen ); 1547 1548 msgs->state = AWAITING_BT_LENGTH; 1549 return READ_NOW; 1550} 1551 1552/* returns 0 on success, or an errno on failure */ 1553static int 1554clientGotBlock( tr_peermsgs * msgs, 1555 struct evbuffer * data, 1556 const struct peer_request * req ) 1557{ 1558 int err; 1559 tr_torrent * tor = msgs->torrent; 1560 const tr_block_index_t block = _tr_block( tor, req->index, req->offset ); 1561 1562 assert( msgs ); 1563 assert( req ); 1564 1565 if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) { 1566 dbgmsg( msgs, "wrong block size -- expected %u, got %d", 1567 tr_torBlockCountBytes( msgs->torrent, block ), req->length ); 1568 return EMSGSIZE; 1569 } 1570 1571 dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length ); 1572 1573 if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) { 1574 dbgmsg( msgs, "we didn't ask for this message..." ); 1575 return 0; 1576 } 1577 if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) { 1578 dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." ); 1579 return 0; 1580 } 1581 1582 /** 1583 *** Save the block 1584 **/ 1585 1586 if(( err = tr_cacheWriteBlock( getSession(msgs)->cache, tor, req->index, req->offset, req->length, data ))) 1587 return err; 1588 1589 tr_bitfieldAdd( &msgs->peer->blame, req->index ); 1590 fireGotBlock( msgs, req ); 1591 return 0; 1592} 1593 1594static int peerPulse( void * vmsgs ); 1595 1596static void 1597didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs ) 1598{ 1599 tr_peermsgs * msgs = vmsgs; 1600 firePeerGotData( msgs, bytesWritten, wasPieceData ); 1601 1602 if ( tr_isPeerIo( io ) && io->userData ) 1603 peerPulse( msgs ); 1604} 1605 1606static ReadState 1607canRead( tr_peerIo * io, void * vmsgs, size_t * piece ) 1608{ 1609 ReadState ret; 1610 tr_peermsgs * msgs = vmsgs; 1611 struct evbuffer * in = tr_peerIoGetReadBuffer( io ); 1612 const size_t inlen = evbuffer_get_length( in ); 1613 1614 dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state ); 1615 1616 if( !inlen ) 1617 { 1618 ret = READ_LATER; 1619 } 1620 else if( msgs->state == AWAITING_BT_PIECE ) 1621 { 1622 ret = readBtPiece( msgs, in, inlen, piece ); 1623 } 1624 else switch( msgs->state ) 1625 { 1626 case AWAITING_BT_LENGTH: 1627 ret = readBtLength ( msgs, in, inlen ); break; 1628 1629 case AWAITING_BT_ID: 1630 ret = readBtId ( msgs, in, inlen ); break; 1631 1632 case AWAITING_BT_MESSAGE: 1633 ret = readBtMessage( msgs, in, inlen ); break; 1634 1635 default: 1636 ret = READ_ERR; 1637 assert( 0 ); 1638 } 1639 1640 dbgmsg( msgs, "canRead: ret is %d", (int)ret ); 1641 1642 /* log the raw data that was read */ 1643 if( ( ret != READ_ERR ) && ( evbuffer_get_length( in ) != inlen ) ) 1644 fireClientGotData( msgs, inlen - evbuffer_get_length( in ), false ); 1645 1646 return ret; 1647} 1648 1649int 1650tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block ) 1651{ 1652 if( msgs->state != AWAITING_BT_PIECE ) 1653 return false; 1654 1655 return block == _tr_block( msgs->torrent, 1656 msgs->incoming.blockReq.index, 1657 msgs->incoming.blockReq.offset ); 1658} 1659 1660/** 1661*** 1662**/ 1663 1664static void 1665updateDesiredRequestCount( tr_peermsgs * msgs ) 1666{ 1667 const tr_torrent * const torrent = msgs->torrent; 1668 1669 /* there are lots of reasons we might not want to request any blocks... */ 1670 if( tr_torrentIsSeed( torrent ) || !tr_torrentHasMetadata( torrent ) 1671 || msgs->peer->clientIsChoked 1672 || !msgs->peer->clientIsInterested ) 1673 { 1674 msgs->desiredRequestCount = 0; 1675 } 1676 else 1677 { 1678 int estimatedBlocksInPeriod; 1679 unsigned int rate_Bps; 1680 unsigned int irate_Bps; 1681 const int floor = 4; 1682 const int seconds = REQUEST_BUF_SECS; 1683 const uint64_t now = tr_time_msec( ); 1684 1685 /* Get the rate limit we should use. 1686 * FIXME: this needs to consider all the other peers as well... */ 1687 rate_Bps = tr_peerGetPieceSpeed_Bps( msgs->peer, now, TR_PEER_TO_CLIENT ); 1688 if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) ) 1689 rate_Bps = MIN( rate_Bps, tr_torrentGetSpeedLimit_Bps( torrent, TR_PEER_TO_CLIENT ) ); 1690 1691 /* honor the session limits, if enabled */ 1692 if( tr_torrentUsesSessionLimits( torrent ) && 1693 tr_sessionGetActiveSpeedLimit_Bps( torrent->session, TR_PEER_TO_CLIENT, &irate_Bps ) ) 1694 rate_Bps = MIN( rate_Bps, irate_Bps ); 1695 1696 /* use this desired rate to figure out how 1697 * many requests we should send to this peer */ 1698 estimatedBlocksInPeriod = ( rate_Bps * seconds ) / torrent->blockSize; 1699 msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod ); 1700 1701 /* honor the peer's maximum request count, if specified */ 1702 if( msgs->reqq > 0 ) 1703 if( msgs->desiredRequestCount > msgs->reqq ) 1704 msgs->desiredRequestCount = msgs->reqq; 1705 } 1706} 1707 1708static void 1709updateMetadataRequests( tr_peermsgs * msgs, time_t now ) 1710{ 1711 int piece; 1712 1713 if( msgs->peerSupportsMetadataXfer 1714 && tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) ) 1715 { 1716 tr_benc tmp; 1717 struct evbuffer * payload; 1718 struct evbuffer * out = msgs->outMessages; 1719 1720 /* build the data message */ 1721 tr_bencInitDict( &tmp, 3 ); 1722 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST ); 1723 tr_bencDictAddInt( &tmp, "piece", piece ); 1724 payload = tr_bencToBuf( &tmp, TR_FMT_BENC ); 1725 1726 dbgmsg( msgs, "requesting metadata piece #%d", piece ); 1727 1728 /* write it out as a LTEP message to our outMessages buffer */ 1729 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) ); 1730 evbuffer_add_uint8 ( out, BT_LTEP ); 1731 evbuffer_add_uint8 ( out, msgs->ut_metadata_id ); 1732 evbuffer_add_buffer( out, payload ); 1733 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 1734 dbgOutMessageLen( msgs ); 1735 1736 /* cleanup */ 1737 evbuffer_free( payload ); 1738 tr_bencFree( &tmp ); 1739 } 1740} 1741 1742static void 1743updateBlockRequests( tr_peermsgs * msgs ) 1744{ 1745 if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) 1746 && ( msgs->desiredRequestCount > 0 ) 1747 && ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) ) 1748 { 1749 int i; 1750 int n; 1751 const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer; 1752 tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant ); 1753 1754 tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n, false ); 1755 1756 for( i=0; i<n; ++i ) 1757 { 1758 struct peer_request req; 1759 blockToReq( msgs->torrent, blocks[i], &req ); 1760 protocolSendRequest( msgs, &req ); 1761 } 1762 1763 tr_free( blocks ); 1764 } 1765} 1766 1767static size_t 1768fillOutputBuffer( tr_peermsgs * msgs, time_t now ) 1769{ 1770 int piece; 1771 size_t bytesWritten = 0; 1772 struct peer_request req; 1773 const bool haveMessages = evbuffer_get_length( msgs->outMessages ) != 0; 1774 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 1775 1776 /** 1777 *** Protocol messages 1778 **/ 1779 1780 if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */ 1781 { 1782 dbgmsg( msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length( msgs->outMessages ) ); 1783 msgs->outMessagesBatchedAt = now; 1784 } 1785 else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) ) 1786 { 1787 const size_t len = evbuffer_get_length( msgs->outMessages ); 1788 /* flush the protocol messages */ 1789 dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len ); 1790 tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, false ); 1791 msgs->clientSentAnythingAt = now; 1792 msgs->outMessagesBatchedAt = 0; 1793 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 1794 bytesWritten += len; 1795 } 1796 1797 /** 1798 *** Metadata Pieces 1799 **/ 1800 1801 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE ) 1802 && popNextMetadataRequest( msgs, &piece ) ) 1803 { 1804 char * data; 1805 int dataLen; 1806 bool ok = false; 1807 1808 data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen ); 1809 if( ( dataLen > 0 ) && ( data != NULL ) ) 1810 { 1811 tr_benc tmp; 1812 struct evbuffer * payload; 1813 struct evbuffer * out = msgs->outMessages; 1814 1815 /* build the data message */ 1816 tr_bencInitDict( &tmp, 3 ); 1817 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA ); 1818 tr_bencDictAddInt( &tmp, "piece", piece ); 1819 tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength ); 1820 payload = tr_bencToBuf( &tmp, TR_FMT_BENC ); 1821 1822 /* write it out as a LTEP message to our outMessages buffer */ 1823 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) + dataLen ); 1824 evbuffer_add_uint8 ( out, BT_LTEP ); 1825 evbuffer_add_uint8 ( out, msgs->ut_metadata_id ); 1826 evbuffer_add_buffer( out, payload ); 1827 evbuffer_add ( out, data, dataLen ); 1828 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 1829 dbgOutMessageLen( msgs ); 1830 1831 evbuffer_free( payload ); 1832 tr_bencFree( &tmp ); 1833 tr_free( data ); 1834 1835 ok = true; 1836 } 1837 1838 if( !ok ) /* send a rejection message */ 1839 { 1840 tr_benc tmp; 1841 struct evbuffer * payload; 1842 struct evbuffer * out = msgs->outMessages; 1843 1844 /* build the rejection message */ 1845 tr_bencInitDict( &tmp, 2 ); 1846 tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT ); 1847 tr_bencDictAddInt( &tmp, "piece", piece ); 1848 payload = tr_bencToBuf( &tmp, TR_FMT_BENC ); 1849 1850 /* write it out as a LTEP message to our outMessages buffer */ 1851 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) ); 1852 evbuffer_add_uint8 ( out, BT_LTEP ); 1853 evbuffer_add_uint8 ( out, msgs->ut_metadata_id ); 1854 evbuffer_add_buffer( out, payload ); 1855 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 1856 dbgOutMessageLen( msgs ); 1857 1858 evbuffer_free( payload ); 1859 tr_bencFree( &tmp ); 1860 } 1861 } 1862 1863 /** 1864 *** Data Blocks 1865 **/ 1866 1867 if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize ) 1868 && popNextRequest( msgs, &req ) ) 1869 { 1870 --msgs->prefetchCount; 1871 1872 if( requestIsValid( msgs, &req ) 1873 && tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) ) 1874 { 1875 int err; 1876 const uint32_t msglen = 4 + 1 + 4 + 4 + req.length; 1877 struct evbuffer * out; 1878 struct evbuffer_iovec iovec[1]; 1879 1880 out = evbuffer_new( ); 1881 evbuffer_expand( out, msglen ); 1882 1883 evbuffer_add_uint32( out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length ); 1884 evbuffer_add_uint8 ( out, BT_PIECE ); 1885 evbuffer_add_uint32( out, req.index ); 1886 evbuffer_add_uint32( out, req.offset ); 1887 1888 evbuffer_reserve_space( out, req.length, iovec, 1 ); 1889 err = tr_cacheReadBlock( getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base ); 1890 iovec[0].iov_len = req.length; 1891 evbuffer_commit_space( out, iovec, 1 ); 1892 1893 /* check the piece if it needs checking... */ 1894 if( !err && tr_torrentPieceNeedsCheck( msgs->torrent, req.index ) ) 1895 if(( err = !tr_torrentCheckPiece( msgs->torrent, req.index ))) 1896 tr_torrentSetLocalError( msgs->torrent, _( "Please Verify Local Data! Piece #%zu is corrupt." ), (size_t)req.index ); 1897 1898 if( err ) 1899 { 1900 if( fext ) 1901 protocolSendReject( msgs, &req ); 1902 } 1903 else 1904 { 1905 const size_t n = evbuffer_get_length( out ); 1906 dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length ); 1907 assert( n == msglen ); 1908 tr_peerIoWriteBuf( msgs->peer->io, out, true ); 1909 bytesWritten += n; 1910 msgs->clientSentAnythingAt = now; 1911 tr_historyAdd( &msgs->peer->blocksSentToPeer, tr_time( ), 1 ); 1912 } 1913 1914 evbuffer_free( out ); 1915 1916 if( err ) 1917 { 1918 bytesWritten = 0; 1919 msgs = NULL; 1920 } 1921 } 1922 else if( fext ) /* peer needs a reject message */ 1923 { 1924 protocolSendReject( msgs, &req ); 1925 } 1926 1927 if( msgs != NULL ) 1928 prefetchPieces( msgs ); 1929 } 1930 1931 /** 1932 *** Keepalive 1933 **/ 1934 1935 if( ( msgs != NULL ) 1936 && ( msgs->clientSentAnythingAt != 0 ) 1937 && ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) ) 1938 { 1939 dbgmsg( msgs, "sending a keepalive message" ); 1940 evbuffer_add_uint32( msgs->outMessages, 0 ); 1941 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1942 } 1943 1944 return bytesWritten; 1945} 1946 1947static int 1948peerPulse( void * vmsgs ) 1949{ 1950 tr_peermsgs * msgs = vmsgs; 1951 const time_t now = tr_time( ); 1952 1953 if ( tr_isPeerIo( msgs->peer->io ) ) { 1954 updateDesiredRequestCount( msgs ); 1955 updateBlockRequests( msgs ); 1956 updateMetadataRequests( msgs, now ); 1957 } 1958 1959 for( ;; ) 1960 if( fillOutputBuffer( msgs, now ) < 1 ) 1961 break; 1962 1963 return true; /* loop forever */ 1964} 1965 1966void 1967tr_peerMsgsPulse( tr_peermsgs * msgs ) 1968{ 1969 if( msgs != NULL ) 1970 peerPulse( msgs ); 1971} 1972 1973static void 1974gotError( tr_peerIo * io UNUSED, short what, void * vmsgs ) 1975{ 1976 if( what & BEV_EVENT_TIMEOUT ) 1977 dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what ); 1978 if( what & ( BEV_EVENT_EOF | BEV_EVENT_ERROR ) ) 1979 dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", 1980 what, errno, tr_strerror( errno ) ); 1981 fireError( vmsgs, ENOTCONN ); 1982} 1983 1984static void 1985sendBitfield( tr_peermsgs * msgs ) 1986{ 1987 void * bytes; 1988 size_t byte_count = 0; 1989 struct evbuffer * out = msgs->outMessages; 1990 1991 assert( tr_torrentHasMetadata( msgs->torrent ) ); 1992 1993 bytes = tr_cpCreatePieceBitfield( &msgs->torrent->completion, &byte_count ); 1994 evbuffer_add_uint32( out, sizeof( uint8_t ) + byte_count ); 1995 evbuffer_add_uint8 ( out, BT_BITFIELD ); 1996 evbuffer_add ( out, bytes, byte_count ); 1997 dbgmsg( msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length( out ) ); 1998 pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS ); 1999 2000 tr_free( bytes ); 2001} 2002 2003static void 2004tellPeerWhatWeHave( tr_peermsgs * msgs ) 2005{ 2006 const bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); 2007 2008 if( fext && tr_cpHasAll( &msgs->torrent->completion ) ) 2009 { 2010 protocolSendHaveAll( msgs ); 2011 } 2012 else if( fext && tr_cpHasNone( &msgs->torrent->completion ) ) 2013 { 2014 protocolSendHaveNone( msgs ); 2015 } 2016 else if( !tr_cpHasNone( &msgs->torrent->completion ) ) 2017 { 2018 sendBitfield( msgs ); 2019 } 2020} 2021 2022/** 2023*** 2024**/ 2025 2026/* some peers give us error messages if we send 2027 more than this many peers in a single pex message 2028 http://wiki.theory.org/BitTorrentPeerExchangeConventions */ 2029#define MAX_PEX_ADDED 50 2030#define MAX_PEX_DROPPED 50 2031 2032typedef struct 2033{ 2034 tr_pex * added; 2035 tr_pex * dropped; 2036 tr_pex * elements; 2037 int addedCount; 2038 int droppedCount; 2039 int elementCount; 2040} 2041PexDiffs; 2042 2043static void 2044pexAddedCb( void * vpex, void * userData ) 2045{ 2046 PexDiffs * diffs = userData; 2047 tr_pex * pex = vpex; 2048 2049 if( diffs->addedCount < MAX_PEX_ADDED ) 2050 { 2051 diffs->added[diffs->addedCount++] = *pex; 2052 diffs->elements[diffs->elementCount++] = *pex; 2053 } 2054} 2055 2056static inline void 2057pexDroppedCb( void * vpex, void * userData ) 2058{ 2059 PexDiffs * diffs = userData; 2060 tr_pex * pex = vpex; 2061 2062 if( diffs->droppedCount < MAX_PEX_DROPPED ) 2063 { 2064 diffs->dropped[diffs->droppedCount++] = *pex; 2065 } 2066} 2067 2068static inline void 2069pexElementCb( void * vpex, void * userData ) 2070{ 2071 PexDiffs * diffs = userData; 2072 tr_pex * pex = vpex; 2073 2074 diffs->elements[diffs->elementCount++] = *pex; 2075} 2076 2077typedef void ( tr_set_func )( void * element, void * userData ); 2078 2079/** 2080 * @brief find the differences and commonalities in two sorted sets 2081 * @param a the first set 2082 * @param aCount the number of elements in the set 'a' 2083 * @param b the second set 2084 * @param bCount the number of elements in the set 'b' 2085 * @param compare the sorting method for both sets 2086 * @param elementSize the sizeof the element in the two sorted sets 2087 * @param in_a called for items in set 'a' but not set 'b' 2088 * @param in_b called for items in set 'b' but not set 'a' 2089 * @param in_both called for items that are in both sets 2090 * @param userData user data passed along to in_a, in_b, and in_both 2091 */ 2092static void 2093tr_set_compare( const void * va, size_t aCount, 2094 const void * vb, size_t bCount, 2095 int compare( const void * a, const void * b ), 2096 size_t elementSize, 2097 tr_set_func in_a_cb, 2098 tr_set_func in_b_cb, 2099 tr_set_func in_both_cb, 2100 void * userData ) 2101{ 2102 const uint8_t * a = va; 2103 const uint8_t * b = vb; 2104 const uint8_t * aend = a + elementSize * aCount; 2105 const uint8_t * bend = b + elementSize * bCount; 2106 2107 while( a != aend || b != bend ) 2108 { 2109 if( a == aend ) 2110 { 2111 ( *in_b_cb )( (void*)b, userData ); 2112 b += elementSize; 2113 } 2114 else if( b == bend ) 2115 { 2116 ( *in_a_cb )( (void*)a, userData ); 2117 a += elementSize; 2118 } 2119 else 2120 { 2121 const int val = ( *compare )( a, b ); 2122 2123 if( !val ) 2124 { 2125 ( *in_both_cb )( (void*)a, userData ); 2126 a += elementSize; 2127 b += elementSize; 2128 } 2129 else if( val < 0 ) 2130 { 2131 ( *in_a_cb )( (void*)a, userData ); 2132 a += elementSize; 2133 } 2134 else if( val > 0 ) 2135 { 2136 ( *in_b_cb )( (void*)b, userData ); 2137 b += elementSize; 2138 } 2139 } 2140 } 2141} 2142 2143 2144static void 2145sendPex( tr_peermsgs * msgs ) 2146{ 2147 if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) ) 2148 { 2149 PexDiffs diffs; 2150 PexDiffs diffs6; 2151 tr_pex * newPex = NULL; 2152 tr_pex * newPex6 = NULL; 2153 const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT ); 2154 const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT ); 2155 2156 /* build the diffs */ 2157 diffs.added = tr_new( tr_pex, newCount ); 2158 diffs.addedCount = 0; 2159 diffs.dropped = tr_new( tr_pex, msgs->pexCount ); 2160 diffs.droppedCount = 0; 2161 diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount ); 2162 diffs.elementCount = 0; 2163 tr_set_compare( msgs->pex, msgs->pexCount, 2164 newPex, newCount, 2165 tr_pexCompare, sizeof( tr_pex ), 2166 pexDroppedCb, pexAddedCb, pexElementCb, &diffs ); 2167 diffs6.added = tr_new( tr_pex, newCount6 ); 2168 diffs6.addedCount = 0; 2169 diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 ); 2170 diffs6.droppedCount = 0; 2171 diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 ); 2172 diffs6.elementCount = 0; 2173 tr_set_compare( msgs->pex6, msgs->pexCount6, 2174 newPex6, newCount6, 2175 tr_pexCompare, sizeof( tr_pex ), 2176 pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 ); 2177 dbgmsg( 2178 msgs, 2179 "pex: old peer count %d+%d, new peer count %d+%d, " 2180 "added %d+%d, removed %d+%d", 2181 msgs->pexCount, msgs->pexCount6, newCount, newCount6, 2182 diffs.addedCount, diffs6.addedCount, 2183 diffs.droppedCount, diffs6.droppedCount ); 2184 2185 if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount && 2186 !diffs6.droppedCount ) 2187 { 2188 tr_free( diffs.elements ); 2189 tr_free( diffs6.elements ); 2190 } 2191 else 2192 { 2193 int i; 2194 tr_benc val; 2195 uint8_t * tmp, *walk; 2196 struct evbuffer * payload; 2197 struct evbuffer * out = msgs->outMessages; 2198 2199 /* update peer */ 2200 tr_free( msgs->pex ); 2201 msgs->pex = diffs.elements; 2202 msgs->pexCount = diffs.elementCount; 2203 tr_free( msgs->pex6 ); 2204 msgs->pex6 = diffs6.elements; 2205 msgs->pexCount6 = diffs6.elementCount; 2206 2207 /* build the pex payload */ 2208 tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3: 2209 * speed vs. likelihood? */ 2210 2211 if( diffs.addedCount > 0) 2212 { 2213 /* "added" */ 2214 tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 ); 2215 for( i = 0; i < diffs.addedCount; ++i ) { 2216 memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4; 2217 memcpy( walk, &diffs.added[i].port, 2 ); walk += 2; 2218 } 2219 assert( ( walk - tmp ) == diffs.addedCount * 6 ); 2220 tr_bencDictAddRaw( &val, "added", tmp, walk - tmp ); 2221 tr_free( tmp ); 2222 2223 /* "added.f" 2224 * unset each holepunch flag because we don't support it. */ 2225 tmp = walk = tr_new( uint8_t, diffs.addedCount ); 2226 for( i = 0; i < diffs.addedCount; ++i ) 2227 *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH; 2228 assert( ( walk - tmp ) == diffs.addedCount ); 2229 tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp ); 2230 tr_free( tmp ); 2231 } 2232 2233 if( diffs.droppedCount > 0 ) 2234 { 2235 /* "dropped" */ 2236 tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 ); 2237 for( i = 0; i < diffs.droppedCount; ++i ) { 2238 memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4; 2239 memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2; 2240 } 2241 assert( ( walk - tmp ) == diffs.droppedCount * 6 ); 2242 tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp ); 2243 tr_free( tmp ); 2244 } 2245 2246 if( diffs6.addedCount > 0 ) 2247 { 2248 /* "added6" */ 2249 tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 ); 2250 for( i = 0; i < diffs6.addedCount; ++i ) { 2251 memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 ); 2252 walk += 16; 2253 memcpy( walk, &diffs6.added[i].port, 2 ); 2254 walk += 2; 2255 } 2256 assert( ( walk - tmp ) == diffs6.addedCount * 18 ); 2257 tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp ); 2258 tr_free( tmp ); 2259 2260 /* "added6.f" 2261 * unset each holepunch flag because we don't support it. */ 2262 tmp = walk = tr_new( uint8_t, diffs6.addedCount ); 2263 for( i = 0; i < diffs6.addedCount; ++i ) 2264 *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH; 2265 assert( ( walk - tmp ) == diffs6.addedCount ); 2266 tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp ); 2267 tr_free( tmp ); 2268 } 2269 2270 if( diffs6.droppedCount > 0 ) 2271 { 2272 /* "dropped6" */ 2273 tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 ); 2274 for( i = 0; i < diffs6.droppedCount; ++i ) { 2275 memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 ); 2276 walk += 16; 2277 memcpy( walk, &diffs6.dropped[i].port, 2 ); 2278 walk += 2; 2279 } 2280 assert( ( walk - tmp ) == diffs6.droppedCount * 18); 2281 tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp ); 2282 tr_free( tmp ); 2283 } 2284 2285 /* write the pex message */ 2286 payload = tr_bencToBuf( &val, TR_FMT_BENC ); 2287 evbuffer_add_uint32( out, 2 * sizeof( uint8_t ) + evbuffer_get_length( payload ) ); 2288 evbuffer_add_uint8 ( out, BT_LTEP ); 2289 evbuffer_add_uint8 ( out, msgs->ut_pex_id ); 2290 evbuffer_add_buffer( out, payload ); 2291 pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS ); 2292 dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length( out ) ); 2293 dbgOutMessageLen( msgs ); 2294 2295 evbuffer_free( payload ); 2296 tr_bencFree( &val ); 2297 } 2298 2299 /* cleanup */ 2300 tr_free( diffs.added ); 2301 tr_free( diffs.dropped ); 2302 tr_free( newPex ); 2303 tr_free( diffs6.added ); 2304 tr_free( diffs6.dropped ); 2305 tr_free( newPex6 ); 2306 2307 /*msgs->clientSentPexAt = tr_time( );*/ 2308 } 2309} 2310 2311static void 2312pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs ) 2313{ 2314 struct tr_peermsgs * msgs = vmsgs; 2315 2316 sendPex( msgs ); 2317 2318 assert( msgs->pexTimer != NULL ); 2319 tr_timerAdd( msgs->pexTimer, PEX_INTERVAL_SECS, 0 ); 2320} 2321 2322/** 2323*** 2324**/ 2325 2326tr_peermsgs* 2327tr_peerMsgsNew( struct tr_torrent * torrent, 2328 struct tr_peer * peer, 2329 tr_peer_callback * callback, 2330 void * callbackData ) 2331{ 2332 tr_peermsgs * m; 2333 2334 assert( peer ); 2335 assert( peer->io ); 2336 2337 m = tr_new0( tr_peermsgs, 1 ); 2338 m->callback = callback; 2339 m->callbackData = callbackData; 2340 m->peer = peer; 2341 m->torrent = torrent; 2342 m->peer->clientIsChoked = 1; 2343 m->peer->peerIsChoked = 1; 2344 m->peer->clientIsInterested = 0; 2345 m->peer->peerIsInterested = 0; 2346 m->state = AWAITING_BT_LENGTH; 2347 m->outMessages = evbuffer_new( ); 2348 m->outMessagesBatchedAt = 0; 2349 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; 2350 peer->msgs = m; 2351 2352 if( tr_torrentAllowsPex( torrent ) ) { 2353 m->pexTimer = evtimer_new( torrent->session->event_base, pexPulse, m ); 2354 tr_timerAdd( m->pexTimer, PEX_INTERVAL_SECS, 0 ); 2355 } 2356 2357 if( tr_peerIoSupportsUTP( peer->io ) ) { 2358 const tr_address * addr = tr_peerIoGetAddress( peer->io, NULL ); 2359 tr_peerMgrSetUtpSupported( torrent, addr ); 2360 tr_peerMgrSetUtpFailed( torrent, addr, false ); 2361 } 2362 2363 if( tr_peerIoSupportsLTEP( peer->io ) ) 2364 sendLtepHandshake( m ); 2365 2366 tellPeerWhatWeHave( m ); 2367 2368 if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io )) 2369 { 2370 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */ 2371 const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL ); 2372 if( addr->type == TR_AF_INET || tr_globalIPv6() ) { 2373 protocolSendPort( m, tr_dhtPort( torrent->session ) ); 2374 } 2375 } 2376 2377 tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m ); 2378 updateDesiredRequestCount( m ); 2379 2380 return m; 2381} 2382 2383void 2384tr_peerMsgsFree( tr_peermsgs* msgs ) 2385{ 2386 if( msgs ) 2387 { 2388 if( msgs->pexTimer != NULL ) 2389 event_free( msgs->pexTimer ); 2390 2391 if( msgs->incoming.block != NULL ) 2392 evbuffer_free( msgs->incoming.block ); 2393 2394 evbuffer_free( msgs->outMessages ); 2395 tr_free( msgs->pex6 ); 2396 tr_free( msgs->pex ); 2397 2398 memset( msgs, ~0, sizeof( tr_peermsgs ) ); 2399 tr_free( msgs ); 2400 } 2401} 2402