1/* 2 * This file Copyright (C) Mnemosyne LLC 3 * 4 * This file is licensed by the GPL version 2. Works owned by the 5 * Transmission project are granted a special exemption to clause 2(b) 6 * so that the bulk of its code can remain under the MIT license. 7 * This exemption does not extend to derived works not owned by 8 * the Transmission project. 9 * 10 * $Id: peer-mgr.c 13549 2012-10-05 22:04:08Z jordan $ 11 */ 12 13#include <assert.h> 14#include <errno.h> /* error codes ERANGE, ... */ 15#include <limits.h> /* INT_MAX */ 16#include <string.h> /* memcpy, memcmp, strstr */ 17#include <stdlib.h> /* qsort */ 18 19#include <event2/event.h> 20 21#include <libutp/utp.h> 22 23#include "transmission.h" 24#include "announcer.h" 25#include "bandwidth.h" 26#include "blocklist.h" 27#include "cache.h" 28#include "clients.h" 29#include "completion.h" 30#include "crypto.h" 31#include "handshake.h" 32#include "net.h" 33#include "peer-io.h" 34#include "peer-mgr.h" 35#include "peer-msgs.h" 36#include "ptrarray.h" 37#include "session.h" 38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */ 39#include "torrent.h" 40#include "tr-utp.h" 41#include "utils.h" 42#include "webseed.h" 43 44enum 45{ 46 /* how frequently to cull old atoms */ 47 ATOM_PERIOD_MSEC = ( 60 * 1000 ), 48 49 /* how frequently to change which peers are choked */ 50 RECHOKE_PERIOD_MSEC = ( 10 * 1000 ), 51 52 /* an optimistically unchoked peer is immune from rechoking 53 for this many calls to rechokeUploads(). */ 54 OPTIMISTIC_UNCHOKE_MULTIPLIER = 4, 55 56 /* how frequently to reallocate bandwidth */ 57 BANDWIDTH_PERIOD_MSEC = 500, 58 59 /* how frequently to age out old piece request lists */ 60 REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ), 61 62 /* how frequently to decide which peers live and die */ 63 RECONNECT_PERIOD_MSEC = 500, 64 65 /* when many peers are available, keep idle ones this long */ 66 MIN_UPLOAD_IDLE_SECS = ( 60 ), 67 68 /* when few peers are available, keep idle ones this long */ 69 MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ), 70 71 /* max number of peers to ask for per second overall. 72 * this throttle is to avoid overloading the router */ 73 MAX_CONNECTIONS_PER_SECOND = 12, 74 75 MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)), 76 77 /* number of bad pieces a peer is allowed to send before we ban them */ 78 MAX_BAD_PIECES_PER_PEER = 5, 79 80 /* amount of time to keep a list of request pieces lying around 81 before it's considered too old and needs to be rebuilt */ 82 PIECE_LIST_SHELF_LIFE_SECS = 60, 83 84 /* use for bitwise operations w/peer_atom.flags2 */ 85 MYFLAG_BANNED = 1, 86 87 /* use for bitwise operations w/peer_atom.flags2 */ 88 /* unreachable for now... but not banned. 89 * if they try to connect to us it's okay */ 90 MYFLAG_UNREACHABLE = 2, 91 92 /* the minimum we'll wait before attempting to reconnect to a peer */ 93 MINIMUM_RECONNECT_INTERVAL_SECS = 5, 94 95 /** how long we'll let requests we've made linger before we cancel them */ 96 REQUEST_TTL_SECS = 120, 97 98 NO_BLOCKS_CANCEL_HISTORY = 120, 99 100 CANCEL_HISTORY_SEC = 60 101}; 102 103const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 }; 104 105/** 106*** 107**/ 108 109/** 110 * Peer information that should be kept even before we've connected and 111 * after we've disconnected. These are kept in a pool of peer_atoms to decide 112 * which ones would make good candidates for connecting to, and to watch out 113 * for banned peers. 114 * 115 * @see tr_peer 116 * @see tr_peermsgs 117 */ 118struct peer_atom 119{ 120 uint8_t fromFirst; /* where the peer was first found */ 121 uint8_t fromBest; /* the "best" value of where the peer has been found */ 122 uint8_t flags; /* these match the added_f flags */ 123 uint8_t flags2; /* flags that aren't defined in added_f */ 124 int8_t seedProbability; /* how likely is this to be a seed... [0..100] or -1 for unknown */ 125 int8_t blocklisted; /* -1 for unknown, true for blocklisted, false for not blocklisted */ 126 127 tr_port port; 128 bool utp_failed; /* We recently failed to connect over uTP */ 129 uint16_t numFails; 130 time_t time; /* when the peer's connection status last changed */ 131 time_t piece_data_time; 132 133 time_t lastConnectionAttemptAt; 134 time_t lastConnectionAt; 135 136 /* similar to a TTL field, but less rigid -- 137 * if the swarm is small, the atom will be kept past this date. */ 138 time_t shelf_date; 139 tr_peer * peer; /* will be NULL if not connected */ 140 tr_address addr; 141}; 142 143#ifdef NDEBUG 144#define tr_isAtom(a) (TRUE) 145#else 146static bool 147tr_isAtom( const struct peer_atom * atom ) 148{ 149 return ( atom != NULL ) 150 && ( atom->fromFirst < TR_PEER_FROM__MAX ) 151 && ( atom->fromBest < TR_PEER_FROM__MAX ) 152 && ( tr_address_is_valid( &atom->addr ) ); 153} 154#endif 155 156static const char* 157tr_atomAddrStr( const struct peer_atom * atom ) 158{ 159 return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]"; 160} 161 162struct block_request 163{ 164 tr_block_index_t block; 165 tr_peer * peer; 166 time_t sentAt; 167}; 168 169struct weighted_piece 170{ 171 tr_piece_index_t index; 172 int16_t salt; 173 int16_t requestCount; 174}; 175 176enum piece_sort_state 177{ 178 PIECES_UNSORTED, 179 PIECES_SORTED_BY_INDEX, 180 PIECES_SORTED_BY_WEIGHT 181}; 182 183/** @brief Opaque, per-torrent data structure for peer connection information */ 184typedef struct tr_torrent_peers 185{ 186 tr_ptrArray outgoingHandshakes; /* tr_handshake */ 187 tr_ptrArray pool; /* struct peer_atom */ 188 tr_ptrArray peers; /* tr_peer */ 189 tr_ptrArray webseeds; /* tr_webseed */ 190 191 tr_torrent * tor; 192 struct tr_peerMgr * manager; 193 194 tr_peer * optimistic; /* the optimistic peer, or NULL if none */ 195 int optimisticUnchokeTimeScaler; 196 197 bool isRunning; 198 bool needsCompletenessCheck; 199 200 struct block_request * requests; 201 int requestCount; 202 int requestAlloc; 203 204 struct weighted_piece * pieces; 205 int pieceCount; 206 enum piece_sort_state pieceSortState; 207 208 /* An array of pieceCount items stating how many peers have each piece. 209 This is used to help us for downloading pieces "rarest first." 210 This may be NULL if we don't have metainfo yet, or if we're not 211 downloading and don't care about rarity */ 212 uint16_t * pieceReplication; 213 size_t pieceReplicationSize; 214 215 int interestedCount; 216 int maxPeers; 217 time_t lastCancel; 218 219 /* Before the endgame this should be 0. In endgame, is contains the average 220 * number of pending requests per peer. Only peers which have more pending 221 * requests are considered 'fast' are allowed to request a block that's 222 * already been requested from another (slower?) peer. */ 223 int endgame; 224} 225Torrent; 226 227struct tr_peerMgr 228{ 229 tr_session * session; 230 tr_ptrArray incomingHandshakes; /* tr_handshake */ 231 struct event * bandwidthTimer; 232 struct event * rechokeTimer; 233 struct event * refillUpkeepTimer; 234 struct event * atomTimer; 235}; 236 237#define tordbg( t, ... ) \ 238 do { \ 239 if( tr_deepLoggingIsActive( ) ) \ 240 tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \ 241 } while( 0 ) 242 243#define dbgmsg( ... ) \ 244 do { \ 245 if( tr_deepLoggingIsActive( ) ) \ 246 tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \ 247 } while( 0 ) 248 249/** 250*** 251**/ 252 253static inline void 254managerLock( const struct tr_peerMgr * manager ) 255{ 256 tr_sessionLock( manager->session ); 257} 258 259static inline void 260managerUnlock( const struct tr_peerMgr * manager ) 261{ 262 tr_sessionUnlock( manager->session ); 263} 264 265static inline void 266torrentLock( Torrent * torrent ) 267{ 268 managerLock( torrent->manager ); 269} 270 271static inline void 272torrentUnlock( Torrent * torrent ) 273{ 274 managerUnlock( torrent->manager ); 275} 276 277static inline int 278torrentIsLocked( const Torrent * t ) 279{ 280 return tr_sessionIsLocked( t->manager->session ); 281} 282 283/** 284*** 285**/ 286 287static int 288handshakeCompareToAddr( const void * va, const void * vb ) 289{ 290 const tr_handshake * a = va; 291 292 return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb ); 293} 294 295static int 296handshakeCompare( const void * a, const void * b ) 297{ 298 return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) ); 299} 300 301static inline tr_handshake* 302getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr ) 303{ 304 if( tr_ptrArrayEmpty( handshakes ) ) 305 return NULL; 306 307 return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr ); 308} 309 310static int 311comparePeerAtomToAddress( const void * va, const void * vb ) 312{ 313 const struct peer_atom * a = va; 314 315 return tr_address_compare( &a->addr, vb ); 316} 317 318static int 319compareAtomsByAddress( const void * va, const void * vb ) 320{ 321 const struct peer_atom * b = vb; 322 323 assert( tr_isAtom( b ) ); 324 325 return comparePeerAtomToAddress( va, &b->addr ); 326} 327 328/** 329*** 330**/ 331 332const tr_address * 333tr_peerAddress( const tr_peer * peer ) 334{ 335 return &peer->atom->addr; 336} 337 338static Torrent* 339getExistingTorrent( tr_peerMgr * manager, 340 const uint8_t * hash ) 341{ 342 tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash ); 343 344 return tor == NULL ? NULL : tor->torrentPeers; 345} 346 347static int 348peerCompare( const void * a, const void * b ) 349{ 350 return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) ); 351} 352 353static struct peer_atom* 354getExistingAtom( const Torrent * t, 355 const tr_address * addr ) 356{ 357 Torrent * tt = (Torrent*)t; 358 assert( torrentIsLocked( t ) ); 359 return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress ); 360} 361 362static bool 363peerIsInUse( const Torrent * ct, const struct peer_atom * atom ) 364{ 365 Torrent * t = (Torrent*) ct; 366 367 assert( torrentIsLocked ( t ) ); 368 369 return ( atom->peer != NULL ) 370 || getExistingHandshake( &t->outgoingHandshakes, &atom->addr ) 371 || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr ); 372} 373 374void 375tr_peerConstruct( tr_peer * peer ) 376{ 377 memset( peer, 0, sizeof( tr_peer ) ); 378 379 peer->have = TR_BITFIELD_INIT; 380} 381 382static tr_peer* 383peerNew( struct peer_atom * atom ) 384{ 385 tr_peer * peer = tr_new( tr_peer, 1 ); 386 tr_peerConstruct( peer ); 387 388 peer->atom = atom; 389 atom->peer = peer; 390 391 return peer; 392} 393 394static tr_peer* 395getPeer( Torrent * torrent, struct peer_atom * atom ) 396{ 397 tr_peer * peer; 398 399 assert( torrentIsLocked( torrent ) ); 400 401 peer = atom->peer; 402 403 if( peer == NULL ) 404 { 405 peer = peerNew( atom ); 406 tr_bitfieldConstruct( &peer->have, torrent->tor->info.pieceCount ); 407 tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount ); 408 tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare ); 409 } 410 411 return peer; 412} 413 414static void peerDeclinedAllRequests( Torrent *, const tr_peer * ); 415 416void 417tr_peerDestruct( tr_torrent * tor, tr_peer * peer ) 418{ 419 assert( peer != NULL ); 420 421 peerDeclinedAllRequests( tor->torrentPeers, peer ); 422 423 if( peer->msgs != NULL ) 424 tr_peerMsgsFree( peer->msgs ); 425 426 if( peer->io ) { 427 tr_peerIoClear( peer->io ); 428 tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */ 429 } 430 431 tr_bitfieldDestruct( &peer->have ); 432 tr_bitfieldDestruct( &peer->blame ); 433 tr_free( peer->client ); 434 435 if( peer->atom ) 436 peer->atom->peer = NULL; 437} 438 439static void 440peerDelete( Torrent * t, tr_peer * peer ) 441{ 442 tr_peerDestruct( t->tor, peer ); 443 tr_free( peer ); 444} 445 446static bool 447replicationExists( const Torrent * t ) 448{ 449 return t->pieceReplication != NULL; 450} 451 452static void 453replicationFree( Torrent * t ) 454{ 455 tr_free( t->pieceReplication ); 456 t->pieceReplication = NULL; 457 t->pieceReplicationSize = 0; 458} 459 460static void 461replicationNew( Torrent * t ) 462{ 463 tr_piece_index_t piece_i; 464 const tr_piece_index_t piece_count = t->tor->info.pieceCount; 465 tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 466 const int peer_count = tr_ptrArraySize( &t->peers ); 467 468 assert( !replicationExists( t ) ); 469 470 t->pieceReplicationSize = piece_count; 471 t->pieceReplication = tr_new0( uint16_t, piece_count ); 472 473 for( piece_i=0; piece_i<piece_count; ++piece_i ) 474 { 475 int peer_i; 476 uint16_t r = 0; 477 478 for( peer_i=0; peer_i<peer_count; ++peer_i ) 479 if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) ) 480 ++r; 481 482 t->pieceReplication[piece_i] = r; 483 } 484} 485 486static void 487torrentFree( void * vt ) 488{ 489 Torrent * t = vt; 490 491 assert( t ); 492 assert( !t->isRunning ); 493 assert( torrentIsLocked( t ) ); 494 assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) ); 495 assert( tr_ptrArrayEmpty( &t->peers ) ); 496 497 tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree ); 498 tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free ); 499 tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL ); 500 tr_ptrArrayDestruct( &t->peers, NULL ); 501 502 replicationFree( t ); 503 504 tr_free( t->requests ); 505 tr_free( t->pieces ); 506 tr_free( t ); 507} 508 509static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * ); 510 511static void 512rebuildWebseedArray( Torrent * t, tr_torrent * tor ) 513{ 514 int i; 515 const tr_info * inf = &tor->info; 516 517 /* clear the array */ 518 tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree ); 519 t->webseeds = TR_PTR_ARRAY_INIT; 520 521 /* repopulate it */ 522 for( i = 0; i < inf->webseedCount; ++i ) 523 { 524 tr_webseed * w = tr_webseedNew( tor, inf->webseeds[i], peerCallbackFunc, t ); 525 tr_ptrArrayAppend( &t->webseeds, w ); 526 } 527} 528 529static Torrent* 530torrentNew( tr_peerMgr * manager, tr_torrent * tor ) 531{ 532 Torrent * t; 533 534 t = tr_new0( Torrent, 1 ); 535 t->manager = manager; 536 t->tor = tor; 537 t->pool = TR_PTR_ARRAY_INIT; 538 t->peers = TR_PTR_ARRAY_INIT; 539 t->webseeds = TR_PTR_ARRAY_INIT; 540 t->outgoingHandshakes = TR_PTR_ARRAY_INIT; 541 542 rebuildWebseedArray( t, tor ); 543 544 return t; 545} 546 547static void ensureMgrTimersExist( struct tr_peerMgr * m ); 548 549tr_peerMgr* 550tr_peerMgrNew( tr_session * session ) 551{ 552 tr_peerMgr * m = tr_new0( tr_peerMgr, 1 ); 553 m->session = session; 554 m->incomingHandshakes = TR_PTR_ARRAY_INIT; 555 ensureMgrTimersExist( m ); 556 return m; 557} 558 559static void 560deleteTimer( struct event ** t ) 561{ 562 if( *t != NULL ) 563 { 564 event_free( *t ); 565 *t = NULL; 566 } 567} 568 569static void 570deleteTimers( struct tr_peerMgr * m ) 571{ 572 deleteTimer( &m->atomTimer ); 573 deleteTimer( &m->bandwidthTimer ); 574 deleteTimer( &m->rechokeTimer ); 575 deleteTimer( &m->refillUpkeepTimer ); 576} 577 578void 579tr_peerMgrFree( tr_peerMgr * manager ) 580{ 581 managerLock( manager ); 582 583 deleteTimers( manager ); 584 585 /* free the handshakes. Abort invokes handshakeDoneCB(), which removes 586 * the item from manager->handshakes, so this is a little roundabout... */ 587 while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) ) 588 tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) ); 589 590 tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL ); 591 592 managerUnlock( manager ); 593 tr_free( manager ); 594} 595 596static int 597clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer ) 598{ 599 if( !tr_torrentHasMetadata( tor ) ) 600 return true; 601 602 return peer->clientIsInterested && !peer->clientIsChoked; 603} 604 605static int 606clientIsUploadingTo( const tr_peer * peer ) 607{ 608 return peer->peerIsInterested && !peer->peerIsChoked; 609} 610 611/*** 612**** 613***/ 614 615void 616tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr ) 617{ 618 tr_torrent * tor = NULL; 619 tr_session * session = mgr->session; 620 621 /* we cache whether or not a peer is blocklisted... 622 since the blocklist has changed, erase that cached value */ 623 while(( tor = tr_torrentNext( session, tor ))) 624 { 625 int i; 626 Torrent * t = tor->torrentPeers; 627 const int n = tr_ptrArraySize( &t->pool ); 628 for( i=0; i<n; ++i ) { 629 struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i ); 630 atom->blocklisted = -1; 631 } 632 } 633} 634 635static bool 636isAtomBlocklisted( tr_session * session, struct peer_atom * atom ) 637{ 638 if( atom->blocklisted < 0 ) 639 atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr ); 640 641 assert( tr_isBool( atom->blocklisted ) ); 642 return atom->blocklisted; 643} 644 645 646/*** 647**** 648***/ 649 650static void 651atomSetSeedProbability( struct peer_atom * atom, int seedProbability ) 652{ 653 assert( atom != NULL ); 654 assert( -1<=seedProbability && seedProbability<=100 ); 655 656 atom->seedProbability = seedProbability; 657 658 if( seedProbability == 100 ) 659 atom->flags |= ADDED_F_SEED_FLAG; 660 else if( seedProbability != -1 ) 661 atom->flags &= ~ADDED_F_SEED_FLAG; 662} 663 664static inline bool 665atomIsSeed( const struct peer_atom * atom ) 666{ 667 return atom->seedProbability == 100; 668} 669 670static void 671atomSetSeed( const Torrent * t, struct peer_atom * atom ) 672{ 673 if( !atomIsSeed( atom ) ) 674 { 675 tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) ); 676 677 atomSetSeedProbability( atom, 100 ); 678 } 679} 680 681 682bool 683tr_peerMgrPeerIsSeed( const tr_torrent * tor, 684 const tr_address * addr ) 685{ 686 bool isSeed = false; 687 const Torrent * t = tor->torrentPeers; 688 const struct peer_atom * atom = getExistingAtom( t, addr ); 689 690 if( atom ) 691 isSeed = atomIsSeed( atom ); 692 693 return isSeed; 694} 695 696void 697tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr ) 698{ 699 struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr ); 700 701 if( atom ) 702 atom->flags |= ADDED_F_UTP_FLAGS; 703} 704 705void 706tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed ) 707{ 708 struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr ); 709 710 if( atom ) 711 atom->utp_failed = failed; 712} 713 714 715/** 716*** REQUESTS 717*** 718*** There are two data structures associated with managing block requests: 719*** 720*** 1. Torrent::requests, an array of "struct block_request" which keeps 721*** track of which blocks have been requested, and when, and by which peers. 722*** This is list is used for (a) cancelling requests that have been pending 723*** for too long and (b) avoiding duplicate requests before endgame. 724*** 725*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the 726*** pieces that we want to request. It's used to decide which blocks to 727*** return next when tr_peerMgrGetBlockRequests() is called. 728**/ 729 730/** 731*** struct block_request 732**/ 733 734static int 735compareReqByBlock( const void * va, const void * vb ) 736{ 737 const struct block_request * a = va; 738 const struct block_request * b = vb; 739 740 /* primary key: block */ 741 if( a->block < b->block ) return -1; 742 if( a->block > b->block ) return 1; 743 744 /* secondary key: peer */ 745 if( a->peer < b->peer ) return -1; 746 if( a->peer > b->peer ) return 1; 747 748 return 0; 749} 750 751static void 752requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer ) 753{ 754 struct block_request key; 755 756 /* ensure enough room is available... */ 757 if( t->requestCount + 1 >= t->requestAlloc ) 758 { 759 const int CHUNK_SIZE = 128; 760 t->requestAlloc += CHUNK_SIZE; 761 t->requests = tr_renew( struct block_request, 762 t->requests, t->requestAlloc ); 763 } 764 765 /* populate the record we're inserting */ 766 key.block = block; 767 key.peer = peer; 768 key.sentAt = tr_time( ); 769 770 /* insert the request to our array... */ 771 { 772 bool exact; 773 const int pos = tr_lowerBound( &key, t->requests, t->requestCount, 774 sizeof( struct block_request ), 775 compareReqByBlock, &exact ); 776 assert( !exact ); 777 memmove( t->requests + pos + 1, 778 t->requests + pos, 779 sizeof( struct block_request ) * ( t->requestCount++ - pos ) ); 780 t->requests[pos] = key; 781 } 782 783 if( peer != NULL ) 784 { 785 ++peer->pendingReqsToPeer; 786 assert( peer->pendingReqsToPeer >= 0 ); 787 } 788 789 /*fprintf( stderr, "added request of block %lu from peer %s... " 790 "there are now %d block\n", 791 (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/ 792} 793 794static struct block_request * 795requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer ) 796{ 797 struct block_request key; 798 key.block = block; 799 key.peer = (tr_peer*) peer; 800 801 return bsearch( &key, t->requests, t->requestCount, 802 sizeof( struct block_request ), 803 compareReqByBlock ); 804} 805 806/** 807 * Find the peers are we currently requesting the block 808 * with index @a block from and append them to @a peerArr. 809 */ 810static void 811getBlockRequestPeers( Torrent * t, tr_block_index_t block, 812 tr_ptrArray * peerArr ) 813{ 814 bool exact; 815 int i, pos; 816 struct block_request key; 817 818 key.block = block; 819 key.peer = NULL; 820 pos = tr_lowerBound( &key, t->requests, t->requestCount, 821 sizeof( struct block_request ), 822 compareReqByBlock, &exact ); 823 824 assert( !exact ); /* shouldn't have a request with .peer == NULL */ 825 826 for( i = pos; i < t->requestCount; ++i ) 827 { 828 if( t->requests[i].block != block ) 829 break; 830 tr_ptrArrayAppend( peerArr, t->requests[i].peer ); 831 } 832} 833 834static void 835decrementPendingReqCount( const struct block_request * b ) 836{ 837 if( b->peer != NULL ) 838 if( b->peer->pendingReqsToPeer > 0 ) 839 --b->peer->pendingReqsToPeer; 840} 841 842static void 843requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer ) 844{ 845 const struct block_request * b = requestListLookup( t, block, peer ); 846 if( b != NULL ) 847 { 848 const int pos = b - t->requests; 849 assert( pos < t->requestCount ); 850 851 decrementPendingReqCount( b ); 852 853 tr_removeElementFromArray( t->requests, 854 pos, 855 sizeof( struct block_request ), 856 t->requestCount-- ); 857 858 /*fprintf( stderr, "removing request of block %lu from peer %s... " 859 "there are now %d block requests left\n", 860 (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/ 861 } 862} 863 864static int 865countActiveWebseeds( const Torrent * t ) 866{ 867 int activeCount = 0; 868 const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds ); 869 const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds ); 870 871 for( ; w!=wend; ++w ) 872 if( tr_webseedIsActive( *w ) ) 873 ++activeCount; 874 875 return activeCount; 876} 877 878static bool 879testForEndgame( const Torrent * t ) 880{ 881 /* we consider ourselves to be in endgame if the number of bytes 882 we've got requested is >= the number of bytes left to download */ 883 return ( t->requestCount * t->tor->blockSize ) 884 >= tr_cpLeftUntilDone( &t->tor->completion ); 885} 886 887static void 888updateEndgame( Torrent * t ) 889{ 890 assert( t->requestCount >= 0 ); 891 892 if( !testForEndgame( t ) ) 893 { 894 /* not in endgame */ 895 t->endgame = 0; 896 } 897 else if( !t->endgame ) /* only recalculate when endgame first begins */ 898 { 899 int numDownloading = 0; 900 const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers ); 901 const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers ); 902 903 /* add the active bittorrent peers... */ 904 for( ; p!=pend; ++p ) 905 if( (*p)->pendingReqsToPeer > 0 ) 906 ++numDownloading; 907 908 /* add the active webseeds... */ 909 numDownloading += countActiveWebseeds( t ); 910 911 /* average number of pending requests per downloading peer */ 912 t->endgame = t->requestCount / MAX( numDownloading, 1 ); 913 } 914} 915 916 917/**** 918***** 919***** Piece List Manipulation / Accessors 920***** 921****/ 922 923static inline void 924invalidatePieceSorting( Torrent * t ) 925{ 926 t->pieceSortState = PIECES_UNSORTED; 927} 928 929static const tr_torrent * weightTorrent; 930 931static const uint16_t * weightReplication; 932 933static void 934setComparePieceByWeightTorrent( Torrent * t ) 935{ 936 if( !replicationExists( t ) ) 937 replicationNew( t ); 938 939 weightTorrent = t->tor; 940 weightReplication = t->pieceReplication; 941} 942 943/* we try to create a "weight" s.t. high-priority pieces come before others, 944 * and that partially-complete pieces come before empty ones. */ 945static int 946comparePieceByWeight( const void * va, const void * vb ) 947{ 948 const struct weighted_piece * a = va; 949 const struct weighted_piece * b = vb; 950 int ia, ib, missing, pending; 951 const tr_torrent * tor = weightTorrent; 952 const uint16_t * rep = weightReplication; 953 954 /* primary key: weight */ 955 missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index ); 956 pending = a->requestCount; 957 ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending); 958 missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index ); 959 pending = b->requestCount; 960 ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending); 961 if( ia < ib ) return -1; 962 if( ia > ib ) return 1; 963 964 /* secondary key: higher priorities go first */ 965 ia = tor->info.pieces[a->index].priority; 966 ib = tor->info.pieces[b->index].priority; 967 if( ia > ib ) return -1; 968 if( ia < ib ) return 1; 969 970 /* tertiary key: rarest first. */ 971 ia = rep[a->index]; 972 ib = rep[b->index]; 973 if( ia < ib ) return -1; 974 if( ia > ib ) return 1; 975 976 /* quaternary key: random */ 977 if( a->salt < b->salt ) return -1; 978 if( a->salt > b->salt ) return 1; 979 980 /* okay, they're equal */ 981 return 0; 982} 983 984static int 985comparePieceByIndex( const void * va, const void * vb ) 986{ 987 const struct weighted_piece * a = va; 988 const struct weighted_piece * b = vb; 989 if( a->index < b->index ) return -1; 990 if( a->index > b->index ) return 1; 991 return 0; 992} 993 994static void 995pieceListSort( Torrent * t, enum piece_sort_state state ) 996{ 997 assert( state==PIECES_SORTED_BY_INDEX 998 || state==PIECES_SORTED_BY_WEIGHT ); 999 1000 1001 if( state == PIECES_SORTED_BY_WEIGHT ) 1002 { 1003 setComparePieceByWeightTorrent( t ); 1004 qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight ); 1005 } 1006 else 1007 qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex ); 1008 1009 t->pieceSortState = state; 1010} 1011 1012/** 1013 * These functions are useful for testing, but too expensive for nightly builds. 1014 * let's leave it disabled but add an easy hook to compile it back in 1015 */ 1016#if 1 1017#define assertWeightedPiecesAreSorted(t) 1018#define assertReplicationCountIsExact(t) 1019#else 1020static void 1021assertWeightedPiecesAreSorted( Torrent * t ) 1022{ 1023 if( !t->endgame ) 1024 { 1025 int i; 1026 setComparePieceByWeightTorrent( t ); 1027 for( i=0; i<t->pieceCount-1; ++i ) 1028 assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 ); 1029 } 1030} 1031static void 1032assertReplicationCountIsExact( Torrent * t ) 1033{ 1034 /* This assert might fail due to errors of implementations in other 1035 * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone 1036 * from a client. If a such a behavior is noticed, 1037 * a bug report should be filled to the faulty client. */ 1038 1039 size_t piece_i; 1040 const uint16_t * rep = t->pieceReplication; 1041 const size_t piece_count = t->pieceReplicationSize; 1042 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); 1043 const int peer_count = tr_ptrArraySize( &t->peers ); 1044 1045 assert( piece_count == t->tor->info.pieceCount ); 1046 1047 for( piece_i=0; piece_i<piece_count; ++piece_i ) 1048 { 1049 int peer_i; 1050 uint16_t r = 0; 1051 1052 for( peer_i=0; peer_i<peer_count; ++peer_i ) 1053 if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) ) 1054 ++r; 1055 1056 assert( rep[piece_i] == r ); 1057 } 1058} 1059#endif 1060 1061static struct weighted_piece * 1062pieceListLookup( Torrent * t, tr_piece_index_t index ) 1063{ 1064 int i; 1065 1066 for( i=0; i<t->pieceCount; ++i ) 1067 if( t->pieces[i].index == index ) 1068 return &t->pieces[i]; 1069 1070 return NULL; 1071} 1072 1073static void 1074pieceListRebuild( Torrent * t ) 1075{ 1076 1077 if( !tr_torrentIsSeed( t->tor ) ) 1078 { 1079 tr_piece_index_t i; 1080 tr_piece_index_t * pool; 1081 tr_piece_index_t poolCount = 0; 1082 const tr_torrent * tor = t->tor; 1083 const tr_info * inf = tr_torrentInfo( tor ); 1084 struct weighted_piece * pieces; 1085 int pieceCount; 1086 1087 /* build the new list */ 1088 pool = tr_new( tr_piece_index_t, inf->pieceCount ); 1089 for( i=0; i<inf->pieceCount; ++i ) 1090 if( !inf->pieces[i].dnd ) 1091 if( !tr_cpPieceIsComplete( &tor->completion, i ) ) 1092 pool[poolCount++] = i; 1093 pieceCount = poolCount; 1094 pieces = tr_new0( struct weighted_piece, pieceCount ); 1095 for( i=0; i<poolCount; ++i ) { 1096 struct weighted_piece * piece = pieces + i; 1097 piece->index = pool[i]; 1098 piece->requestCount = 0; 1099 piece->salt = tr_cryptoWeakRandInt( 4096 ); 1100 } 1101 1102 /* if we already had a list of pieces, merge it into 1103 * the new list so we don't lose its requestCounts */ 1104 if( t->pieces != NULL ) 1105 { 1106 struct weighted_piece * o = t->pieces; 1107 struct weighted_piece * oend = o + t->pieceCount; 1108 struct weighted_piece * n = pieces; 1109 struct weighted_piece * nend = n + pieceCount; 1110 1111 pieceListSort( t, PIECES_SORTED_BY_INDEX ); 1112 1113 while( o!=oend && n!=nend ) { 1114 if( o->index < n->index ) 1115 ++o; 1116 else if( o->index > n->index ) 1117 ++n; 1118 else 1119 *n++ = *o++; 1120 } 1121 1122 tr_free( t->pieces ); 1123 } 1124 1125 t->pieces = pieces; 1126 t->pieceCount = pieceCount; 1127 1128 pieceListSort( t, PIECES_SORTED_BY_WEIGHT ); 1129 1130 /* cleanup */ 1131 tr_free( pool ); 1132 } 1133} 1134 1135static void 1136pieceListRemovePiece( Torrent * t, tr_piece_index_t piece ) 1137{ 1138 struct weighted_piece * p; 1139 1140 if(( p = pieceListLookup( t, piece ))) 1141 { 1142 const int pos = p - t->pieces; 1143 1144 tr_removeElementFromArray( t->pieces, 1145 pos, 1146 sizeof( struct weighted_piece ), 1147 t->pieceCount-- ); 1148 1149 if( t->pieceCount == 0 ) 1150 { 1151 tr_free( t->pieces ); 1152 t->pieces = NULL; 1153 } 1154 } 1155} 1156 1157static void 1158pieceListResortPiece( Torrent * t, struct weighted_piece * p ) 1159{ 1160 int pos; 1161 bool isSorted = true; 1162 1163 if( p == NULL ) 1164 return; 1165 1166 /* is the torrent already sorted? */ 1167 pos = p - t->pieces; 1168 setComparePieceByWeightTorrent( t ); 1169 if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) ) 1170 isSorted = false; 1171 if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) ) 1172 isSorted = false; 1173 1174 if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT ) 1175 { 1176 pieceListSort( t, PIECES_SORTED_BY_WEIGHT); 1177 isSorted = true; 1178 } 1179 1180 /* if it's not sorted, move it around */ 1181 if( !isSorted ) 1182 { 1183 bool exact; 1184 const struct weighted_piece tmp = *p; 1185 1186 tr_removeElementFromArray( t->pieces, 1187 pos, 1188 sizeof( struct weighted_piece ), 1189 t->pieceCount-- ); 1190 1191 pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount, 1192 sizeof( struct weighted_piece ), 1193 comparePieceByWeight, &exact ); 1194 1195 memmove( &t->pieces[pos + 1], 1196 &t->pieces[pos], 1197 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) ); 1198 1199 t->pieces[pos] = tmp; 1200 } 1201 1202 assertWeightedPiecesAreSorted( t ); 1203} 1204 1205static void 1206pieceListRemoveRequest( Torrent * t, tr_block_index_t block ) 1207{ 1208 struct weighted_piece * p; 1209 const tr_piece_index_t index = tr_torBlockPiece( t->tor, block ); 1210 1211 if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) ) 1212 { 1213 --p->requestCount; 1214 pieceListResortPiece( t, p ); 1215 } 1216} 1217 1218 1219/**** 1220***** 1221***** Replication count ( for rarest first policy ) 1222***** 1223****/ 1224 1225/** 1226 * Increase the replication count of this piece and sort it if the 1227 * piece list is already sorted 1228 */ 1229static void 1230tr_incrReplicationOfPiece( Torrent * t, const size_t index ) 1231{ 1232 assert( replicationExists( t ) ); 1233 assert( t->pieceReplicationSize == t->tor->info.pieceCount ); 1234 1235 /* One more replication of this piece is present in the swarm */ 1236 ++t->pieceReplication[index]; 1237 1238 /* we only resort the piece if the list is already sorted */ 1239 if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT ) 1240 pieceListResortPiece( t, pieceListLookup( t, index ) ); 1241} 1242 1243/** 1244 * Increases the replication count of pieces present in the bitfield 1245 */ 1246static void 1247tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b ) 1248{ 1249 size_t i; 1250 uint16_t * rep = t->pieceReplication; 1251 const size_t n = t->tor->info.pieceCount; 1252 1253 assert( replicationExists( t ) ); 1254 1255 for( i=0; i<n; ++i ) 1256 if( tr_bitfieldHas( b, i ) ) 1257 ++rep[i]; 1258 1259 if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT ) 1260 invalidatePieceSorting( t ); 1261} 1262 1263/** 1264 * Increase the replication count of every piece 1265 */ 1266static void 1267tr_incrReplication( Torrent * t ) 1268{ 1269 int i; 1270 const int n = t->pieceReplicationSize; 1271 1272 assert( replicationExists( t ) ); 1273 assert( t->pieceReplicationSize == t->tor->info.pieceCount ); 1274 1275 for( i=0; i<n; ++i ) 1276 ++t->pieceReplication[i]; 1277} 1278 1279/** 1280 * Decrease the replication count of pieces present in the bitset. 1281 */ 1282static void 1283tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b ) 1284{ 1285 int i; 1286 const int n = t->pieceReplicationSize; 1287 1288 assert( replicationExists( t ) ); 1289 assert( t->pieceReplicationSize == t->tor->info.pieceCount ); 1290 1291 if( tr_bitfieldHasAll( b ) ) 1292 { 1293 for( i=0; i<n; ++i ) 1294 --t->pieceReplication[i]; 1295 } 1296 else if ( !tr_bitfieldHasNone( b ) ) 1297 { 1298 for( i=0; i<n; ++i ) 1299 if( tr_bitfieldHas( b, i ) ) 1300 --t->pieceReplication[i]; 1301 1302 if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT ) 1303 invalidatePieceSorting( t ); 1304 } 1305} 1306 1307/** 1308*** 1309**/ 1310 1311void 1312tr_peerMgrRebuildRequests( tr_torrent * tor ) 1313{ 1314 assert( tr_isTorrent( tor ) ); 1315 1316 pieceListRebuild( tor->torrentPeers ); 1317} 1318 1319void 1320tr_peerMgrGetNextRequests( tr_torrent * tor, 1321 tr_peer * peer, 1322 int numwant, 1323 tr_block_index_t * setme, 1324 int * numgot, 1325 bool get_intervals ) 1326{ 1327 int i; 1328 int got; 1329 Torrent * t; 1330 struct weighted_piece * pieces; 1331 const tr_bitfield * const have = &peer->have; 1332 1333 /* sanity clause */ 1334 assert( tr_isTorrent( tor ) ); 1335 assert( peer->clientIsInterested ); 1336 assert( !peer->clientIsChoked ); 1337 assert( numwant > 0 ); 1338 1339 /* walk through the pieces and find blocks that should be requested */ 1340 got = 0; 1341 t = tor->torrentPeers; 1342 1343 /* prep the pieces list */ 1344 if( t->pieces == NULL ) 1345 pieceListRebuild( t ); 1346 1347 if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT ) 1348 pieceListSort( t, PIECES_SORTED_BY_WEIGHT ); 1349 1350 assertReplicationCountIsExact( t ); 1351 assertWeightedPiecesAreSorted( t ); 1352 1353 updateEndgame( t ); 1354 pieces = t->pieces; 1355 for( i=0; i<t->pieceCount && got<numwant; ++i ) 1356 { 1357 struct weighted_piece * p = pieces + i; 1358 1359 /* if the peer has this piece that we want... */ 1360 if( tr_bitfieldHas( have, p->index ) ) 1361 { 1362 tr_block_index_t b; 1363 tr_block_index_t first; 1364 tr_block_index_t last; 1365 tr_ptrArray peerArr = TR_PTR_ARRAY_INIT; 1366 1367 tr_torGetPieceBlockRange( tor, p->index, &first, &last ); 1368 1369 for( b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b ) 1370 { 1371 int peerCount; 1372 tr_peer ** peers; 1373 1374 /* don't request blocks we've already got */ 1375 if( tr_cpBlockIsComplete( &tor->completion, b ) ) 1376 continue; 1377 1378 /* always add peer if this block has no peers yet */ 1379 tr_ptrArrayClear( &peerArr ); 1380 getBlockRequestPeers( t, b, &peerArr ); 1381 peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount ); 1382 if( peerCount != 0 ) 1383 { 1384 /* don't make a second block request until the endgame */ 1385 if( !t->endgame ) 1386 continue; 1387 1388 /* don't have more than two peers requesting this block */ 1389 if( peerCount > 1 ) 1390 continue; 1391 1392 /* don't send the same request to the same peer twice */ 1393 if( peer == peers[0] ) 1394 continue; 1395 1396 /* in the endgame allow an additional peer to download a 1397 block but only if the peer seems to be handling requests 1398 relatively fast */ 1399 if( peer->pendingReqsToPeer + numwant - got < t->endgame ) 1400 continue; 1401 } 1402 1403 /* update the caller's table */ 1404 if( !get_intervals ) { 1405 setme[got++] = b; 1406 } 1407 /* if intervals are requested two array entries are necessarry: 1408 one for the interval's starting block and one for its end block */ 1409 else if( got && setme[2 * got - 1] == b - 1 && b != first ) { 1410 /* expand the last interval */ 1411 ++setme[2 * got - 1]; 1412 } 1413 else { 1414 /* begin a new interval */ 1415 setme[2 * got] = setme[2 * got + 1] = b; 1416 ++got; 1417 } 1418 1419 /* update our own tables */ 1420 requestListAdd( t, b, peer ); 1421 ++p->requestCount; 1422 } 1423 1424 tr_ptrArrayDestruct( &peerArr, NULL ); 1425 } 1426 } 1427 1428 /* In most cases we've just changed the weights of a small number of pieces. 1429 * So rather than qsort()ing the entire array, it's faster to apply an 1430 * adaptive insertion sort algorithm. */ 1431 if( got > 0 ) 1432 { 1433 /* not enough requests || last piece modified */ 1434 if ( i == t->pieceCount ) --i; 1435 1436 setComparePieceByWeightTorrent( t ); 1437 while( --i >= 0 ) 1438 { 1439 bool exact; 1440 1441 /* relative position! */ 1442 const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1], 1443 t->pieceCount - (i + 1), 1444 sizeof( struct weighted_piece ), 1445 comparePieceByWeight, &exact ); 1446 if( newpos > 0 ) 1447 { 1448 const struct weighted_piece piece = t->pieces[i]; 1449 memmove( &t->pieces[i], 1450 &t->pieces[i + 1], 1451 sizeof( struct weighted_piece ) * ( newpos ) ); 1452 t->pieces[i + newpos] = piece; 1453 } 1454 } 1455 } 1456 1457 assertWeightedPiecesAreSorted( t ); 1458 *numgot = got; 1459} 1460 1461bool 1462tr_peerMgrDidPeerRequest( const tr_torrent * tor, 1463 const tr_peer * peer, 1464 tr_block_index_t block ) 1465{ 1466 const Torrent * t = tor->torrentPeers; 1467 return requestListLookup( (Torrent*)t, block, peer ) != NULL; 1468} 1469 1470/* cancel requests that are too old */ 1471static void 1472refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr ) 1473{ 1474 time_t now; 1475 time_t too_old; 1476 tr_torrent * tor; 1477 int cancel_buflen = 0; 1478 struct block_request * cancel = NULL; 1479 tr_peerMgr * mgr = vmgr; 1480 managerLock( mgr ); 1481 1482 now = tr_time( ); 1483 too_old = now - REQUEST_TTL_SECS; 1484 1485 /* alloc the temporary "cancel" buffer */ 1486 tor = NULL; 1487 while(( tor = tr_torrentNext( mgr->session, tor ))) 1488 cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount ); 1489 if( cancel_buflen > 0 ) 1490 cancel = tr_new( struct block_request, cancel_buflen ); 1491 1492 /* prune requests that are too old */ 1493 tor = NULL; 1494 while(( tor = tr_torrentNext( mgr->session, tor ))) 1495 { 1496 Torrent * t = tor->torrentPeers; 1497 const int n = t->requestCount; 1498 if( n > 0 ) 1499 { 1500 int keepCount = 0; 1501 int cancelCount = 0; 1502 const struct block_request * it; 1503 const struct block_request * end; 1504 1505 for( it=t->requests, end=it+n; it!=end; ++it ) 1506 { 1507 if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) ) 1508 cancel[cancelCount++] = *it; 1509 else 1510 { 1511 if( it != &t->requests[keepCount] ) 1512 t->requests[keepCount] = *it; 1513 keepCount++; 1514 } 1515 } 1516 1517 /* prune out the ones we aren't keeping */ 1518 t->requestCount = keepCount; 1519 1520 /* send cancel messages for all the "cancel" ones */ 1521 for( it=cancel, end=it+cancelCount; it!=end; ++it ) { 1522 if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) { 1523 tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 ); 1524 tr_peerMsgsCancel( it->peer->msgs, it->block ); 1525 decrementPendingReqCount( it ); 1526 } 1527 } 1528 1529 /* decrement the pending request counts for the timed-out blocks */ 1530 for( it=cancel, end=it+cancelCount; it!=end; ++it ) 1531 pieceListRemoveRequest( t, it->block ); 1532 } 1533 } 1534 1535 tr_free( cancel ); 1536 tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC ); 1537 managerUnlock( mgr ); 1538} 1539 1540static void 1541addStrike( Torrent * t, tr_peer * peer ) 1542{ 1543 tordbg( t, "increasing peer %s strike count to %d", 1544 tr_atomAddrStr( peer->atom ), peer->strikes + 1 ); 1545 1546 if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER ) 1547 { 1548 struct peer_atom * atom = peer->atom; 1549 atom->flags2 |= MYFLAG_BANNED; 1550 peer->doPurge = 1; 1551 tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) ); 1552 } 1553} 1554 1555static void 1556gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex ) 1557{ 1558 tr_torrent * tor = t->tor; 1559 const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex ); 1560 1561 tor->corruptCur += byteCount; 1562 tor->downloadedCur -= MIN( tor->downloadedCur, byteCount ); 1563 1564 tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount ); 1565} 1566 1567static void 1568peerSuggestedPiece( Torrent * t UNUSED, 1569 tr_peer * peer UNUSED, 1570 tr_piece_index_t pieceIndex UNUSED, 1571 int isFastAllowed UNUSED ) 1572{ 1573#if 0 1574 assert( t ); 1575 assert( peer ); 1576 assert( peer->msgs ); 1577 1578 /* is this a valid piece? */ 1579 if( pieceIndex >= t->tor->info.pieceCount ) 1580 return; 1581 1582 /* don't ask for it if we've already got it */ 1583 if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) ) 1584 return; 1585 1586 /* don't ask for it if they don't have it */ 1587 if( !tr_bitfieldHas( peer->have, pieceIndex ) ) 1588 return; 1589 1590 /* don't ask for it if we're choked and it's not fast */ 1591 if( !isFastAllowed && peer->clientIsChoked ) 1592 return; 1593 1594 /* request the blocks that we don't have in this piece */ 1595 { 1596 tr_block_index_t b; 1597 tr_block_index_t first; 1598 tr_block_index_t last; 1599 const tr_torrent * tor = t->tor; 1600 1601 tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last ); 1602 1603 for( b=first; b<=last; ++b ) 1604 { 1605 if( !tr_cpBlockIsComplete( tor->completion, b ) ) 1606 { 1607 const uint32_t offset = getBlockOffsetInPiece( tor, b ); 1608 const uint32_t length = tr_torBlockCountBytes( tor, b ); 1609 tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length ); 1610 incrementPieceRequests( t, pieceIndex ); 1611 } 1612 } 1613 } 1614#endif 1615} 1616 1617static void 1618removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer ) 1619{ 1620 requestListRemove( t, block, peer ); 1621 pieceListRemoveRequest( t, block ); 1622} 1623 1624/* peer choked us, or maybe it disconnected. 1625 either way we need to remove all its requests */ 1626static void 1627peerDeclinedAllRequests( Torrent * t, const tr_peer * peer ) 1628{ 1629 int i, n; 1630 tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount ); 1631 1632 for( i=n=0; i<t->requestCount; ++i ) 1633 if( peer == t->requests[i].peer ) 1634 blocks[n++] = t->requests[i].block; 1635 1636 for( i=0; i<n; ++i ) 1637 removeRequestFromTables( t, blocks[i], peer ); 1638 1639 tr_free( blocks ); 1640} 1641 1642static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int ); 1643 1644static void 1645peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt ) 1646{ 1647 Torrent * t = vt; 1648 1649 torrentLock( t ); 1650 1651 assert( peer != NULL ); 1652 1653 switch( e->eventType ) 1654 { 1655 case TR_PEER_PEER_GOT_DATA: 1656 { 1657 const time_t now = tr_time( ); 1658 tr_torrent * tor = t->tor; 1659 1660 if( e->wasPieceData ) 1661 { 1662 tor->uploadedCur += e->length; 1663 tr_announcerAddBytes( tor, TR_ANN_UP, e->length ); 1664 tr_torrentSetActivityDate( tor, now ); 1665 tr_torrentSetDirty( tor ); 1666 } 1667 1668 /* update the stats */ 1669 if( e->wasPieceData ) 1670 tr_statsAddUploaded( tor->session, e->length ); 1671 1672 /* update our atom */ 1673 if( peer->atom && e->wasPieceData ) 1674 peer->atom->piece_data_time = now; 1675 1676 break; 1677 } 1678 1679 case TR_PEER_CLIENT_GOT_HAVE: 1680 if( replicationExists( t ) ) { 1681 tr_incrReplicationOfPiece( t, e->pieceIndex ); 1682 assertReplicationCountIsExact( t ); 1683 } 1684 break; 1685 1686 case TR_PEER_CLIENT_GOT_HAVE_ALL: 1687 if( replicationExists( t ) ) { 1688 tr_incrReplication( t ); 1689 assertReplicationCountIsExact( t ); 1690 } 1691 break; 1692 1693 case TR_PEER_CLIENT_GOT_HAVE_NONE: 1694 /* noop */ 1695 break; 1696 1697 case TR_PEER_CLIENT_GOT_BITFIELD: 1698 assert( e->bitfield != NULL ); 1699 if( replicationExists( t ) ) { 1700 tr_incrReplicationFromBitfield( t, e->bitfield ); 1701 assertReplicationCountIsExact( t ); 1702 } 1703 break; 1704 1705 case TR_PEER_CLIENT_GOT_REJ: { 1706 tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset ); 1707 if( b < t->tor->blockCount ) 1708 removeRequestFromTables( t, b, peer ); 1709 else 1710 tordbg( t, "Peer %s sent an out-of-range reject message", 1711 tr_atomAddrStr( peer->atom ) ); 1712 break; 1713 } 1714 1715 case TR_PEER_CLIENT_GOT_CHOKE: 1716 peerDeclinedAllRequests( t, peer ); 1717 break; 1718 1719 case TR_PEER_CLIENT_GOT_PORT: 1720 if( peer->atom ) 1721 peer->atom->port = e->port; 1722 break; 1723 1724 case TR_PEER_CLIENT_GOT_SUGGEST: 1725 peerSuggestedPiece( t, peer, e->pieceIndex, false ); 1726 break; 1727 1728 case TR_PEER_CLIENT_GOT_ALLOWED_FAST: 1729 peerSuggestedPiece( t, peer, e->pieceIndex, true ); 1730 break; 1731 1732 case TR_PEER_CLIENT_GOT_DATA: 1733 { 1734 const time_t now = tr_time( ); 1735 tr_torrent * tor = t->tor; 1736 1737 if( e->wasPieceData ) 1738 { 1739 tor->downloadedCur += e->length; 1740 tr_torrentSetActivityDate( tor, now ); 1741 tr_torrentSetDirty( tor ); 1742 } 1743 1744 /* update the stats */ 1745 if( e->wasPieceData ) 1746 tr_statsAddDownloaded( tor->session, e->length ); 1747 1748 /* update our atom */ 1749 if( peer->atom && e->wasPieceData ) 1750 peer->atom->piece_data_time = now; 1751 1752 break; 1753 } 1754 1755 case TR_PEER_CLIENT_GOT_BLOCK: 1756 { 1757 tr_torrent * tor = t->tor; 1758 tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset ); 1759 int i, peerCount; 1760 tr_peer ** peers; 1761 tr_ptrArray peerArr = TR_PTR_ARRAY_INIT; 1762 1763 removeRequestFromTables( t, block, peer ); 1764 getBlockRequestPeers( t, block, &peerArr ); 1765 peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount ); 1766 1767 /* remove additional block requests and send cancel to peers */ 1768 for( i=0; i<peerCount; i++ ) { 1769 tr_peer * p = peers[i]; 1770 assert( p != peer ); 1771 if( p->msgs ) { 1772 tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 ); 1773 tr_peerMsgsCancel( p->msgs, block ); 1774 } 1775 removeRequestFromTables( t, block, p ); 1776 } 1777 1778 tr_ptrArrayDestruct( &peerArr, false ); 1779 1780 tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 ); 1781 1782 if( tr_cpBlockIsComplete( &tor->completion, block ) ) 1783 { 1784 /* we already have this block... */ 1785 const uint32_t n = tr_torBlockCountBytes( tor, block ); 1786 tor->downloadedCur -= MIN( tor->downloadedCur, n ); 1787 tordbg( t, "we have this block already..." ); 1788 } 1789 else 1790 { 1791 tr_cpBlockAdd( &tor->completion, block ); 1792 pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) ); 1793 tr_torrentSetDirty( tor ); 1794 1795 if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) ) 1796 { 1797 const tr_piece_index_t p = e->pieceIndex; 1798 const bool ok = tr_torrentCheckPiece( tor, p ); 1799 1800 tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p ); 1801 1802 if( !ok ) 1803 { 1804 tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ), 1805 (unsigned long)p ); 1806 } 1807 1808 tr_peerMgrSetBlame( tor, p, ok ); 1809 1810 if( !ok ) 1811 { 1812 gotBadPiece( t, p ); 1813 } 1814 else 1815 { 1816 int i; 1817 int peerCount; 1818 tr_peer ** peers; 1819 tr_file_index_t fileIndex; 1820 1821 /* only add this to downloadedCur if we got it from a peer -- 1822 * webseeds shouldn't count against our ratio. As one tracker 1823 * admin put it, "Those pieces are downloaded directly from the 1824 * content distributor, not the peers, it is the tracker's job 1825 * to manage the swarms, not the web server and does not fit 1826 * into the jurisdiction of the tracker." */ 1827 if( peer->msgs != NULL ) { 1828 const uint32_t n = tr_torPieceCountBytes( tor, p ); 1829 tr_announcerAddBytes( tor, TR_ANN_DOWN, n ); 1830 } 1831 1832 peerCount = tr_ptrArraySize( &t->peers ); 1833 peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 1834 for( i=0; i<peerCount; ++i ) 1835 tr_peerMsgsHave( peers[i]->msgs, p ); 1836 1837 for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) { 1838 const tr_file * file = &tor->info.files[fileIndex]; 1839 if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) { 1840 if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) { 1841 tr_cacheFlushFile( tor->session->cache, tor, fileIndex ); 1842 tr_torrentFileCompleted( tor, fileIndex ); 1843 } 1844 } 1845 } 1846 1847 pieceListRemovePiece( t, p ); 1848 } 1849 } 1850 1851 t->needsCompletenessCheck = true; 1852 } 1853 break; 1854 } 1855 1856 case TR_PEER_ERROR: 1857 if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) ) 1858 { 1859 /* some protocol error from the peer */ 1860 peer->doPurge = 1; 1861 tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", 1862 tr_atomAddrStr( peer->atom ) ); 1863 } 1864 else 1865 { 1866 tordbg( t, "unhandled error: %s", tr_strerror( e->err ) ); 1867 } 1868 break; 1869 1870 default: 1871 assert( 0 ); 1872 } 1873 1874 torrentUnlock( t ); 1875} 1876 1877static int 1878getDefaultShelfLife( uint8_t from ) 1879{ 1880 /* in general, peers obtained from firsthand contact 1881 * are better than those from secondhand, etc etc */ 1882 switch( from ) 1883 { 1884 case TR_PEER_FROM_INCOMING : return 60 * 60 * 6; 1885 case TR_PEER_FROM_LTEP : return 60 * 60 * 6; 1886 case TR_PEER_FROM_TRACKER : return 60 * 60 * 3; 1887 case TR_PEER_FROM_DHT : return 60 * 60 * 3; 1888 case TR_PEER_FROM_PEX : return 60 * 60 * 2; 1889 case TR_PEER_FROM_RESUME : return 60 * 60; 1890 case TR_PEER_FROM_LPD : return 10 * 60; 1891 default : return 60 * 60; 1892 } 1893} 1894 1895static void 1896ensureAtomExists( Torrent * t, 1897 const tr_address * addr, 1898 const tr_port port, 1899 const uint8_t flags, 1900 const int8_t seedProbability, 1901 const uint8_t from ) 1902{ 1903 struct peer_atom * a; 1904 1905 assert( tr_address_is_valid( addr ) ); 1906 assert( from < TR_PEER_FROM__MAX ); 1907 1908 a = getExistingAtom( t, addr ); 1909 1910 if( a == NULL ) 1911 { 1912 const int jitter = tr_cryptoWeakRandInt( 60*10 ); 1913 a = tr_new0( struct peer_atom, 1 ); 1914 a->addr = *addr; 1915 a->port = port; 1916 a->flags = flags; 1917 a->fromFirst = from; 1918 a->fromBest = from; 1919 a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter; 1920 a->blocklisted = -1; 1921 atomSetSeedProbability( a, seedProbability ); 1922 tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress ); 1923 1924 tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) ); 1925 } 1926 else 1927 { 1928 if( from < a->fromBest ) 1929 a->fromBest = from; 1930 1931 if( a->seedProbability == -1 ) 1932 atomSetSeedProbability( a, seedProbability ); 1933 1934 a->flags |= flags; 1935 } 1936} 1937 1938static int 1939getMaxPeerCount( const tr_torrent * tor ) 1940{ 1941 return tor->maxConnectedPeers; 1942} 1943 1944static int 1945getPeerCount( const Torrent * t ) 1946{ 1947 return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */ 1948} 1949 1950/* FIXME: this is kind of a mess. */ 1951static bool 1952myHandshakeDoneCB( tr_handshake * handshake, 1953 tr_peerIo * io, 1954 bool readAnythingFromPeer, 1955 bool isConnected, 1956 const uint8_t * peer_id, 1957 void * vmanager ) 1958{ 1959 bool ok = isConnected; 1960 bool success = false; 1961 tr_port port; 1962 const tr_address * addr; 1963 tr_peerMgr * manager = vmanager; 1964 Torrent * t; 1965 tr_handshake * ours; 1966 1967 assert( io ); 1968 assert( tr_isBool( ok ) ); 1969 1970 t = tr_peerIoHasTorrentHash( io ) 1971 ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) ) 1972 : NULL; 1973 1974 if( tr_peerIoIsIncoming ( io ) ) 1975 ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes, 1976 handshake, handshakeCompare ); 1977 else if( t ) 1978 ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes, 1979 handshake, handshakeCompare ); 1980 else 1981 ours = handshake; 1982 1983 assert( ours ); 1984 assert( ours == handshake ); 1985 1986 if( t ) 1987 torrentLock( t ); 1988 1989 addr = tr_peerIoGetAddress( io, &port ); 1990 1991 if( !ok || !t || !t->isRunning ) 1992 { 1993 if( t ) 1994 { 1995 struct peer_atom * atom = getExistingAtom( t, addr ); 1996 if( atom ) 1997 { 1998 ++atom->numFails; 1999 2000 if( !readAnythingFromPeer ) 2001 { 2002 tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails ); 2003 atom->flags2 |= MYFLAG_UNREACHABLE; 2004 } 2005 } 2006 } 2007 } 2008 else /* looking good */ 2009 { 2010 struct peer_atom * atom; 2011 2012 ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING ); 2013 atom = getExistingAtom( t, addr ); 2014 atom->time = tr_time( ); 2015 atom->piece_data_time = 0; 2016 atom->lastConnectionAt = tr_time( ); 2017 2018 if( !tr_peerIoIsIncoming( io ) ) 2019 { 2020 atom->flags |= ADDED_F_CONNECTABLE; 2021 atom->flags2 &= ~MYFLAG_UNREACHABLE; 2022 } 2023 2024 /* In principle, this flag specifies whether the peer groks uTP, 2025 not whether it's currently connected over uTP. */ 2026 if( io->utp_socket ) 2027 atom->flags |= ADDED_F_UTP_FLAGS; 2028 2029 if( atom->flags2 & MYFLAG_BANNED ) 2030 { 2031 tordbg( t, "banned peer %s tried to reconnect", 2032 tr_atomAddrStr( atom ) ); 2033 } 2034 else if( tr_peerIoIsIncoming( io ) 2035 && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) ) 2036 2037 { 2038 } 2039 else 2040 { 2041 tr_peer * peer = atom->peer; 2042 2043 if( peer ) /* we already have this peer */ 2044 { 2045 } 2046 else 2047 { 2048 peer = getPeer( t, atom ); 2049 tr_free( peer->client ); 2050 2051 if( !peer_id ) 2052 peer->client = NULL; 2053 else { 2054 char client[128]; 2055 tr_clientForId( client, sizeof( client ), peer_id ); 2056 peer->client = tr_strdup( client ); 2057 } 2058 2059 peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is 2060 balanced by our unref in peerDelete() */ 2061 tr_peerIoSetParent( peer->io, &t->tor->bandwidth ); 2062 tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t ); 2063 2064 success = true; 2065 } 2066 } 2067 } 2068 2069 if( t ) 2070 torrentUnlock( t ); 2071 2072 return success; 2073} 2074 2075void 2076tr_peerMgrAddIncoming( tr_peerMgr * manager, 2077 tr_address * addr, 2078 tr_port port, 2079 int socket, 2080 struct UTPSocket * utp_socket ) 2081{ 2082 tr_session * session; 2083 2084 managerLock( manager ); 2085 2086 assert( tr_isSession( manager->session ) ); 2087 session = manager->session; 2088 2089 if( tr_sessionIsAddressBlocked( session, addr ) ) 2090 { 2091 tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) ); 2092 if(socket >= 0) 2093 tr_netClose( session, socket ); 2094 else 2095 UTP_Close( utp_socket ); 2096 } 2097 else if( getExistingHandshake( &manager->incomingHandshakes, addr ) ) 2098 { 2099 if(socket >= 0) 2100 tr_netClose( session, socket ); 2101 else 2102 UTP_Close( utp_socket ); 2103 } 2104 else /* we don't have a connection to them yet... */ 2105 { 2106 tr_peerIo * io; 2107 tr_handshake * handshake; 2108 2109 io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket ); 2110 2111 handshake = tr_handshakeNew( io, 2112 session->encryptionMode, 2113 myHandshakeDoneCB, 2114 manager ); 2115 2116 tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */ 2117 2118 tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake, 2119 handshakeCompare ); 2120 } 2121 2122 managerUnlock( manager ); 2123} 2124 2125void 2126tr_peerMgrAddPex( tr_torrent * tor, uint8_t from, 2127 const tr_pex * pex, int8_t seedProbability ) 2128{ 2129 if( tr_isPex( pex ) ) /* safeguard against corrupt data */ 2130 { 2131 Torrent * t = tor->torrentPeers; 2132 managerLock( t->manager ); 2133 2134 if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) ) 2135 if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) ) 2136 ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from ); 2137 2138 managerUnlock( t->manager ); 2139 } 2140} 2141 2142void 2143tr_peerMgrMarkAllAsSeeds( tr_torrent * tor ) 2144{ 2145 Torrent * t = tor->torrentPeers; 2146 const int n = tr_ptrArraySize( &t->pool ); 2147 struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool ); 2148 struct peer_atom ** end = it + n; 2149 2150 while( it != end ) 2151 atomSetSeed( t, *it++ ); 2152} 2153 2154tr_pex * 2155tr_peerMgrCompactToPex( const void * compact, 2156 size_t compactLen, 2157 const uint8_t * added_f, 2158 size_t added_f_len, 2159 size_t * pexCount ) 2160{ 2161 size_t i; 2162 size_t n = compactLen / 6; 2163 const uint8_t * walk = compact; 2164 tr_pex * pex = tr_new0( tr_pex, n ); 2165 2166 for( i = 0; i < n; ++i ) 2167 { 2168 pex[i].addr.type = TR_AF_INET; 2169 memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4; 2170 memcpy( &pex[i].port, walk, 2 ); walk += 2; 2171 if( added_f && ( n == added_f_len ) ) 2172 pex[i].flags = added_f[i]; 2173 } 2174 2175 *pexCount = n; 2176 return pex; 2177} 2178 2179tr_pex * 2180tr_peerMgrCompact6ToPex( const void * compact, 2181 size_t compactLen, 2182 const uint8_t * added_f, 2183 size_t added_f_len, 2184 size_t * pexCount ) 2185{ 2186 size_t i; 2187 size_t n = compactLen / 18; 2188 const uint8_t * walk = compact; 2189 tr_pex * pex = tr_new0( tr_pex, n ); 2190 2191 for( i = 0; i < n; ++i ) 2192 { 2193 pex[i].addr.type = TR_AF_INET6; 2194 memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16; 2195 memcpy( &pex[i].port, walk, 2 ); walk += 2; 2196 if( added_f && ( n == added_f_len ) ) 2197 pex[i].flags = added_f[i]; 2198 } 2199 2200 *pexCount = n; 2201 return pex; 2202} 2203 2204tr_pex * 2205tr_peerMgrArrayToPex( const void * array, 2206 size_t arrayLen, 2207 size_t * pexCount ) 2208{ 2209 size_t i; 2210 size_t n = arrayLen / ( sizeof( tr_address ) + 2 ); 2211 /*size_t n = arrayLen / sizeof( tr_peerArrayElement );*/ 2212 const uint8_t * walk = array; 2213 tr_pex * pex = tr_new0( tr_pex, n ); 2214 2215 for( i = 0 ; i < n ; i++ ) { 2216 memcpy( &pex[i].addr, walk, sizeof( tr_address ) ); 2217 memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 ); 2218 pex[i].flags = 0x00; 2219 walk += sizeof( tr_address ) + 2; 2220 } 2221 2222 *pexCount = n; 2223 return pex; 2224} 2225 2226/** 2227*** 2228**/ 2229 2230static void 2231tr_peerMgrSetBlame( tr_torrent * tor, 2232 tr_piece_index_t pieceIndex, 2233 int success ) 2234{ 2235 if( !success ) 2236 { 2237 int peerCount, i; 2238 Torrent * t = tor->torrentPeers; 2239 tr_peer ** peers; 2240 2241 assert( torrentIsLocked( t ) ); 2242 2243 peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount ); 2244 for( i = 0; i < peerCount; ++i ) 2245 { 2246 tr_peer * peer = peers[i]; 2247 if( tr_bitfieldHas( &peer->blame, pieceIndex ) ) 2248 { 2249 tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes", 2250 tr_atomAddrStr( peer->atom ), 2251 pieceIndex, (int)peer->strikes + 1 ); 2252 addStrike( t, peer ); 2253 } 2254 } 2255 } 2256} 2257 2258int 2259tr_pexCompare( const void * va, const void * vb ) 2260{ 2261 const tr_pex * a = va; 2262 const tr_pex * b = vb; 2263 int i; 2264 2265 assert( tr_isPex( a ) ); 2266 assert( tr_isPex( b ) ); 2267 2268 if(( i = tr_address_compare( &a->addr, &b->addr ))) 2269 return i; 2270 2271 if( a->port != b->port ) 2272 return a->port < b->port ? -1 : 1; 2273 2274 return 0; 2275} 2276 2277/* better goes first */ 2278static int 2279compareAtomsByUsefulness( const void * va, const void *vb ) 2280{ 2281 const struct peer_atom * a = * (const struct peer_atom**) va; 2282 const struct peer_atom * b = * (const struct peer_atom**) vb; 2283 2284 assert( tr_isAtom( a ) ); 2285 assert( tr_isAtom( b ) ); 2286 2287 if( a->piece_data_time != b->piece_data_time ) 2288 return a->piece_data_time > b->piece_data_time ? -1 : 1; 2289 if( a->fromBest != b->fromBest ) 2290 return a->fromBest < b->fromBest ? -1 : 1; 2291 if( a->numFails != b->numFails ) 2292 return a->numFails < b->numFails ? -1 : 1; 2293 2294 return 0; 2295} 2296 2297static bool 2298isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom ) 2299{ 2300 if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) ) 2301 return false; 2302 2303 if( peerIsInUse( tor->torrentPeers, atom ) ) 2304 return true; 2305 2306 if( isAtomBlocklisted( tor->session, atom ) ) 2307 return false; 2308 2309 if( atom->flags2 & MYFLAG_BANNED ) 2310 return false; 2311 2312 return true; 2313} 2314 2315int 2316tr_peerMgrGetPeers( tr_torrent * tor, 2317 tr_pex ** setme_pex, 2318 uint8_t af, 2319 uint8_t list_mode, 2320 int maxCount ) 2321{ 2322 int i; 2323 int n; 2324 int count = 0; 2325 int atomCount = 0; 2326 const Torrent * t = tor->torrentPeers; 2327 struct peer_atom ** atoms = NULL; 2328 tr_pex * pex; 2329 tr_pex * walk; 2330 2331 assert( tr_isTorrent( tor ) ); 2332 assert( setme_pex != NULL ); 2333 assert( af==TR_AF_INET || af==TR_AF_INET6 ); 2334 assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING ); 2335 2336 managerLock( t->manager ); 2337 2338 /** 2339 *** build a list of atoms 2340 **/ 2341 2342 if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */ 2343 { 2344 int i; 2345 const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers ); 2346 atomCount = tr_ptrArraySize( &t->peers ); 2347 atoms = tr_new( struct peer_atom *, atomCount ); 2348 for( i=0; i<atomCount; ++i ) 2349 atoms[i] = peers[i]->atom; 2350 } 2351 else /* TR_PEERS_INTERESTING */ 2352 { 2353 int i; 2354 struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool ); 2355 n = tr_ptrArraySize( &t->pool ); 2356 atoms = tr_new( struct peer_atom *, n ); 2357 for( i=0; i<n; ++i ) 2358 if( isAtomInteresting( tor, atomBase[i] ) ) 2359 atoms[atomCount++] = atomBase[i]; 2360 } 2361 2362 qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness ); 2363 2364 /** 2365 *** add the first N of them into our return list 2366 **/ 2367 2368 n = MIN( atomCount, maxCount ); 2369 pex = walk = tr_new0( tr_pex, n ); 2370 2371 for( i=0; i<atomCount && count<n; ++i ) 2372 { 2373 const struct peer_atom * atom = atoms[i]; 2374 if( atom->addr.type == af ) 2375 { 2376 assert( tr_address_is_valid( &atom->addr ) ); 2377 walk->addr = atom->addr; 2378 walk->port = atom->port; 2379 walk->flags = atom->flags; 2380 ++count; 2381 ++walk; 2382 } 2383 } 2384 2385 qsort( pex, count, sizeof( tr_pex ), tr_pexCompare ); 2386 2387 assert( ( walk - pex ) == count ); 2388 *setme_pex = pex; 2389 2390 /* cleanup */ 2391 tr_free( atoms ); 2392 managerUnlock( t->manager ); 2393 return count; 2394} 2395 2396static void atomPulse ( int, short, void * ); 2397static void bandwidthPulse ( int, short, void * ); 2398static void rechokePulse ( int, short, void * ); 2399static void reconnectPulse ( int, short, void * ); 2400 2401static struct event * 2402createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata ) 2403{ 2404 struct event * timer = evtimer_new( session->event_base, callback, cbdata ); 2405 tr_timerAddMsec( timer, msec ); 2406 return timer; 2407} 2408 2409static void 2410ensureMgrTimersExist( struct tr_peerMgr * m ) 2411{ 2412 if( m->atomTimer == NULL ) 2413 m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m ); 2414 2415 if( m->bandwidthTimer == NULL ) 2416 m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m ); 2417 2418 if( m->rechokeTimer == NULL ) 2419 m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m ); 2420 2421 if( m->refillUpkeepTimer == NULL ) 2422 m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m ); 2423} 2424 2425void 2426tr_peerMgrStartTorrent( tr_torrent * tor ) 2427{ 2428 Torrent * t = tor->torrentPeers; 2429 2430 assert( tr_isTorrent( tor ) ); 2431 assert( tr_torrentIsLocked( tor ) ); 2432 2433 ensureMgrTimersExist( t->manager ); 2434 2435 t->isRunning = true; 2436 t->maxPeers = t->tor->maxConnectedPeers; 2437 t->pieceSortState = PIECES_UNSORTED; 2438 2439 rechokePulse( 0, 0, t->manager ); 2440} 2441 2442static void 2443stopTorrent( Torrent * t ) 2444{ 2445 tr_peer * peer; 2446 2447 t->isRunning = false; 2448 2449 replicationFree( t ); 2450 invalidatePieceSorting( t ); 2451 2452 /* disconnect the peers. */ 2453 while(( peer = tr_ptrArrayPop( &t->peers ))) 2454 peerDelete( t, peer ); 2455 2456 /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(), 2457 * which removes the handshake from t->outgoingHandshakes... */ 2458 while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) ) 2459 tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) ); 2460} 2461 2462void 2463tr_peerMgrStopTorrent( tr_torrent * tor ) 2464{ 2465 assert( tr_isTorrent( tor ) ); 2466 assert( tr_torrentIsLocked( tor ) ); 2467 2468 stopTorrent( tor->torrentPeers ); 2469} 2470 2471void 2472tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor ) 2473{ 2474 assert( tr_isTorrent( tor ) ); 2475 assert( tr_torrentIsLocked( tor ) ); 2476 assert( tor->torrentPeers == NULL ); 2477 2478 tor->torrentPeers = torrentNew( manager, tor ); 2479} 2480 2481void 2482tr_peerMgrRemoveTorrent( tr_torrent * tor ) 2483{ 2484 assert( tr_isTorrent( tor ) ); 2485 assert( tr_torrentIsLocked( tor ) ); 2486 2487 stopTorrent( tor->torrentPeers ); 2488 torrentFree( tor->torrentPeers ); 2489} 2490 2491void 2492tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer ) 2493{ 2494 const tr_bitfield * have = &peer->have; 2495 2496 if( tr_bitfieldHasAll( have ) ) 2497 { 2498 peer->progress = 1.0; 2499 } 2500 else if( tr_bitfieldHasNone( have ) ) 2501 { 2502 peer->progress = 0.0; 2503 } 2504 else 2505 { 2506 const float true_count = tr_bitfieldCountTrueBits( have ); 2507 2508 if( tr_torrentHasMetadata( tor ) ) 2509 { 2510 peer->progress = true_count / tor->info.pieceCount; 2511 } 2512 else /* without pieceCount, this result is only a best guess... */ 2513 { 2514 peer->progress = true_count / ( have->bit_count + 1 ); 2515 } 2516 } 2517 2518 /* clamp the progress range */ 2519 if ( peer->progress < 0.0 ) 2520 peer->progress = 0.0; 2521 if ( peer->progress > 1.0 ) 2522 peer->progress = 1.0; 2523 2524 if( peer->atom && ( peer->progress >= 1.0 ) ) 2525 atomSetSeed( tor->torrentPeers, peer->atom ); 2526} 2527 2528void 2529tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor ) 2530{ 2531 int i; 2532 int peerCount; 2533 tr_peer ** peers; 2534 2535 /* the webseed list may have changed... */ 2536 rebuildWebseedArray( tor->torrentPeers, tor ); 2537 2538 /* some peer_msgs' progress fields may not be accurate if we 2539 didn't have the metadata before now... so refresh them all... */ 2540 peerCount = tr_ptrArraySize( &tor->torrentPeers->peers ); 2541 peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers ); 2542 for( i=0; i<peerCount; ++i ) 2543 tr_peerUpdateProgress( tor, peers[i] ); 2544 2545} 2546 2547void 2548tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount ) 2549{ 2550 assert( tr_isTorrent( tor ) ); 2551 assert( torrentIsLocked( tor->torrentPeers ) ); 2552 assert( tab != NULL ); 2553 assert( tabCount > 0 ); 2554 2555 memset( tab, 0, tabCount ); 2556 2557 if( tr_torrentHasMetadata( tor ) ) 2558 { 2559 tr_piece_index_t i; 2560 const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers ); 2561 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers ); 2562 const float interval = tor->info.pieceCount / (float)tabCount; 2563 const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED; 2564 2565 for( i=0; i<tabCount; ++i ) 2566 { 2567 const int piece = i * interval; 2568 2569 if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) ) 2570 tab[i] = -1; 2571 else if( peerCount ) { 2572 int j; 2573 for( j=0; j<peerCount; ++j ) 2574 if( tr_bitfieldHas( &peers[j]->have, piece ) ) 2575 ++tab[i]; 2576 } 2577 } 2578 } 2579} 2580 2581static bool 2582peerIsSeed( const tr_peer * peer ) 2583{ 2584 if( peer->progress >= 1.0 ) 2585 return true; 2586 2587 if( peer->atom && atomIsSeed( peer->atom ) ) 2588 return true; 2589 2590 return false; 2591} 2592 2593/* count how many bytes we want that connected peers have */ 2594uint64_t 2595tr_peerMgrGetDesiredAvailable( const tr_torrent * tor ) 2596{ 2597 size_t i; 2598 size_t n; 2599 uint64_t desiredAvailable; 2600 const Torrent * t = tor->torrentPeers; 2601 2602 /* common shortcuts... */ 2603 2604 if( tr_torrentIsSeed( t->tor ) ) 2605 return 0; 2606 2607 if( !tr_torrentHasMetadata( tor ) ) 2608 return 0; 2609 2610 n = tr_ptrArraySize( &t->peers ); 2611 if( n == 0 ) 2612 return 0; 2613 else { 2614 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); 2615 for( i=0; i<n; ++i ) 2616 if( peers[i]->atom && atomIsSeed( peers[i]->atom ) ) 2617 return tr_cpLeftUntilDone( &tor->completion ); 2618 } 2619 2620 if( !t->pieceReplication || !t->pieceReplicationSize ) 2621 return 0; 2622 2623 /* do it the hard way */ 2624 2625 desiredAvailable = 0; 2626 for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i ) 2627 if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) ) 2628 desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i ); 2629 2630 assert( desiredAvailable <= tor->info.totalSize ); 2631 return desiredAvailable; 2632} 2633 2634void 2635tr_peerMgrTorrentStats( tr_torrent * tor, 2636 int * setmePeersConnected, 2637 int * setmeWebseedsSendingToUs, 2638 int * setmePeersSendingToUs, 2639 int * setmePeersGettingFromUs, 2640 int * setmePeersFrom ) 2641{ 2642 int i, size; 2643 const Torrent * t = tor->torrentPeers; 2644 const tr_peer ** peers; 2645 2646 assert( tr_torrentIsLocked( tor ) ); 2647 2648 peers = (const tr_peer **) tr_ptrArrayBase( &t->peers ); 2649 size = tr_ptrArraySize( &t->peers ); 2650 2651 *setmePeersConnected = 0; 2652 *setmePeersGettingFromUs = 0; 2653 *setmePeersSendingToUs = 0; 2654 *setmeWebseedsSendingToUs = 0; 2655 2656 for( i=0; i<TR_PEER_FROM__MAX; ++i ) 2657 setmePeersFrom[i] = 0; 2658 2659 for( i=0; i<size; ++i ) 2660 { 2661 const tr_peer * peer = peers[i]; 2662 const struct peer_atom * atom = peer->atom; 2663 2664 if( peer->io == NULL ) /* not connected */ 2665 continue; 2666 2667 ++*setmePeersConnected; 2668 2669 ++setmePeersFrom[atom->fromFirst]; 2670 2671 if( clientIsDownloadingFrom( tor, peer ) ) 2672 ++*setmePeersSendingToUs; 2673 2674 if( clientIsUploadingTo( peer ) ) 2675 ++*setmePeersGettingFromUs; 2676 } 2677 2678 *setmeWebseedsSendingToUs = countActiveWebseeds( t ); 2679} 2680 2681double* 2682tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor ) 2683{ 2684 int i; 2685 const Torrent * t = tor->torrentPeers; 2686 const int webseedCount = tr_ptrArraySize( &t->webseeds ); 2687 const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds ); 2688 const uint64_t now = tr_time_msec( ); 2689 double * ret = tr_new0( double, webseedCount ); 2690 2691 assert( tr_isTorrent( tor ) ); 2692 assert( tr_torrentIsLocked( tor ) ); 2693 assert( t->manager != NULL ); 2694 assert( webseedCount == tor->info.webseedCount ); 2695 2696 for( i=0; i<webseedCount; ++i ) { 2697 unsigned int Bps; 2698 if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) ) 2699 ret[i] = Bps / (double)tr_speed_K; 2700 else 2701 ret[i] = -1.0; 2702 } 2703 2704 return ret; 2705} 2706 2707unsigned int 2708tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction ) 2709{ 2710 return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0; 2711} 2712 2713struct tr_peer_stat * 2714tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount ) 2715{ 2716 int i; 2717 const Torrent * t = tor->torrentPeers; 2718 const int size = tr_ptrArraySize( &t->peers ); 2719 const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); 2720 const uint64_t now_msec = tr_time_msec( ); 2721 const time_t now = tr_time(); 2722 tr_peer_stat * ret = tr_new0( tr_peer_stat, size ); 2723 2724 assert( tr_isTorrent( tor ) ); 2725 assert( tr_torrentIsLocked( tor ) ); 2726 assert( t->manager ); 2727 2728 for( i=0; i<size; ++i ) 2729 { 2730 char * pch; 2731 const tr_peer * peer = peers[i]; 2732 const struct peer_atom * atom = peer->atom; 2733 tr_peer_stat * stat = ret + i; 2734 2735 tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) ); 2736 tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ), 2737 sizeof( stat->client ) ); 2738 stat->port = ntohs( peer->atom->port ); 2739 stat->from = atom->fromFirst; 2740 stat->progress = peer->progress; 2741 stat->isUTP = peer->io->utp_socket != NULL; 2742 stat->isEncrypted = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0; 2743 stat->rateToPeer_KBps = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) ); 2744 stat->rateToClient_KBps = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) ); 2745 stat->peerIsChoked = peer->peerIsChoked; 2746 stat->peerIsInterested = peer->peerIsInterested; 2747 stat->clientIsChoked = peer->clientIsChoked; 2748 stat->clientIsInterested = peer->clientIsInterested; 2749 stat->isIncoming = tr_peerIoIsIncoming( peer->io ); 2750 stat->isDownloadingFrom = clientIsDownloadingFrom( tor, peer ); 2751 stat->isUploadingTo = clientIsUploadingTo( peer ); 2752 stat->isSeed = peerIsSeed( peer ); 2753 2754 stat->blocksToPeer = tr_historyGet( &peer->blocksSentToPeer, now, CANCEL_HISTORY_SEC ); 2755 stat->blocksToClient = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC ); 2756 stat->cancelsToPeer = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC ); 2757 stat->cancelsToClient = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC ); 2758 2759 stat->pendingReqsToPeer = peer->pendingReqsToPeer; 2760 stat->pendingReqsToClient = peer->pendingReqsToClient; 2761 2762 pch = stat->flagStr; 2763 if( stat->isUTP ) *pch++ = 'T'; 2764 if( t->optimistic == peer ) *pch++ = 'O'; 2765 if( stat->isDownloadingFrom ) *pch++ = 'D'; 2766 else if( stat->clientIsInterested ) *pch++ = 'd'; 2767 if( stat->isUploadingTo ) *pch++ = 'U'; 2768 else if( stat->peerIsInterested ) *pch++ = 'u'; 2769 if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K'; 2770 if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?'; 2771 if( stat->isEncrypted ) *pch++ = 'E'; 2772 if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H'; 2773 else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X'; 2774 if( stat->isIncoming ) *pch++ = 'I'; 2775 *pch = '\0'; 2776 } 2777 2778 *setmeCount = size; 2779 2780 return ret; 2781} 2782 2783/*** 2784**** 2785**** 2786***/ 2787 2788void 2789tr_peerMgrClearInterest( tr_torrent * tor ) 2790{ 2791 int i; 2792 Torrent * t = tor->torrentPeers; 2793 const int peerCount = tr_ptrArraySize( &t->peers ); 2794 2795 assert( tr_isTorrent( tor ) ); 2796 assert( tr_torrentIsLocked( tor ) ); 2797 2798 for( i=0; i<peerCount; ++i ) 2799 { 2800 const tr_peer * peer = tr_ptrArrayNth( &t->peers, i ); 2801 tr_peerMsgsSetInterested( peer->msgs, false ); 2802 } 2803} 2804 2805/* does this peer have any pieces that we want? */ 2806static bool 2807isPeerInteresting( const tr_torrent * const tor, 2808 const bool * const piece_is_interesting, 2809 const tr_peer * const peer ) 2810{ 2811 tr_piece_index_t i, n; 2812 2813 /* these cases should have already been handled by the calling code... */ 2814 assert( !tr_torrentIsSeed( tor ) ); 2815 assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) ); 2816 2817 if( peerIsSeed( peer ) ) 2818 return true; 2819 2820 for( i=0, n=tor->info.pieceCount; i<n; ++i ) 2821 if( piece_is_interesting[i] && tr_bitfieldHas( &peer->have, i ) ) 2822 return true; 2823 2824 return false; 2825} 2826 2827typedef enum 2828{ 2829 RECHOKE_STATE_GOOD, 2830 RECHOKE_STATE_UNTESTED, 2831 RECHOKE_STATE_BAD 2832} 2833tr_rechoke_state; 2834 2835struct tr_rechoke_info 2836{ 2837 tr_peer * peer; 2838 int salt; 2839 int rechoke_state; 2840}; 2841 2842static int 2843compare_rechoke_info( const void * va, const void * vb ) 2844{ 2845 const struct tr_rechoke_info * a = va; 2846 const struct tr_rechoke_info * b = vb; 2847 2848 if( a->rechoke_state != b->rechoke_state ) 2849 return a->rechoke_state - b->rechoke_state; 2850 2851 return a->salt - b->salt; 2852} 2853 2854/* determines who we send "interested" messages to */ 2855static void 2856rechokeDownloads( Torrent * t ) 2857{ 2858 int i; 2859 int maxPeers = 0; 2860 int rechoke_count = 0; 2861 struct tr_rechoke_info * rechoke = NULL; 2862 const int MIN_INTERESTING_PEERS = 5; 2863 const int peerCount = tr_ptrArraySize( &t->peers ); 2864 const time_t now = tr_time( ); 2865 2866 /* some cases where this function isn't necessary */ 2867 if( tr_torrentIsSeed( t->tor ) ) 2868 return; 2869 if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) ) 2870 return; 2871 2872 /* decide HOW MANY peers to be interested in */ 2873 { 2874 int blocks = 0; 2875 int cancels = 0; 2876 time_t timeSinceCancel; 2877 2878 /* Count up how many blocks & cancels each peer has. 2879 * 2880 * There are two situations where we send out cancels -- 2881 * 2882 * 1. We've got unresponsive peers, which is handled by deciding 2883 * -which- peers to be interested in. 2884 * 2885 * 2. We've hit our bandwidth cap, which is handled by deciding 2886 * -how many- peers to be interested in. 2887 * 2888 * We're working on 2. here, so we need to ignore unresponsive 2889 * peers in our calculations lest they confuse Transmission into 2890 * thinking it's hit its bandwidth cap. 2891 */ 2892 for( i=0; i<peerCount; ++i ) 2893 { 2894 const tr_peer * peer = tr_ptrArrayNth( &t->peers, i ); 2895 const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC ); 2896 const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC ); 2897 2898 if( b == 0 ) /* ignore unresponsive peers, as described above */ 2899 continue; 2900 2901 blocks += b; 2902 cancels += c; 2903 } 2904 2905 if( cancels > 0 ) 2906 { 2907 /* cancelRate: of the block requests we've recently made, the percentage we cancelled. 2908 * higher values indicate more congestion. */ 2909 const double cancelRate = cancels / (double)(cancels + blocks); 2910 const double mult = 1 - MIN( cancelRate, 0.5 ); 2911 maxPeers = t->interestedCount * mult; 2912 tordbg( t, "cancel rate is %.3f -- reducing the " 2913 "number of peers we're interested in by %.0f percent", 2914 cancelRate, mult * 100 ); 2915 t->lastCancel = now; 2916 } 2917 2918 timeSinceCancel = now - t->lastCancel; 2919 if( timeSinceCancel ) 2920 { 2921 const int maxIncrease = 15; 2922 const time_t maxHistory = 2 * CANCEL_HISTORY_SEC; 2923 const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory; 2924 const int inc = maxIncrease * mult; 2925 maxPeers = t->maxPeers + inc; 2926 tordbg( t, "time since last cancel is %li -- increasing the " 2927 "number of peers we're interested in by %d", 2928 timeSinceCancel, inc ); 2929 } 2930 } 2931 2932 /* don't let the previous section's number tweaking go too far... */ 2933 if( maxPeers < MIN_INTERESTING_PEERS ) 2934 maxPeers = MIN_INTERESTING_PEERS; 2935 if( maxPeers > t->tor->maxConnectedPeers ) 2936 maxPeers = t->tor->maxConnectedPeers; 2937 2938 t->maxPeers = maxPeers; 2939 2940 if( peerCount > 0 ) 2941 { 2942 bool * piece_is_interesting; 2943 const tr_torrent * const tor = t->tor; 2944 const int n = tor->info.pieceCount; 2945 2946 /* build a bitfield of interesting pieces... */ 2947 piece_is_interesting = tr_new( bool, n ); 2948 for( i=0; i<n; i++ ) 2949 piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i ); 2950 2951 /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */ 2952 for( i=0; i<peerCount; ++i ) 2953 { 2954 tr_peer * peer = tr_ptrArrayNth( &t->peers, i ); 2955 2956 if( !isPeerInteresting( t->tor, piece_is_interesting, peer ) ) 2957 { 2958 tr_peerMsgsSetInterested( peer->msgs, false ); 2959 } 2960 else 2961 { 2962 tr_rechoke_state rechoke_state; 2963 const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC ); 2964 const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC ); 2965 2966 if( !blocks && !cancels ) 2967 rechoke_state = RECHOKE_STATE_UNTESTED; 2968 else if( !cancels ) 2969 rechoke_state = RECHOKE_STATE_GOOD; 2970 else if( !blocks ) 2971 rechoke_state = RECHOKE_STATE_BAD; 2972 else if( ( cancels * 10 ) < blocks ) 2973 rechoke_state = RECHOKE_STATE_GOOD; 2974 else 2975 rechoke_state = RECHOKE_STATE_BAD; 2976 2977 if( rechoke == NULL ) 2978 rechoke = tr_new( struct tr_rechoke_info, peerCount ); 2979 2980 rechoke[rechoke_count].peer = peer; 2981 rechoke[rechoke_count].rechoke_state = rechoke_state; 2982 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX ); 2983 rechoke_count++; 2984 } 2985 2986 } 2987 2988 tr_free( piece_is_interesting ); 2989 } 2990 2991 /* now that we know which & how many peers to be interested in... update the peer interest */ 2992 qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info ); 2993 t->interestedCount = MIN( maxPeers, rechoke_count ); 2994 for( i=0; i<rechoke_count; ++i ) 2995 tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount ); 2996 2997 /* cleanup */ 2998 tr_free( rechoke ); 2999} 3000 3001/** 3002*** 3003**/ 3004 3005struct ChokeData 3006{ 3007 bool isInterested; 3008 bool wasChoked; 3009 bool isChoked; 3010 int rate; 3011 int salt; 3012 tr_peer * peer; 3013}; 3014 3015static int 3016compareChoke( const void * va, const void * vb ) 3017{ 3018 const struct ChokeData * a = va; 3019 const struct ChokeData * b = vb; 3020 3021 if( a->rate != b->rate ) /* prefer higher overall speeds */ 3022 return a->rate > b->rate ? -1 : 1; 3023 3024 if( a->wasChoked != b->wasChoked ) /* prefer unchoked */ 3025 return a->wasChoked ? 1 : -1; 3026 3027 if( a->salt != b->salt ) /* random order */ 3028 return a->salt - b->salt; 3029 3030 return 0; 3031} 3032 3033/* is this a new connection? */ 3034static int 3035isNew( const tr_peer * peer ) 3036{ 3037 return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45; 3038} 3039 3040/* get a rate for deciding which peers to choke and unchoke. */ 3041static int 3042getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now ) 3043{ 3044 unsigned int Bps; 3045 3046 if( tr_torrentIsSeed( tor ) ) 3047 Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER ); 3048 3049 /* downloading a private torrent... take upload speed into account 3050 * because there may only be a small window of opportunity to share */ 3051 else if( tr_torrentIsPrivate( tor ) ) 3052 Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT ) 3053 + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER ); 3054 3055 /* downloading a public torrent */ 3056 else 3057 Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT ); 3058 3059 /* convert it to bytes per second */ 3060 return Bps; 3061} 3062 3063static inline bool 3064isBandwidthMaxedOut( const tr_bandwidth * b, 3065 const uint64_t now_msec, tr_direction dir ) 3066{ 3067 if( !tr_bandwidthIsLimited( b, dir ) ) 3068 return false; 3069 else { 3070 const unsigned int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir ); 3071 const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir ); 3072 return got >= want; 3073 } 3074} 3075 3076static void 3077rechokeUploads( Torrent * t, const uint64_t now ) 3078{ 3079 int i, size, unchokedInterested; 3080 const int peerCount = tr_ptrArraySize( &t->peers ); 3081 tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers ); 3082 struct ChokeData * choke = tr_new0( struct ChokeData, peerCount ); 3083 const tr_session * session = t->manager->session; 3084 const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER ); 3085 const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP ); 3086 3087 assert( torrentIsLocked( t ) ); 3088 3089 /* an optimistic unchoke peer's "optimistic" 3090 * state lasts for N calls to rechokeUploads(). */ 3091 if( t->optimisticUnchokeTimeScaler > 0 ) 3092 t->optimisticUnchokeTimeScaler--; 3093 else 3094 t->optimistic = NULL; 3095 3096 /* sort the peers by preference and rate */ 3097 for( i = 0, size = 0; i < peerCount; ++i ) 3098 { 3099 tr_peer * peer = peers[i]; 3100 struct peer_atom * atom = peer->atom; 3101 3102 if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */ 3103 { 3104 tr_peerMsgsSetChoke( peer->msgs, true ); 3105 } 3106 else if( chokeAll ) /* choke everyone if we're not uploading */ 3107 { 3108 tr_peerMsgsSetChoke( peer->msgs, true ); 3109 } 3110 else if( peer != t->optimistic ) 3111 { 3112 struct ChokeData * n = &choke[size++]; 3113 n->peer = peer; 3114 n->isInterested = peer->peerIsInterested; 3115 n->wasChoked = peer->peerIsChoked; 3116 n->rate = getRate( t->tor, atom, now ); 3117 n->salt = tr_cryptoWeakRandInt( INT_MAX ); 3118 n->isChoked = true; 3119 } 3120 } 3121 3122 qsort( choke, size, sizeof( struct ChokeData ), compareChoke ); 3123 3124 /** 3125 * Reciprocation and number of uploads capping is managed by unchoking 3126 * the N peers which have the best upload rate and are interested. 3127 * This maximizes the client's download rate. These N peers are 3128 * referred to as downloaders, because they are interested in downloading 3129 * from the client. 3130 * 3131 * Peers which have a better upload rate (as compared to the downloaders) 3132 * but aren't interested get unchoked. If they become interested, the 3133 * downloader with the worst upload rate gets choked. If a client has 3134 * a complete file, it uses its upload rate rather than its download 3135 * rate to decide which peers to unchoke. 3136 * 3137 * If our bandwidth is maxed out, don't unchoke any more peers. 3138 */ 3139 unchokedInterested = 0; 3140 for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) { 3141 choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false; 3142 if( choke[i].isInterested ) 3143 ++unchokedInterested; 3144 } 3145 3146 /* optimistic unchoke */ 3147 if( !t->optimistic && !isMaxedOut && (i<size) ) 3148 { 3149 int n; 3150 struct ChokeData * c; 3151 tr_ptrArray randPool = TR_PTR_ARRAY_INIT; 3152 3153 for( ; i<size; ++i ) 3154 { 3155 if( choke[i].isInterested ) 3156 { 3157 const tr_peer * peer = choke[i].peer; 3158 int x = 1, y; 3159 if( isNew( peer ) ) x *= 3; 3160 for( y=0; y<x; ++y ) 3161 tr_ptrArrayAppend( &randPool, &choke[i] ); 3162 } 3163 } 3164 3165 if(( n = tr_ptrArraySize( &randPool ))) 3166 { 3167 c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n )); 3168 c->isChoked = false; 3169 t->optimistic = c->peer; 3170 t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER; 3171 } 3172 3173 tr_ptrArrayDestruct( &randPool, NULL ); 3174 } 3175 3176 for( i=0; i<size; ++i ) 3177 tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked ); 3178 3179 /* cleanup */ 3180 tr_free( choke ); 3181} 3182 3183static void 3184rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr ) 3185{ 3186 tr_torrent * tor = NULL; 3187 tr_peerMgr * mgr = vmgr; 3188 const uint64_t now = tr_time_msec( ); 3189 3190 managerLock( mgr ); 3191 3192 while(( tor = tr_torrentNext( mgr->session, tor ))) { 3193 if( tor->isRunning ) { 3194 Torrent * t = tor->torrentPeers; 3195 if( !tr_ptrArrayEmpty( &t->peers ) ) { 3196 rechokeUploads( t, now ); 3197 rechokeDownloads( t ); 3198 } 3199 } 3200 } 3201 3202 tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC ); 3203 managerUnlock( mgr ); 3204} 3205 3206/*** 3207**** 3208**** Life and Death 3209**** 3210***/ 3211 3212static bool 3213shouldPeerBeClosed( const Torrent * t, 3214 const tr_peer * peer, 3215 int peerCount, 3216 const time_t now ) 3217{ 3218 const tr_torrent * tor = t->tor; 3219 const struct peer_atom * atom = peer->atom; 3220 3221 /* if it's marked for purging, close it */ 3222 if( peer->doPurge ) 3223 { 3224 tordbg( t, "purging peer %s because its doPurge flag is set", 3225 tr_atomAddrStr( atom ) ); 3226 return true; 3227 } 3228 3229 /* disconnect if we're both seeds and enough time has passed for PEX */ 3230 if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) ) 3231 return !tr_torrentAllowsPex(tor) || (now-atom->time>=30); 3232 3233 /* disconnect if it's been too long since piece data has been transferred. 3234 * this is on a sliding scale based on number of available peers... */ 3235 { 3236 const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 ); 3237 /* if we have >= relaxIfFewerThan, strictness is 100%. 3238 * if we have zero connections, strictness is 0% */ 3239 const float strictness = peerCount >= relaxStrictnessIfFewerThanN 3240 ? 1.0 3241 : peerCount / (float)relaxStrictnessIfFewerThanN; 3242 const int lo = MIN_UPLOAD_IDLE_SECS; 3243 const int hi = MAX_UPLOAD_IDLE_SECS; 3244 const int limit = hi - ( ( hi - lo ) * strictness ); 3245 const int idleTime = now - MAX( atom->time, atom->piece_data_time ); 3246/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/ 3247 if( idleTime > limit ) { 3248 tordbg( t, "purging peer %s because it's been %d secs since we shared anything", 3249 tr_atomAddrStr( atom ), idleTime ); 3250 return true; 3251 } 3252 } 3253 3254 return false; 3255} 3256 3257static tr_peer ** 3258getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize ) 3259{ 3260 int i, peerCount, outsize; 3261 struct tr_peer ** ret = NULL; 3262 tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount ); 3263 3264 assert( torrentIsLocked( t ) ); 3265 3266 for( i = outsize = 0; i < peerCount; ++i ) { 3267 if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) { 3268 if( ret == NULL ) 3269 ret = tr_new( tr_peer *, peerCount ); 3270 ret[outsize++] = peers[i]; 3271 } 3272 } 3273 3274 *setmeSize = outsize; 3275 return ret; 3276} 3277 3278static int 3279getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now ) 3280{ 3281 int sec; 3282 3283 /* if we were recently connected to this peer and transferring piece 3284 * data, try to reconnect to them sooner rather that later -- we don't 3285 * want network troubles to get in the way of a good peer. */ 3286 if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) ) 3287 sec = MINIMUM_RECONNECT_INTERVAL_SECS; 3288 3289 /* don't allow reconnects more often than our minimum */ 3290 else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS ) 3291 sec = MINIMUM_RECONNECT_INTERVAL_SECS; 3292 3293 /* otherwise, the interval depends on how many times we've tried 3294 * and failed to connect to the peer */ 3295 else switch( atom->numFails ) { 3296 case 0: sec = 0; break; 3297 case 1: sec = 5; break; 3298 case 2: sec = 2 * 60; break; 3299 case 3: sec = 15 * 60; break; 3300 case 4: sec = 30 * 60; break; 3301 case 5: sec = 60 * 60; break; 3302 default: sec = 120 * 60; break; 3303 } 3304 3305 /* penalize peers that were unreachable the last time we tried */ 3306 if( atom->flags2 & MYFLAG_UNREACHABLE ) 3307 sec += sec; 3308 3309 dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec ); 3310 return sec; 3311} 3312 3313static void 3314removePeer( Torrent * t, tr_peer * peer ) 3315{ 3316 tr_peer * removed; 3317 struct peer_atom * atom = peer->atom; 3318 3319 assert( torrentIsLocked( t ) ); 3320 assert( atom ); 3321 3322 atom->time = tr_time( ); 3323 3324 removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare ); 3325 3326 if( replicationExists( t ) ) 3327 tr_decrReplicationFromBitfield( t, &peer->have ); 3328 3329 assert( removed == peer ); 3330 peerDelete( t, removed ); 3331} 3332 3333static void 3334closePeer( Torrent * t, tr_peer * peer ) 3335{ 3336 struct peer_atom * atom; 3337 3338 assert( t != NULL ); 3339 assert( peer != NULL ); 3340 3341 atom = peer->atom; 3342 3343 /* if we transferred piece data, then they might be good peers, 3344 so reset their `numFails' weight to zero. otherwise we connected 3345 to them fruitlessly, so mark it as another fail */ 3346 if( atom->piece_data_time ) { 3347 tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) ); 3348 atom->numFails = 0; 3349 } else { 3350 ++atom->numFails; 3351 tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails ); 3352 } 3353 3354 tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) ); 3355 removePeer( t, peer ); 3356} 3357 3358static void 3359removeAllPeers( Torrent * t ) 3360{ 3361 while( !tr_ptrArrayEmpty( &t->peers ) ) 3362 removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) ); 3363} 3364 3365static void 3366closeBadPeers( Torrent * t, const time_t now_sec ) 3367{ 3368 if( !tr_ptrArrayEmpty( &t->peers ) ) 3369 { 3370 int i; 3371 int peerCount; 3372 struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount ); 3373 for( i=0; i<peerCount; ++i ) 3374 closePeer( t, peers[i] ); 3375 tr_free( peers ); 3376 } 3377} 3378 3379struct peer_liveliness 3380{ 3381 tr_peer * peer; 3382 void * clientData; 3383 time_t pieceDataTime; 3384 time_t time; 3385 int speed; 3386 bool doPurge; 3387}; 3388 3389static int 3390comparePeerLiveliness( const void * va, const void * vb ) 3391{ 3392 const struct peer_liveliness * a = va; 3393 const struct peer_liveliness * b = vb; 3394 3395 if( a->doPurge != b->doPurge ) 3396 return a->doPurge ? 1 : -1; 3397 3398 if( a->speed != b->speed ) /* faster goes first */ 3399 return a->speed > b->speed ? -1 : 1; 3400 3401 /* the one to give us data more recently goes first */ 3402 if( a->pieceDataTime != b->pieceDataTime ) 3403 return a->pieceDataTime > b->pieceDataTime ? -1 : 1; 3404 3405 /* the one we connected to most recently goes first */ 3406 if( a->time != b->time ) 3407 return a->time > b->time ? -1 : 1; 3408 3409 return 0; 3410} 3411 3412static void 3413sortPeersByLivelinessImpl( tr_peer ** peers, 3414 void ** clientData, 3415 int n, 3416 uint64_t now, 3417 int (*compare) ( const void *va, const void *vb ) ) 3418{ 3419 int i; 3420 struct peer_liveliness *lives, *l; 3421 3422 /* build a sortable array of peer + extra info */ 3423 lives = l = tr_new0( struct peer_liveliness, n ); 3424 for( i=0; i<n; ++i, ++l ) 3425 { 3426 tr_peer * p = peers[i]; 3427 l->peer = p; 3428 l->doPurge = p->doPurge; 3429 l->pieceDataTime = p->atom->piece_data_time; 3430 l->time = p->atom->time; 3431 l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP ) 3432 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN ); 3433 if( clientData ) 3434 l->clientData = clientData[i]; 3435 } 3436 3437 /* sort 'em */ 3438 assert( n == ( l - lives ) ); 3439 qsort( lives, n, sizeof( struct peer_liveliness ), compare ); 3440 3441 /* build the peer array */ 3442 for( i=0, l=lives; i<n; ++i, ++l ) { 3443 peers[i] = l->peer; 3444 if( clientData ) 3445 clientData[i] = l->clientData; 3446 } 3447 assert( n == ( l - lives ) ); 3448 3449 /* cleanup */ 3450 tr_free( lives ); 3451} 3452 3453static void 3454sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now ) 3455{ 3456 sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness ); 3457} 3458 3459 3460static void 3461enforceTorrentPeerLimit( Torrent * t, uint64_t now ) 3462{ 3463 int n = tr_ptrArraySize( &t->peers ); 3464 const int max = tr_torrentGetPeerLimit( t->tor ); 3465 if( n > max ) 3466 { 3467 void * base = tr_ptrArrayBase( &t->peers ); 3468 tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) ); 3469 sortPeersByLiveliness( peers, NULL, n, now ); 3470 while( n > max ) 3471 closePeer( t, peers[--n] ); 3472 tr_free( peers ); 3473 } 3474} 3475 3476static void 3477enforceSessionPeerLimit( tr_session * session, uint64_t now ) 3478{ 3479 int n = 0; 3480 tr_torrent * tor = NULL; 3481 const int max = tr_sessionGetPeerLimit( session ); 3482 3483 /* count the total number of peers */ 3484 while(( tor = tr_torrentNext( session, tor ))) 3485 n += tr_ptrArraySize( &tor->torrentPeers->peers ); 3486 3487 /* if there are too many, prune out the worst */ 3488 if( n > max ) 3489 { 3490 tr_peer ** peers = tr_new( tr_peer*, n ); 3491 Torrent ** torrents = tr_new( Torrent*, n ); 3492 3493 /* populate the peer array */ 3494 n = 0; 3495 tor = NULL; 3496 while(( tor = tr_torrentNext( session, tor ))) { 3497 int i; 3498 Torrent * t = tor->torrentPeers; 3499 const int tn = tr_ptrArraySize( &t->peers ); 3500 for( i=0; i<tn; ++i, ++n ) { 3501 peers[n] = tr_ptrArrayNth( &t->peers, i ); 3502 torrents[n] = t; 3503 } 3504 } 3505 3506 /* sort 'em */ 3507 sortPeersByLiveliness( peers, (void**)torrents, n, now ); 3508 3509 /* cull out the crappiest */ 3510 while( n-- > max ) 3511 closePeer( torrents[n], peers[n] ); 3512 3513 /* cleanup */ 3514 tr_free( torrents ); 3515 tr_free( peers ); 3516 } 3517} 3518 3519static void makeNewPeerConnections( tr_peerMgr * mgr, const int max ); 3520 3521static void 3522reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr ) 3523{ 3524 tr_torrent * tor; 3525 tr_peerMgr * mgr = vmgr; 3526 const time_t now_sec = tr_time( ); 3527 const uint64_t now_msec = tr_time_msec( ); 3528 3529 /** 3530 *** enforce the per-session and per-torrent peer limits 3531 **/ 3532 3533 /* if we're over the per-torrent peer limits, cull some peers */ 3534 tor = NULL; 3535 while(( tor = tr_torrentNext( mgr->session, tor ))) 3536 if( tor->isRunning ) 3537 enforceTorrentPeerLimit( tor->torrentPeers, now_msec ); 3538 3539 /* if we're over the per-session peer limits, cull some peers */ 3540 enforceSessionPeerLimit( mgr->session, now_msec ); 3541 3542 /* remove crappy peers */ 3543 tor = NULL; 3544 while(( tor = tr_torrentNext( mgr->session, tor ))) 3545 if( !tor->torrentPeers->isRunning ) 3546 removeAllPeers( tor->torrentPeers ); 3547 else 3548 closeBadPeers( tor->torrentPeers, now_sec ); 3549 3550 /* try to make new peer connections */ 3551 makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE ); 3552} 3553 3554/**** 3555***** 3556***** BANDWIDTH ALLOCATION 3557***** 3558****/ 3559 3560static void 3561pumpAllPeers( tr_peerMgr * mgr ) 3562{ 3563 tr_torrent * tor = NULL; 3564 3565 while(( tor = tr_torrentNext( mgr->session, tor ))) 3566 { 3567 int j; 3568 Torrent * t = tor->torrentPeers; 3569 3570 for( j=0; j<tr_ptrArraySize( &t->peers ); ++j ) 3571 { 3572 tr_peer * peer = tr_ptrArrayNth( &t->peers, j ); 3573 tr_peerMsgsPulse( peer->msgs ); 3574 } 3575 } 3576} 3577 3578static void 3579queuePulse( tr_session * session, tr_direction dir ) 3580{ 3581 assert( tr_isSession( session ) ); 3582 assert( tr_isDirection( dir ) ); 3583 3584 if( tr_sessionGetQueueEnabled( session, dir ) ) 3585 { 3586 int i; 3587 const int n = tr_sessionCountQueueFreeSlots( session, dir ); 3588 for( i=0; i<n; i++ ) { 3589 tr_torrent * tor = tr_sessionGetNextQueuedTorrent( session, dir ); 3590 if( tor != NULL ) { 3591 tr_torrentStartNow( tor ); 3592 if( tor->queue_started_callback != NULL ) 3593 (*tor->queue_started_callback)( tor, tor->queue_started_user_data ); 3594 } 3595 } 3596 } 3597} 3598 3599static void 3600bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr ) 3601{ 3602 tr_torrent * tor; 3603 tr_peerMgr * mgr = vmgr; 3604 tr_session * session = mgr->session; 3605 managerLock( mgr ); 3606 3607 /* FIXME: this next line probably isn't necessary... */ 3608 pumpAllPeers( mgr ); 3609 3610 /* allocate bandwidth to the peers */ 3611 tr_bandwidthAllocate( &session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC ); 3612 tr_bandwidthAllocate( &session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC ); 3613 3614 /* torrent upkeep */ 3615 tor = NULL; 3616 while(( tor = tr_torrentNext( session, tor ))) 3617 { 3618 /* possibly stop torrents that have seeded enough */ 3619 tr_torrentCheckSeedLimit( tor ); 3620 3621 /* run the completeness check for any torrents that need it */ 3622 if( tor->torrentPeers->needsCompletenessCheck ) { 3623 tor->torrentPeers->needsCompletenessCheck = false; 3624 tr_torrentRecheckCompleteness( tor ); 3625 } 3626 3627 /* stop torrents that are ready to stop, but couldn't be stopped 3628 earlier during the peer-io callback call chain */ 3629 if( tor->isStopping ) 3630 tr_torrentStop( tor ); 3631 } 3632 3633 /* pump the queues */ 3634 queuePulse( session, TR_UP ); 3635 queuePulse( session, TR_DOWN ); 3636 3637 reconnectPulse( 0, 0, mgr ); 3638 3639 tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC ); 3640 managerUnlock( mgr ); 3641} 3642 3643/*** 3644**** 3645***/ 3646 3647static int 3648compareAtomPtrsByAddress( const void * va, const void *vb ) 3649{ 3650 const struct peer_atom * a = * (const struct peer_atom**) va; 3651 const struct peer_atom * b = * (const struct peer_atom**) vb; 3652 3653 assert( tr_isAtom( a ) ); 3654 assert( tr_isAtom( b ) ); 3655 3656 return tr_address_compare( &a->addr, &b->addr ); 3657} 3658 3659/* best come first, worst go last */ 3660static int 3661compareAtomPtrsByShelfDate( const void * va, const void *vb ) 3662{ 3663 time_t atime; 3664 time_t btime; 3665 const struct peer_atom * a = * (const struct peer_atom**) va; 3666 const struct peer_atom * b = * (const struct peer_atom**) vb; 3667 const int data_time_cutoff_secs = 60 * 60; 3668 const time_t tr_now = tr_time( ); 3669 3670 assert( tr_isAtom( a ) ); 3671 assert( tr_isAtom( b ) ); 3672 3673 /* primary key: the last piece data time *if* it was within the last hour */ 3674 atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0; 3675 btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0; 3676 if( atime != btime ) 3677 return atime > btime ? -1 : 1; 3678 3679 /* secondary key: shelf date. */ 3680 if( a->shelf_date != b->shelf_date ) 3681 return a->shelf_date > b->shelf_date ? -1 : 1; 3682 3683 return 0; 3684} 3685 3686static int 3687getMaxAtomCount( const tr_torrent * tor ) 3688{ 3689 const int n = tor->maxConnectedPeers; 3690 /* approximate fit of the old jump discontinuous function */ 3691 if( n >= 55 ) return n + 150; 3692 if( n >= 20 ) return 2 * n + 95; 3693 return 4 * n + 55; 3694} 3695 3696static void 3697atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr ) 3698{ 3699 tr_torrent * tor = NULL; 3700 tr_peerMgr * mgr = vmgr; 3701 managerLock( mgr ); 3702 3703 while(( tor = tr_torrentNext( mgr->session, tor ))) 3704 { 3705 int atomCount; 3706 Torrent * t = tor->torrentPeers; 3707 const int maxAtomCount = getMaxAtomCount( tor ); 3708 struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount ); 3709 3710 if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */ 3711 { 3712 int i; 3713 int keepCount = 0; 3714 int testCount = 0; 3715 struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount ); 3716 struct peer_atom ** test = tr_new( struct peer_atom*, atomCount ); 3717 3718 /* keep the ones that are in use */ 3719 for( i=0; i<atomCount; ++i ) { 3720 struct peer_atom * atom = atoms[i]; 3721 if( peerIsInUse( t, atom ) ) 3722 keep[keepCount++] = atom; 3723 else 3724 test[testCount++] = atom; 3725 } 3726 3727 /* if there's room, keep the best of what's left */ 3728 i = 0; 3729 if( keepCount < maxAtomCount ) { 3730 qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate ); 3731 while( i<testCount && keepCount<maxAtomCount ) 3732 keep[keepCount++] = test[i++]; 3733 } 3734 3735 /* free the culled atoms */ 3736 while( i<testCount ) 3737 tr_free( test[i++] ); 3738 3739 /* rebuild Torrent.pool with what's left */ 3740 tr_ptrArrayDestruct( &t->pool, NULL ); 3741 t->pool = TR_PTR_ARRAY_INIT; 3742 qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress ); 3743 for( i=0; i<keepCount; ++i ) 3744 tr_ptrArrayAppend( &t->pool, keep[i] ); 3745 3746 tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount ); 3747 3748 /* cleanup */ 3749 tr_free( test ); 3750 tr_free( keep ); 3751 } 3752 } 3753 3754 tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC ); 3755 managerUnlock( mgr ); 3756} 3757 3758/*** 3759**** 3760**** 3761**** 3762***/ 3763 3764/* is this atom someone that we'd want to initiate a connection to? */ 3765static bool 3766isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now ) 3767{ 3768 /* not if we're both seeds */ 3769 if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) ) 3770 return false; 3771 3772 /* not if we've already got a connection to them... */ 3773 if( peerIsInUse( tor->torrentPeers, atom ) ) 3774 return false; 3775 3776 /* not if we just tried them already */ 3777 if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) ) 3778 return false; 3779 3780 /* not if they're blocklisted */ 3781 if( isAtomBlocklisted( tor->session, atom ) ) 3782 return false; 3783 3784 /* not if they're banned... */ 3785 if( atom->flags2 & MYFLAG_BANNED ) 3786 return false; 3787 3788 return true; 3789} 3790 3791struct peer_candidate 3792{ 3793 uint64_t score; 3794 tr_torrent * tor; 3795 struct peer_atom * atom; 3796}; 3797 3798static bool 3799torrentWasRecentlyStarted( const tr_torrent * tor ) 3800{ 3801 return difftime( tr_time( ), tor->startDate ) < 120; 3802} 3803 3804static inline uint64_t 3805addValToKey( uint64_t value, int width, uint64_t addme ) 3806{ 3807 value = (value << (uint64_t)width); 3808 value |= addme; 3809 return value; 3810} 3811 3812/* smaller value is better */ 3813static uint64_t 3814getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt ) 3815{ 3816 uint64_t i; 3817 uint64_t score = 0; 3818 const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt; 3819 3820 /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */ 3821 i = failed ? 1 : 0; 3822 score = addValToKey( score, 1, i ); 3823 3824 /* prefer the one we attempted least recently (to cycle through all peers) */ 3825 i = atom->lastConnectionAttemptAt; 3826 score = addValToKey( score, 32, i ); 3827 3828 /* prefer peers belonging to a torrent of a higher priority */ 3829 switch( tr_torrentGetPriority( tor ) ) { 3830 case TR_PRI_HIGH: i = 0; break; 3831 case TR_PRI_NORMAL: i = 1; break; 3832 case TR_PRI_LOW: i = 2; break; 3833 } 3834 score = addValToKey( score, 4, i ); 3835 3836 /* prefer recently-started torrents */ 3837 i = torrentWasRecentlyStarted( tor ) ? 0 : 1; 3838 score = addValToKey( score, 1, i ); 3839 3840 /* prefer torrents we're downloading with */ 3841 i = tr_torrentIsSeed( tor ) ? 1 : 0; 3842 score = addValToKey( score, 1, i ); 3843 3844 /* prefer peers that are known to be connectible */ 3845 i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1; 3846 score = addValToKey( score, 1, i ); 3847 3848 /* prefer peers that we might have a chance of uploading to... 3849 so lower seed probability is better */ 3850 if( atom->seedProbability == 100 ) i = 101; 3851 else if( atom->seedProbability == -1 ) i = 100; 3852 else i = atom->seedProbability; 3853 score = addValToKey( score, 8, i ); 3854 3855 /* Prefer peers that we got from more trusted sources. 3856 * lower `fromBest' values indicate more trusted sources */ 3857 score = addValToKey( score, 4, atom->fromBest ); 3858 3859 /* salt */ 3860 score = addValToKey( score, 8, salt ); 3861 3862 return score; 3863} 3864 3865#ifndef NDEBUG 3866static int 3867checkPartition( const struct peer_candidate * candidates, int left, int right, uint64_t pivotScore, int storeIndex ) 3868{ 3869 int i; 3870 3871 assert( storeIndex >= left ); 3872 assert( storeIndex <= right ); 3873 assert( candidates[storeIndex].score == pivotScore ); 3874 3875 for( i=left; i<storeIndex; ++i ) 3876 assert( candidates[i].score < pivotScore ); 3877 for( i=storeIndex+1; i<=right; ++i ) 3878 assert( candidates[i].score >= pivotScore ); 3879 3880 return true; 3881} 3882#endif 3883 3884/* Helper to selectBestCandidates(). 3885 * Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */ 3886static int 3887partitionPeerCandidates( struct peer_candidate * candidates, int left, int right, int pivotIndex ) 3888{ 3889 int i; 3890 int storeIndex; 3891 struct peer_candidate tmp; 3892 const struct peer_candidate pivotValue = candidates[pivotIndex]; 3893 3894 /* move pivot to end */ 3895 tmp = candidates[right]; 3896 candidates[right] = pivotValue; 3897 candidates[pivotIndex] = tmp; 3898 3899 storeIndex = left; 3900 for( i=left; i<=right; ++i ) 3901 { 3902 if( candidates[i].score < pivotValue.score ) 3903 { 3904 tmp = candidates[storeIndex]; 3905 candidates[storeIndex] = candidates[i]; 3906 candidates[i] = tmp; 3907 storeIndex++; 3908 } 3909 } 3910 3911 /* move pivot to its final place */ 3912 tmp = candidates[right]; 3913 candidates[right] = candidates[storeIndex]; 3914 candidates[storeIndex] = tmp; 3915 3916 /* sanity check */ 3917 assert( checkPartition( candidates, left, right, pivotValue.score, storeIndex ) ); 3918 3919 return storeIndex; 3920} 3921 3922/* Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */ 3923static void 3924selectPeerCandidates( struct peer_candidate * candidates, int left, int right, int k ) 3925{ 3926 if( right > left ) 3927 { 3928 const int pivotIndex = left + (right-left)/2; 3929 3930 int pivotNewIndex = partitionPeerCandidates( candidates, left, right, pivotIndex ); 3931 3932 if( pivotNewIndex > left + k ) /* new condition */ 3933 selectPeerCandidates( candidates, left, pivotNewIndex-1, k ); 3934 else if( pivotNewIndex < left + k ) 3935 selectPeerCandidates( candidates, pivotNewIndex+1, right, k+left-pivotNewIndex-1 ); 3936 } 3937} 3938 3939#ifndef NDEBUG 3940static bool 3941checkBestScoresComeFirst( const struct peer_candidate * candidates, int n, int k ) 3942{ 3943 int i; 3944 uint64_t worstFirstScore = 0; 3945 const int x = MIN( n, k ) - 1; 3946 3947 for( i=0; i<x; i++ ) 3948 if( worstFirstScore < candidates[i].score ) 3949 worstFirstScore = candidates[i].score; 3950 3951 for( i=0; i<x; i++ ) 3952 assert( candidates[i].score <= worstFirstScore ); 3953 3954 for( i=x+1; i<n; i++ ) 3955 assert( candidates[i].score >= worstFirstScore ); 3956 3957 return true; 3958} 3959#endif /* NDEBUG */ 3960 3961/** @return an array of all the atoms we might want to connect to */ 3962static struct peer_candidate* 3963getPeerCandidates( tr_session * session, int * candidateCount, int max ) 3964{ 3965 int atomCount; 3966 int peerCount; 3967 tr_torrent * tor; 3968 struct peer_candidate * candidates; 3969 struct peer_candidate * walk; 3970 const time_t now = tr_time( ); 3971 const uint64_t now_msec = tr_time_msec( ); 3972 /* leave 5% of connection slots for incoming connections -- ticket #2609 */ 3973 const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95; 3974 3975 /* count how many peers and atoms we've got */ 3976 tor= NULL; 3977 atomCount = 0; 3978 peerCount = 0; 3979 while(( tor = tr_torrentNext( session, tor ))) { 3980 atomCount += tr_ptrArraySize( &tor->torrentPeers->pool ); 3981 peerCount += tr_ptrArraySize( &tor->torrentPeers->peers ); 3982 } 3983 3984 /* don't start any new handshakes if we're full up */ 3985 if( maxCandidates <= peerCount ) { 3986 *candidateCount = 0; 3987 return NULL; 3988 } 3989 3990 /* allocate an array of candidates */ 3991 walk = candidates = tr_new( struct peer_candidate, atomCount ); 3992 3993 /* populate the candidate array */ 3994 tor = NULL; 3995 while(( tor = tr_torrentNext( session, tor ))) 3996 { 3997 int i, nAtoms; 3998 struct peer_atom ** atoms; 3999 4000 if( !tor->torrentPeers->isRunning ) 4001 continue; 4002 4003 /* if we've already got enough peers in this torrent... */ 4004 if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) ) 4005 continue; 4006 4007 /* if we've already got enough speed in this torrent... */ 4008 if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) ) 4009 continue; 4010 4011 atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms ); 4012 for( i=0; i<nAtoms; ++i ) 4013 { 4014 struct peer_atom * atom = atoms[i]; 4015 4016 if( isPeerCandidate( tor, atom, now ) ) 4017 { 4018 const uint8_t salt = tr_cryptoWeakRandInt( 1024 ); 4019 walk->tor = tor; 4020 walk->atom = atom; 4021 walk->score = getPeerCandidateScore( tor, atom, salt ); 4022 ++walk; 4023 } 4024 } 4025 } 4026 4027 *candidateCount = walk - candidates; 4028 if( walk != candidates ) 4029 selectPeerCandidates( candidates, 0, (walk-candidates)-1, max ); 4030 4031 assert( checkBestScoresComeFirst( candidates, *candidateCount, max ) ); 4032 4033 return candidates; 4034} 4035 4036static void 4037initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom ) 4038{ 4039 tr_peerIo * io; 4040 const time_t now = tr_time( ); 4041 bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed; 4042 4043 if( atom->fromFirst == TR_PEER_FROM_PEX ) 4044 /* PEX has explicit signalling for uTP support. If an atom 4045 originally came from PEX and doesn't have the uTP flag, skip the 4046 uTP connection attempt. Are we being optimistic here? */ 4047 utp = utp && (atom->flags & ADDED_F_UTP_FLAGS); 4048 4049 tordbg( t, "Starting an OUTGOING%s connection with %s", 4050 utp ? " ��TP" : "", 4051 tr_atomAddrStr( atom ) ); 4052 4053 io = tr_peerIoNewOutgoing( mgr->session, 4054 &mgr->session->bandwidth, 4055 &atom->addr, 4056 atom->port, 4057 t->tor->info.hash, 4058 t->tor->completeness == TR_SEED, 4059 utp ); 4060 4061 if( io == NULL ) 4062 { 4063 tordbg( t, "peerIo not created; marking peer %s as unreachable", 4064 tr_atomAddrStr( atom ) ); 4065 atom->flags2 |= MYFLAG_UNREACHABLE; 4066 atom->numFails++; 4067 } 4068 else 4069 { 4070 tr_handshake * handshake = tr_handshakeNew( io, 4071 mgr->session->encryptionMode, 4072 myHandshakeDoneCB, 4073 mgr ); 4074 4075 assert( tr_peerIoGetTorrentHash( io ) ); 4076 4077 tr_peerIoUnref( io ); /* balanced by the initial ref 4078 in tr_peerIoNewOutgoing() */ 4079 4080 tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake, 4081 handshakeCompare ); 4082 } 4083 4084 atom->lastConnectionAttemptAt = now; 4085 atom->time = now; 4086} 4087 4088static void 4089initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c ) 4090{ 4091#if 0 4092 fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n", 4093 tr_atomAddrStr( c->atom ), 4094 tr_torrentName( c->tor ), 4095 (int)c->atom->seedProbability, 4096 tr_torrentIsPrivate( c->tor ) ? "private" : "public", 4097 tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" ); 4098#endif 4099 4100 initiateConnection( mgr, c->tor->torrentPeers, c->atom ); 4101} 4102 4103static void 4104makeNewPeerConnections( struct tr_peerMgr * mgr, const int max ) 4105{ 4106 int i, n; 4107 struct peer_candidate * candidates; 4108 4109 candidates = getPeerCandidates( mgr->session, &n, max ); 4110 4111 for( i=0; i<n && i<max; ++i ) 4112 initiateCandidateConnection( mgr, &candidates[i] ); 4113 4114 tr_free( candidates ); 4115} 4116