• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /netgear-R7000-V1.0.7.12_1.2.5/ap/gpl/transmission/transmission-2.73/libtransmission/
1/*
2 * This file Copyright (C) Mnemosyne LLC
3 *
4 * This file is licensed by the GPL version 2. Works owned by the
5 * Transmission project are granted a special exemption to clause 2(b)
6 * so that the bulk of its code can remain under the MIT license.
7 * This exemption does not extend to derived works not owned by
8 * the Transmission project.
9 *
10 * $Id: peer-io.c 13464 2012-09-05 11:39:57Z livings124 $
11 */
12
13#include <assert.h>
14#include <errno.h>
15#include <string.h>
16
17#include <event2/event.h>
18#include <event2/buffer.h>
19#include <event2/bufferevent.h>
20
21#include <libutp/utp.h>
22
23#include "transmission.h"
24#include "session.h"
25#include "bandwidth.h"
26#include "crypto.h"
27#include "net.h"
28#include "peer-common.h" /* MAX_BLOCK_SIZE */
29#include "peer-io.h"
30#include "trevent.h" /* tr_runInEventThread() */
31#include "tr-utp.h"
32#include "utils.h"
33
34
35#ifdef WIN32
36 #define EAGAIN       WSAEWOULDBLOCK
37 #define EINTR        WSAEINTR
38 #define EINPROGRESS  WSAEINPROGRESS
39 #define EPIPE        WSAECONNRESET
40#endif
41
42/* The amount of read bufferring that we allow for uTP sockets. */
43
44#define UTP_READ_BUFFER_SIZE (256 * 1024)
45
46static size_t
47guessPacketOverhead( size_t d )
48{
49    /**
50     * http://sd.wareonearth.com/~phil/net/overhead/
51     *
52     * TCP over Ethernet:
53     * Assuming no header compression (e.g. not PPP)
54     * Add 20 IPv4 header or 40 IPv6 header (no options)
55     * Add 20 TCP header
56     * Add 12 bytes optional TCP timestamps
57     * Max TCP Payload data rates over ethernet are thus:
58     *  (1500-40)/(38+1500) = 94.9285 %  IPv4, minimal headers
59     *  (1500-52)/(38+1500) = 94.1482 %  IPv4, TCP timestamps
60     *  (1500-52)/(42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
61     *  (1500-60)/(38+1500) = 93.6281 %  IPv6, minimal headers
62     *  (1500-72)/(38+1500) = 92.8479 %  IPv6, TCP timestamps
63     *  (1500-72)/(42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
64     */
65    const double assumed_payload_data_rate = 94.0;
66
67    return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d );
68}
69
70/**
71***
72**/
73
74#define dbgmsg( io, ... ) \
75    do { \
76        if( tr_deepLoggingIsActive( ) ) \
77            tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
78    } while( 0 )
79
80/**
81***
82**/
83
84struct tr_datatype
85{
86    struct tr_datatype * next;
87    size_t length;
88    bool isPieceData;
89};
90
91static struct tr_datatype * datatype_pool = NULL;
92
93static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false };
94
95static struct tr_datatype *
96datatype_new( void )
97{
98    struct tr_datatype * ret;
99
100    if( datatype_pool == NULL )
101        ret = tr_new( struct tr_datatype, 1 );
102    else {
103        ret = datatype_pool;
104        datatype_pool = datatype_pool->next;
105    }
106
107    *ret = TR_DATATYPE_INIT;
108    return ret;
109}
110
111static void
112datatype_free( struct tr_datatype * datatype )
113{
114    datatype->next = datatype_pool;
115    datatype_pool = datatype;
116}
117
118static void
119peer_io_pull_datatype( tr_peerIo * io )
120{
121    struct tr_datatype * tmp;
122
123    if(( tmp = io->outbuf_datatypes ))
124    {
125        io->outbuf_datatypes = tmp->next;
126        datatype_free( tmp );
127    }
128}
129
130static void
131peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype )
132{
133    struct tr_datatype * tmp;
134
135    if(( tmp = io->outbuf_datatypes )) {
136        while( tmp->next != NULL )
137            tmp = tmp->next;
138        tmp->next = datatype;
139    } else {
140        io->outbuf_datatypes = datatype;
141    }
142}
143
144/***
145****
146***/
147
148static void
149didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred )
150{
151     while( bytes_transferred && tr_isPeerIo( io ) )
152     {
153        struct tr_datatype * next = io->outbuf_datatypes;
154
155        const unsigned int payload = MIN( next->length, bytes_transferred );
156        /* For uTP sockets, the overhead is computed in utp_on_overhead. */
157        const unsigned int overhead =
158            io->socket ? guessPacketOverhead( payload ) : 0;
159        const uint64_t now = tr_time_msec( );
160
161        tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now );
162
163        if( overhead > 0 )
164            tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
165
166        if( io->didWrite )
167            io->didWrite( io, payload, next->isPieceData, io->userData );
168
169        if( tr_isPeerIo( io ) )
170        {
171            bytes_transferred -= payload;
172            next->length -= payload;
173            if( !next->length )
174                peer_io_pull_datatype( io );
175        }
176    }
177}
178
179static void
180canReadWrapper( tr_peerIo * io )
181{
182    bool err = 0;
183    bool done = 0;
184    tr_session * session;
185
186    dbgmsg( io, "canRead" );
187
188    tr_peerIoRef( io );
189
190    session = io->session;
191
192    /* try to consume the input buffer */
193    if( io->canRead )
194    {
195        const uint64_t now = tr_time_msec( );
196
197        tr_sessionLock( session );
198
199        while( !done && !err )
200        {
201            size_t piece = 0;
202            const size_t oldLen = evbuffer_get_length( io->inbuf );
203            const int ret = io->canRead( io, io->userData, &piece );
204            const size_t used = oldLen - evbuffer_get_length( io->inbuf );
205            const unsigned int overhead = guessPacketOverhead( used );
206
207            if( piece || (piece!=used) )
208            {
209                if( piece )
210                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now );
211
212                if( used != piece )
213                    tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now );
214            }
215
216            if( overhead > 0 )
217                tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now );
218
219            switch( ret )
220            {
221                case READ_NOW:
222                    if( evbuffer_get_length( io->inbuf ) )
223                        continue;
224                    done = 1;
225                    break;
226
227                case READ_LATER:
228                    done = 1;
229                    break;
230
231                case READ_ERR:
232                    err = 1;
233                    break;
234            }
235
236            assert( tr_isPeerIo( io ) );
237        }
238
239        tr_sessionUnlock( session );
240    }
241
242    tr_peerIoUnref( io );
243}
244
245static void
246event_read_cb( int fd, short event UNUSED, void * vio )
247{
248    int res;
249    int e;
250    tr_peerIo * io = vio;
251
252    /* Limit the input buffer to 256K, so it doesn't grow too large */
253    unsigned int howmuch;
254    unsigned int curlen;
255    const tr_direction dir = TR_DOWN;
256    const unsigned int max = 256 * 1024;
257
258    assert( tr_isPeerIo( io ) );
259    assert( io->socket >= 0 );
260
261    io->pendingEvents &= ~EV_READ;
262
263    curlen = evbuffer_get_length( io->inbuf );
264    howmuch = curlen >= max ? 0 : max - curlen;
265    howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch );
266
267    dbgmsg( io, "libevent says this peer is ready to read" );
268
269    /* if we don't have any bandwidth left, stop reading */
270    if( howmuch < 1 ) {
271        tr_peerIoSetEnabled( io, dir, false );
272        return;
273    }
274
275    EVUTIL_SET_SOCKET_ERROR( 0 );
276    res = evbuffer_read( io->inbuf, fd, (int)howmuch );
277    e = EVUTIL_SOCKET_ERROR( );
278
279    if( res > 0 )
280    {
281        tr_peerIoSetEnabled( io, dir, true );
282
283        /* Invoke the user callback - must always be called last */
284        canReadWrapper( io );
285    }
286    else
287    {
288        char errstr[512];
289        short what = BEV_EVENT_READING;
290
291        if( res == 0 ) /* EOF */
292            what |= BEV_EVENT_EOF;
293        else if( res == -1 ) {
294            if( e == EAGAIN || e == EINTR ) {
295                tr_peerIoSetEnabled( io, dir, true );
296                return;
297            }
298            what |= BEV_EVENT_ERROR;
299        }
300
301        dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)",
302                res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
303
304        if( io->gotError != NULL )
305            io->gotError( io, what, io->userData );
306    }
307}
308
309static int
310tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
311{
312    int e;
313    int n;
314    char errstr[256];
315
316    EVUTIL_SET_SOCKET_ERROR( 0 );
317    n = evbuffer_write_atmost( io->outbuf, fd, howmuch );
318    e = EVUTIL_SOCKET_ERROR( );
319    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") );
320
321    return n;
322}
323
324static void
325event_write_cb( int fd, short event UNUSED, void * vio )
326{
327    int res = 0;
328    int e;
329    short what = BEV_EVENT_WRITING;
330    tr_peerIo * io = vio;
331    size_t howmuch;
332    const tr_direction dir = TR_UP;
333    char errstr[1024];
334
335    assert( tr_isPeerIo( io ) );
336    assert( io->socket >= 0 );
337
338    io->pendingEvents &= ~EV_WRITE;
339
340    dbgmsg( io, "libevent says this peer is ready to write" );
341
342    /* Write as much as possible, since the socket is non-blocking, write() will
343     * return if it can't write any more data without blocking */
344    howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) );
345
346    /* if we don't have any bandwidth left, stop writing */
347    if( howmuch < 1 ) {
348        tr_peerIoSetEnabled( io, dir, false );
349        return;
350    }
351
352    EVUTIL_SET_SOCKET_ERROR( 0 );
353    res = tr_evbuffer_write( io, fd, howmuch );
354    e = EVUTIL_SOCKET_ERROR( );
355
356    if (res == -1) {
357        if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS)
358            goto reschedule;
359        /* error case */
360        what |= BEV_EVENT_ERROR;
361    } else if (res == 0) {
362        /* eof case */
363        what |= BEV_EVENT_EOF;
364    }
365    if (res <= 0)
366        goto error;
367
368    if( evbuffer_get_length( io->outbuf ) )
369        tr_peerIoSetEnabled( io, dir, true );
370
371    didWriteWrapper( io, res );
372    return;
373
374 reschedule:
375    if( evbuffer_get_length( io->outbuf ) )
376        tr_peerIoSetEnabled( io, dir, true );
377    return;
378
379 error:
380
381    tr_net_strerror( errstr, sizeof( errstr ), e );
382    dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr );
383
384    if( io->gotError != NULL )
385        io->gotError( io, what, io->userData );
386}
387
388/**
389***
390**/
391
392static void
393maybeSetCongestionAlgorithm( int socket, const char * algorithm )
394{
395    if( algorithm && *algorithm )
396    {
397        const int rc = tr_netSetCongestionControl( socket, algorithm );
398
399        if( rc < 0 )
400            tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s",
401                     algorithm, tr_strerror( errno ));
402    }
403}
404
405#ifdef WITH_UTP
406/* UTP callbacks */
407
408static void
409utp_on_read(void *closure, const unsigned char *buf, size_t buflen)
410{
411    int rc;
412    tr_peerIo *io = closure;
413    assert( tr_isPeerIo( io ) );
414
415    rc = evbuffer_add( io->inbuf, buf, buflen );
416    dbgmsg( io, "utp_on_read got %zu bytes", buflen );
417
418    if( rc < 0 ) {
419        tr_nerr( "UTP", "On read evbuffer_add" );
420        return;
421    }
422
423    tr_peerIoSetEnabled( io, TR_DOWN, true );
424    canReadWrapper( io );
425}
426
427static void
428utp_on_write(void *closure, unsigned char *buf, size_t buflen)
429{
430    int rc;
431    tr_peerIo *io = closure;
432    assert( tr_isPeerIo( io ) );
433
434    rc = evbuffer_remove( io->outbuf, buf, buflen );
435    dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc );
436    assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */
437    if( rc < (long)buflen ) {
438        tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen);
439    }
440
441    didWriteWrapper( io, buflen );
442}
443
444static size_t
445utp_get_rb_size(void *closure)
446{
447    size_t bytes;
448    tr_peerIo *io = closure;
449    assert( tr_isPeerIo( io ) );
450
451    bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE );
452
453    dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes );
454    return UTP_READ_BUFFER_SIZE - bytes;
455}
456
457static int tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch );
458
459static void
460utp_on_writable( tr_peerIo *io )
461{
462    int n;
463
464    dbgmsg( io, "libutp says this peer is ready to write" );
465
466    n = tr_peerIoTryWrite( io, SIZE_MAX );
467    tr_peerIoSetEnabled( io, TR_UP, n && evbuffer_get_length( io->outbuf ) );
468}
469
470static void
471utp_on_state_change(void *closure, int state)
472{
473    tr_peerIo *io = closure;
474    assert( tr_isPeerIo( io ) );
475
476    if( state == UTP_STATE_CONNECT ) {
477        dbgmsg( io, "utp_on_state_change -- changed to connected" );
478        io->utpSupported = true;
479    } else if( state == UTP_STATE_WRITABLE ) {
480        dbgmsg( io, "utp_on_state_change -- changed to writable" );
481        if ( io->pendingEvents & EV_WRITE )
482            utp_on_writable( io );
483    } else if( state == UTP_STATE_EOF ) {
484        if( io->gotError )
485            io->gotError( io, BEV_EVENT_EOF, io->userData );
486    } else if( state == UTP_STATE_DESTROYING ) {
487        tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" );
488        return;
489    } else {
490        tr_nerr( "UTP", "Unknown state %d", state );
491    }
492}
493
494static void
495utp_on_error(void *closure, int errcode)
496{
497    tr_peerIo *io = closure;
498    assert( tr_isPeerIo( io ) );
499
500    dbgmsg( io, "utp_on_error -- errcode is %d", errcode );
501
502    if( io->gotError ) {
503        errno = errcode;
504        io->gotError( io, BEV_EVENT_ERROR, io->userData );
505    }
506}
507
508static void
509utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED)
510{
511    tr_peerIo *io = closure;
512    assert( tr_isPeerIo( io ) );
513
514    dbgmsg( io, "utp_on_overhead -- count is %zu", count );
515
516    tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN,
517                      count, false, tr_time_msec() );
518}
519
520static struct UTPFunctionTable utp_function_table = {
521    .on_read = utp_on_read,
522    .on_write = utp_on_write,
523    .get_rb_size = utp_get_rb_size,
524    .on_state = utp_on_state_change,
525    .on_error = utp_on_error,
526    .on_overhead = utp_on_overhead
527};
528
529
530/* Dummy UTP callbacks. */
531/* We switch a UTP socket to use these after the associated peerIo has been
532   destroyed -- see io_dtor. */
533
534static void
535dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED )
536{
537    /* This cannot happen, as far as I'm aware. */
538    tr_nerr( "UTP", "On_read called on closed socket" );
539
540}
541
542static void
543dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen)
544{
545    /* This can very well happen if we've shut down a peer connection that
546       had unflushed buffers.  Complain and send zeroes. */
547    tr_ndbg( "UTP", "On_write called on closed socket" );
548    memset( buf, 0, buflen );
549}
550
551static size_t
552dummy_get_rb_size( void * closure UNUSED )
553{
554    return 0;
555}
556
557static void
558dummy_on_state_change(void * closure UNUSED, int state UNUSED )
559{
560    return;
561}
562
563static void
564dummy_on_error( void * closure UNUSED, int errcode UNUSED )
565{
566    return;
567}
568
569static void
570dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED )
571{
572    return;
573}
574
575static struct UTPFunctionTable dummy_utp_function_table = {
576    .on_read = dummy_read,
577    .on_write = dummy_write,
578    .get_rb_size = dummy_get_rb_size,
579    .on_state = dummy_on_state_change,
580    .on_error = dummy_on_error,
581    .on_overhead = dummy_on_overhead
582};
583
584#endif /* #ifdef WITH_UTP */
585
586static tr_peerIo*
587tr_peerIoNew( tr_session       * session,
588              tr_bandwidth     * parent,
589              const tr_address * addr,
590              tr_port            port,
591              const uint8_t    * torrentHash,
592              bool               isIncoming,
593              bool               isSeed,
594              int                socket,
595              struct UTPSocket * utp_socket)
596{
597    tr_peerIo * io;
598
599    assert( session != NULL );
600    assert( session->events != NULL );
601    assert( tr_isBool( isIncoming ) );
602    assert( tr_isBool( isSeed ) );
603    assert( tr_amInEventThread( session ) );
604    assert( (socket < 0) == (utp_socket != NULL) );
605#ifndef WITH_UTP
606    assert( socket >= 0 );
607#endif
608
609    if( socket >= 0 ) {
610        tr_netSetTOS( socket, session->peerSocketTOS );
611        maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm );
612    }
613
614    io = tr_new0( tr_peerIo, 1 );
615    io->magicNumber = PEER_IO_MAGIC_NUMBER;
616    io->refCount = 1;
617    tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming );
618    io->session = session;
619    io->addr = *addr;
620    io->isSeed = isSeed;
621    io->port = port;
622    io->socket = socket;
623    io->utp_socket = utp_socket;
624    io->isIncoming = isIncoming != 0;
625    io->timeCreated = tr_time( );
626    io->inbuf = evbuffer_new( );
627    io->outbuf = evbuffer_new( );
628    tr_bandwidthConstruct( &io->bandwidth, session, parent );
629    tr_bandwidthSetPeer( &io->bandwidth, io );
630    dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent );
631    dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket );
632
633    if( io->socket >= 0 ) {
634        io->event_read = event_new( session->event_base,
635                                    io->socket, EV_READ, event_read_cb, io );
636        io->event_write = event_new( session->event_base,
637                                     io->socket, EV_WRITE, event_write_cb, io );
638    }
639#ifdef WITH_UTP
640    else {
641        UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE );
642        dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" );
643        UTP_SetCallbacks( utp_socket,
644                          &utp_function_table,
645                          io );
646        if( !isIncoming ) {
647            dbgmsg( io, "%s", "calling UTP_Connect" );
648            UTP_Connect( utp_socket );
649        }
650    }
651#endif
652
653    return io;
654}
655
656tr_peerIo*
657tr_peerIoNewIncoming( tr_session        * session,
658                      tr_bandwidth      * parent,
659                      const tr_address  * addr,
660                      tr_port             port,
661                      int                 fd,
662                      struct UTPSocket  * utp_socket )
663{
664    assert( session );
665    assert( tr_address_is_valid( addr ) );
666
667    return tr_peerIoNew( session, parent, addr, port, NULL, true, false,
668                         fd, utp_socket );
669}
670
671tr_peerIo*
672tr_peerIoNewOutgoing( tr_session        * session,
673                      tr_bandwidth      * parent,
674                      const tr_address  * addr,
675                      tr_port             port,
676                      const uint8_t     * torrentHash,
677                      bool                isSeed,
678                      bool                utp )
679{
680    int fd = -1;
681    struct UTPSocket * utp_socket = NULL;
682
683    assert( session );
684    assert( tr_address_is_valid( addr ) );
685    assert( torrentHash );
686
687    if( utp )
688        utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed );
689
690    if( !utp_socket ) {
691        fd = tr_netOpenPeerSocket( session, addr, port, isSeed );
692        dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd );
693    }
694
695    if( fd < 0 && utp_socket == NULL )
696        return NULL;
697
698    return tr_peerIoNew( session, parent, addr, port,
699                         torrentHash, false, isSeed, fd, utp_socket );
700}
701
702/***
703****
704***/
705
706static void
707event_enable( tr_peerIo * io, short event )
708{
709    assert( tr_amInEventThread( io->session ) );
710    assert( io->session != NULL );
711    assert( io->session->events != NULL );
712
713    if( io->socket >= 0 )
714    {
715        assert( event_initialized( io->event_read ) );
716        assert( event_initialized( io->event_write ) );
717    }
718
719    if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) )
720    {
721        dbgmsg( io, "enabling ready-to-read polling" );
722        if( io->socket >= 0 )
723            event_add( io->event_read, NULL );
724        io->pendingEvents |= EV_READ;
725    }
726
727    if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) )
728    {
729        dbgmsg( io, "enabling ready-to-write polling" );
730        if( io->socket >= 0 )
731            event_add( io->event_write, NULL );
732        io->pendingEvents |= EV_WRITE;
733    }
734}
735
736static void
737event_disable( struct tr_peerIo * io, short event )
738{
739    assert( tr_amInEventThread( io->session ) );
740    assert( io->session != NULL );
741    assert( io->session->events != NULL );
742
743    if( io->socket >= 0 )
744    {
745        assert( event_initialized( io->event_read ) );
746        assert( event_initialized( io->event_write ) );
747    }
748
749    if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) )
750    {
751        dbgmsg( io, "disabling ready-to-read polling" );
752        if( io->socket >= 0 )
753            event_del( io->event_read );
754        io->pendingEvents &= ~EV_READ;
755    }
756
757    if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) )
758    {
759        dbgmsg( io, "disabling ready-to-write polling" );
760        if( io->socket >= 0 )
761            event_del( io->event_write );
762        io->pendingEvents &= ~EV_WRITE;
763    }
764}
765
766void
767tr_peerIoSetEnabled( tr_peerIo    * io,
768                     tr_direction   dir,
769                     bool           isEnabled )
770{
771    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
772
773    assert( tr_isPeerIo( io ) );
774    assert( tr_isDirection( dir ) );
775    assert( tr_amInEventThread( io->session ) );
776    assert( io->session->events != NULL );
777
778    if( isEnabled )
779        event_enable( io, event );
780    else
781        event_disable( io, event );
782}
783
784/***
785****
786***/
787static void
788io_close_socket( tr_peerIo * io )
789{
790    if( io->socket >= 0 ) {
791        tr_netClose( io->session, io->socket );
792        io->socket = -1;
793    }
794
795    if( io->event_read != NULL) {
796        event_free( io->event_read );
797        io->event_read = NULL;
798    }
799
800    if( io->event_write != NULL) {
801        event_free( io->event_write );
802        io->event_write = NULL;
803    }
804
805#ifdef WITH_UTP
806    if( io->utp_socket ) {
807        UTP_SetCallbacks( io->utp_socket,
808                          &dummy_utp_function_table,
809                          NULL );
810        UTP_Close( io->utp_socket );
811
812        io->utp_socket = NULL;
813    }
814#endif
815}
816
817static void
818io_dtor( void * vio )
819{
820    tr_peerIo * io = vio;
821
822    assert( tr_isPeerIo( io ) );
823    assert( tr_amInEventThread( io->session ) );
824    assert( io->session->events != NULL );
825
826    dbgmsg( io, "in tr_peerIo destructor" );
827    event_disable( io, EV_READ | EV_WRITE );
828    tr_bandwidthDestruct( &io->bandwidth );
829    evbuffer_free( io->outbuf );
830    evbuffer_free( io->inbuf );
831    io_close_socket( io );
832    tr_cryptoDestruct( &io->crypto );
833
834    while( io->outbuf_datatypes != NULL )
835        peer_io_pull_datatype( io );
836
837    memset( io, ~0, sizeof( tr_peerIo ) );
838    tr_free( io );
839}
840
841static void
842tr_peerIoFree( tr_peerIo * io )
843{
844    if( io )
845    {
846        dbgmsg( io, "in tr_peerIoFree" );
847        io->canRead = NULL;
848        io->didWrite = NULL;
849        io->gotError = NULL;
850        tr_runInEventThread( io->session, io_dtor, io );
851    }
852}
853
854void
855tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io )
856{
857    assert( tr_isPeerIo( io ) );
858
859    dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d",
860                file, line, io->refCount, io->refCount+1 );
861
862    ++io->refCount;
863}
864
865void
866tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io )
867{
868    assert( tr_isPeerIo( io ) );
869
870    dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d",
871                file, line, io->refCount, io->refCount-1 );
872
873    if( !--io->refCount )
874        tr_peerIoFree( io );
875}
876
877const tr_address*
878tr_peerIoGetAddress( const tr_peerIo * io, tr_port   * port )
879{
880    assert( tr_isPeerIo( io ) );
881
882    if( port )
883        *port = io->port;
884
885    return &io->addr;
886}
887
888const char*
889tr_peerIoAddrStr( const tr_address * addr, tr_port port )
890{
891    static char buf[512];
892    tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) );
893    return buf;
894}
895
896const char* tr_peerIoGetAddrStr( const tr_peerIo * io )
897{
898    return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error";
899}
900
901void
902tr_peerIoSetIOFuncs( tr_peerIo        * io,
903                     tr_can_read_cb     readcb,
904                     tr_did_write_cb    writecb,
905                     tr_net_error_cb    errcb,
906                     void             * userData )
907{
908    io->canRead = readcb;
909    io->didWrite = writecb;
910    io->gotError = errcb;
911    io->userData = userData;
912}
913
914void
915tr_peerIoClear( tr_peerIo * io )
916{
917    tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL );
918    tr_peerIoSetEnabled( io, TR_UP, false );
919    tr_peerIoSetEnabled( io, TR_DOWN, false );
920}
921
922int
923tr_peerIoReconnect( tr_peerIo * io )
924{
925    short int pendingEvents;
926    tr_session * session;
927
928    assert( tr_isPeerIo( io ) );
929    assert( !tr_peerIoIsIncoming( io ) );
930
931    session = tr_peerIoGetSession( io );
932
933    pendingEvents = io->pendingEvents;
934    event_disable( io, EV_READ | EV_WRITE );
935
936    io_close_socket( io );
937
938    io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed );
939    io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io );
940    io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io );
941
942    if( io->socket >= 0 )
943    {
944        event_enable( io, pendingEvents );
945        tr_netSetTOS( io->socket, session->peerSocketTOS );
946        maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm );
947        return 0;
948    }
949
950    return -1;
951}
952
953/**
954***
955**/
956
957void
958tr_peerIoSetTorrentHash( tr_peerIo *     io,
959                         const uint8_t * hash )
960{
961    assert( tr_isPeerIo( io ) );
962
963    tr_cryptoSetTorrentHash( &io->crypto, hash );
964}
965
966const uint8_t*
967tr_peerIoGetTorrentHash( tr_peerIo * io )
968{
969    assert( tr_isPeerIo( io ) );
970
971    return tr_cryptoGetTorrentHash( &io->crypto );
972}
973
974int
975tr_peerIoHasTorrentHash( const tr_peerIo * io )
976{
977    assert( tr_isPeerIo( io ) );
978
979    return tr_cryptoHasTorrentHash( &io->crypto );
980}
981
982/**
983***
984**/
985
986void
987tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id )
988{
989    assert( tr_isPeerIo( io ) );
990
991    if( ( io->peerIdIsSet = peer_id != NULL ) )
992        memcpy( io->peerId, peer_id, 20 );
993    else
994        memset( io->peerId, 0, 20 );
995}
996
997/**
998***
999**/
1000
1001static unsigned int
1002getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now )
1003{
1004    /* this is all kind of arbitrary, but what seems to work well is
1005     * being large enough to hold the next 20 seconds' worth of input,
1006     * or a few blocks, whichever is bigger.
1007     * It's okay to tweak this as needed */
1008    const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP );
1009    const unsigned int period = 15u; /* arbitrary */
1010    /* the 3 is arbitrary; the .5 is to leave room for messages */
1011    static const unsigned int ceiling =  (unsigned int)( MAX_BLOCK_SIZE * 3.5 );
1012    return MAX( ceiling, currentSpeed_Bps*period );
1013}
1014
1015size_t
1016tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now )
1017{
1018    const size_t desiredLen = getDesiredOutputBufferSize( io, now );
1019    const size_t currentLen = evbuffer_get_length( io->outbuf );
1020    size_t freeSpace = 0;
1021
1022    if( desiredLen > currentLen )
1023        freeSpace = desiredLen - currentLen;
1024
1025    return freeSpace;
1026}
1027
1028/**
1029***
1030**/
1031
1032void
1033tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type )
1034{
1035    assert( tr_isPeerIo( io ) );
1036    assert( encryption_type == PEER_ENCRYPTION_NONE
1037         || encryption_type == PEER_ENCRYPTION_RC4 );
1038
1039    io->encryption_type = encryption_type;
1040}
1041
1042/**
1043***
1044**/
1045
1046static void
1047addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData )
1048{
1049    struct tr_datatype * d;
1050    d = datatype_new( );
1051    d->isPieceData = isPieceData != 0;
1052    d->length = byteCount;
1053    peer_io_push_datatype( io, d );
1054}
1055
1056static void
1057maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf )
1058{
1059    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1060    {
1061        struct evbuffer_ptr pos;
1062        struct evbuffer_iovec iovec;
1063        evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET );
1064        do {
1065            evbuffer_peek( buf, -1, &pos, &iovec, 1 );
1066            tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1067        } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1068    }
1069}
1070
1071void
1072tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData )
1073{
1074    const size_t byteCount = evbuffer_get_length( buf );
1075    maybeEncryptBuffer( io, buf );
1076    evbuffer_add_buffer( io->outbuf, buf );
1077    addDatatype( io, byteCount, isPieceData );
1078}
1079
1080void
1081tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData )
1082{
1083    struct evbuffer_iovec iovec;
1084    evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 );
1085
1086    iovec.iov_len = byteCount;
1087    if( io->encryption_type == PEER_ENCRYPTION_RC4 )
1088        tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base );
1089    else
1090        memcpy( iovec.iov_base, bytes, iovec.iov_len );
1091    evbuffer_commit_space( io->outbuf, &iovec, 1 );
1092
1093    addDatatype( io, byteCount, isPieceData );
1094}
1095
1096/***
1097****
1098***/
1099
1100void
1101evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte )
1102{
1103    evbuffer_add( outbuf, &byte, 1 );
1104}
1105
1106void
1107evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs )
1108{
1109    const uint16_t ns = htons( addme_hs );
1110    evbuffer_add( outbuf, &ns, sizeof( ns ) );
1111}
1112
1113void
1114evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl )
1115{
1116    const uint32_t nl = htonl( addme_hl );
1117    evbuffer_add( outbuf, &nl, sizeof( nl ) );
1118}
1119
1120void
1121evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll )
1122{
1123    const uint64_t nll = tr_htonll( addme_hll );
1124    evbuffer_add( outbuf, &nll, sizeof( nll ) );
1125}
1126
1127/***
1128****
1129***/
1130
1131void
1132tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount )
1133{
1134    struct evbuffer * tmp;
1135    const size_t old_length = evbuffer_get_length( outbuf );
1136
1137    assert( tr_isPeerIo( io ) );
1138    assert( evbuffer_get_length( inbuf ) >= byteCount );
1139
1140    /* append it to outbuf */
1141    tmp = evbuffer_new( );
1142    evbuffer_remove_buffer( inbuf, tmp, byteCount );
1143    evbuffer_add_buffer( outbuf, tmp );
1144    evbuffer_free( tmp );
1145
1146    /* decrypt if needed */
1147    if( io->encryption_type == PEER_ENCRYPTION_RC4 ) {
1148        struct evbuffer_ptr pos;
1149        struct evbuffer_iovec iovec;
1150        evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET );
1151        do {
1152            evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 );
1153            tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base );
1154            byteCount -= iovec.iov_len;
1155        } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) );
1156    }
1157}
1158
1159void
1160tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount )
1161{
1162    assert( tr_isPeerIo( io ) );
1163    assert( evbuffer_get_length( inbuf )  >= byteCount );
1164
1165    switch( io->encryption_type )
1166    {
1167        case PEER_ENCRYPTION_NONE:
1168            evbuffer_remove( inbuf, bytes, byteCount );
1169            break;
1170
1171        case PEER_ENCRYPTION_RC4:
1172            evbuffer_remove( inbuf, bytes, byteCount );
1173            tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes );
1174            break;
1175
1176        default:
1177            assert( 0 );
1178    }
1179}
1180
1181void
1182tr_peerIoReadUint16( tr_peerIo        * io,
1183                     struct evbuffer  * inbuf,
1184                     uint16_t         * setme )
1185{
1186    uint16_t tmp;
1187    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) );
1188    *setme = ntohs( tmp );
1189}
1190
1191void tr_peerIoReadUint32( tr_peerIo        * io,
1192                          struct evbuffer  * inbuf,
1193                          uint32_t         * setme )
1194{
1195    uint32_t tmp;
1196    tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) );
1197    *setme = ntohl( tmp );
1198}
1199
1200void
1201tr_peerIoDrain( tr_peerIo       * io,
1202                struct evbuffer * inbuf,
1203                size_t            byteCount )
1204{
1205    char buf[4096];
1206    const size_t buflen = sizeof( buf );
1207
1208    while( byteCount > 0 )
1209    {
1210        const size_t thisPass = MIN( byteCount, buflen );
1211        tr_peerIoReadBytes( io, inbuf, buf, thisPass );
1212        byteCount -= thisPass;
1213    }
1214}
1215
1216/***
1217****
1218***/
1219
1220static int
1221tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
1222{
1223    int res = 0;
1224
1225    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch )))
1226    {
1227        if( io->utp_socket != NULL ) /* utp peer connection */
1228        {
1229            /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1230             * It opens up the congestion window by sending an ACK (soonish)
1231             * if one was not going to be sent. */
1232            if( evbuffer_get_length( io->inbuf ) == 0 )
1233                UTP_RBDrained( io->utp_socket );
1234        }
1235        else /* tcp peer connection */
1236        {
1237            int e;
1238
1239            EVUTIL_SET_SOCKET_ERROR( 0 );
1240            res = evbuffer_read( io->inbuf, io->socket, (int)howmuch );
1241            e = EVUTIL_SOCKET_ERROR( );
1242
1243            dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") );
1244
1245            if( evbuffer_get_length( io->inbuf ) )
1246                canReadWrapper( io );
1247
1248            if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1249            {
1250                char errstr[512];
1251                short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1252                if( res == 0 )
1253                    what |= BEV_EVENT_EOF;
1254                dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)",
1255                        res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1256                io->gotError( io, what, io->userData );
1257            }
1258        }
1259    }
1260
1261    return res;
1262}
1263
1264static int
1265tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
1266{
1267    int n = 0;
1268    const size_t old_len = evbuffer_get_length( io->outbuf );
1269    dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch );
1270
1271    if( howmuch > old_len )
1272        howmuch = old_len;
1273
1274    if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch )))
1275    {
1276        if( io->utp_socket != NULL ) /* utp peer connection */
1277        {
1278            UTP_Write( io->utp_socket, howmuch );
1279            n = old_len - evbuffer_get_length( io->outbuf );
1280        }
1281        else
1282        {
1283            int e;
1284
1285            EVUTIL_SET_SOCKET_ERROR( 0 );
1286            n = tr_evbuffer_write( io, io->socket, howmuch );
1287            e = EVUTIL_SOCKET_ERROR( );
1288
1289            if( n > 0 )
1290                didWriteWrapper( io, n );
1291
1292            if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) )
1293            {
1294                char errstr[512];
1295                const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1296
1297                dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)",
1298                        n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) );
1299
1300                if( io->gotError != NULL )
1301                    io->gotError( io, what, io->userData );
1302            }
1303        }
1304    }
1305
1306    return n;
1307}
1308
1309int
1310tr_peerIoFlush( tr_peerIo  * io, tr_direction dir, size_t limit )
1311{
1312    int bytesUsed = 0;
1313
1314    assert( tr_isPeerIo( io ) );
1315    assert( tr_isDirection( dir ) );
1316
1317    if( dir == TR_DOWN )
1318        bytesUsed = tr_peerIoTryRead( io, limit );
1319    else
1320        bytesUsed = tr_peerIoTryWrite( io, limit );
1321
1322    dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed );
1323    return bytesUsed;
1324}
1325
1326int
1327tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io )
1328{
1329    size_t byteCount = 0;
1330    const struct tr_datatype * it;
1331
1332    /* count up how many bytes are used by non-piece-data messages
1333       at the front of our outbound queue */
1334    for( it=io->outbuf_datatypes; it!=NULL; it=it->next )
1335        if( it->isPieceData )
1336            break;
1337        else
1338            byteCount += it->length;
1339
1340    return tr_peerIoFlush( io, TR_UP, byteCount );
1341}
1342