• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /netgear-R7000-V1.0.7.12_1.2.5/ap/gpl/transmission/transmission-2.73/libtransmission/
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-mgr.c 13549 2012-10-05 22:04:08Z jordan $
11 */
12
13#include <assert.h>
14#include <errno.h> /* error codes ERANGE, ... */
15#include <limits.h> /* INT_MAX */
16#include <string.h> /* memcpy, memcmp, strstr */
17#include <stdlib.h> /* qsort */
18
19#include <event2/event.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "announcer.h"
25#include "bandwidth.h"
26#include "blocklist.h"
27#include "cache.h"
28#include "clients.h"
29#include "completion.h"
30#include "crypto.h"
31#include "handshake.h"
32#include "net.h"
33#include "peer-io.h"
34#include "peer-mgr.h"
35#include "peer-msgs.h"
36#include "ptrarray.h"
37#include "session.h"
38#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
39#include "torrent.h"
40#include "tr-utp.h"
41#include "utils.h"
42#include "webseed.h"
43
44enum
45{
46    /* how frequently to cull old atoms */
47    ATOM_PERIOD_MSEC = ( 60 * 1000 ),
48
49    /* how frequently to change which peers are choked */
50    RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
51
52    /* an optimistically unchoked peer is immune from rechoking
53       for this many calls to rechokeUploads(). */
54    OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
55
56    /* how frequently to reallocate bandwidth */
57    BANDWIDTH_PERIOD_MSEC = 500,
58
59    /* how frequently to age out old piece request lists */
60    REFILL_UPKEEP_PERIOD_MSEC = ( 10 * 1000 ),
61
62    /* how frequently to decide which peers live and die */
63    RECONNECT_PERIOD_MSEC = 500,
64
65    /* when many peers are available, keep idle ones this long */
66    MIN_UPLOAD_IDLE_SECS = ( 60 ),
67
68    /* when few peers are available, keep idle ones this long */
69    MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
70
71    /* max number of peers to ask for per second overall.
72     * this throttle is to avoid overloading the router */
73    MAX_CONNECTIONS_PER_SECOND = 12,
74
75    MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
76
77    /* number of bad pieces a peer is allowed to send before we ban them */
78    MAX_BAD_PIECES_PER_PEER = 5,
79
80    /* amount of time to keep a list of request pieces lying around
81       before it's considered too old and needs to be rebuilt */
82    PIECE_LIST_SHELF_LIFE_SECS = 60,
83
84    /* use for bitwise operations w/peer_atom.flags2 */
85    MYFLAG_BANNED = 1,
86
87    /* use for bitwise operations w/peer_atom.flags2 */
88    /* unreachable for now... but not banned.
89     * if they try to connect to us it's okay */
90    MYFLAG_UNREACHABLE = 2,
91
92    /* the minimum we'll wait before attempting to reconnect to a peer */
93    MINIMUM_RECONNECT_INTERVAL_SECS = 5,
94
95    /** how long we'll let requests we've made linger before we cancel them */
96    REQUEST_TTL_SECS = 120,
97
98    NO_BLOCKS_CANCEL_HISTORY = 120,
99
100    CANCEL_HISTORY_SEC = 60
101};
102
103const tr_peer_event TR_PEER_EVENT_INIT = { 0, 0, NULL, 0, 0, 0, false, 0 };
104
105/**
106***
107**/
108
109/**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peermsgs
117 */
118struct peer_atom
119{
120    uint8_t     fromFirst;          /* where the peer was first found */
121    uint8_t     fromBest;           /* the "best" value of where the peer has been found */
122    uint8_t     flags;              /* these match the added_f flags */
123    uint8_t     flags2;             /* flags that aren't defined in added_f */
124    int8_t      seedProbability;    /* how likely is this to be a seed... [0..100] or -1 for unknown */
125    int8_t      blocklisted;        /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127    tr_port     port;
128    bool        utp_failed;         /* We recently failed to connect over uTP */
129    uint16_t    numFails;
130    time_t      time;               /* when the peer's connection status last changed */
131    time_t      piece_data_time;
132
133    time_t      lastConnectionAttemptAt;
134    time_t      lastConnectionAt;
135
136    /* similar to a TTL field, but less rigid --
137     * if the swarm is small, the atom will be kept past this date. */
138    time_t      shelf_date;
139    tr_peer   * peer;               /* will be NULL if not connected */
140    tr_address  addr;
141};
142
143#ifdef NDEBUG
144#define tr_isAtom(a) (TRUE)
145#else
146static bool
147tr_isAtom( const struct peer_atom * atom )
148{
149    return ( atom != NULL )
150        && ( atom->fromFirst < TR_PEER_FROM__MAX )
151        && ( atom->fromBest < TR_PEER_FROM__MAX )
152        && ( tr_address_is_valid( &atom->addr ) );
153}
154#endif
155
156static const char*
157tr_atomAddrStr( const struct peer_atom * atom )
158{
159    return atom ? tr_peerIoAddrStr( &atom->addr, atom->port ) : "[no atom]";
160}
161
162struct block_request
163{
164    tr_block_index_t block;
165    tr_peer * peer;
166    time_t sentAt;
167};
168
169struct weighted_piece
170{
171    tr_piece_index_t index;
172    int16_t salt;
173    int16_t requestCount;
174};
175
176enum piece_sort_state
177{
178    PIECES_UNSORTED,
179    PIECES_SORTED_BY_INDEX,
180    PIECES_SORTED_BY_WEIGHT
181};
182
183/** @brief Opaque, per-torrent data structure for peer connection information */
184typedef struct tr_torrent_peers
185{
186    tr_ptrArray                outgoingHandshakes; /* tr_handshake */
187    tr_ptrArray                pool; /* struct peer_atom */
188    tr_ptrArray                peers; /* tr_peer */
189    tr_ptrArray                webseeds; /* tr_webseed */
190
191    tr_torrent               * tor;
192    struct tr_peerMgr        * manager;
193
194    tr_peer                  * optimistic; /* the optimistic peer, or NULL if none */
195    int                        optimisticUnchokeTimeScaler;
196
197    bool                       isRunning;
198    bool                       needsCompletenessCheck;
199
200    struct block_request     * requests;
201    int                        requestCount;
202    int                        requestAlloc;
203
204    struct weighted_piece    * pieces;
205    int                        pieceCount;
206    enum piece_sort_state      pieceSortState;
207
208    /* An array of pieceCount items stating how many peers have each piece.
209       This is used to help us for downloading pieces "rarest first."
210       This may be NULL if we don't have metainfo yet, or if we're not
211       downloading and don't care about rarity */
212    uint16_t                 * pieceReplication;
213    size_t                     pieceReplicationSize;
214
215    int                        interestedCount;
216    int                        maxPeers;
217    time_t                     lastCancel;
218
219    /* Before the endgame this should be 0. In endgame, is contains the average
220     * number of pending requests per peer. Only peers which have more pending
221     * requests are considered 'fast' are allowed to request a block that's
222     * already been requested from another (slower?) peer. */
223    int                        endgame;
224}
225Torrent;
226
227struct tr_peerMgr
228{
229    tr_session    * session;
230    tr_ptrArray     incomingHandshakes; /* tr_handshake */
231    struct event  * bandwidthTimer;
232    struct event  * rechokeTimer;
233    struct event  * refillUpkeepTimer;
234    struct event  * atomTimer;
235};
236
237#define tordbg( t, ... ) \
238    do { \
239        if( tr_deepLoggingIsActive( ) ) \
240            tr_deepLog( __FILE__, __LINE__, tr_torrentName( t->tor ), __VA_ARGS__ ); \
241    } while( 0 )
242
243#define dbgmsg( ... ) \
244    do { \
245        if( tr_deepLoggingIsActive( ) ) \
246            tr_deepLog( __FILE__, __LINE__, NULL, __VA_ARGS__ ); \
247    } while( 0 )
248
249/**
250***
251**/
252
253static inline void
254managerLock( const struct tr_peerMgr * manager )
255{
256    tr_sessionLock( manager->session );
257}
258
259static inline void
260managerUnlock( const struct tr_peerMgr * manager )
261{
262    tr_sessionUnlock( manager->session );
263}
264
265static inline void
266torrentLock( Torrent * torrent )
267{
268    managerLock( torrent->manager );
269}
270
271static inline void
272torrentUnlock( Torrent * torrent )
273{
274    managerUnlock( torrent->manager );
275}
276
277static inline int
278torrentIsLocked( const Torrent * t )
279{
280    return tr_sessionIsLocked( t->manager->session );
281}
282
283/**
284***
285**/
286
287static int
288handshakeCompareToAddr( const void * va, const void * vb )
289{
290    const tr_handshake * a = va;
291
292    return tr_address_compare( tr_handshakeGetAddr( a, NULL ), vb );
293}
294
295static int
296handshakeCompare( const void * a, const void * b )
297{
298    return handshakeCompareToAddr( a, tr_handshakeGetAddr( b, NULL ) );
299}
300
301static inline tr_handshake*
302getExistingHandshake( tr_ptrArray * handshakes, const tr_address * addr )
303{
304    if( tr_ptrArrayEmpty( handshakes ) )
305        return NULL;
306
307    return tr_ptrArrayFindSorted( handshakes, addr, handshakeCompareToAddr );
308}
309
310static int
311comparePeerAtomToAddress( const void * va, const void * vb )
312{
313    const struct peer_atom * a = va;
314
315    return tr_address_compare( &a->addr, vb );
316}
317
318static int
319compareAtomsByAddress( const void * va, const void * vb )
320{
321    const struct peer_atom * b = vb;
322
323    assert( tr_isAtom( b ) );
324
325    return comparePeerAtomToAddress( va, &b->addr );
326}
327
328/**
329***
330**/
331
332const tr_address *
333tr_peerAddress( const tr_peer * peer )
334{
335    return &peer->atom->addr;
336}
337
338static Torrent*
339getExistingTorrent( tr_peerMgr *    manager,
340                    const uint8_t * hash )
341{
342    tr_torrent * tor = tr_torrentFindFromHash( manager->session, hash );
343
344    return tor == NULL ? NULL : tor->torrentPeers;
345}
346
347static int
348peerCompare( const void * a, const void * b )
349{
350    return tr_address_compare( tr_peerAddress( a ), tr_peerAddress( b ) );
351}
352
353static struct peer_atom*
354getExistingAtom( const Torrent    * t,
355                 const tr_address * addr )
356{
357    Torrent * tt = (Torrent*)t;
358    assert( torrentIsLocked( t ) );
359    return tr_ptrArrayFindSorted( &tt->pool, addr, comparePeerAtomToAddress );
360}
361
362static bool
363peerIsInUse( const Torrent * ct, const struct peer_atom * atom )
364{
365    Torrent * t = (Torrent*) ct;
366
367    assert( torrentIsLocked ( t ) );
368
369    return ( atom->peer != NULL )
370        || getExistingHandshake( &t->outgoingHandshakes, &atom->addr )
371        || getExistingHandshake( &t->manager->incomingHandshakes, &atom->addr );
372}
373
374void
375tr_peerConstruct( tr_peer * peer )
376{
377    memset( peer, 0, sizeof( tr_peer ) );
378
379    peer->have = TR_BITFIELD_INIT;
380}
381
382static tr_peer*
383peerNew( struct peer_atom * atom )
384{
385    tr_peer * peer = tr_new( tr_peer, 1 );
386    tr_peerConstruct( peer );
387
388    peer->atom = atom;
389    atom->peer = peer;
390
391    return peer;
392}
393
394static tr_peer*
395getPeer( Torrent * torrent, struct peer_atom * atom )
396{
397    tr_peer * peer;
398
399    assert( torrentIsLocked( torrent ) );
400
401    peer = atom->peer;
402
403    if( peer == NULL )
404    {
405        peer = peerNew( atom );
406        tr_bitfieldConstruct( &peer->have, torrent->tor->info.pieceCount );
407        tr_bitfieldConstruct( &peer->blame, torrent->tor->blockCount );
408        tr_ptrArrayInsertSorted( &torrent->peers, peer, peerCompare );
409    }
410
411    return peer;
412}
413
414static void peerDeclinedAllRequests( Torrent *, const tr_peer * );
415
416void
417tr_peerDestruct( tr_torrent * tor, tr_peer * peer )
418{
419    assert( peer != NULL );
420
421    peerDeclinedAllRequests( tor->torrentPeers, peer );
422
423    if( peer->msgs != NULL )
424        tr_peerMsgsFree( peer->msgs );
425
426    if( peer->io ) {
427        tr_peerIoClear( peer->io );
428        tr_peerIoUnref( peer->io ); /* balanced by the ref in handshakeDoneCB() */
429    }
430
431    tr_bitfieldDestruct( &peer->have );
432    tr_bitfieldDestruct( &peer->blame );
433    tr_free( peer->client );
434
435    if( peer->atom )
436        peer->atom->peer = NULL;
437}
438
439static void
440peerDelete( Torrent * t, tr_peer * peer )
441{
442    tr_peerDestruct( t->tor, peer );
443    tr_free( peer );
444}
445
446static bool
447replicationExists( const Torrent * t )
448{
449    return t->pieceReplication != NULL;
450}
451
452static void
453replicationFree( Torrent * t )
454{
455    tr_free( t->pieceReplication );
456    t->pieceReplication = NULL;
457    t->pieceReplicationSize = 0;
458}
459
460static void
461replicationNew( Torrent * t )
462{
463    tr_piece_index_t piece_i;
464    const tr_piece_index_t piece_count = t->tor->info.pieceCount;
465    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
466    const int peer_count = tr_ptrArraySize( &t->peers );
467
468    assert( !replicationExists( t ) );
469
470    t->pieceReplicationSize = piece_count;
471    t->pieceReplication = tr_new0( uint16_t, piece_count );
472
473    for( piece_i=0; piece_i<piece_count; ++piece_i )
474    {
475        int peer_i;
476        uint16_t r = 0;
477
478        for( peer_i=0; peer_i<peer_count; ++peer_i )
479            if( tr_bitfieldHas( &peers[peer_i]->have, piece_i ) )
480                ++r;
481
482        t->pieceReplication[piece_i] = r;
483    }
484}
485
486static void
487torrentFree( void * vt )
488{
489    Torrent * t = vt;
490
491    assert( t );
492    assert( !t->isRunning );
493    assert( torrentIsLocked( t ) );
494    assert( tr_ptrArrayEmpty( &t->outgoingHandshakes ) );
495    assert( tr_ptrArrayEmpty( &t->peers ) );
496
497    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
498    tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
499    tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
500    tr_ptrArrayDestruct( &t->peers, NULL );
501
502    replicationFree( t );
503
504    tr_free( t->requests );
505    tr_free( t->pieces );
506    tr_free( t );
507}
508
509static void peerCallbackFunc( tr_peer *, const tr_peer_event *, void * );
510
511static void
512rebuildWebseedArray( Torrent * t, tr_torrent * tor )
513{
514    int i;
515    const tr_info * inf = &tor->info;
516
517    /* clear the array */
518    tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
519    t->webseeds = TR_PTR_ARRAY_INIT;
520
521    /* repopulate it */
522    for( i = 0; i < inf->webseedCount; ++i )
523    {
524        tr_webseed * w = tr_webseedNew( tor, inf->webseeds[i], peerCallbackFunc, t );
525        tr_ptrArrayAppend( &t->webseeds, w );
526    }
527}
528
529static Torrent*
530torrentNew( tr_peerMgr * manager, tr_torrent * tor )
531{
532    Torrent * t;
533
534    t = tr_new0( Torrent, 1 );
535    t->manager = manager;
536    t->tor = tor;
537    t->pool = TR_PTR_ARRAY_INIT;
538    t->peers = TR_PTR_ARRAY_INIT;
539    t->webseeds = TR_PTR_ARRAY_INIT;
540    t->outgoingHandshakes = TR_PTR_ARRAY_INIT;
541
542    rebuildWebseedArray( t, tor );
543
544    return t;
545}
546
547static void ensureMgrTimersExist( struct tr_peerMgr * m );
548
549tr_peerMgr*
550tr_peerMgrNew( tr_session * session )
551{
552    tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
553    m->session = session;
554    m->incomingHandshakes = TR_PTR_ARRAY_INIT;
555    ensureMgrTimersExist( m );
556    return m;
557}
558
559static void
560deleteTimer( struct event ** t )
561{
562    if( *t != NULL )
563    {
564        event_free( *t );
565        *t = NULL;
566    }
567}
568
569static void
570deleteTimers( struct tr_peerMgr * m )
571{
572    deleteTimer( &m->atomTimer );
573    deleteTimer( &m->bandwidthTimer );
574    deleteTimer( &m->rechokeTimer );
575    deleteTimer( &m->refillUpkeepTimer );
576}
577
578void
579tr_peerMgrFree( tr_peerMgr * manager )
580{
581    managerLock( manager );
582
583    deleteTimers( manager );
584
585    /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
586     * the item from manager->handshakes, so this is a little roundabout... */
587    while( !tr_ptrArrayEmpty( &manager->incomingHandshakes ) )
588        tr_handshakeAbort( tr_ptrArrayNth( &manager->incomingHandshakes, 0 ) );
589
590    tr_ptrArrayDestruct( &manager->incomingHandshakes, NULL );
591
592    managerUnlock( manager );
593    tr_free( manager );
594}
595
596static int
597clientIsDownloadingFrom( const tr_torrent * tor, const tr_peer * peer )
598{
599    if( !tr_torrentHasMetadata( tor ) )
600        return true;
601
602    return peer->clientIsInterested && !peer->clientIsChoked;
603}
604
605static int
606clientIsUploadingTo( const tr_peer * peer )
607{
608    return peer->peerIsInterested && !peer->peerIsChoked;
609}
610
611/***
612****
613***/
614
615void
616tr_peerMgrOnBlocklistChanged( tr_peerMgr * mgr )
617{
618    tr_torrent * tor = NULL;
619    tr_session * session = mgr->session;
620
621    /* we cache whether or not a peer is blocklisted...
622       since the blocklist has changed, erase that cached value */
623    while(( tor = tr_torrentNext( session, tor )))
624    {
625        int i;
626        Torrent * t = tor->torrentPeers;
627        const int n = tr_ptrArraySize( &t->pool );
628        for( i=0; i<n; ++i ) {
629            struct peer_atom * atom = tr_ptrArrayNth( &t->pool, i );
630            atom->blocklisted = -1;
631        }
632    }
633}
634
635static bool
636isAtomBlocklisted( tr_session * session, struct peer_atom * atom )
637{
638    if( atom->blocklisted < 0 )
639        atom->blocklisted = tr_sessionIsAddressBlocked( session, &atom->addr );
640
641    assert( tr_isBool( atom->blocklisted ) );
642    return atom->blocklisted;
643}
644
645
646/***
647****
648***/
649
650static void
651atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
652{
653    assert( atom != NULL );
654    assert( -1<=seedProbability && seedProbability<=100 );
655
656    atom->seedProbability = seedProbability;
657
658    if( seedProbability == 100 )
659        atom->flags |= ADDED_F_SEED_FLAG;
660    else if( seedProbability != -1 )
661        atom->flags &= ~ADDED_F_SEED_FLAG;
662}
663
664static inline bool
665atomIsSeed( const struct peer_atom * atom )
666{
667    return atom->seedProbability == 100;
668}
669
670static void
671atomSetSeed( const Torrent * t, struct peer_atom * atom )
672{
673    if( !atomIsSeed( atom ) )
674    {
675        tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
676
677        atomSetSeedProbability( atom, 100 );
678    }
679}
680
681
682bool
683tr_peerMgrPeerIsSeed( const tr_torrent  * tor,
684                      const tr_address  * addr )
685{
686    bool isSeed = false;
687    const Torrent * t = tor->torrentPeers;
688    const struct peer_atom * atom = getExistingAtom( t, addr );
689
690    if( atom )
691        isSeed = atomIsSeed( atom );
692
693    return isSeed;
694}
695
696void
697tr_peerMgrSetUtpSupported( tr_torrent * tor, const tr_address * addr )
698{
699    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
700
701    if( atom )
702        atom->flags |= ADDED_F_UTP_FLAGS;
703}
704
705void
706tr_peerMgrSetUtpFailed( tr_torrent *tor, const tr_address *addr, bool failed )
707{
708    struct peer_atom * atom = getExistingAtom( tor->torrentPeers, addr );
709
710    if( atom )
711        atom->utp_failed = failed;
712}
713
714
715/**
716***  REQUESTS
717***
718*** There are two data structures associated with managing block requests:
719***
720*** 1. Torrent::requests, an array of "struct block_request" which keeps
721***    track of which blocks have been requested, and when, and by which peers.
722***    This is list is used for (a) cancelling requests that have been pending
723***    for too long and (b) avoiding duplicate requests before endgame.
724***
725*** 2. Torrent::pieces, an array of "struct weighted_piece" which lists the
726***    pieces that we want to request. It's used to decide which blocks to
727***    return next when tr_peerMgrGetBlockRequests() is called.
728**/
729
730/**
731*** struct block_request
732**/
733
734static int
735compareReqByBlock( const void * va, const void * vb )
736{
737    const struct block_request * a = va;
738    const struct block_request * b = vb;
739
740    /* primary key: block */
741    if( a->block < b->block ) return -1;
742    if( a->block > b->block ) return 1;
743
744    /* secondary key: peer */
745    if( a->peer < b->peer ) return -1;
746    if( a->peer > b->peer ) return 1;
747
748    return 0;
749}
750
751static void
752requestListAdd( Torrent * t, tr_block_index_t block, tr_peer * peer )
753{
754    struct block_request key;
755
756    /* ensure enough room is available... */
757    if( t->requestCount + 1 >= t->requestAlloc )
758    {
759        const int CHUNK_SIZE = 128;
760        t->requestAlloc += CHUNK_SIZE;
761        t->requests = tr_renew( struct block_request,
762                                t->requests, t->requestAlloc );
763    }
764
765    /* populate the record we're inserting */
766    key.block = block;
767    key.peer = peer;
768    key.sentAt = tr_time( );
769
770    /* insert the request to our array... */
771    {
772        bool exact;
773        const int pos = tr_lowerBound( &key, t->requests, t->requestCount,
774                                       sizeof( struct block_request ),
775                                       compareReqByBlock, &exact );
776        assert( !exact );
777        memmove( t->requests + pos + 1,
778                 t->requests + pos,
779                 sizeof( struct block_request ) * ( t->requestCount++ - pos ) );
780        t->requests[pos] = key;
781    }
782
783    if( peer != NULL )
784    {
785        ++peer->pendingReqsToPeer;
786        assert( peer->pendingReqsToPeer >= 0 );
787    }
788
789    /*fprintf( stderr, "added request of block %lu from peer %s... "
790                       "there are now %d block\n",
791                       (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
792}
793
794static struct block_request *
795requestListLookup( Torrent * t, tr_block_index_t block, const tr_peer * peer )
796{
797    struct block_request key;
798    key.block = block;
799    key.peer = (tr_peer*) peer;
800
801    return bsearch( &key, t->requests, t->requestCount,
802                    sizeof( struct block_request ),
803                    compareReqByBlock );
804}
805
806/**
807 * Find the peers are we currently requesting the block
808 * with index @a block from and append them to @a peerArr.
809 */
810static void
811getBlockRequestPeers( Torrent * t, tr_block_index_t block,
812                      tr_ptrArray * peerArr )
813{
814    bool exact;
815    int i, pos;
816    struct block_request key;
817
818    key.block = block;
819    key.peer = NULL;
820    pos = tr_lowerBound( &key, t->requests, t->requestCount,
821                         sizeof( struct block_request ),
822                         compareReqByBlock, &exact );
823
824    assert( !exact ); /* shouldn't have a request with .peer == NULL */
825
826    for( i = pos; i < t->requestCount; ++i )
827    {
828        if( t->requests[i].block != block )
829            break;
830        tr_ptrArrayAppend( peerArr, t->requests[i].peer );
831    }
832}
833
834static void
835decrementPendingReqCount( const struct block_request * b )
836{
837    if( b->peer != NULL )
838        if( b->peer->pendingReqsToPeer > 0 )
839            --b->peer->pendingReqsToPeer;
840}
841
842static void
843requestListRemove( Torrent * t, tr_block_index_t block, const tr_peer * peer )
844{
845    const struct block_request * b = requestListLookup( t, block, peer );
846    if( b != NULL )
847    {
848        const int pos = b - t->requests;
849        assert( pos < t->requestCount );
850
851        decrementPendingReqCount( b );
852
853        tr_removeElementFromArray( t->requests,
854                                   pos,
855                                   sizeof( struct block_request ),
856                                   t->requestCount-- );
857
858        /*fprintf( stderr, "removing request of block %lu from peer %s... "
859                           "there are now %d block requests left\n",
860                           (unsigned long)block, tr_atomAddrStr( peer->atom ), t->requestCount );*/
861    }
862}
863
864static int
865countActiveWebseeds( const Torrent * t )
866{
867    int activeCount = 0;
868    const tr_webseed ** w = (const tr_webseed **) tr_ptrArrayBase( &t->webseeds );
869    const tr_webseed ** const wend = w + tr_ptrArraySize( &t->webseeds );
870
871    for( ; w!=wend; ++w )
872        if( tr_webseedIsActive( *w ) )
873            ++activeCount;
874
875    return activeCount;
876}
877
878static bool
879testForEndgame( const Torrent * t )
880{
881    /* we consider ourselves to be in endgame if the number of bytes
882       we've got requested is >= the number of bytes left to download */
883    return ( t->requestCount * t->tor->blockSize )
884               >= tr_cpLeftUntilDone( &t->tor->completion );
885}
886
887static void
888updateEndgame( Torrent * t )
889{
890    assert( t->requestCount >= 0 );
891
892    if( !testForEndgame( t ) )
893    {
894        /* not in endgame */
895        t->endgame = 0;
896    }
897    else if( !t->endgame ) /* only recalculate when endgame first begins */
898    {
899        int numDownloading = 0;
900        const tr_peer ** p = (const tr_peer **) tr_ptrArrayBase( &t->peers );
901        const tr_peer ** const pend = p + tr_ptrArraySize( &t->peers );
902
903        /* add the active bittorrent peers... */
904        for( ; p!=pend; ++p )
905            if( (*p)->pendingReqsToPeer > 0 )
906                ++numDownloading;
907
908        /* add the active webseeds... */
909        numDownloading += countActiveWebseeds( t );
910
911        /* average number of pending requests per downloading peer */
912        t->endgame = t->requestCount / MAX( numDownloading, 1 );
913    }
914}
915
916
917/****
918*****
919*****  Piece List Manipulation / Accessors
920*****
921****/
922
923static inline void
924invalidatePieceSorting( Torrent * t )
925{
926    t->pieceSortState = PIECES_UNSORTED;
927}
928
929static const tr_torrent * weightTorrent;
930
931static const uint16_t * weightReplication;
932
933static void
934setComparePieceByWeightTorrent( Torrent * t )
935{
936    if( !replicationExists( t ) )
937        replicationNew( t );
938
939    weightTorrent = t->tor;
940    weightReplication = t->pieceReplication;
941}
942
943/* we try to create a "weight" s.t. high-priority pieces come before others,
944 * and that partially-complete pieces come before empty ones. */
945static int
946comparePieceByWeight( const void * va, const void * vb )
947{
948    const struct weighted_piece * a = va;
949    const struct weighted_piece * b = vb;
950    int ia, ib, missing, pending;
951    const tr_torrent * tor = weightTorrent;
952    const uint16_t * rep = weightReplication;
953
954    /* primary key: weight */
955    missing = tr_cpMissingBlocksInPiece( &tor->completion, a->index );
956    pending = a->requestCount;
957    ia = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
958    missing = tr_cpMissingBlocksInPiece( &tor->completion, b->index );
959    pending = b->requestCount;
960    ib = missing > pending ? missing - pending : (tor->blockCountInPiece + pending);
961    if( ia < ib ) return -1;
962    if( ia > ib ) return 1;
963
964    /* secondary key: higher priorities go first */
965    ia = tor->info.pieces[a->index].priority;
966    ib = tor->info.pieces[b->index].priority;
967    if( ia > ib ) return -1;
968    if( ia < ib ) return 1;
969
970    /* tertiary key: rarest first. */
971    ia = rep[a->index];
972    ib = rep[b->index];
973    if( ia < ib ) return -1;
974    if( ia > ib ) return 1;
975
976    /* quaternary key: random */
977    if( a->salt < b->salt ) return -1;
978    if( a->salt > b->salt ) return 1;
979
980    /* okay, they're equal */
981    return 0;
982}
983
984static int
985comparePieceByIndex( const void * va, const void * vb )
986{
987    const struct weighted_piece * a = va;
988    const struct weighted_piece * b = vb;
989    if( a->index < b->index ) return -1;
990    if( a->index > b->index ) return 1;
991    return 0;
992}
993
994static void
995pieceListSort( Torrent * t, enum piece_sort_state state )
996{
997    assert( state==PIECES_SORTED_BY_INDEX
998         || state==PIECES_SORTED_BY_WEIGHT );
999
1000
1001    if( state == PIECES_SORTED_BY_WEIGHT )
1002    {
1003        setComparePieceByWeightTorrent( t );
1004        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByWeight );
1005    }
1006    else
1007        qsort( t->pieces, t->pieceCount, sizeof( struct weighted_piece ), comparePieceByIndex );
1008
1009    t->pieceSortState = state;
1010}
1011
1012/**
1013 * These functions are useful for testing, but too expensive for nightly builds.
1014 * let's leave it disabled but add an easy hook to compile it back in
1015 */
1016#if 1
1017#define assertWeightedPiecesAreSorted(t)
1018#define assertReplicationCountIsExact(t)
1019#else
1020static void
1021assertWeightedPiecesAreSorted( Torrent * t )
1022{
1023    if( !t->endgame )
1024    {
1025        int i;
1026        setComparePieceByWeightTorrent( t );
1027        for( i=0; i<t->pieceCount-1; ++i )
1028            assert( comparePieceByWeight( &t->pieces[i], &t->pieces[i+1] ) <= 0 );
1029    }
1030}
1031static void
1032assertReplicationCountIsExact( Torrent * t )
1033{
1034    /* This assert might fail due to errors of implementations in other
1035     * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1036     * from a client. If a such a behavior is noticed,
1037     * a bug report should be filled to the faulty client. */
1038
1039    size_t piece_i;
1040    const uint16_t * rep = t->pieceReplication;
1041    const size_t piece_count = t->pieceReplicationSize;
1042    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
1043    const int peer_count = tr_ptrArraySize( &t->peers );
1044
1045    assert( piece_count == t->tor->info.pieceCount );
1046
1047    for( piece_i=0; piece_i<piece_count; ++piece_i )
1048    {
1049        int peer_i;
1050        uint16_t r = 0;
1051
1052        for( peer_i=0; peer_i<peer_count; ++peer_i )
1053            if( tr_bitsetHas( &peers[peer_i]->have, piece_i ) )
1054                ++r;
1055
1056        assert( rep[piece_i] == r );
1057    }
1058}
1059#endif
1060
1061static struct weighted_piece *
1062pieceListLookup( Torrent * t, tr_piece_index_t index )
1063{
1064    int i;
1065
1066    for( i=0; i<t->pieceCount; ++i )
1067        if( t->pieces[i].index == index )
1068            return &t->pieces[i];
1069
1070    return NULL;
1071}
1072
1073static void
1074pieceListRebuild( Torrent * t )
1075{
1076
1077    if( !tr_torrentIsSeed( t->tor ) )
1078    {
1079        tr_piece_index_t i;
1080        tr_piece_index_t * pool;
1081        tr_piece_index_t poolCount = 0;
1082        const tr_torrent * tor = t->tor;
1083        const tr_info * inf = tr_torrentInfo( tor );
1084        struct weighted_piece * pieces;
1085        int pieceCount;
1086
1087        /* build the new list */
1088        pool = tr_new( tr_piece_index_t, inf->pieceCount );
1089        for( i=0; i<inf->pieceCount; ++i )
1090            if( !inf->pieces[i].dnd )
1091                if( !tr_cpPieceIsComplete( &tor->completion, i ) )
1092                    pool[poolCount++] = i;
1093        pieceCount = poolCount;
1094        pieces = tr_new0( struct weighted_piece, pieceCount );
1095        for( i=0; i<poolCount; ++i ) {
1096            struct weighted_piece * piece = pieces + i;
1097            piece->index = pool[i];
1098            piece->requestCount = 0;
1099            piece->salt = tr_cryptoWeakRandInt( 4096 );
1100        }
1101
1102        /* if we already had a list of pieces, merge it into
1103         * the new list so we don't lose its requestCounts */
1104        if( t->pieces != NULL )
1105        {
1106            struct weighted_piece * o = t->pieces;
1107            struct weighted_piece * oend = o + t->pieceCount;
1108            struct weighted_piece * n = pieces;
1109            struct weighted_piece * nend = n + pieceCount;
1110
1111            pieceListSort( t, PIECES_SORTED_BY_INDEX );
1112
1113            while( o!=oend && n!=nend ) {
1114                if( o->index < n->index )
1115                    ++o;
1116                else if( o->index > n->index )
1117                    ++n;
1118                else
1119                    *n++ = *o++;
1120            }
1121
1122            tr_free( t->pieces );
1123        }
1124
1125        t->pieces = pieces;
1126        t->pieceCount = pieceCount;
1127
1128        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1129
1130        /* cleanup */
1131        tr_free( pool );
1132    }
1133}
1134
1135static void
1136pieceListRemovePiece( Torrent * t, tr_piece_index_t piece )
1137{
1138    struct weighted_piece * p;
1139
1140    if(( p = pieceListLookup( t, piece )))
1141    {
1142        const int pos = p - t->pieces;
1143
1144        tr_removeElementFromArray( t->pieces,
1145                                   pos,
1146                                   sizeof( struct weighted_piece ),
1147                                   t->pieceCount-- );
1148
1149        if( t->pieceCount == 0 )
1150        {
1151            tr_free( t->pieces );
1152            t->pieces = NULL;
1153        }
1154    }
1155}
1156
1157static void
1158pieceListResortPiece( Torrent * t, struct weighted_piece * p )
1159{
1160    int pos;
1161    bool isSorted = true;
1162
1163    if( p == NULL )
1164        return;
1165
1166    /* is the torrent already sorted? */
1167    pos = p - t->pieces;
1168    setComparePieceByWeightTorrent( t );
1169    if( isSorted && ( pos > 0 ) && ( comparePieceByWeight( p-1, p ) > 0 ) )
1170        isSorted = false;
1171    if( isSorted && ( pos < t->pieceCount - 1 ) && ( comparePieceByWeight( p, p+1 ) > 0 ) )
1172        isSorted = false;
1173
1174    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1175    {
1176       pieceListSort( t, PIECES_SORTED_BY_WEIGHT);
1177       isSorted = true;
1178    }
1179
1180    /* if it's not sorted, move it around */
1181    if( !isSorted )
1182    {
1183        bool exact;
1184        const struct weighted_piece tmp = *p;
1185
1186        tr_removeElementFromArray( t->pieces,
1187                                   pos,
1188                                   sizeof( struct weighted_piece ),
1189                                   t->pieceCount-- );
1190
1191        pos = tr_lowerBound( &tmp, t->pieces, t->pieceCount,
1192                             sizeof( struct weighted_piece ),
1193                             comparePieceByWeight, &exact );
1194
1195        memmove( &t->pieces[pos + 1],
1196                 &t->pieces[pos],
1197                 sizeof( struct weighted_piece ) * ( t->pieceCount++ - pos ) );
1198
1199        t->pieces[pos] = tmp;
1200    }
1201
1202    assertWeightedPiecesAreSorted( t );
1203}
1204
1205static void
1206pieceListRemoveRequest( Torrent * t, tr_block_index_t block )
1207{
1208    struct weighted_piece * p;
1209    const tr_piece_index_t index = tr_torBlockPiece( t->tor, block );
1210
1211    if( ((p = pieceListLookup( t, index ))) && ( p->requestCount > 0 ) )
1212    {
1213        --p->requestCount;
1214        pieceListResortPiece( t, p );
1215    }
1216}
1217
1218
1219/****
1220*****
1221*****  Replication count ( for rarest first policy )
1222*****
1223****/
1224
1225/**
1226 * Increase the replication count of this piece and sort it if the
1227 * piece list is already sorted
1228 */
1229static void
1230tr_incrReplicationOfPiece( Torrent * t, const size_t index )
1231{
1232    assert( replicationExists( t ) );
1233    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1234
1235    /* One more replication of this piece is present in the swarm */
1236    ++t->pieceReplication[index];
1237
1238    /* we only resort the piece if the list is already sorted */
1239    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1240        pieceListResortPiece( t, pieceListLookup( t, index ) );
1241}
1242
1243/**
1244 * Increases the replication count of pieces present in the bitfield
1245 */
1246static void
1247tr_incrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1248{
1249    size_t i;
1250    uint16_t * rep = t->pieceReplication;
1251    const size_t n = t->tor->info.pieceCount;
1252
1253    assert( replicationExists( t ) );
1254
1255    for( i=0; i<n; ++i )
1256        if( tr_bitfieldHas( b, i ) )
1257            ++rep[i];
1258
1259    if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1260        invalidatePieceSorting( t );
1261}
1262
1263/**
1264 * Increase the replication count of every piece
1265 */
1266static void
1267tr_incrReplication( Torrent * t )
1268{
1269    int i;
1270    const int n = t->pieceReplicationSize;
1271
1272    assert( replicationExists( t ) );
1273    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1274
1275    for( i=0; i<n; ++i )
1276        ++t->pieceReplication[i];
1277}
1278
1279/**
1280 * Decrease the replication count of pieces present in the bitset.
1281 */
1282static void
1283tr_decrReplicationFromBitfield( Torrent * t, const tr_bitfield * b )
1284{
1285    int i;
1286    const int n = t->pieceReplicationSize;
1287
1288    assert( replicationExists( t ) );
1289    assert( t->pieceReplicationSize == t->tor->info.pieceCount );
1290
1291    if( tr_bitfieldHasAll( b ) )
1292    {
1293        for( i=0; i<n; ++i )
1294            --t->pieceReplication[i];
1295    }
1296    else if ( !tr_bitfieldHasNone( b ) )
1297    {
1298        for( i=0; i<n; ++i )
1299            if( tr_bitfieldHas( b, i ) )
1300                --t->pieceReplication[i];
1301
1302        if( t->pieceSortState == PIECES_SORTED_BY_WEIGHT )
1303            invalidatePieceSorting( t );
1304    }
1305}
1306
1307/**
1308***
1309**/
1310
1311void
1312tr_peerMgrRebuildRequests( tr_torrent * tor )
1313{
1314    assert( tr_isTorrent( tor ) );
1315
1316    pieceListRebuild( tor->torrentPeers );
1317}
1318
1319void
1320tr_peerMgrGetNextRequests( tr_torrent           * tor,
1321                           tr_peer              * peer,
1322                           int                    numwant,
1323                           tr_block_index_t     * setme,
1324                           int                  * numgot,
1325                           bool                   get_intervals )
1326{
1327    int i;
1328    int got;
1329    Torrent * t;
1330    struct weighted_piece * pieces;
1331    const tr_bitfield * const have = &peer->have;
1332
1333    /* sanity clause */
1334    assert( tr_isTorrent( tor ) );
1335    assert( peer->clientIsInterested );
1336    assert( !peer->clientIsChoked );
1337    assert( numwant > 0 );
1338
1339    /* walk through the pieces and find blocks that should be requested */
1340    got = 0;
1341    t = tor->torrentPeers;
1342
1343    /* prep the pieces list */
1344    if( t->pieces == NULL )
1345        pieceListRebuild( t );
1346
1347    if( t->pieceSortState != PIECES_SORTED_BY_WEIGHT )
1348        pieceListSort( t, PIECES_SORTED_BY_WEIGHT );
1349
1350    assertReplicationCountIsExact( t );
1351    assertWeightedPiecesAreSorted( t );
1352
1353    updateEndgame( t );
1354    pieces = t->pieces;
1355    for( i=0; i<t->pieceCount && got<numwant; ++i )
1356    {
1357        struct weighted_piece * p = pieces + i;
1358
1359        /* if the peer has this piece that we want... */
1360        if( tr_bitfieldHas( have, p->index ) )
1361        {
1362            tr_block_index_t b;
1363            tr_block_index_t first;
1364            tr_block_index_t last;
1365            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1366
1367            tr_torGetPieceBlockRange( tor, p->index, &first, &last );
1368
1369            for( b=first; b<=last && (got<numwant || (get_intervals && setme[2*got-1] == b-1)); ++b )
1370            {
1371                int peerCount;
1372                tr_peer ** peers;
1373
1374                /* don't request blocks we've already got */
1375                if( tr_cpBlockIsComplete( &tor->completion, b ) )
1376                    continue;
1377
1378                /* always add peer if this block has no peers yet */
1379                tr_ptrArrayClear( &peerArr );
1380                getBlockRequestPeers( t, b, &peerArr );
1381                peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1382                if( peerCount != 0 )
1383                {
1384                    /* don't make a second block request until the endgame */
1385                    if( !t->endgame )
1386                        continue;
1387
1388                    /* don't have more than two peers requesting this block */
1389                    if( peerCount > 1 )
1390                        continue;
1391
1392                    /* don't send the same request to the same peer twice */
1393                    if( peer == peers[0] )
1394                        continue;
1395
1396                    /* in the endgame allow an additional peer to download a
1397                       block but only if the peer seems to be handling requests
1398                       relatively fast */
1399                    if( peer->pendingReqsToPeer + numwant - got < t->endgame )
1400                        continue;
1401                }
1402
1403                /* update the caller's table */
1404                if( !get_intervals ) {
1405                    setme[got++] = b;
1406                }
1407                /* if intervals are requested two array entries are necessarry:
1408                   one for the interval's starting block and one for its end block */
1409                else if( got && setme[2 * got - 1] == b - 1 && b != first ) {
1410                    /* expand the last interval */
1411                    ++setme[2 * got - 1];
1412                }
1413                else {
1414                    /* begin a new interval */
1415                    setme[2 * got] = setme[2 * got + 1] = b;
1416                    ++got;
1417                }
1418
1419                /* update our own tables */
1420                requestListAdd( t, b, peer );
1421                ++p->requestCount;
1422            }
1423
1424            tr_ptrArrayDestruct( &peerArr, NULL );
1425        }
1426    }
1427
1428    /* In most cases we've just changed the weights of a small number of pieces.
1429     * So rather than qsort()ing the entire array, it's faster to apply an
1430     * adaptive insertion sort algorithm. */
1431    if( got > 0 )
1432    {
1433        /* not enough requests || last piece modified */
1434        if ( i == t->pieceCount ) --i;
1435
1436        setComparePieceByWeightTorrent( t );
1437        while( --i >= 0 )
1438        {
1439            bool exact;
1440
1441            /* relative position! */
1442            const int newpos = tr_lowerBound( &t->pieces[i], &t->pieces[i + 1],
1443                                              t->pieceCount - (i + 1),
1444                                              sizeof( struct weighted_piece ),
1445                                              comparePieceByWeight, &exact );
1446            if( newpos > 0 )
1447            {
1448                const struct weighted_piece piece = t->pieces[i];
1449                memmove( &t->pieces[i],
1450                         &t->pieces[i + 1],
1451                         sizeof( struct weighted_piece ) * ( newpos ) );
1452                t->pieces[i + newpos] = piece;
1453            }
1454        }
1455    }
1456
1457    assertWeightedPiecesAreSorted( t );
1458    *numgot = got;
1459}
1460
1461bool
1462tr_peerMgrDidPeerRequest( const tr_torrent  * tor,
1463                          const tr_peer     * peer,
1464                          tr_block_index_t    block )
1465{
1466    const Torrent * t = tor->torrentPeers;
1467    return requestListLookup( (Torrent*)t, block, peer ) != NULL;
1468}
1469
1470/* cancel requests that are too old */
1471static void
1472refillUpkeep( int foo UNUSED, short bar UNUSED, void * vmgr )
1473{
1474    time_t now;
1475    time_t too_old;
1476    tr_torrent * tor;
1477    int cancel_buflen = 0;
1478    struct block_request * cancel = NULL;
1479    tr_peerMgr * mgr = vmgr;
1480    managerLock( mgr );
1481
1482    now = tr_time( );
1483    too_old = now - REQUEST_TTL_SECS;
1484
1485    /* alloc the temporary "cancel" buffer */
1486    tor = NULL;
1487    while(( tor = tr_torrentNext( mgr->session, tor )))
1488        cancel_buflen = MAX( cancel_buflen, tor->torrentPeers->requestCount );
1489    if( cancel_buflen > 0 )
1490        cancel = tr_new( struct block_request, cancel_buflen );
1491
1492    /* prune requests that are too old */
1493    tor = NULL;
1494    while(( tor = tr_torrentNext( mgr->session, tor )))
1495    {
1496        Torrent * t = tor->torrentPeers;
1497        const int n = t->requestCount;
1498        if( n > 0 )
1499        {
1500            int keepCount = 0;
1501            int cancelCount = 0;
1502            const struct block_request * it;
1503            const struct block_request * end;
1504
1505            for( it=t->requests, end=it+n; it!=end; ++it )
1506            {
1507                if( ( it->sentAt <= too_old ) && it->peer->msgs && !tr_peerMsgsIsReadingBlock( it->peer->msgs, it->block ) )
1508                    cancel[cancelCount++] = *it;
1509                else
1510                {
1511                    if( it != &t->requests[keepCount] )
1512                        t->requests[keepCount] = *it;
1513                    keepCount++;
1514                }
1515            }
1516
1517            /* prune out the ones we aren't keeping */
1518            t->requestCount = keepCount;
1519
1520            /* send cancel messages for all the "cancel" ones */
1521            for( it=cancel, end=it+cancelCount; it!=end; ++it ) {
1522                if( ( it->peer != NULL ) && ( it->peer->msgs != NULL ) ) {
1523                    tr_historyAdd( &it->peer->cancelsSentToPeer, now, 1 );
1524                    tr_peerMsgsCancel( it->peer->msgs, it->block );
1525                    decrementPendingReqCount( it );
1526                }
1527            }
1528
1529            /* decrement the pending request counts for the timed-out blocks */
1530            for( it=cancel, end=it+cancelCount; it!=end; ++it )
1531                pieceListRemoveRequest( t, it->block );
1532        }
1533    }
1534
1535    tr_free( cancel );
1536    tr_timerAddMsec( mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC );
1537    managerUnlock( mgr );
1538}
1539
1540static void
1541addStrike( Torrent * t, tr_peer * peer )
1542{
1543    tordbg( t, "increasing peer %s strike count to %d",
1544            tr_atomAddrStr( peer->atom ), peer->strikes + 1 );
1545
1546    if( ++peer->strikes >= MAX_BAD_PIECES_PER_PEER )
1547    {
1548        struct peer_atom * atom = peer->atom;
1549        atom->flags2 |= MYFLAG_BANNED;
1550        peer->doPurge = 1;
1551        tordbg( t, "banning peer %s", tr_atomAddrStr( atom ) );
1552    }
1553}
1554
1555static void
1556gotBadPiece( Torrent * t, tr_piece_index_t pieceIndex )
1557{
1558    tr_torrent *   tor = t->tor;
1559    const uint32_t byteCount = tr_torPieceCountBytes( tor, pieceIndex );
1560
1561    tor->corruptCur += byteCount;
1562    tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
1563
1564    tr_announcerAddBytes( tor, TR_ANN_CORRUPT, byteCount );
1565}
1566
1567static void
1568peerSuggestedPiece( Torrent            * t UNUSED,
1569                    tr_peer            * peer UNUSED,
1570                    tr_piece_index_t     pieceIndex UNUSED,
1571                    int                  isFastAllowed UNUSED )
1572{
1573#if 0
1574    assert( t );
1575    assert( peer );
1576    assert( peer->msgs );
1577
1578    /* is this a valid piece? */
1579    if(  pieceIndex >= t->tor->info.pieceCount )
1580        return;
1581
1582    /* don't ask for it if we've already got it */
1583    if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
1584        return;
1585
1586    /* don't ask for it if they don't have it */
1587    if( !tr_bitfieldHas( peer->have, pieceIndex ) )
1588        return;
1589
1590    /* don't ask for it if we're choked and it's not fast */
1591    if( !isFastAllowed && peer->clientIsChoked )
1592        return;
1593
1594    /* request the blocks that we don't have in this piece */
1595    {
1596        tr_block_index_t b;
1597        tr_block_index_t first;
1598        tr_block_index_t last;
1599        const tr_torrent * tor = t->tor;
1600
1601        tr_torGetPieceBlockRange( t->tor, pieceIndex, &first, &last );
1602
1603        for( b=first; b<=last; ++b )
1604        {
1605            if( !tr_cpBlockIsComplete( tor->completion, b ) )
1606            {
1607                const uint32_t offset = getBlockOffsetInPiece( tor, b );
1608                const uint32_t length = tr_torBlockCountBytes( tor, b );
1609                tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length );
1610                incrementPieceRequests( t, pieceIndex );
1611            }
1612        }
1613    }
1614#endif
1615}
1616
1617static void
1618removeRequestFromTables( Torrent * t, tr_block_index_t block, const tr_peer * peer )
1619{
1620    requestListRemove( t, block, peer );
1621    pieceListRemoveRequest( t, block );
1622}
1623
1624/* peer choked us, or maybe it disconnected.
1625   either way we need to remove all its requests */
1626static void
1627peerDeclinedAllRequests( Torrent * t, const tr_peer * peer )
1628{
1629    int i, n;
1630    tr_block_index_t * blocks = tr_new( tr_block_index_t, t->requestCount );
1631
1632    for( i=n=0; i<t->requestCount; ++i )
1633        if( peer == t->requests[i].peer )
1634            blocks[n++] = t->requests[i].block;
1635
1636    for( i=0; i<n; ++i )
1637        removeRequestFromTables( t, blocks[i], peer );
1638
1639    tr_free( blocks );
1640}
1641
1642static void tr_peerMgrSetBlame( tr_torrent *, tr_piece_index_t, int );
1643
1644static void
1645peerCallbackFunc( tr_peer * peer, const tr_peer_event * e, void * vt )
1646{
1647    Torrent * t = vt;
1648
1649    torrentLock( t );
1650
1651    assert( peer != NULL );
1652
1653    switch( e->eventType )
1654    {
1655        case TR_PEER_PEER_GOT_DATA:
1656        {
1657            const time_t now = tr_time( );
1658            tr_torrent * tor = t->tor;
1659
1660            if( e->wasPieceData )
1661            {
1662                tor->uploadedCur += e->length;
1663                tr_announcerAddBytes( tor, TR_ANN_UP, e->length );
1664                tr_torrentSetActivityDate( tor, now );
1665                tr_torrentSetDirty( tor );
1666            }
1667
1668            /* update the stats */
1669            if( e->wasPieceData )
1670                tr_statsAddUploaded( tor->session, e->length );
1671
1672            /* update our atom */
1673            if( peer->atom && e->wasPieceData )
1674                peer->atom->piece_data_time = now;
1675
1676            break;
1677        }
1678
1679        case TR_PEER_CLIENT_GOT_HAVE:
1680            if( replicationExists( t ) ) {
1681                tr_incrReplicationOfPiece( t, e->pieceIndex );
1682                assertReplicationCountIsExact( t );
1683            }
1684            break;
1685
1686        case TR_PEER_CLIENT_GOT_HAVE_ALL:
1687            if( replicationExists( t ) ) {
1688                tr_incrReplication( t );
1689                assertReplicationCountIsExact( t );
1690            }
1691            break;
1692
1693        case TR_PEER_CLIENT_GOT_HAVE_NONE:
1694            /* noop */
1695            break;
1696
1697        case TR_PEER_CLIENT_GOT_BITFIELD:
1698            assert( e->bitfield != NULL );
1699            if( replicationExists( t ) ) {
1700                tr_incrReplicationFromBitfield( t, e->bitfield );
1701                assertReplicationCountIsExact( t );
1702            }
1703            break;
1704
1705        case TR_PEER_CLIENT_GOT_REJ: {
1706            tr_block_index_t b = _tr_block( t->tor, e->pieceIndex, e->offset );
1707            if( b < t->tor->blockCount )
1708                removeRequestFromTables( t, b, peer );
1709            else
1710                tordbg( t, "Peer %s sent an out-of-range reject message",
1711                           tr_atomAddrStr( peer->atom ) );
1712            break;
1713        }
1714
1715        case TR_PEER_CLIENT_GOT_CHOKE:
1716            peerDeclinedAllRequests( t, peer );
1717            break;
1718
1719        case TR_PEER_CLIENT_GOT_PORT:
1720            if( peer->atom )
1721                peer->atom->port = e->port;
1722            break;
1723
1724        case TR_PEER_CLIENT_GOT_SUGGEST:
1725            peerSuggestedPiece( t, peer, e->pieceIndex, false );
1726            break;
1727
1728        case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1729            peerSuggestedPiece( t, peer, e->pieceIndex, true );
1730            break;
1731
1732        case TR_PEER_CLIENT_GOT_DATA:
1733        {
1734            const time_t now = tr_time( );
1735            tr_torrent * tor = t->tor;
1736
1737            if( e->wasPieceData )
1738            {
1739                tor->downloadedCur += e->length;
1740                tr_torrentSetActivityDate( tor, now );
1741                tr_torrentSetDirty( tor );
1742            }
1743
1744            /* update the stats */
1745            if( e->wasPieceData )
1746                tr_statsAddDownloaded( tor->session, e->length );
1747
1748            /* update our atom */
1749            if( peer->atom && e->wasPieceData )
1750                peer->atom->piece_data_time = now;
1751
1752            break;
1753        }
1754
1755        case TR_PEER_CLIENT_GOT_BLOCK:
1756        {
1757            tr_torrent * tor = t->tor;
1758            tr_block_index_t block = _tr_block( tor, e->pieceIndex, e->offset );
1759            int i, peerCount;
1760            tr_peer ** peers;
1761            tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1762
1763            removeRequestFromTables( t, block, peer );
1764            getBlockRequestPeers( t, block, &peerArr );
1765            peers = (tr_peer **) tr_ptrArrayPeek( &peerArr, &peerCount );
1766
1767            /* remove additional block requests and send cancel to peers */
1768            for( i=0; i<peerCount; i++ ) {
1769                tr_peer * p = peers[i];
1770                assert( p != peer );
1771                if( p->msgs ) {
1772                    tr_historyAdd( &p->cancelsSentToPeer, tr_time( ), 1 );
1773                    tr_peerMsgsCancel( p->msgs, block );
1774                }
1775                removeRequestFromTables( t, block, p );
1776            }
1777
1778            tr_ptrArrayDestruct( &peerArr, false );
1779
1780            tr_historyAdd( &peer->blocksSentToClient, tr_time( ), 1 );
1781
1782            if( tr_cpBlockIsComplete( &tor->completion, block ) )
1783            {
1784                /* we already have this block... */
1785                const uint32_t n = tr_torBlockCountBytes( tor, block );
1786                tor->downloadedCur -= MIN( tor->downloadedCur, n );
1787                tordbg( t, "we have this block already..." );
1788            }
1789            else
1790            {
1791                tr_cpBlockAdd( &tor->completion, block );
1792                pieceListResortPiece( t, pieceListLookup( t, e->pieceIndex ) );
1793                tr_torrentSetDirty( tor );
1794
1795                if( tr_cpPieceIsComplete( &tor->completion, e->pieceIndex ) )
1796                {
1797                    const tr_piece_index_t p = e->pieceIndex;
1798                    const bool ok = tr_torrentCheckPiece( tor, p );
1799
1800                    tordbg( t, "[LAZY] checked just-completed piece %zu", (size_t)p );
1801
1802                    if( !ok )
1803                    {
1804                        tr_torerr( tor, _( "Piece %lu, which was just downloaded, failed its checksum test" ),
1805                                   (unsigned long)p );
1806                    }
1807
1808                    tr_peerMgrSetBlame( tor, p, ok );
1809
1810                    if( !ok )
1811                    {
1812                        gotBadPiece( t, p );
1813                    }
1814                    else
1815                    {
1816                        int i;
1817                        int peerCount;
1818                        tr_peer ** peers;
1819                        tr_file_index_t fileIndex;
1820
1821                        /* only add this to downloadedCur if we got it from a peer --
1822                         * webseeds shouldn't count against our ratio. As one tracker
1823                         * admin put it, "Those pieces are downloaded directly from the
1824                         * content distributor, not the peers, it is the tracker's job
1825                         * to manage the swarms, not the web server and does not fit
1826                         * into the jurisdiction of the tracker." */
1827                        if( peer->msgs != NULL ) {
1828                            const uint32_t n = tr_torPieceCountBytes( tor, p );
1829                            tr_announcerAddBytes( tor, TR_ANN_DOWN, n );
1830                        }
1831
1832                        peerCount = tr_ptrArraySize( &t->peers );
1833                        peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
1834                        for( i=0; i<peerCount; ++i )
1835                            tr_peerMsgsHave( peers[i]->msgs, p );
1836
1837                        for( fileIndex=0; fileIndex<tor->info.fileCount; ++fileIndex ) {
1838                            const tr_file * file = &tor->info.files[fileIndex];
1839                            if( ( file->firstPiece <= p ) && ( p <= file->lastPiece ) ) {
1840                                if( tr_cpFileIsComplete( &tor->completion, fileIndex ) ) {
1841                                    tr_cacheFlushFile( tor->session->cache, tor, fileIndex );
1842                                    tr_torrentFileCompleted( tor, fileIndex );
1843                                }
1844                            }
1845                        }
1846
1847                        pieceListRemovePiece( t, p );
1848                    }
1849                }
1850
1851                t->needsCompletenessCheck = true;
1852            }
1853            break;
1854        }
1855
1856        case TR_PEER_ERROR:
1857            if( ( e->err == ERANGE ) || ( e->err == EMSGSIZE ) || ( e->err == ENOTCONN ) )
1858            {
1859                /* some protocol error from the peer */
1860                peer->doPurge = 1;
1861                tordbg( t, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
1862                        tr_atomAddrStr( peer->atom ) );
1863            }
1864            else
1865            {
1866                tordbg( t, "unhandled error: %s", tr_strerror( e->err ) );
1867            }
1868            break;
1869
1870        default:
1871            assert( 0 );
1872    }
1873
1874    torrentUnlock( t );
1875}
1876
1877static int
1878getDefaultShelfLife( uint8_t from )
1879{
1880    /* in general, peers obtained from firsthand contact
1881     * are better than those from secondhand, etc etc */
1882    switch( from )
1883    {
1884        case TR_PEER_FROM_INCOMING : return 60 * 60 * 6;
1885        case TR_PEER_FROM_LTEP     : return 60 * 60 * 6;
1886        case TR_PEER_FROM_TRACKER  : return 60 * 60 * 3;
1887        case TR_PEER_FROM_DHT      : return 60 * 60 * 3;
1888        case TR_PEER_FROM_PEX      : return 60 * 60 * 2;
1889        case TR_PEER_FROM_RESUME   : return 60 * 60;
1890        case TR_PEER_FROM_LPD      : return 10 * 60;
1891        default                    : return 60 * 60;
1892    }
1893}
1894
1895static void
1896ensureAtomExists( Torrent           * t,
1897                  const tr_address  * addr,
1898                  const tr_port       port,
1899                  const uint8_t       flags,
1900                  const int8_t        seedProbability,
1901                  const uint8_t       from )
1902{
1903    struct peer_atom * a;
1904
1905    assert( tr_address_is_valid( addr ) );
1906    assert( from < TR_PEER_FROM__MAX );
1907
1908    a = getExistingAtom( t, addr );
1909
1910    if( a == NULL )
1911    {
1912        const int jitter = tr_cryptoWeakRandInt( 60*10 );
1913        a = tr_new0( struct peer_atom, 1 );
1914        a->addr = *addr;
1915        a->port = port;
1916        a->flags = flags;
1917        a->fromFirst = from;
1918        a->fromBest = from;
1919        a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
1920        a->blocklisted = -1;
1921        atomSetSeedProbability( a, seedProbability );
1922        tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
1923
1924        tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
1925    }
1926    else
1927    {
1928        if( from < a->fromBest )
1929            a->fromBest = from;
1930
1931        if( a->seedProbability == -1 )
1932            atomSetSeedProbability( a, seedProbability );
1933
1934        a->flags |= flags;
1935    }
1936}
1937
1938static int
1939getMaxPeerCount( const tr_torrent * tor )
1940{
1941    return tor->maxConnectedPeers;
1942}
1943
1944static int
1945getPeerCount( const Torrent * t )
1946{
1947    return tr_ptrArraySize( &t->peers );/* + tr_ptrArraySize( &t->outgoingHandshakes ); */
1948}
1949
1950/* FIXME: this is kind of a mess. */
1951static bool
1952myHandshakeDoneCB( tr_handshake  * handshake,
1953                   tr_peerIo     * io,
1954                   bool            readAnythingFromPeer,
1955                   bool            isConnected,
1956                   const uint8_t * peer_id,
1957                   void          * vmanager )
1958{
1959    bool               ok = isConnected;
1960    bool               success = false;
1961    tr_port            port;
1962    const tr_address * addr;
1963    tr_peerMgr       * manager = vmanager;
1964    Torrent          * t;
1965    tr_handshake     * ours;
1966
1967    assert( io );
1968    assert( tr_isBool( ok ) );
1969
1970    t = tr_peerIoHasTorrentHash( io )
1971        ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
1972        : NULL;
1973
1974    if( tr_peerIoIsIncoming ( io ) )
1975        ours = tr_ptrArrayRemoveSorted( &manager->incomingHandshakes,
1976                                        handshake, handshakeCompare );
1977    else if( t )
1978        ours = tr_ptrArrayRemoveSorted( &t->outgoingHandshakes,
1979                                        handshake, handshakeCompare );
1980    else
1981        ours = handshake;
1982
1983    assert( ours );
1984    assert( ours == handshake );
1985
1986    if( t )
1987        torrentLock( t );
1988
1989    addr = tr_peerIoGetAddress( io, &port );
1990
1991    if( !ok || !t || !t->isRunning )
1992    {
1993        if( t )
1994        {
1995            struct peer_atom * atom = getExistingAtom( t, addr );
1996            if( atom )
1997            {
1998                ++atom->numFails;
1999
2000                if( !readAnythingFromPeer )
2001                {
2002                    tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
2003                    atom->flags2 |= MYFLAG_UNREACHABLE;
2004                }
2005            }
2006        }
2007    }
2008    else /* looking good */
2009    {
2010        struct peer_atom * atom;
2011
2012        ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
2013        atom = getExistingAtom( t, addr );
2014        atom->time = tr_time( );
2015        atom->piece_data_time = 0;
2016        atom->lastConnectionAt = tr_time( );
2017
2018        if( !tr_peerIoIsIncoming( io ) )
2019        {
2020            atom->flags |= ADDED_F_CONNECTABLE;
2021            atom->flags2 &= ~MYFLAG_UNREACHABLE;
2022        }
2023
2024        /* In principle, this flag specifies whether the peer groks uTP,
2025           not whether it's currently connected over uTP. */
2026        if( io->utp_socket )
2027            atom->flags |= ADDED_F_UTP_FLAGS;
2028
2029        if( atom->flags2 & MYFLAG_BANNED )
2030        {
2031            tordbg( t, "banned peer %s tried to reconnect",
2032                    tr_atomAddrStr( atom ) );
2033        }
2034        else if( tr_peerIoIsIncoming( io )
2035               && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
2036
2037        {
2038        }
2039        else
2040        {
2041            tr_peer * peer = atom->peer;
2042
2043            if( peer ) /* we already have this peer */
2044            {
2045            }
2046            else
2047            {
2048                peer = getPeer( t, atom );
2049                tr_free( peer->client );
2050
2051                if( !peer_id )
2052                    peer->client = NULL;
2053                else {
2054                    char client[128];
2055                    tr_clientForId( client, sizeof( client ), peer_id );
2056                    peer->client = tr_strdup( client );
2057                }
2058
2059                peer->io = tr_handshakeStealIO( handshake ); /* this steals its refcount too, which is
2060                                                                balanced by our unref in peerDelete()  */
2061                tr_peerIoSetParent( peer->io, &t->tor->bandwidth );
2062                tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t );
2063
2064                success = true;
2065            }
2066        }
2067    }
2068
2069    if( t )
2070        torrentUnlock( t );
2071
2072    return success;
2073}
2074
2075void
2076tr_peerMgrAddIncoming( tr_peerMgr * manager,
2077                       tr_address * addr,
2078                       tr_port      port,
2079                       int          socket,
2080                       struct UTPSocket * utp_socket )
2081{
2082    tr_session * session;
2083
2084    managerLock( manager );
2085
2086    assert( tr_isSession( manager->session ) );
2087    session = manager->session;
2088
2089    if( tr_sessionIsAddressBlocked( session, addr ) )
2090    {
2091        tr_dbg( "Banned IP address \"%s\" tried to connect to us", tr_address_to_string( addr ) );
2092        if(socket >= 0)
2093            tr_netClose( session, socket );
2094        else
2095            UTP_Close( utp_socket );
2096    }
2097    else if( getExistingHandshake( &manager->incomingHandshakes, addr ) )
2098    {
2099        if(socket >= 0)
2100            tr_netClose( session, socket );
2101        else
2102            UTP_Close( utp_socket );
2103    }
2104    else /* we don't have a connection to them yet... */
2105    {
2106        tr_peerIo *    io;
2107        tr_handshake * handshake;
2108
2109        io = tr_peerIoNewIncoming( session, &session->bandwidth, addr, port, socket, utp_socket );
2110
2111        handshake = tr_handshakeNew( io,
2112                                     session->encryptionMode,
2113                                     myHandshakeDoneCB,
2114                                     manager );
2115
2116        tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2117
2118        tr_ptrArrayInsertSorted( &manager->incomingHandshakes, handshake,
2119                                 handshakeCompare );
2120    }
2121
2122    managerUnlock( manager );
2123}
2124
2125void
2126tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
2127                  const tr_pex * pex, int8_t seedProbability )
2128{
2129    if( tr_isPex( pex ) ) /* safeguard against corrupt data */
2130    {
2131        Torrent * t = tor->torrentPeers;
2132        managerLock( t->manager );
2133
2134        if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
2135            if( tr_address_is_valid_for_peers( &pex->addr, pex->port ) )
2136                ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
2137
2138        managerUnlock( t->manager );
2139    }
2140}
2141
2142void
2143tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
2144{
2145    Torrent * t = tor->torrentPeers;
2146    const int n = tr_ptrArraySize( &t->pool );
2147    struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2148    struct peer_atom ** end = it + n;
2149
2150    while( it != end )
2151        atomSetSeed( t, *it++ );
2152}
2153
2154tr_pex *
2155tr_peerMgrCompactToPex( const void *    compact,
2156                        size_t          compactLen,
2157                        const uint8_t * added_f,
2158                        size_t          added_f_len,
2159                        size_t *        pexCount )
2160{
2161    size_t          i;
2162    size_t          n = compactLen / 6;
2163    const uint8_t * walk = compact;
2164    tr_pex *        pex = tr_new0( tr_pex, n );
2165
2166    for( i = 0; i < n; ++i )
2167    {
2168        pex[i].addr.type = TR_AF_INET;
2169        memcpy( &pex[i].addr.addr, walk, 4 ); walk += 4;
2170        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2171        if( added_f && ( n == added_f_len ) )
2172            pex[i].flags = added_f[i];
2173    }
2174
2175    *pexCount = n;
2176    return pex;
2177}
2178
2179tr_pex *
2180tr_peerMgrCompact6ToPex( const void    * compact,
2181                         size_t          compactLen,
2182                         const uint8_t * added_f,
2183                         size_t          added_f_len,
2184                         size_t        * pexCount )
2185{
2186    size_t          i;
2187    size_t          n = compactLen / 18;
2188    const uint8_t * walk = compact;
2189    tr_pex *        pex = tr_new0( tr_pex, n );
2190
2191    for( i = 0; i < n; ++i )
2192    {
2193        pex[i].addr.type = TR_AF_INET6;
2194        memcpy( &pex[i].addr.addr.addr6.s6_addr, walk, 16 ); walk += 16;
2195        memcpy( &pex[i].port, walk, 2 ); walk += 2;
2196        if( added_f && ( n == added_f_len ) )
2197            pex[i].flags = added_f[i];
2198    }
2199
2200    *pexCount = n;
2201    return pex;
2202}
2203
2204tr_pex *
2205tr_peerMgrArrayToPex( const void * array,
2206                      size_t       arrayLen,
2207                      size_t      * pexCount )
2208{
2209    size_t          i;
2210    size_t          n = arrayLen / ( sizeof( tr_address ) + 2 );
2211    /*size_t          n = arrayLen / sizeof( tr_peerArrayElement );*/
2212    const uint8_t * walk = array;
2213    tr_pex        * pex = tr_new0( tr_pex, n );
2214
2215    for( i = 0 ; i < n ; i++ ) {
2216        memcpy( &pex[i].addr, walk, sizeof( tr_address ) );
2217        memcpy( &pex[i].port, walk + sizeof( tr_address ), 2 );
2218        pex[i].flags = 0x00;
2219        walk += sizeof( tr_address ) + 2;
2220    }
2221
2222    *pexCount = n;
2223    return pex;
2224}
2225
2226/**
2227***
2228**/
2229
2230static void
2231tr_peerMgrSetBlame( tr_torrent     * tor,
2232                    tr_piece_index_t pieceIndex,
2233                    int              success )
2234{
2235    if( !success )
2236    {
2237        int        peerCount, i;
2238        Torrent *  t = tor->torrentPeers;
2239        tr_peer ** peers;
2240
2241        assert( torrentIsLocked( t ) );
2242
2243        peers = (tr_peer **) tr_ptrArrayPeek( &t->peers, &peerCount );
2244        for( i = 0; i < peerCount; ++i )
2245        {
2246            tr_peer * peer = peers[i];
2247            if( tr_bitfieldHas( &peer->blame, pieceIndex ) )
2248            {
2249                tordbg( t, "peer %s contributed to corrupt piece (%d); now has %d strikes",
2250                        tr_atomAddrStr( peer->atom ),
2251                        pieceIndex, (int)peer->strikes + 1 );
2252                addStrike( t, peer );
2253            }
2254        }
2255    }
2256}
2257
2258int
2259tr_pexCompare( const void * va, const void * vb )
2260{
2261    const tr_pex * a = va;
2262    const tr_pex * b = vb;
2263    int i;
2264
2265    assert( tr_isPex( a ) );
2266    assert( tr_isPex( b ) );
2267
2268    if(( i = tr_address_compare( &a->addr, &b->addr )))
2269        return i;
2270
2271    if( a->port != b->port )
2272        return a->port < b->port ? -1 : 1;
2273
2274    return 0;
2275}
2276
2277/* better goes first */
2278static int
2279compareAtomsByUsefulness( const void * va, const void *vb )
2280{
2281    const struct peer_atom * a = * (const struct peer_atom**) va;
2282    const struct peer_atom * b = * (const struct peer_atom**) vb;
2283
2284    assert( tr_isAtom( a ) );
2285    assert( tr_isAtom( b ) );
2286
2287    if( a->piece_data_time != b->piece_data_time )
2288        return a->piece_data_time > b->piece_data_time ? -1 : 1;
2289    if( a->fromBest != b->fromBest )
2290        return a->fromBest < b->fromBest ? -1 : 1;
2291    if( a->numFails != b->numFails )
2292        return a->numFails < b->numFails ? -1 : 1;
2293
2294    return 0;
2295}
2296
2297static bool
2298isAtomInteresting( const tr_torrent * tor, struct peer_atom * atom )
2299{
2300    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
2301        return false;
2302
2303    if( peerIsInUse( tor->torrentPeers, atom ) )
2304        return true;
2305
2306    if( isAtomBlocklisted( tor->session, atom ) )
2307        return false;
2308
2309    if( atom->flags2 & MYFLAG_BANNED )
2310        return false;
2311
2312    return true;
2313}
2314
2315int
2316tr_peerMgrGetPeers( tr_torrent   * tor,
2317                    tr_pex      ** setme_pex,
2318                    uint8_t        af,
2319                    uint8_t        list_mode,
2320                    int            maxCount )
2321{
2322    int i;
2323    int n;
2324    int count = 0;
2325    int atomCount = 0;
2326    const Torrent * t = tor->torrentPeers;
2327    struct peer_atom ** atoms = NULL;
2328    tr_pex * pex;
2329    tr_pex * walk;
2330
2331    assert( tr_isTorrent( tor ) );
2332    assert( setme_pex != NULL );
2333    assert( af==TR_AF_INET || af==TR_AF_INET6 );
2334    assert( list_mode==TR_PEERS_CONNECTED || list_mode==TR_PEERS_INTERESTING );
2335
2336    managerLock( t->manager );
2337
2338    /**
2339    ***  build a list of atoms
2340    **/
2341
2342    if( list_mode == TR_PEERS_CONNECTED ) /* connected peers only */
2343    {
2344        int i;
2345        const tr_peer ** peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2346        atomCount = tr_ptrArraySize( &t->peers );
2347        atoms = tr_new( struct peer_atom *, atomCount );
2348        for( i=0; i<atomCount; ++i )
2349            atoms[i] = peers[i]->atom;
2350    }
2351    else /* TR_PEERS_INTERESTING */
2352    {
2353        int i;
2354        struct peer_atom ** atomBase = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
2355        n = tr_ptrArraySize( &t->pool );
2356        atoms = tr_new( struct peer_atom *, n );
2357        for( i=0; i<n; ++i )
2358            if( isAtomInteresting( tor, atomBase[i] ) )
2359                atoms[atomCount++] = atomBase[i];
2360    }
2361
2362    qsort( atoms, atomCount, sizeof( struct peer_atom * ), compareAtomsByUsefulness );
2363
2364    /**
2365    ***  add the first N of them into our return list
2366    **/
2367
2368    n = MIN( atomCount, maxCount );
2369    pex = walk = tr_new0( tr_pex, n );
2370
2371    for( i=0; i<atomCount && count<n; ++i )
2372    {
2373        const struct peer_atom * atom = atoms[i];
2374        if( atom->addr.type == af )
2375        {
2376            assert( tr_address_is_valid( &atom->addr ) );
2377            walk->addr = atom->addr;
2378            walk->port = atom->port;
2379            walk->flags = atom->flags;
2380            ++count;
2381            ++walk;
2382        }
2383    }
2384
2385    qsort( pex, count, sizeof( tr_pex ), tr_pexCompare );
2386
2387    assert( ( walk - pex ) == count );
2388    *setme_pex = pex;
2389
2390    /* cleanup */
2391    tr_free( atoms );
2392    managerUnlock( t->manager );
2393    return count;
2394}
2395
2396static void atomPulse      ( int, short, void * );
2397static void bandwidthPulse ( int, short, void * );
2398static void rechokePulse   ( int, short, void * );
2399static void reconnectPulse ( int, short, void * );
2400
2401static struct event *
2402createTimer( tr_session * session, int msec, void (*callback)(int, short, void *), void * cbdata )
2403{
2404    struct event * timer = evtimer_new( session->event_base, callback, cbdata );
2405    tr_timerAddMsec( timer, msec );
2406    return timer;
2407}
2408
2409static void
2410ensureMgrTimersExist( struct tr_peerMgr * m )
2411{
2412    if( m->atomTimer == NULL )
2413        m->atomTimer = createTimer( m->session, ATOM_PERIOD_MSEC, atomPulse, m );
2414
2415    if( m->bandwidthTimer == NULL )
2416        m->bandwidthTimer = createTimer( m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m );
2417
2418    if( m->rechokeTimer == NULL )
2419        m->rechokeTimer = createTimer( m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m );
2420
2421    if( m->refillUpkeepTimer == NULL )
2422        m->refillUpkeepTimer = createTimer( m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
2423}
2424
2425void
2426tr_peerMgrStartTorrent( tr_torrent * tor )
2427{
2428    Torrent * t = tor->torrentPeers;
2429
2430    assert( tr_isTorrent( tor ) );
2431    assert( tr_torrentIsLocked( tor ) );
2432
2433    ensureMgrTimersExist( t->manager );
2434
2435    t->isRunning = true;
2436    t->maxPeers = t->tor->maxConnectedPeers;
2437    t->pieceSortState = PIECES_UNSORTED;
2438
2439    rechokePulse( 0, 0, t->manager );
2440}
2441
2442static void
2443stopTorrent( Torrent * t )
2444{
2445    tr_peer * peer;
2446
2447    t->isRunning = false;
2448
2449    replicationFree( t );
2450    invalidatePieceSorting( t );
2451
2452    /* disconnect the peers. */
2453    while(( peer = tr_ptrArrayPop( &t->peers )))
2454        peerDelete( t, peer );
2455
2456    /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2457     * which removes the handshake from t->outgoingHandshakes... */
2458    while( !tr_ptrArrayEmpty( &t->outgoingHandshakes ) )
2459        tr_handshakeAbort( tr_ptrArrayNth( &t->outgoingHandshakes, 0 ) );
2460}
2461
2462void
2463tr_peerMgrStopTorrent( tr_torrent * tor )
2464{
2465    assert( tr_isTorrent( tor ) );
2466    assert( tr_torrentIsLocked( tor ) );
2467
2468    stopTorrent( tor->torrentPeers );
2469}
2470
2471void
2472tr_peerMgrAddTorrent( tr_peerMgr * manager, tr_torrent * tor )
2473{
2474    assert( tr_isTorrent( tor ) );
2475    assert( tr_torrentIsLocked( tor ) );
2476    assert( tor->torrentPeers == NULL );
2477
2478    tor->torrentPeers = torrentNew( manager, tor );
2479}
2480
2481void
2482tr_peerMgrRemoveTorrent( tr_torrent * tor )
2483{
2484    assert( tr_isTorrent( tor ) );
2485    assert( tr_torrentIsLocked( tor ) );
2486
2487    stopTorrent( tor->torrentPeers );
2488    torrentFree( tor->torrentPeers );
2489}
2490
2491void
2492tr_peerUpdateProgress( tr_torrent * tor, tr_peer * peer )
2493{
2494    const tr_bitfield * have = &peer->have;
2495
2496    if( tr_bitfieldHasAll( have ) )
2497    {
2498        peer->progress = 1.0;
2499    }
2500    else if( tr_bitfieldHasNone( have ) )
2501    {
2502        peer->progress = 0.0;
2503    }
2504    else
2505    {
2506        const float true_count = tr_bitfieldCountTrueBits( have );
2507
2508        if( tr_torrentHasMetadata( tor ) )
2509        {
2510            peer->progress = true_count / tor->info.pieceCount;
2511        }
2512        else /* without pieceCount, this result is only a best guess... */
2513        {
2514            peer->progress = true_count / ( have->bit_count + 1 );
2515        }
2516    }
2517
2518    /* clamp the progress range */
2519    if ( peer->progress < 0.0 )
2520        peer->progress = 0.0;
2521    if ( peer->progress > 1.0 )
2522        peer->progress = 1.0;
2523
2524    if( peer->atom && ( peer->progress >= 1.0 ) )
2525        atomSetSeed( tor->torrentPeers, peer->atom );
2526}
2527
2528void
2529tr_peerMgrOnTorrentGotMetainfo( tr_torrent * tor )
2530{
2531    int i;
2532    int peerCount;
2533    tr_peer ** peers;
2534
2535    /* the webseed list may have changed... */
2536    rebuildWebseedArray( tor->torrentPeers, tor );
2537
2538    /* some peer_msgs' progress fields may not be accurate if we
2539       didn't have the metadata before now... so refresh them all... */
2540    peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2541    peers = (tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2542    for( i=0; i<peerCount; ++i )
2543        tr_peerUpdateProgress( tor, peers[i] );
2544
2545}
2546
2547void
2548tr_peerMgrTorrentAvailability( const tr_torrent * tor, int8_t * tab, unsigned int tabCount )
2549{
2550    assert( tr_isTorrent( tor ) );
2551    assert( torrentIsLocked( tor->torrentPeers ) );
2552    assert( tab != NULL );
2553    assert( tabCount > 0 );
2554
2555    memset( tab, 0, tabCount );
2556
2557    if( tr_torrentHasMetadata( tor ) )
2558    {
2559        tr_piece_index_t i;
2560        const int peerCount = tr_ptrArraySize( &tor->torrentPeers->peers );
2561        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &tor->torrentPeers->peers );
2562        const float interval = tor->info.pieceCount / (float)tabCount;
2563        const bool isSeed = tr_cpGetStatus( &tor->completion ) == TR_SEED;
2564
2565        for( i=0; i<tabCount; ++i )
2566        {
2567            const int piece = i * interval;
2568
2569            if( isSeed || tr_cpPieceIsComplete( &tor->completion, piece ) )
2570                tab[i] = -1;
2571            else if( peerCount ) {
2572                int j;
2573                for( j=0; j<peerCount; ++j )
2574                    if( tr_bitfieldHas( &peers[j]->have, piece ) )
2575                        ++tab[i];
2576            }
2577        }
2578    }
2579}
2580
2581static bool
2582peerIsSeed( const tr_peer * peer )
2583{
2584    if( peer->progress >= 1.0 )
2585        return true;
2586
2587    if( peer->atom && atomIsSeed( peer->atom ) )
2588        return true;
2589
2590    return false;
2591}
2592
2593/* count how many bytes we want that connected peers have */
2594uint64_t
2595tr_peerMgrGetDesiredAvailable( const tr_torrent * tor )
2596{
2597    size_t i;
2598    size_t n;
2599    uint64_t desiredAvailable;
2600    const Torrent * t = tor->torrentPeers;
2601
2602    /* common shortcuts... */
2603
2604    if( tr_torrentIsSeed( t->tor ) )
2605        return 0;
2606
2607    if( !tr_torrentHasMetadata( tor ) )
2608        return 0;
2609
2610    n = tr_ptrArraySize( &t->peers );
2611    if( n == 0 )
2612        return 0;
2613    else {
2614        const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2615        for( i=0; i<n; ++i )
2616            if( peers[i]->atom && atomIsSeed( peers[i]->atom ) )
2617                return tr_cpLeftUntilDone( &tor->completion );
2618    }
2619
2620    if( !t->pieceReplication || !t->pieceReplicationSize )
2621        return 0;
2622
2623    /* do it the hard way */
2624
2625    desiredAvailable = 0;
2626    for( i=0, n=MIN(tor->info.pieceCount, t->pieceReplicationSize); i<n; ++i )
2627        if( !tor->info.pieces[i].dnd && ( t->pieceReplication[i] > 0 ) )
2628            desiredAvailable += tr_cpMissingBytesInPiece( &t->tor->completion, i );
2629
2630    assert( desiredAvailable <= tor->info.totalSize );
2631    return desiredAvailable;
2632}
2633
2634void
2635tr_peerMgrTorrentStats( tr_torrent  * tor,
2636                        int         * setmePeersConnected,
2637                        int         * setmeWebseedsSendingToUs,
2638                        int         * setmePeersSendingToUs,
2639                        int         * setmePeersGettingFromUs,
2640                        int         * setmePeersFrom )
2641{
2642    int i, size;
2643    const Torrent * t = tor->torrentPeers;
2644    const tr_peer ** peers;
2645
2646    assert( tr_torrentIsLocked( tor ) );
2647
2648    peers = (const tr_peer **) tr_ptrArrayBase( &t->peers );
2649    size = tr_ptrArraySize( &t->peers );
2650
2651    *setmePeersConnected       = 0;
2652    *setmePeersGettingFromUs   = 0;
2653    *setmePeersSendingToUs     = 0;
2654    *setmeWebseedsSendingToUs  = 0;
2655
2656    for( i=0; i<TR_PEER_FROM__MAX; ++i )
2657        setmePeersFrom[i] = 0;
2658
2659    for( i=0; i<size; ++i )
2660    {
2661        const tr_peer * peer = peers[i];
2662        const struct peer_atom * atom = peer->atom;
2663
2664        if( peer->io == NULL ) /* not connected */
2665            continue;
2666
2667        ++*setmePeersConnected;
2668
2669        ++setmePeersFrom[atom->fromFirst];
2670
2671        if( clientIsDownloadingFrom( tor, peer ) )
2672            ++*setmePeersSendingToUs;
2673
2674        if( clientIsUploadingTo( peer ) )
2675            ++*setmePeersGettingFromUs;
2676    }
2677
2678    *setmeWebseedsSendingToUs = countActiveWebseeds( t );
2679}
2680
2681double*
2682tr_peerMgrWebSpeeds_KBps( const tr_torrent * tor )
2683{
2684    int i;
2685    const Torrent * t = tor->torrentPeers;
2686    const int webseedCount = tr_ptrArraySize( &t->webseeds );
2687    const tr_webseed ** webseeds = (const tr_webseed**) tr_ptrArrayBase( &t->webseeds );
2688    const uint64_t now = tr_time_msec( );
2689    double * ret = tr_new0( double, webseedCount );
2690
2691    assert( tr_isTorrent( tor ) );
2692    assert( tr_torrentIsLocked( tor ) );
2693    assert( t->manager != NULL );
2694    assert( webseedCount == tor->info.webseedCount );
2695
2696    for( i=0; i<webseedCount; ++i ) {
2697        unsigned int Bps;
2698        if( tr_webseedGetSpeed_Bps( webseeds[i], now, &Bps ) )
2699            ret[i] = Bps / (double)tr_speed_K;
2700        else
2701            ret[i] = -1.0;
2702    }
2703
2704    return ret;
2705}
2706
2707unsigned int
2708tr_peerGetPieceSpeed_Bps( const tr_peer * peer, uint64_t now, tr_direction direction )
2709{
2710    return peer->io ? tr_peerIoGetPieceSpeed_Bps( peer->io, now, direction ) : 0.0;
2711}
2712
2713struct tr_peer_stat *
2714tr_peerMgrPeerStats( const tr_torrent * tor, int * setmeCount )
2715{
2716    int i;
2717    const Torrent * t = tor->torrentPeers;
2718    const int size = tr_ptrArraySize( &t->peers );
2719    const tr_peer ** peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
2720    const uint64_t now_msec = tr_time_msec( );
2721    const time_t now = tr_time();
2722    tr_peer_stat * ret = tr_new0( tr_peer_stat, size );
2723
2724    assert( tr_isTorrent( tor ) );
2725    assert( tr_torrentIsLocked( tor ) );
2726    assert( t->manager );
2727
2728    for( i=0; i<size; ++i )
2729    {
2730        char *                   pch;
2731        const tr_peer *          peer = peers[i];
2732        const struct peer_atom * atom = peer->atom;
2733        tr_peer_stat *           stat = ret + i;
2734
2735        tr_address_to_string_with_buf( &atom->addr, stat->addr, sizeof( stat->addr ) );
2736        tr_strlcpy( stat->client, ( peer->client ? peer->client : "" ),
2737                   sizeof( stat->client ) );
2738        stat->port                = ntohs( peer->atom->port );
2739        stat->from                = atom->fromFirst;
2740        stat->progress            = peer->progress;
2741        stat->isUTP               = peer->io->utp_socket != NULL;
2742        stat->isEncrypted         = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
2743        stat->rateToPeer_KBps     = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_CLIENT_TO_PEER ) );
2744        stat->rateToClient_KBps   = toSpeedKBps( tr_peerGetPieceSpeed_Bps( peer, now_msec, TR_PEER_TO_CLIENT ) );
2745        stat->peerIsChoked        = peer->peerIsChoked;
2746        stat->peerIsInterested    = peer->peerIsInterested;
2747        stat->clientIsChoked      = peer->clientIsChoked;
2748        stat->clientIsInterested  = peer->clientIsInterested;
2749        stat->isIncoming          = tr_peerIoIsIncoming( peer->io );
2750        stat->isDownloadingFrom   = clientIsDownloadingFrom( tor, peer );
2751        stat->isUploadingTo       = clientIsUploadingTo( peer );
2752        stat->isSeed              = peerIsSeed( peer );
2753
2754        stat->blocksToPeer        = tr_historyGet( &peer->blocksSentToPeer,    now, CANCEL_HISTORY_SEC );
2755        stat->blocksToClient      = tr_historyGet( &peer->blocksSentToClient,  now, CANCEL_HISTORY_SEC );
2756        stat->cancelsToPeer       = tr_historyGet( &peer->cancelsSentToPeer,   now, CANCEL_HISTORY_SEC );
2757        stat->cancelsToClient     = tr_historyGet( &peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC );
2758
2759        stat->pendingReqsToPeer   = peer->pendingReqsToPeer;
2760        stat->pendingReqsToClient = peer->pendingReqsToClient;
2761
2762        pch = stat->flagStr;
2763        if( stat->isUTP ) *pch++ = 'T';
2764        if( t->optimistic == peer ) *pch++ = 'O';
2765        if( stat->isDownloadingFrom ) *pch++ = 'D';
2766        else if( stat->clientIsInterested ) *pch++ = 'd';
2767        if( stat->isUploadingTo ) *pch++ = 'U';
2768        else if( stat->peerIsInterested ) *pch++ = 'u';
2769        if( !stat->clientIsChoked && !stat->clientIsInterested ) *pch++ = 'K';
2770        if( !stat->peerIsChoked && !stat->peerIsInterested ) *pch++ = '?';
2771        if( stat->isEncrypted ) *pch++ = 'E';
2772        if( stat->from == TR_PEER_FROM_DHT ) *pch++ = 'H';
2773        else if( stat->from == TR_PEER_FROM_PEX ) *pch++ = 'X';
2774        if( stat->isIncoming ) *pch++ = 'I';
2775        *pch = '\0';
2776    }
2777
2778    *setmeCount = size;
2779
2780    return ret;
2781}
2782
2783/***
2784****
2785****
2786***/
2787
2788void
2789tr_peerMgrClearInterest( tr_torrent * tor )
2790{
2791    int i;
2792    Torrent * t = tor->torrentPeers;
2793    const int peerCount = tr_ptrArraySize( &t->peers );
2794
2795    assert( tr_isTorrent( tor ) );
2796    assert( tr_torrentIsLocked( tor ) );
2797
2798    for( i=0; i<peerCount; ++i )
2799    {
2800        const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2801        tr_peerMsgsSetInterested( peer->msgs, false );
2802    }
2803}
2804
2805/* does this peer have any pieces that we want? */
2806static bool
2807isPeerInteresting( const tr_torrent  * const tor,
2808                   const bool        * const piece_is_interesting,
2809                   const tr_peer     * const peer )
2810{
2811    tr_piece_index_t i, n;
2812
2813    /* these cases should have already been handled by the calling code... */
2814    assert( !tr_torrentIsSeed( tor ) );
2815    assert( tr_torrentIsPieceTransferAllowed( tor, TR_PEER_TO_CLIENT ) );
2816
2817    if( peerIsSeed( peer ) )
2818        return true;
2819
2820    for( i=0, n=tor->info.pieceCount; i<n; ++i )
2821        if( piece_is_interesting[i] && tr_bitfieldHas( &peer->have, i ) )
2822            return true;
2823
2824    return false;
2825}
2826
2827typedef enum
2828{
2829    RECHOKE_STATE_GOOD,
2830    RECHOKE_STATE_UNTESTED,
2831    RECHOKE_STATE_BAD
2832}
2833tr_rechoke_state;
2834
2835struct tr_rechoke_info
2836{
2837    tr_peer * peer;
2838    int salt;
2839    int rechoke_state;
2840};
2841
2842static int
2843compare_rechoke_info( const void * va, const void * vb )
2844{
2845    const struct tr_rechoke_info * a = va;
2846    const struct tr_rechoke_info * b = vb;
2847
2848    if( a->rechoke_state != b->rechoke_state )
2849        return a->rechoke_state - b->rechoke_state;
2850
2851    return a->salt - b->salt;
2852}
2853
2854/* determines who we send "interested" messages to */
2855static void
2856rechokeDownloads( Torrent * t )
2857{
2858    int i;
2859    int maxPeers = 0;
2860    int rechoke_count = 0;
2861    struct tr_rechoke_info * rechoke = NULL;
2862    const int MIN_INTERESTING_PEERS = 5;
2863    const int peerCount = tr_ptrArraySize( &t->peers );
2864    const time_t now = tr_time( );
2865
2866    /* some cases where this function isn't necessary */
2867    if( tr_torrentIsSeed( t->tor ) )
2868        return;
2869    if ( !tr_torrentIsPieceTransferAllowed( t->tor, TR_PEER_TO_CLIENT ) )
2870        return;
2871
2872    /* decide HOW MANY peers to be interested in */
2873    {
2874        int blocks = 0;
2875        int cancels = 0;
2876        time_t timeSinceCancel;
2877
2878        /* Count up how many blocks & cancels each peer has.
2879         *
2880         * There are two situations where we send out cancels --
2881         *
2882         * 1. We've got unresponsive peers, which is handled by deciding
2883         *    -which- peers to be interested in.
2884         *
2885         * 2. We've hit our bandwidth cap, which is handled by deciding
2886         *    -how many- peers to be interested in.
2887         *
2888         * We're working on 2. here, so we need to ignore unresponsive
2889         * peers in our calculations lest they confuse Transmission into
2890         * thinking it's hit its bandwidth cap.
2891         */
2892        for( i=0; i<peerCount; ++i )
2893        {
2894            const tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2895            const int b = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2896            const int c = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2897
2898            if( b == 0 ) /* ignore unresponsive peers, as described above */
2899                continue;
2900
2901            blocks += b;
2902            cancels += c;
2903        }
2904
2905        if( cancels > 0 )
2906        {
2907            /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
2908             * higher values indicate more congestion. */
2909            const double cancelRate = cancels / (double)(cancels + blocks);
2910            const double mult = 1 - MIN( cancelRate, 0.5 );
2911            maxPeers = t->interestedCount * mult;
2912            tordbg( t, "cancel rate is %.3f -- reducing the "
2913                       "number of peers we're interested in by %.0f percent",
2914                       cancelRate, mult * 100 );
2915            t->lastCancel = now;
2916        }
2917
2918        timeSinceCancel = now - t->lastCancel;
2919        if( timeSinceCancel )
2920        {
2921            const int maxIncrease = 15;
2922            const time_t maxHistory = 2 * CANCEL_HISTORY_SEC;
2923            const double mult = MIN( timeSinceCancel, maxHistory ) / (double) maxHistory;
2924            const int inc = maxIncrease * mult;
2925            maxPeers = t->maxPeers + inc;
2926            tordbg( t, "time since last cancel is %li -- increasing the "
2927                       "number of peers we're interested in by %d",
2928                       timeSinceCancel, inc );
2929        }
2930    }
2931
2932    /* don't let the previous section's number tweaking go too far... */
2933    if( maxPeers < MIN_INTERESTING_PEERS )
2934        maxPeers = MIN_INTERESTING_PEERS;
2935    if( maxPeers > t->tor->maxConnectedPeers )
2936        maxPeers = t->tor->maxConnectedPeers;
2937
2938    t->maxPeers = maxPeers;
2939
2940    if( peerCount > 0 )
2941    {
2942        bool * piece_is_interesting;
2943        const tr_torrent * const tor = t->tor;
2944        const int n = tor->info.pieceCount;
2945
2946        /* build a bitfield of interesting pieces... */
2947        piece_is_interesting = tr_new( bool, n );
2948        for( i=0; i<n; i++ )
2949            piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_cpPieceIsComplete( &tor->completion, i );
2950
2951        /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
2952        for( i=0; i<peerCount; ++i )
2953        {
2954            tr_peer * peer = tr_ptrArrayNth( &t->peers, i );
2955
2956            if( !isPeerInteresting( t->tor, piece_is_interesting, peer ) )
2957            {
2958                tr_peerMsgsSetInterested( peer->msgs, false );
2959            }
2960            else
2961            {
2962                tr_rechoke_state rechoke_state;
2963                const int blocks = tr_historyGet( &peer->blocksSentToClient, now, CANCEL_HISTORY_SEC );
2964                const int cancels = tr_historyGet( &peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC );
2965
2966                if( !blocks && !cancels )
2967                    rechoke_state = RECHOKE_STATE_UNTESTED;
2968                else if( !cancels )
2969                    rechoke_state = RECHOKE_STATE_GOOD;
2970                else if( !blocks )
2971                    rechoke_state = RECHOKE_STATE_BAD;
2972                else if( ( cancels * 10 ) < blocks )
2973                    rechoke_state = RECHOKE_STATE_GOOD;
2974                else
2975                    rechoke_state = RECHOKE_STATE_BAD;
2976
2977                if( rechoke == NULL )
2978                    rechoke = tr_new( struct tr_rechoke_info, peerCount );
2979
2980                 rechoke[rechoke_count].peer = peer;
2981                 rechoke[rechoke_count].rechoke_state = rechoke_state;
2982                 rechoke[rechoke_count].salt = tr_cryptoWeakRandInt( INT_MAX );
2983                 rechoke_count++;
2984            }
2985
2986        }
2987
2988        tr_free( piece_is_interesting );
2989    }
2990
2991    /* now that we know which & how many peers to be interested in... update the peer interest */
2992    qsort( rechoke, rechoke_count, sizeof( struct tr_rechoke_info ), compare_rechoke_info );
2993    t->interestedCount = MIN( maxPeers, rechoke_count );
2994    for( i=0; i<rechoke_count; ++i )
2995        tr_peerMsgsSetInterested( rechoke[i].peer->msgs, i<t->interestedCount );
2996
2997    /* cleanup */
2998    tr_free( rechoke );
2999}
3000
3001/**
3002***
3003**/
3004
3005struct ChokeData
3006{
3007    bool            isInterested;
3008    bool            wasChoked;
3009    bool            isChoked;
3010    int             rate;
3011    int             salt;
3012    tr_peer *       peer;
3013};
3014
3015static int
3016compareChoke( const void * va, const void * vb )
3017{
3018    const struct ChokeData * a = va;
3019    const struct ChokeData * b = vb;
3020
3021    if( a->rate != b->rate ) /* prefer higher overall speeds */
3022        return a->rate > b->rate ? -1 : 1;
3023
3024    if( a->wasChoked != b->wasChoked ) /* prefer unchoked */
3025        return a->wasChoked ? 1 : -1;
3026
3027    if( a->salt != b->salt ) /* random order */
3028        return a->salt - b->salt;
3029
3030    return 0;
3031}
3032
3033/* is this a new connection? */
3034static int
3035isNew( const tr_peer * peer )
3036{
3037    return peer && peer->io && tr_peerIoGetAge( peer->io ) < 45;
3038}
3039
3040/* get a rate for deciding which peers to choke and unchoke. */
3041static int
3042getRate( const tr_torrent * tor, struct peer_atom * atom, uint64_t now )
3043{
3044    unsigned int Bps;
3045
3046    if( tr_torrentIsSeed( tor ) )
3047        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3048
3049    /* downloading a private torrent... take upload speed into account
3050     * because there may only be a small window of opportunity to share */
3051    else if( tr_torrentIsPrivate( tor ) )
3052        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT )
3053            + tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_CLIENT_TO_PEER );
3054
3055    /* downloading a public torrent */
3056    else
3057        Bps = tr_peerGetPieceSpeed_Bps( atom->peer, now, TR_PEER_TO_CLIENT );
3058
3059    /* convert it to bytes per second */
3060    return Bps;
3061}
3062
3063static inline bool
3064isBandwidthMaxedOut( const tr_bandwidth * b,
3065                     const uint64_t now_msec, tr_direction dir )
3066{
3067    if( !tr_bandwidthIsLimited( b, dir ) )
3068        return false;
3069    else {
3070        const unsigned int got = tr_bandwidthGetPieceSpeed_Bps( b, now_msec, dir );
3071        const unsigned int want = tr_bandwidthGetDesiredSpeed_Bps( b, dir );
3072        return got >= want;
3073    }
3074}
3075
3076static void
3077rechokeUploads( Torrent * t, const uint64_t now )
3078{
3079    int i, size, unchokedInterested;
3080    const int peerCount = tr_ptrArraySize( &t->peers );
3081    tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( &t->peers );
3082    struct ChokeData * choke = tr_new0( struct ChokeData, peerCount );
3083    const tr_session * session = t->manager->session;
3084    const int chokeAll = !tr_torrentIsPieceTransferAllowed( t->tor, TR_CLIENT_TO_PEER );
3085    const bool isMaxedOut = isBandwidthMaxedOut( &t->tor->bandwidth, now, TR_UP );
3086
3087    assert( torrentIsLocked( t ) );
3088
3089    /* an optimistic unchoke peer's "optimistic"
3090     * state lasts for N calls to rechokeUploads(). */
3091    if( t->optimisticUnchokeTimeScaler > 0 )
3092        t->optimisticUnchokeTimeScaler--;
3093    else
3094        t->optimistic = NULL;
3095
3096    /* sort the peers by preference and rate */
3097    for( i = 0, size = 0; i < peerCount; ++i )
3098    {
3099        tr_peer * peer = peers[i];
3100        struct peer_atom * atom = peer->atom;
3101
3102        if( peerIsSeed( peer ) ) /* choke seeds and partial seeds */
3103        {
3104            tr_peerMsgsSetChoke( peer->msgs, true );
3105        }
3106        else if( chokeAll ) /* choke everyone if we're not uploading */
3107        {
3108            tr_peerMsgsSetChoke( peer->msgs, true );
3109        }
3110        else if( peer != t->optimistic )
3111        {
3112            struct ChokeData * n = &choke[size++];
3113            n->peer         = peer;
3114            n->isInterested = peer->peerIsInterested;
3115            n->wasChoked    = peer->peerIsChoked;
3116            n->rate         = getRate( t->tor, atom, now );
3117            n->salt         = tr_cryptoWeakRandInt( INT_MAX );
3118            n->isChoked     = true;
3119        }
3120    }
3121
3122    qsort( choke, size, sizeof( struct ChokeData ), compareChoke );
3123
3124    /**
3125     * Reciprocation and number of uploads capping is managed by unchoking
3126     * the N peers which have the best upload rate and are interested.
3127     * This maximizes the client's download rate. These N peers are
3128     * referred to as downloaders, because they are interested in downloading
3129     * from the client.
3130     *
3131     * Peers which have a better upload rate (as compared to the downloaders)
3132     * but aren't interested get unchoked. If they become interested, the
3133     * downloader with the worst upload rate gets choked. If a client has
3134     * a complete file, it uses its upload rate rather than its download
3135     * rate to decide which peers to unchoke.
3136     *
3137     * If our bandwidth is maxed out, don't unchoke any more peers.
3138     */
3139    unchokedInterested = 0;
3140    for( i=0; i<size && unchokedInterested<session->uploadSlotsPerTorrent; ++i ) {
3141        choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3142        if( choke[i].isInterested )
3143            ++unchokedInterested;
3144    }
3145
3146    /* optimistic unchoke */
3147    if( !t->optimistic && !isMaxedOut && (i<size) )
3148    {
3149        int n;
3150        struct ChokeData * c;
3151        tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3152
3153        for( ; i<size; ++i )
3154        {
3155            if( choke[i].isInterested )
3156            {
3157                const tr_peer * peer = choke[i].peer;
3158                int x = 1, y;
3159                if( isNew( peer ) ) x *= 3;
3160                for( y=0; y<x; ++y )
3161                    tr_ptrArrayAppend( &randPool, &choke[i] );
3162            }
3163        }
3164
3165        if(( n = tr_ptrArraySize( &randPool )))
3166        {
3167            c = tr_ptrArrayNth( &randPool, tr_cryptoWeakRandInt( n ));
3168            c->isChoked = false;
3169            t->optimistic = c->peer;
3170            t->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3171        }
3172
3173        tr_ptrArrayDestruct( &randPool, NULL );
3174    }
3175
3176    for( i=0; i<size; ++i )
3177        tr_peerMsgsSetChoke( choke[i].peer->msgs, choke[i].isChoked );
3178
3179    /* cleanup */
3180    tr_free( choke );
3181}
3182
3183static void
3184rechokePulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3185{
3186    tr_torrent * tor = NULL;
3187    tr_peerMgr * mgr = vmgr;
3188    const uint64_t now = tr_time_msec( );
3189
3190    managerLock( mgr );
3191
3192    while(( tor = tr_torrentNext( mgr->session, tor ))) {
3193        if( tor->isRunning ) {
3194            Torrent * t = tor->torrentPeers;
3195            if( !tr_ptrArrayEmpty( &t->peers ) ) {
3196                rechokeUploads( t, now );
3197                rechokeDownloads( t );
3198            }
3199        }
3200    }
3201
3202    tr_timerAddMsec( mgr->rechokeTimer, RECHOKE_PERIOD_MSEC );
3203    managerUnlock( mgr );
3204}
3205
3206/***
3207****
3208****  Life and Death
3209****
3210***/
3211
3212static bool
3213shouldPeerBeClosed( const Torrent    * t,
3214                    const tr_peer    * peer,
3215                    int                peerCount,
3216                    const time_t       now )
3217{
3218    const tr_torrent *       tor = t->tor;
3219    const struct peer_atom * atom = peer->atom;
3220
3221    /* if it's marked for purging, close it */
3222    if( peer->doPurge )
3223    {
3224        tordbg( t, "purging peer %s because its doPurge flag is set",
3225                tr_atomAddrStr( atom ) );
3226        return true;
3227    }
3228
3229    /* disconnect if we're both seeds and enough time has passed for PEX */
3230    if( tr_torrentIsSeed( tor ) && peerIsSeed( peer ) )
3231        return !tr_torrentAllowsPex(tor) || (now-atom->time>=30);
3232
3233    /* disconnect if it's been too long since piece data has been transferred.
3234     * this is on a sliding scale based on number of available peers... */
3235    {
3236        const int relaxStrictnessIfFewerThanN = (int)( ( getMaxPeerCount( tor ) * 0.9 ) + 0.5 );
3237        /* if we have >= relaxIfFewerThan, strictness is 100%.
3238         * if we have zero connections, strictness is 0% */
3239        const float strictness = peerCount >= relaxStrictnessIfFewerThanN
3240                               ? 1.0
3241                               : peerCount / (float)relaxStrictnessIfFewerThanN;
3242        const int lo = MIN_UPLOAD_IDLE_SECS;
3243        const int hi = MAX_UPLOAD_IDLE_SECS;
3244        const int limit = hi - ( ( hi - lo ) * strictness );
3245        const int idleTime = now - MAX( atom->time, atom->piece_data_time );
3246/*fprintf( stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time), (int)(now - atom->piece_data_time), idleTime, idleTime > limit );*/
3247        if( idleTime > limit ) {
3248            tordbg( t, "purging peer %s because it's been %d secs since we shared anything",
3249                       tr_atomAddrStr( atom ), idleTime );
3250            return true;
3251        }
3252    }
3253
3254    return false;
3255}
3256
3257static tr_peer **
3258getPeersToClose( Torrent * t, const time_t now_sec, int * setmeSize )
3259{
3260    int i, peerCount, outsize;
3261    struct tr_peer ** ret = NULL;
3262    tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( &t->peers, &peerCount );
3263
3264    assert( torrentIsLocked( t ) );
3265
3266    for( i = outsize = 0; i < peerCount; ++i ) {
3267        if( shouldPeerBeClosed( t, peers[i], peerCount, now_sec ) ) {
3268            if( ret == NULL )
3269                ret = tr_new( tr_peer *, peerCount );
3270            ret[outsize++] = peers[i];
3271        }
3272    }
3273
3274    *setmeSize = outsize;
3275    return ret;
3276}
3277
3278static int
3279getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
3280{
3281    int sec;
3282
3283    /* if we were recently connected to this peer and transferring piece
3284     * data, try to reconnect to them sooner rather that later -- we don't
3285     * want network troubles to get in the way of a good peer. */
3286    if( ( now - atom->piece_data_time ) <= ( MINIMUM_RECONNECT_INTERVAL_SECS * 2 ) )
3287        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3288
3289    /* don't allow reconnects more often than our minimum */
3290    else if( ( now - atom->time ) < MINIMUM_RECONNECT_INTERVAL_SECS )
3291        sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3292
3293    /* otherwise, the interval depends on how many times we've tried
3294     * and failed to connect to the peer */
3295    else switch( atom->numFails ) {
3296        case 0: sec = 0; break;
3297        case 1: sec = 5; break;
3298        case 2: sec = 2 * 60; break;
3299        case 3: sec = 15 * 60; break;
3300        case 4: sec = 30 * 60; break;
3301        case 5: sec = 60 * 60; break;
3302        default: sec = 120 * 60; break;
3303    }
3304
3305    /* penalize peers that were unreachable the last time we tried */
3306    if( atom->flags2 & MYFLAG_UNREACHABLE )
3307        sec += sec;
3308
3309    dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
3310    return sec;
3311}
3312
3313static void
3314removePeer( Torrent * t, tr_peer * peer )
3315{
3316    tr_peer * removed;
3317    struct peer_atom * atom = peer->atom;
3318
3319    assert( torrentIsLocked( t ) );
3320    assert( atom );
3321
3322    atom->time = tr_time( );
3323
3324    removed = tr_ptrArrayRemoveSorted( &t->peers, peer, peerCompare );
3325
3326    if( replicationExists( t ) )
3327        tr_decrReplicationFromBitfield( t, &peer->have );
3328
3329    assert( removed == peer );
3330    peerDelete( t, removed );
3331}
3332
3333static void
3334closePeer( Torrent * t, tr_peer * peer )
3335{
3336    struct peer_atom * atom;
3337
3338    assert( t != NULL );
3339    assert( peer != NULL );
3340
3341    atom = peer->atom;
3342
3343    /* if we transferred piece data, then they might be good peers,
3344       so reset their `numFails' weight to zero. otherwise we connected
3345       to them fruitlessly, so mark it as another fail */
3346    if( atom->piece_data_time ) {
3347        tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
3348        atom->numFails = 0;
3349    } else {
3350        ++atom->numFails;
3351        tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
3352    }
3353
3354    tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
3355    removePeer( t, peer );
3356}
3357
3358static void
3359removeAllPeers( Torrent * t )
3360{
3361    while( !tr_ptrArrayEmpty( &t->peers ) )
3362        removePeer( t, tr_ptrArrayNth( &t->peers, 0 ) );
3363}
3364
3365static void
3366closeBadPeers( Torrent * t, const time_t now_sec )
3367{
3368    if( !tr_ptrArrayEmpty( &t->peers ) )
3369    {
3370        int i;
3371        int peerCount;
3372        struct tr_peer ** peers = getPeersToClose( t, now_sec, &peerCount );
3373        for( i=0; i<peerCount; ++i )
3374            closePeer( t, peers[i] );
3375        tr_free( peers );
3376    }
3377}
3378
3379struct peer_liveliness
3380{
3381    tr_peer * peer;
3382    void * clientData;
3383    time_t pieceDataTime;
3384    time_t time;
3385    int speed;
3386    bool doPurge;
3387};
3388
3389static int
3390comparePeerLiveliness( const void * va, const void * vb )
3391{
3392    const struct peer_liveliness * a = va;
3393    const struct peer_liveliness * b = vb;
3394
3395    if( a->doPurge != b->doPurge )
3396        return a->doPurge ? 1 : -1;
3397
3398    if( a->speed != b->speed ) /* faster goes first */
3399        return a->speed > b->speed ? -1 : 1;
3400
3401    /* the one to give us data more recently goes first */
3402    if( a->pieceDataTime != b->pieceDataTime )
3403        return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3404
3405    /* the one we connected to most recently goes first */
3406    if( a->time != b->time )
3407        return a->time > b->time ? -1 : 1;
3408
3409    return 0;
3410}
3411
3412static void
3413sortPeersByLivelinessImpl( tr_peer  ** peers,
3414                           void     ** clientData,
3415                           int         n,
3416                           uint64_t    now,
3417                           int (*compare) ( const void *va, const void *vb ) )
3418{
3419    int i;
3420    struct peer_liveliness *lives, *l;
3421
3422    /* build a sortable array of peer + extra info */
3423    lives = l = tr_new0( struct peer_liveliness, n );
3424    for( i=0; i<n; ++i, ++l )
3425    {
3426        tr_peer * p = peers[i];
3427        l->peer = p;
3428        l->doPurge = p->doPurge;
3429        l->pieceDataTime = p->atom->piece_data_time;
3430        l->time = p->atom->time;
3431        l->speed = tr_peerGetPieceSpeed_Bps( p, now, TR_UP )
3432                 + tr_peerGetPieceSpeed_Bps( p, now, TR_DOWN );
3433        if( clientData )
3434            l->clientData = clientData[i];
3435    }
3436
3437    /* sort 'em */
3438    assert( n == ( l - lives ) );
3439    qsort( lives, n, sizeof( struct peer_liveliness ), compare );
3440
3441    /* build the peer array */
3442    for( i=0, l=lives; i<n; ++i, ++l ) {
3443        peers[i] = l->peer;
3444        if( clientData )
3445            clientData[i] = l->clientData;
3446    }
3447    assert( n == ( l - lives ) );
3448
3449    /* cleanup */
3450    tr_free( lives );
3451}
3452
3453static void
3454sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now )
3455{
3456    sortPeersByLivelinessImpl( peers, clientData, n, now, comparePeerLiveliness );
3457}
3458
3459
3460static void
3461enforceTorrentPeerLimit( Torrent * t, uint64_t now )
3462{
3463    int n = tr_ptrArraySize( &t->peers );
3464    const int max = tr_torrentGetPeerLimit( t->tor );
3465    if( n > max )
3466    {
3467        void * base = tr_ptrArrayBase( &t->peers );
3468        tr_peer ** peers = tr_memdup( base, n*sizeof( tr_peer* ) );
3469        sortPeersByLiveliness( peers, NULL, n, now );
3470        while( n > max )
3471            closePeer( t, peers[--n] );
3472        tr_free( peers );
3473    }
3474}
3475
3476static void
3477enforceSessionPeerLimit( tr_session * session, uint64_t now )
3478{
3479    int n = 0;
3480    tr_torrent * tor = NULL;
3481    const int max = tr_sessionGetPeerLimit( session );
3482
3483    /* count the total number of peers */
3484    while(( tor = tr_torrentNext( session, tor )))
3485        n += tr_ptrArraySize( &tor->torrentPeers->peers );
3486
3487    /* if there are too many, prune out the worst */
3488    if( n > max )
3489    {
3490        tr_peer ** peers = tr_new( tr_peer*, n );
3491        Torrent ** torrents = tr_new( Torrent*, n );
3492
3493        /* populate the peer array */
3494        n = 0;
3495        tor = NULL;
3496        while(( tor = tr_torrentNext( session, tor ))) {
3497            int i;
3498            Torrent * t = tor->torrentPeers;
3499            const int tn = tr_ptrArraySize( &t->peers );
3500            for( i=0; i<tn; ++i, ++n ) {
3501                peers[n] = tr_ptrArrayNth( &t->peers, i );
3502                torrents[n] = t;
3503            }
3504        }
3505
3506        /* sort 'em */
3507        sortPeersByLiveliness( peers, (void**)torrents, n, now );
3508
3509        /* cull out the crappiest */
3510        while( n-- > max )
3511            closePeer( torrents[n], peers[n] );
3512
3513        /* cleanup */
3514        tr_free( torrents );
3515        tr_free( peers );
3516    }
3517}
3518
3519static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
3520
3521static void
3522reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3523{
3524    tr_torrent * tor;
3525    tr_peerMgr * mgr = vmgr;
3526    const time_t now_sec = tr_time( );
3527    const uint64_t now_msec = tr_time_msec( );
3528
3529    /**
3530    ***  enforce the per-session and per-torrent peer limits
3531    **/
3532
3533    /* if we're over the per-torrent peer limits, cull some peers */
3534    tor = NULL;
3535    while(( tor = tr_torrentNext( mgr->session, tor )))
3536        if( tor->isRunning )
3537            enforceTorrentPeerLimit( tor->torrentPeers, now_msec );
3538
3539    /* if we're over the per-session peer limits, cull some peers */
3540    enforceSessionPeerLimit( mgr->session, now_msec );
3541
3542    /* remove crappy peers */
3543    tor = NULL;
3544    while(( tor = tr_torrentNext( mgr->session, tor )))
3545        if( !tor->torrentPeers->isRunning )
3546            removeAllPeers( tor->torrentPeers );
3547        else
3548            closeBadPeers( tor->torrentPeers, now_sec );
3549
3550    /* try to make new peer connections */
3551    makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
3552}
3553
3554/****
3555*****
3556*****  BANDWIDTH ALLOCATION
3557*****
3558****/
3559
3560static void
3561pumpAllPeers( tr_peerMgr * mgr )
3562{
3563    tr_torrent * tor = NULL;
3564
3565    while(( tor = tr_torrentNext( mgr->session, tor )))
3566    {
3567        int j;
3568        Torrent * t = tor->torrentPeers;
3569
3570        for( j=0; j<tr_ptrArraySize( &t->peers ); ++j )
3571        {
3572            tr_peer * peer = tr_ptrArrayNth( &t->peers, j );
3573            tr_peerMsgsPulse( peer->msgs );
3574        }
3575    }
3576}
3577
3578static void
3579queuePulse( tr_session * session, tr_direction dir )
3580{
3581    assert( tr_isSession( session ) );
3582    assert( tr_isDirection( dir ) );
3583
3584    if( tr_sessionGetQueueEnabled( session, dir ) )
3585    {
3586        int i;
3587        const int n = tr_sessionCountQueueFreeSlots( session, dir );
3588        for( i=0; i<n; i++ ) {
3589            tr_torrent * tor = tr_sessionGetNextQueuedTorrent( session, dir );
3590            if( tor != NULL ) {
3591                tr_torrentStartNow( tor );
3592                if( tor->queue_started_callback != NULL )
3593                    (*tor->queue_started_callback)( tor, tor->queue_started_user_data );
3594            }
3595        }
3596    }
3597}
3598
3599static void
3600bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3601{
3602    tr_torrent * tor;
3603    tr_peerMgr * mgr = vmgr;
3604    tr_session * session = mgr->session;
3605    managerLock( mgr );
3606
3607    /* FIXME: this next line probably isn't necessary... */
3608    pumpAllPeers( mgr );
3609
3610    /* allocate bandwidth to the peers */
3611    tr_bandwidthAllocate( &session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
3612    tr_bandwidthAllocate( &session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
3613
3614    /* torrent upkeep */
3615    tor = NULL;
3616    while(( tor = tr_torrentNext( session, tor )))
3617    {
3618        /* possibly stop torrents that have seeded enough */
3619        tr_torrentCheckSeedLimit( tor );
3620
3621        /* run the completeness check for any torrents that need it */
3622        if( tor->torrentPeers->needsCompletenessCheck ) {
3623            tor->torrentPeers->needsCompletenessCheck  = false;
3624            tr_torrentRecheckCompleteness( tor );
3625        }
3626
3627        /* stop torrents that are ready to stop, but couldn't be stopped
3628           earlier during the peer-io callback call chain */
3629        if( tor->isStopping )
3630            tr_torrentStop( tor );
3631    }
3632
3633    /* pump the queues */
3634    queuePulse( session, TR_UP );
3635    queuePulse( session, TR_DOWN );
3636
3637    reconnectPulse( 0, 0, mgr );
3638
3639    tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
3640    managerUnlock( mgr );
3641}
3642
3643/***
3644****
3645***/
3646
3647static int
3648compareAtomPtrsByAddress( const void * va, const void *vb )
3649{
3650    const struct peer_atom * a = * (const struct peer_atom**) va;
3651    const struct peer_atom * b = * (const struct peer_atom**) vb;
3652
3653    assert( tr_isAtom( a ) );
3654    assert( tr_isAtom( b ) );
3655
3656    return tr_address_compare( &a->addr, &b->addr );
3657}
3658
3659/* best come first, worst go last */
3660static int
3661compareAtomPtrsByShelfDate( const void * va, const void *vb )
3662{
3663    time_t atime;
3664    time_t btime;
3665    const struct peer_atom * a = * (const struct peer_atom**) va;
3666    const struct peer_atom * b = * (const struct peer_atom**) vb;
3667    const int data_time_cutoff_secs = 60 * 60;
3668    const time_t tr_now = tr_time( );
3669
3670    assert( tr_isAtom( a ) );
3671    assert( tr_isAtom( b ) );
3672
3673    /* primary key: the last piece data time *if* it was within the last hour */
3674    atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
3675    btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
3676    if( atime != btime )
3677        return atime > btime ? -1 : 1;
3678
3679    /* secondary key: shelf date. */
3680    if( a->shelf_date != b->shelf_date )
3681        return a->shelf_date > b->shelf_date ? -1 : 1;
3682
3683    return 0;
3684}
3685
3686static int
3687getMaxAtomCount( const tr_torrent * tor )
3688{
3689    const int n = tor->maxConnectedPeers;
3690    /* approximate fit of the old jump discontinuous function */
3691    if( n >= 55 ) return     n + 150;
3692    if( n >= 20 ) return 2 * n + 95;
3693    return               4 * n + 55;
3694}
3695
3696static void
3697atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
3698{
3699    tr_torrent * tor = NULL;
3700    tr_peerMgr * mgr = vmgr;
3701    managerLock( mgr );
3702
3703    while(( tor = tr_torrentNext( mgr->session, tor )))
3704    {
3705        int atomCount;
3706        Torrent * t = tor->torrentPeers;
3707        const int maxAtomCount = getMaxAtomCount( tor );
3708        struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
3709
3710        if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
3711        {
3712            int i;
3713            int keepCount = 0;
3714            int testCount = 0;
3715            struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
3716            struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
3717
3718            /* keep the ones that are in use */
3719            for( i=0; i<atomCount; ++i ) {
3720                struct peer_atom * atom = atoms[i];
3721                if( peerIsInUse( t, atom ) )
3722                    keep[keepCount++] = atom;
3723                else
3724                    test[testCount++] = atom;
3725            }
3726
3727            /* if there's room, keep the best of what's left */
3728            i = 0;
3729            if( keepCount < maxAtomCount ) {
3730                qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
3731                while( i<testCount && keepCount<maxAtomCount )
3732                    keep[keepCount++] = test[i++];
3733            }
3734
3735            /* free the culled atoms */
3736            while( i<testCount )
3737                tr_free( test[i++] );
3738
3739            /* rebuild Torrent.pool with what's left */
3740            tr_ptrArrayDestruct( &t->pool, NULL );
3741            t->pool = TR_PTR_ARRAY_INIT;
3742            qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
3743            for( i=0; i<keepCount; ++i )
3744                tr_ptrArrayAppend( &t->pool, keep[i] );
3745
3746            tordbg( t, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount );
3747
3748            /* cleanup */
3749            tr_free( test );
3750            tr_free( keep );
3751        }
3752    }
3753
3754    tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
3755    managerUnlock( mgr );
3756}
3757
3758/***
3759****
3760****
3761****
3762***/
3763
3764/* is this atom someone that we'd want to initiate a connection to? */
3765static bool
3766isPeerCandidate( const tr_torrent * tor, struct peer_atom * atom, const time_t now )
3767{
3768    /* not if we're both seeds */
3769    if( tr_torrentIsSeed( tor ) && atomIsSeed( atom ) )
3770        return false;
3771
3772    /* not if we've already got a connection to them... */
3773    if( peerIsInUse( tor->torrentPeers, atom ) )
3774        return false;
3775
3776    /* not if we just tried them already */
3777    if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
3778        return false;
3779
3780    /* not if they're blocklisted */
3781    if( isAtomBlocklisted( tor->session, atom ) )
3782        return false;
3783
3784    /* not if they're banned... */
3785    if( atom->flags2 & MYFLAG_BANNED )
3786        return false;
3787
3788    return true;
3789}
3790
3791struct peer_candidate
3792{
3793    uint64_t score;
3794    tr_torrent * tor;
3795    struct peer_atom * atom;
3796};
3797
3798static bool
3799torrentWasRecentlyStarted( const tr_torrent * tor )
3800{
3801    return difftime( tr_time( ), tor->startDate ) < 120;
3802}
3803
3804static inline uint64_t
3805addValToKey( uint64_t value, int width, uint64_t addme )
3806{
3807    value = (value << (uint64_t)width);
3808    value |= addme;
3809    return value;
3810}
3811
3812/* smaller value is better */
3813static uint64_t
3814getPeerCandidateScore( const tr_torrent * tor, const struct peer_atom * atom, uint8_t salt  )
3815{
3816    uint64_t i;
3817    uint64_t score = 0;
3818    const bool failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
3819
3820    /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
3821    i = failed ? 1 : 0;
3822    score = addValToKey( score, 1, i );
3823
3824    /* prefer the one we attempted least recently (to cycle through all peers) */
3825    i = atom->lastConnectionAttemptAt;
3826    score = addValToKey( score, 32, i );
3827
3828    /* prefer peers belonging to a torrent of a higher priority */
3829    switch( tr_torrentGetPriority( tor ) ) {
3830        case TR_PRI_HIGH:    i = 0; break;
3831        case TR_PRI_NORMAL:  i = 1; break;
3832        case TR_PRI_LOW:     i = 2; break;
3833    }
3834    score = addValToKey( score, 4, i );
3835
3836    /* prefer recently-started torrents */
3837    i = torrentWasRecentlyStarted( tor ) ? 0 : 1;
3838    score = addValToKey( score, 1, i );
3839
3840    /* prefer torrents we're downloading with */
3841    i = tr_torrentIsSeed( tor ) ? 1 : 0;
3842    score = addValToKey( score, 1, i );
3843
3844    /* prefer peers that are known to be connectible */
3845    i = ( atom->flags & ADDED_F_CONNECTABLE ) ? 0 : 1;
3846    score = addValToKey( score, 1, i );
3847
3848    /* prefer peers that we might have a chance of uploading to...
3849       so lower seed probability is better */
3850    if( atom->seedProbability == 100 ) i = 101;
3851    else if( atom->seedProbability == -1 ) i = 100;
3852    else i = atom->seedProbability;
3853    score = addValToKey( score, 8, i );
3854
3855    /* Prefer peers that we got from more trusted sources.
3856     * lower `fromBest' values indicate more trusted sources */
3857    score = addValToKey( score, 4, atom->fromBest );
3858
3859    /* salt */
3860    score = addValToKey( score, 8, salt );
3861
3862    return score;
3863}
3864
3865#ifndef NDEBUG
3866static int
3867checkPartition( const struct peer_candidate * candidates, int left, int right, uint64_t pivotScore, int storeIndex )
3868{
3869    int i;
3870
3871    assert( storeIndex >= left );
3872    assert( storeIndex <= right );
3873    assert( candidates[storeIndex].score == pivotScore );
3874
3875    for( i=left; i<storeIndex; ++i )
3876        assert( candidates[i].score < pivotScore );
3877    for( i=storeIndex+1; i<=right; ++i )
3878        assert( candidates[i].score >= pivotScore );
3879
3880    return true;
3881}
3882#endif
3883
3884/* Helper to selectBestCandidates().
3885 * Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3886static int
3887partitionPeerCandidates( struct peer_candidate * candidates, int left, int right, int pivotIndex )
3888{
3889    int i;
3890    int storeIndex;
3891    struct peer_candidate tmp;
3892    const struct peer_candidate pivotValue = candidates[pivotIndex];
3893
3894    /* move pivot to end */
3895    tmp = candidates[right];
3896    candidates[right] = pivotValue;
3897    candidates[pivotIndex] = tmp;
3898
3899    storeIndex = left;
3900    for( i=left; i<=right; ++i )
3901    {
3902        if( candidates[i].score < pivotValue.score )
3903        {
3904            tmp = candidates[storeIndex];
3905            candidates[storeIndex] = candidates[i];
3906            candidates[i] = tmp;
3907            storeIndex++;
3908        }
3909    }
3910
3911    /* move pivot to its final place */
3912    tmp = candidates[right];
3913    candidates[right] = candidates[storeIndex];
3914    candidates[storeIndex] = tmp;
3915
3916    /* sanity check */
3917    assert( checkPartition( candidates, left, right, pivotValue.score, storeIndex ) );
3918
3919    return storeIndex;
3920}
3921
3922/* Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
3923static void
3924selectPeerCandidates( struct peer_candidate * candidates, int left, int right, int k )
3925{
3926    if( right > left )
3927    {
3928        const int pivotIndex = left + (right-left)/2;
3929
3930        int pivotNewIndex = partitionPeerCandidates( candidates, left, right, pivotIndex );
3931
3932        if( pivotNewIndex > left + k ) /* new condition */
3933            selectPeerCandidates( candidates, left, pivotNewIndex-1, k );
3934        else if( pivotNewIndex < left + k )
3935            selectPeerCandidates( candidates, pivotNewIndex+1, right, k+left-pivotNewIndex-1 );
3936    }
3937}
3938
3939#ifndef NDEBUG
3940static bool
3941checkBestScoresComeFirst( const struct peer_candidate * candidates, int n, int k )
3942{
3943    int i;
3944    uint64_t worstFirstScore = 0;
3945    const int x = MIN( n, k ) - 1;
3946
3947    for( i=0; i<x; i++ )
3948        if( worstFirstScore < candidates[i].score )
3949            worstFirstScore = candidates[i].score;
3950
3951    for( i=0; i<x; i++ )
3952        assert( candidates[i].score <= worstFirstScore );
3953
3954    for( i=x+1; i<n; i++ )
3955        assert( candidates[i].score >= worstFirstScore );
3956
3957    return true;
3958}
3959#endif /* NDEBUG */
3960
3961/** @return an array of all the atoms we might want to connect to */
3962static struct peer_candidate*
3963getPeerCandidates( tr_session * session, int * candidateCount, int max )
3964{
3965    int atomCount;
3966    int peerCount;
3967    tr_torrent * tor;
3968    struct peer_candidate * candidates;
3969    struct peer_candidate * walk;
3970    const time_t now = tr_time( );
3971    const uint64_t now_msec = tr_time_msec( );
3972    /* leave 5% of connection slots for incoming connections -- ticket #2609 */
3973    const int maxCandidates = tr_sessionGetPeerLimit( session ) * 0.95;
3974
3975    /* count how many peers and atoms we've got */
3976    tor= NULL;
3977    atomCount = 0;
3978    peerCount = 0;
3979    while(( tor = tr_torrentNext( session, tor ))) {
3980        atomCount += tr_ptrArraySize( &tor->torrentPeers->pool );
3981        peerCount += tr_ptrArraySize( &tor->torrentPeers->peers );
3982    }
3983
3984    /* don't start any new handshakes if we're full up */
3985    if( maxCandidates <= peerCount ) {
3986        *candidateCount = 0;
3987        return NULL;
3988    }
3989
3990    /* allocate an array of candidates */
3991    walk = candidates = tr_new( struct peer_candidate, atomCount );
3992
3993    /* populate the candidate array */
3994    tor = NULL;
3995    while(( tor = tr_torrentNext( session, tor )))
3996    {
3997        int i, nAtoms;
3998        struct peer_atom ** atoms;
3999
4000        if( !tor->torrentPeers->isRunning )
4001            continue;
4002
4003        /* if we've already got enough peers in this torrent... */
4004        if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
4005            continue;
4006
4007        /* if we've already got enough speed in this torrent... */
4008        if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( &tor->bandwidth, now_msec, TR_UP ) )
4009            continue;
4010
4011        atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
4012        for( i=0; i<nAtoms; ++i )
4013        {
4014            struct peer_atom * atom = atoms[i];
4015
4016            if( isPeerCandidate( tor, atom, now ) )
4017            {
4018                const uint8_t salt = tr_cryptoWeakRandInt( 1024 );
4019                walk->tor = tor;
4020                walk->atom = atom;
4021                walk->score = getPeerCandidateScore( tor, atom, salt );
4022                ++walk;
4023            }
4024        }
4025    }
4026
4027    *candidateCount = walk - candidates;
4028    if( walk != candidates )
4029        selectPeerCandidates( candidates, 0, (walk-candidates)-1, max );
4030
4031    assert( checkBestScoresComeFirst( candidates, *candidateCount, max ) );
4032
4033    return candidates;
4034}
4035
4036static void
4037initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
4038{
4039    tr_peerIo * io;
4040    const time_t now = tr_time( );
4041    bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
4042
4043    if( atom->fromFirst == TR_PEER_FROM_PEX )
4044        /* PEX has explicit signalling for uTP support.  If an atom
4045           originally came from PEX and doesn't have the uTP flag, skip the
4046           uTP connection attempt.  Are we being optimistic here? */
4047        utp = utp && (atom->flags & ADDED_F_UTP_FLAGS);
4048
4049    tordbg( t, "Starting an OUTGOING%s connection with %s",
4050            utp ? " ��TP" : "",
4051            tr_atomAddrStr( atom ) );
4052
4053    io = tr_peerIoNewOutgoing( mgr->session,
4054                               &mgr->session->bandwidth,
4055                               &atom->addr,
4056                               atom->port,
4057                               t->tor->info.hash,
4058                               t->tor->completeness == TR_SEED,
4059                               utp );
4060
4061    if( io == NULL )
4062    {
4063        tordbg( t, "peerIo not created; marking peer %s as unreachable",
4064                tr_atomAddrStr( atom ) );
4065        atom->flags2 |= MYFLAG_UNREACHABLE;
4066        atom->numFails++;
4067    }
4068    else
4069    {
4070        tr_handshake * handshake = tr_handshakeNew( io,
4071                                                    mgr->session->encryptionMode,
4072                                                    myHandshakeDoneCB,
4073                                                    mgr );
4074
4075        assert( tr_peerIoGetTorrentHash( io ) );
4076
4077        tr_peerIoUnref( io ); /* balanced by the initial ref
4078                                 in tr_peerIoNewOutgoing() */
4079
4080        tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
4081                                 handshakeCompare );
4082    }
4083
4084    atom->lastConnectionAttemptAt = now;
4085    atom->time = now;
4086}
4087
4088static void
4089initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
4090{
4091#if 0
4092    fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
4093             tr_atomAddrStr( c->atom ),
4094             tr_torrentName( c->tor ),
4095             (int)c->atom->seedProbability,
4096             tr_torrentIsPrivate( c->tor ) ? "private" : "public",
4097             tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
4098#endif
4099
4100    initiateConnection( mgr, c->tor->torrentPeers, c->atom );
4101}
4102
4103static void
4104makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
4105{
4106    int i, n;
4107    struct peer_candidate * candidates;
4108
4109    candidates = getPeerCandidates( mgr->session, &n, max );
4110
4111    for( i=0; i<n && i<max; ++i )
4112        initiateCandidateConnection( mgr, &candidates[i] );
4113
4114    tr_free( candidates );
4115}
4116