1// 2// This file is part of the aMule Project. 3// 4// Copyright (c) 2003-2011 aMule Team ( admin@amule.org / http://www.amule.org ) 5// Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net ) 6// 7// Any parts of this program derived from the xMule, lMule or eMule project, 8// or contributed by third-party developers are copyrighted by their 9// respective authors. 10// 11// This program is free software; you can redistribute it and/or modify 12// it under the terms of the GNU General Public License as published by 13// the Free Software Foundation; either version 2 of the License, or 14// (at your option) any later version. 15// 16// This program is distributed in the hope that it will be useful, 17// but WITHOUT ANY WARRANTY; without even the implied warranty of 18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19// GNU General Public License for more details. 20// 21// You should have received a copy of the GNU General Public License 22// along with this program; if not, write to the Free Software 23// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 24// 25 26 27#include "EMSocket.h" // Interface declarations. 28 29#include <protocol/Protocols.h> 30#include <protocol/ed2k/Constants.h> 31 32#include "Packet.h" // Needed for CPacket 33#include "amule.h" 34#include "GetTickCount.h" 35#include "UploadBandwidthThrottler.h" 36#include "Logger.h" 37#include "Preferences.h" 38#include "ScopedPtr.h" 39 40 41const uint32 MAX_PACKET_SIZE = 2000000; 42 43CEMSocket::CEMSocket(const CProxyData *ProxyData) 44 : CEncryptedStreamSocket(wxSOCKET_NOWAIT, ProxyData) 45{ 46 // If an interface has been specified, 47 // then we need to bind to it. 48 if (!thePrefs::GetAddress().IsEmpty()) { 49 amuleIPV4Address host; 50 51 // No need to warn here, in case of failure to 52 // assign the hostname. That is already done 53 // in amule.cpp when starting ... 54 if (host.Hostname(thePrefs::GetAddress())) { 55 SetLocal(host); 56 } 57 } 58 59 byConnected = ES_NOTCONNECTED; 60 m_uTimeOut = CONNECTION_TIMEOUT; // default timeout for ed2k sockets 61 62 // Download (pseudo) rate control 63 downloadLimit = 0; 64 downloadLimitEnable = false; 65 pendingOnReceive = false; 66 67 // Download partial header 68 pendingHeaderSize = 0; 69 70 // Download partial packet 71 pendingPacket = NULL; 72 pendingPacketSize = 0; 73 74 // Upload control 75 sendbuffer = NULL; 76 sendblen = 0; 77 sent = 0; 78 79 m_currentPacket_is_controlpacket = false; 80 m_currentPackageIsFromPartFile = false; 81 82 m_numberOfSentBytesCompleteFile = 0; 83 m_numberOfSentBytesPartFile = 0; 84 m_numberOfSentBytesControlPacket = 0; 85 86 lastCalledSend = ::GetTickCount(); 87 lastSent = ::GetTickCount()-1000; 88 89 m_bAccelerateUpload = false; 90 91 m_actualPayloadSize = 0; 92 m_actualPayloadSizeSent = 0; 93 94 m_bBusy = false; 95 m_hasSent = false; 96 97 lastFinishedStandard = 0; 98 99 DoingDestroy = false; 100 101} 102 103CEMSocket::~CEMSocket() 104{ 105 // need to be locked here to know that the other methods 106 // won't be in the middle of things 107 { 108 wxMutexLocker lock(m_sendLocker); 109 byConnected = ES_DISCONNECTED; 110 } 111 112 // now that we know no other method will keep adding to the queue 113 // we can remove ourself from the queue 114 if (theApp->uploadBandwidthThrottler) { 115 theApp->uploadBandwidthThrottler->RemoveFromAllQueues(this); 116 } 117 118 ClearQueues(); 119 120 SetNotify(0); 121 Notify(FALSE); 122} 123 124void CEMSocket::Destroy() { 125 if (!DoingDestroy) { 126 DoingDestroy = true; 127 wxSocketClient::Destroy(); 128 } 129} 130 131 132void CEMSocket::ClearQueues() 133{ 134 wxMutexLocker lock(m_sendLocker); 135 136 DeleteContents(m_control_queue); 137 138 { 139 CStdPacketQueue::iterator it = m_standard_queue.begin(); 140 for (; it != m_standard_queue.end(); ++it) { 141 delete it->packet; 142 } 143 m_standard_queue.clear(); 144 } 145 146 // Download (pseudo) rate control 147 downloadLimit = 0; 148 downloadLimitEnable = false; 149 pendingOnReceive = false; 150 151 // Download partial header 152 pendingHeaderSize = 0; 153 154 // Download partial packet 155 delete[] pendingPacket; 156 pendingPacket = NULL; 157 pendingPacketSize = 0; 158 159 // Upload control 160 delete[] sendbuffer; 161 sendbuffer = NULL; 162 sendblen = 0; 163 sent = 0; 164} 165 166 167void CEMSocket::OnClose(int WXUNUSED(nErrorCode)) 168{ 169 // need to be locked here to know that the other methods 170 // won't be in the middle of things 171 { 172 wxMutexLocker lock(m_sendLocker); 173 byConnected = ES_DISCONNECTED; 174 } 175 176 // now that we know no other method will keep adding to the queue 177 // we can remove ourself from the queue 178 theApp->uploadBandwidthThrottler->RemoveFromAllQueues(this); 179 180 ClearQueues(); 181} 182 183 184void CEMSocket::OnReceive(int nErrorCode) 185{ 186 if(nErrorCode) { 187 uint32 error = LastError(); 188 if (error != wxSOCKET_WOULDBLOCK) { 189 OnError(nErrorCode); 190 return; 191 } 192 } 193 194 // Check current connection state 195 if (byConnected == ES_DISCONNECTED) { 196 return; 197 } else { 198 byConnected = ES_CONNECTED; // ES_DISCONNECTED, ES_NOTCONNECTED, ES_CONNECTED 199 } 200 201 uint32 ret; 202 do { 203 // CPU load improvement 204 if (downloadLimitEnable && downloadLimit == 0){ 205 pendingOnReceive = true; 206 return; 207 } 208 209 uint32 readMax; 210 byte *buf; 211 if (pendingHeaderSize < PACKET_HEADER_SIZE) { 212 delete[] pendingPacket; 213 pendingPacket = NULL; 214 buf = pendingHeader + pendingHeaderSize; 215 readMax = PACKET_HEADER_SIZE - pendingHeaderSize; 216 } else if (pendingPacket == NULL) { 217 pendingPacketSize = 0; 218 readMax = CPacket::GetPacketSizeFromHeader(pendingHeader); 219 if (readMax > MAX_PACKET_SIZE) { 220 pendingHeaderSize = 0; 221 OnError(ERR_TOOBIG); 222 return; 223 } 224 pendingPacket = new byte[readMax + 1]; 225 buf = pendingPacket; 226 } else { 227 buf = pendingPacket + pendingPacketSize; 228 readMax = CPacket::GetPacketSizeFromHeader(pendingHeader) - pendingPacketSize; 229 } 230 231 if (downloadLimitEnable && readMax > downloadLimit) { 232 readMax = downloadLimit; 233 } 234 235 ret = 0; 236 if (readMax) { 237 wxMutexLocker lock(m_sendLocker); 238 ret = Read(buf, readMax); 239 if (Error() || (ret == 0)) { 240 if (LastError() == wxSOCKET_WOULDBLOCK) { 241 pendingOnReceive = true; 242 } 243 return; 244 } 245 } 246 247 // Bandwidth control 248 if (downloadLimitEnable) { 249 // Update limit 250 if (ret >= downloadLimit) { 251 downloadLimit = 0; 252 } else { 253 downloadLimit -= ret; 254 } 255 } 256 257 // CPU load improvement 258 // Detect if the socket's buffer is empty (or the size did match...) 259 pendingOnReceive = (ret == readMax); 260 261 if (pendingHeaderSize >= PACKET_HEADER_SIZE) { 262 pendingPacketSize += ret; 263 if (pendingPacketSize >= CPacket::GetPacketSizeFromHeader(pendingHeader)) { 264 CScopedPtr<CPacket> packet(new CPacket(pendingHeader, pendingPacket)); 265 pendingPacket = NULL; 266 pendingPacketSize = 0; 267 pendingHeaderSize = 0; 268 269 // Bugfix We still need to check for a valid protocol 270 // Remark: the default eMule v0.26b had removed this test...... 271 switch (packet->GetProtocol()){ 272 case OP_EDONKEYPROT: 273 case OP_PACKEDPROT: 274 case OP_EMULEPROT: 275 case OP_ED2KV2HEADER: 276 case OP_ED2KV2PACKEDPROT: 277 break; 278 default: 279 OnError(ERR_WRONGHEADER); 280 return; 281 } 282 283 // Process packet 284 PacketReceived(packet.get()); 285 } 286 } else { 287 pendingHeaderSize += ret; 288 } 289 } while (ret && pendingHeaderSize >= PACKET_HEADER_SIZE); 290} 291 292 293void CEMSocket::SetDownloadLimit(uint32 limit) 294{ 295 downloadLimit = limit; 296 downloadLimitEnable = true; 297 298 // CPU load improvement 299 if(limit > 0 && pendingOnReceive == true){ 300 OnReceive(0); 301 } 302} 303 304 305void CEMSocket::DisableDownloadLimit() 306{ 307 downloadLimitEnable = false; 308 309 // CPU load improvement 310 if (pendingOnReceive == true){ 311 OnReceive(0); 312 } 313} 314 315 316/** 317 * Queues up the packet to be sent. Another thread will actually send the packet. 318 * 319 * If the packet is not a control packet, and if the socket decides that its queue is 320 * full and forceAdd is false, then the socket is allowed to refuse to add the packet 321 * to its queue. It will then return false and it is up to the calling thread to try 322 * to call SendPacket for that packet again at a later time. 323 * 324 * @param packet address to the packet that should be added to the queue 325 * 326 * @param delpacket if true, the responsibility for deleting the packet after it has been sent 327 * has been transferred to this object. If false, don't delete the packet after it 328 * has been sent. 329 * 330 * @param controlpacket the packet is a controlpacket 331 * 332 * @param forceAdd this packet must be added to the queue, even if it is full. If this flag is true 333 * then the method can not refuse to add the packet, and therefore not return false. 334 * 335 * @return true if the packet was added to the queue, false otherwise 336 */ 337void CEMSocket::SendPacket(CPacket* packet, bool delpacket, bool controlpacket, uint32 actualPayloadSize) 338{ 339 //printf("* SendPacket called on socket %p\n", this); 340 wxMutexLocker lock(m_sendLocker); 341 342 if (byConnected == ES_DISCONNECTED) { 343 //printf("* Disconnected, drop packet\n"); 344 if(delpacket) { 345 delete packet; 346 } 347 } else { 348 if (!delpacket){ 349 packet = new CPacket(*packet); 350 } 351 352 if (controlpacket) { 353 //printf("* Adding a control packet\n"); 354 m_control_queue.push_back(packet); 355 356 // queue up for controlpacket 357 theApp->uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent()); 358 } else { 359 //printf("* Adding a normal packet to the queue\n"); 360 bool first = !((sendbuffer && !m_currentPacket_is_controlpacket) || !m_standard_queue.empty()); 361 StandardPacketQueueEntry queueEntry = { actualPayloadSize, packet }; 362 m_standard_queue.push_back(queueEntry); 363 364 // reset timeout for the first time 365 if (first) { 366 lastFinishedStandard = ::GetTickCount(); 367 m_bAccelerateUpload = true; // Always accelerate first packet in a block 368 } 369 } 370 } 371} 372 373 374uint64 CEMSocket::GetSentBytesCompleteFileSinceLastCallAndReset() 375{ 376 wxMutexLocker lock( m_sendLocker ); 377 378 uint64 sentBytes = m_numberOfSentBytesCompleteFile; 379 m_numberOfSentBytesCompleteFile = 0; 380 381 return sentBytes; 382} 383 384 385uint64 CEMSocket::GetSentBytesPartFileSinceLastCallAndReset() 386{ 387 wxMutexLocker lock( m_sendLocker ); 388 389 uint64 sentBytes = m_numberOfSentBytesPartFile; 390 m_numberOfSentBytesPartFile = 0; 391 392 return sentBytes; 393} 394 395uint64 CEMSocket::GetSentBytesControlPacketSinceLastCallAndReset() 396{ 397 wxMutexLocker lock( m_sendLocker ); 398 399 uint64 sentBytes = m_numberOfSentBytesControlPacket; 400 m_numberOfSentBytesControlPacket = 0; 401 402 return sentBytes; 403} 404 405uint64 CEMSocket::GetSentPayloadSinceLastCallAndReset() 406{ 407 wxMutexLocker lock( m_sendLocker ); 408 409 uint64 sentBytes = m_actualPayloadSizeSent; 410 m_actualPayloadSizeSent = 0; 411 412 return sentBytes; 413} 414 415 416void CEMSocket::OnSend(int nErrorCode) 417{ 418 if (nErrorCode){ 419 OnError(nErrorCode); 420 return; 421 } 422 423 CEncryptedStreamSocket::OnSend(0); 424 425 wxMutexLocker lock( m_sendLocker ); 426 m_bBusy = false; 427 428 if (byConnected != ES_DISCONNECTED) { 429 byConnected = ES_CONNECTED; 430 431 if (m_currentPacket_is_controlpacket) { 432 // queue up for control packet 433 theApp->uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent()); 434 } 435 } 436} 437 438 439/** 440 * Try to put queued up data on the socket. 441 * 442 * Control packets have higher priority, and will be sent first, if possible. 443 * Standard packets can be split up in several package containers. In that case 444 * all the parts of a split package must be sent in a row, without any control packet 445 * in between. 446 * 447 * @param maxNumberOfBytesToSend This is the maximum number of bytes that is allowed to be put on the socket 448 * this call. The actual number of sent bytes will be returned from the method. 449 * 450 * @param onlyAllowedToSendControlPacket This call we only try to put control packets on the sockets. 451 * If there's a standard packet "in the way", and we think that this socket 452 * is no longer an upload slot, then it is ok to send the standard packet to 453 * get it out of the way. But it is not allowed to pick a new standard packet 454 * from the queue during this call. Several split packets are counted as one 455 * standard packet though, so it is ok to finish them all off if necessary. 456 * 457 * @return the actual number of bytes that were put on the socket. 458 */ 459SocketSentBytes CEMSocket::Send(uint32 maxNumberOfBytesToSend, uint32 minFragSize, bool onlyAllowedToSendControlPacket) 460{ 461 wxMutexLocker lock(m_sendLocker); 462 463 //printf("* Attempt to send a packet on socket %p\n", this); 464 465 if (byConnected == ES_DISCONNECTED) { 466 //printf("* Disconnected socket %p\n", this); 467 SocketSentBytes returnVal = { false, 0, 0 }; 468 return returnVal; 469 } else if (m_bBusy && onlyAllowedToSendControlPacket) { 470 //printf("* Busy socket %p\n", this); 471 SocketSentBytes returnVal = { true, 0, 0 }; 472 return returnVal; 473 } 474 475 bool anErrorHasOccured = false; 476 uint32 sentStandardPacketBytesThisCall = 0; 477 uint32 sentControlPacketBytesThisCall = 0; 478 479 if (byConnected == ES_CONNECTED && IsEncryptionLayerReady() && (!m_bBusy || onlyAllowedToSendControlPacket)) { 480 481 //printf("* Internal attemptto send on %p\n", this); 482 483 if(minFragSize < 1) { 484 minFragSize = 1; 485 } 486 487 maxNumberOfBytesToSend = GetNextFragSize(maxNumberOfBytesToSend, minFragSize); 488 489 bool bWasLongTimeSinceSend = (::GetTickCount() - lastSent) > 1000; 490 491 lastCalledSend = ::GetTickCount(); 492 493 494 while(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall < maxNumberOfBytesToSend && anErrorHasOccured == false && // don't send more than allowed. Also, there should have been no error in earlier loop 495 (!m_control_queue.empty() || !m_standard_queue.empty() || sendbuffer != NULL) && // there must exist something to send 496 (onlyAllowedToSendControlPacket == false || // this means we are allowed to send both types of packets, so proceed 497 (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall > 0 && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) % minFragSize != 0) || 498 (sendbuffer == NULL && !m_control_queue.empty()) || // There's a control packet in queue, and we are not currently sending anything, so we will handle the control packet next 499 (sendbuffer != NULL && m_currentPacket_is_controlpacket == true) || // We are in the progress of sending a control packet. We are always allowed to send those 500 (sendbuffer != NULL && m_currentPacket_is_controlpacket == false && bWasLongTimeSinceSend && !m_control_queue.empty() && m_standard_queue.empty() && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) // We have waited to long to clean the current packet (which may be a standard packet that is in the way). Proceed no matter what the value of onlyAllowedToSendControlPacket. 501 ) 502 ) { 503 504 // If we are currently not in the progress of sending a packet, we will need to find the next one to send 505 if(sendbuffer == NULL) { 506 CPacket* curPacket = NULL; 507 if(!m_control_queue.empty()) { 508 // There's a control packet to send 509 m_currentPacket_is_controlpacket = true; 510 curPacket = m_control_queue.front(); 511 m_control_queue.pop_front(); 512 } else if(!m_standard_queue.empty() /*&& onlyAllowedToSendControlPacket == false*/) { 513 // There's a standard packet to send 514 m_currentPacket_is_controlpacket = false; 515 StandardPacketQueueEntry queueEntry = m_standard_queue.front(); 516 m_standard_queue.pop_front(); 517 curPacket = queueEntry.packet; 518 m_actualPayloadSize = queueEntry.actualPayloadSize; 519 520 // remember this for statistics purposes. 521 m_currentPackageIsFromPartFile = curPacket->IsFromPF(); 522 } else { 523 // Just to be safe. Shouldn't happen? 524 // if we reach this point, then there's something wrong with the while condition above! 525 wxFAIL; 526 AddDebugLogLineC(logGeneral, wxT("EMSocket: Couldn't get a new packet! There's an error in the first while condition in EMSocket::Send()")); 527 528 SocketSentBytes returnVal = { true, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall }; 529 return returnVal; 530 } 531 532 // We found a packet to send. Get the data to send from the 533 // package container and dispose of the container. 534 sendblen = curPacket->GetRealPacketSize(); 535 sendbuffer = curPacket->DetachPacket(); 536 sent = 0; 537 delete curPacket; 538 539 CryptPrepareSendData((byte*)sendbuffer, sendblen); 540 } 541 542 // At this point we've got a packet to send in sendbuffer. Try to send it. Loop until entire packet 543 // is sent, or until we reach maximum bytes to send for this call, or until we get an error. 544 // NOTE! If send would block (returns WOULDBLOCK), we will return from this method INSIDE this loop. 545 while (sent < sendblen && 546 sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall < maxNumberOfBytesToSend && 547 ( 548 onlyAllowedToSendControlPacket == false || // this means we are allowed to send both types of packets, so proceed 549 m_currentPacket_is_controlpacket || 550 (bWasLongTimeSinceSend && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) || 551 (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) % minFragSize != 0 552 ) && 553 anErrorHasOccured == false) { 554 uint32 tosend = sendblen-sent; 555 if(!onlyAllowedToSendControlPacket || m_currentPacket_is_controlpacket) { 556 if (maxNumberOfBytesToSend >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > maxNumberOfBytesToSend-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall)) 557 tosend = maxNumberOfBytesToSend-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall); 558 } else if(bWasLongTimeSinceSend && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) { 559 if (minFragSize >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > minFragSize-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall)) 560 tosend = minFragSize-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall); 561 } else { 562 uint32 nextFragMaxBytesToSent = GetNextFragSize(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall, minFragSize); 563 if (nextFragMaxBytesToSent >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > nextFragMaxBytesToSent-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall)) 564 tosend = nextFragMaxBytesToSent-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall); 565 } 566 wxASSERT(tosend != 0 && tosend <= sendblen-sent); 567 568 //DWORD tempStartSendTick = ::GetTickCount(); 569 570 lastSent = ::GetTickCount(); 571 572 uint32 result = CEncryptedStreamSocket::Write(sendbuffer+sent,tosend); 573 574 if (Error()){ 575 576 uint32 error = LastError(); 577 if (error == wxSOCKET_WOULDBLOCK){ 578 m_bBusy = true; 579 580 SocketSentBytes returnVal = { true, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall }; 581 582 return returnVal; // Send() blocked, onsend will be called when ready to send again 583 } else{ 584 // Send() gave an error 585 anErrorHasOccured = true; 586 } 587 } else { 588 // we managed to send some bytes. Perform bookkeeping. 589 m_bBusy = false; 590 m_hasSent = true; 591 592 sent += result; 593 594 // Log send bytes in correct class 595 if(m_currentPacket_is_controlpacket == false) { 596 sentStandardPacketBytesThisCall += result; 597 598 if(m_currentPackageIsFromPartFile == true) { 599 m_numberOfSentBytesPartFile += result; 600 } else { 601 m_numberOfSentBytesCompleteFile += result; 602 } 603 } else { 604 sentControlPacketBytesThisCall += result; 605 m_numberOfSentBytesControlPacket += result; 606 } 607 } 608 } 609 610 if (sent == sendblen){ 611 // we are done sending the current packet. Delete it and set 612 // sendbuffer to NULL so a new packet can be fetched. 613 delete[] sendbuffer; 614 sendbuffer = NULL; 615 sendblen = 0; 616 617 if(!m_currentPacket_is_controlpacket) { 618 m_actualPayloadSizeSent += m_actualPayloadSize; 619 m_actualPayloadSize = 0; 620 621 lastFinishedStandard = ::GetTickCount(); // reset timeout 622 m_bAccelerateUpload = false; // Safe until told otherwise 623 } 624 625 sent = 0; 626 } 627 } 628 } 629 630 if(onlyAllowedToSendControlPacket && (!m_control_queue.empty() || (sendbuffer != NULL && m_currentPacket_is_controlpacket))) { 631 // enter control packet send queue 632 // we might enter control packet queue several times for the same package, 633 // but that costs very little overhead. Less overhead than trying to make sure 634 // that we only enter the queue once. 635 //printf("* Requeueing control packet on %p\n", this); 636 theApp->uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent()); 637 } 638 639 //printf("* Finishing send debug on %p\n",this); 640 641 SocketSentBytes returnVal = { !anErrorHasOccured, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall }; 642 643 return returnVal; 644} 645 646 647uint32 CEMSocket::GetNextFragSize(uint32 current, uint32 minFragSize) 648{ 649 if(current % minFragSize == 0) { 650 return current; 651 } else { 652 return minFragSize*(current/minFragSize+1); 653 } 654} 655 656 657/** 658 * Decides the (minimum) amount the socket needs to send to prevent timeout. 659 * 660 * @author SlugFiller 661 */ 662uint32 CEMSocket::GetNeededBytes() 663{ 664 uint32 sendgap; 665 666 uint64 timetotal; 667 uint64 timeleft; 668 uint64 sizeleft, sizetotal; 669 670 { 671 wxMutexLocker lock(m_sendLocker); 672 673 if (byConnected == ES_DISCONNECTED) { 674 return 0; 675 } 676 677 if (!((sendbuffer && !m_currentPacket_is_controlpacket) || !m_standard_queue.empty())) { 678 // No standard packet to send. Even if data needs to be sent to prevent timout, there's nothing to send. 679 return 0; 680 } 681 682 if (((sendbuffer && !m_currentPacket_is_controlpacket)) && !m_control_queue.empty()) 683 m_bAccelerateUpload = true; // We might be trying to send a block request, accelerate packet 684 685 sendgap = ::GetTickCount() - lastCalledSend; 686 687 timetotal = m_bAccelerateUpload?45000:90000; 688 timeleft = ::GetTickCount() - lastFinishedStandard; 689 if (sendbuffer && !m_currentPacket_is_controlpacket) { 690 sizeleft = sendblen-sent; 691 sizetotal = sendblen; 692 } else { 693 sizeleft = sizetotal = m_standard_queue.front().packet->GetRealPacketSize(); 694 } 695 } 696 697 if (timeleft >= timetotal) 698 return sizeleft; 699 timeleft = timetotal-timeleft; 700 if (timeleft*sizetotal >= timetotal*sizeleft) { 701 // don't use 'GetTimeOut' here in case the timeout value is high, 702 if (sendgap > SEC2MS(20)) 703 return 1; // Don't let the socket itself time out - Might happen when switching from spread(non-focus) slot to trickle slot 704 return 0; 705 } 706 uint64 decval = timeleft*sizetotal/timetotal; 707 if (!decval) 708 return sizeleft; 709 if (decval < sizeleft) 710 return sizeleft-decval+1; // Round up 711 else 712 return 1; 713} 714 715 716/** 717 * Removes all packets from the standard queue that don't have to be sent for the socket to be able to send a control packet. 718 * 719 * Before a socket can send a new packet, the current packet has to be finished. If the current packet is part of 720 * a split packet, then all parts of that split packet must be sent before the socket can send a control packet. 721 * 722 * This method keeps in standard queue only those packets that must be sent (rest of split packet), and removes everything 723 * after it. The method doesn't touch the control packet queue. 724 */ 725void CEMSocket::TruncateQueues() 726{ 727 wxMutexLocker lock(m_sendLocker); 728 729 // Clear the standard queue totally 730 // Please note! There may still be a standardpacket in the sendbuffer variable! 731 CStdPacketQueue::iterator it = m_standard_queue.begin(); 732 for (; it != m_standard_queue.end(); ++it) { 733 delete it->packet; 734 } 735 736 m_standard_queue.clear(); 737} 738 739 740uint32 CEMSocket::GetTimeOut() const 741{ 742 return m_uTimeOut; 743} 744 745 746void CEMSocket::SetTimeOut(uint32 uTimeOut) 747{ 748 m_uTimeOut = uTimeOut; 749} 750// File_checked_for_headers 751