1// 2// This file is part of the aMule Project. 3// 4// Copyright (c) 2005-2011 aMule Team ( admin@amule.org / http://www.amule.org ) 5// Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net ) 6// 7// Any parts of this program derived from the xMule, lMule or eMule project, 8// or contributed by third-party developers are copyrighted by their 9// respective authors. 10// 11// This program is free software; you can redistribute it and/or modify 12// it under the terms of the GNU General Public License as published by 13// the Free Software Foundation; either version 2 of the License, or 14// (at your option) any later version. 15// 16// This program is distributed in the hope that it will be useful, 17// but WITHOUT ANY WARRANTY; without even the implied warranty of 18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19// GNU General Public License for more details. 20// 21// You should have received a copy of the GNU General Public License 22// along with this program; if not, write to the Free Software 23// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 24// 25 26#include "UploadBandwidthThrottler.h" 27 28#include <protocol/ed2k/Constants.h> 29#include <common/Macros.h> 30#include <common/Constants.h> 31 32#include <cmath> 33#include <limits> // Do_not_auto_remove (NetBSD) 34#include "OtherFunctions.h" 35#include "ThrottledSocket.h" 36#include "Logger.h" 37#include "Preferences.h" 38#include "Statistics.h" 39 40#ifndef _MSC_VER 41 42#ifdef _UI64_MAX 43#undef _UI64_MAX 44#endif 45 46#ifdef _I64_MAX 47#undef _I64_MAX 48#endif 49 50const uint32 _UI32_MAX = std::numeric_limits<uint32>::max(); 51const sint32 _I32_MAX = std::numeric_limits<sint32>::max(); 52const uint64 _UI64_MAX = std::numeric_limits<uint64>::max(); 53const sint64 _I64_MAX = std::numeric_limits<sint64>::max(); 54 55#endif 56 57///////////////////////////////////// 58 59 60/** 61 * The constructor starts the thread. 62 */ 63UploadBandwidthThrottler::UploadBandwidthThrottler() 64 : wxThread( wxTHREAD_JOINABLE ) 65{ 66 m_SentBytesSinceLastCall = 0; 67 m_SentBytesSinceLastCallOverhead = 0; 68 69 m_doRun = true; 70 71 Create(); 72 Run(); 73} 74 75 76/** 77 * The destructor stops the thread. If the thread has already stoppped, destructor does nothing. 78 */ 79UploadBandwidthThrottler::~UploadBandwidthThrottler() 80{ 81 EndThread(); 82} 83 84 85/** 86 * Find out how many bytes that has been put on the sockets since the last call to this 87 * method. Includes overhead of control packets. 88 * 89 * @return the number of bytes that has been put on the sockets since the last call 90 */ 91uint64 UploadBandwidthThrottler::GetNumberOfSentBytesSinceLastCallAndReset() 92{ 93 wxMutexLocker lock( m_sendLocker ); 94 95 uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCall; 96 m_SentBytesSinceLastCall = 0; 97 98 return numberOfSentBytesSinceLastCall; 99} 100 101/** 102 * Find out how many bytes that has been put on the sockets since the last call to this 103 * method. Excludes overhead of control packets. 104 * 105 * @return the number of bytes that has been put on the sockets since the last call 106 */ 107uint64 UploadBandwidthThrottler::GetNumberOfSentBytesOverheadSinceLastCallAndReset() 108{ 109 wxMutexLocker lock( m_sendLocker ); 110 111 uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCallOverhead; 112 m_SentBytesSinceLastCallOverhead = 0; 113 114 return numberOfSentBytesSinceLastCall; 115} 116 117 118/** 119 * Add a socket to the list of sockets that have upload slots. The main thread will 120 * continously call send on these sockets, to give them chance to work off their queues. 121 * The sockets are called in the order they exist in the list, so the top socket (index 0) 122 * will be given a chance first to use bandwidth, and then the next socket (index 1) etc. 123 * 124 * It is possible to add a socket several times to the list without removing it inbetween, 125 * but that should be avoided. 126 * 127 * @param index insert the socket at this place in the list. An index that is higher than the 128 * current number of sockets in the list will mean that the socket should be inserted 129 * last in the list. 130 * 131 * @param socket the address to the socket that should be added to the list. If the address is NULL, 132 * this method will do nothing. 133 */ 134void UploadBandwidthThrottler::AddToStandardList(uint32 index, ThrottledFileSocket* socket) 135{ 136 if ( socket ) { 137 wxMutexLocker lock( m_sendLocker ); 138 139 RemoveFromStandardListNoLock(socket); 140 if (index > (uint32)m_StandardOrder_list.size()) { 141 index = m_StandardOrder_list.size(); 142 } 143 144 m_StandardOrder_list.insert(m_StandardOrder_list.begin() + index, socket); 145 } 146} 147 148 149/** 150 * Remove a socket from the list of sockets that have upload slots. 151 * 152 * If the socket has mistakenly been added several times to the list, this method 153 * will return all of the entries for the socket. 154 * 155 * @param socket the address of the socket that should be removed from the list. If this socket 156 * does not exist in the list, this method will do nothing. 157 */ 158bool UploadBandwidthThrottler::RemoveFromStandardList(ThrottledFileSocket* socket) 159{ 160 wxMutexLocker lock( m_sendLocker ); 161 162 return RemoveFromStandardListNoLock(socket); 163} 164 165 166/** 167 * Remove a socket from the list of sockets that have upload slots. NOT THREADSAFE! 168 * This is an internal method that doesn't take the necessary lock before it removes 169 * the socket. This method should only be called when the current thread already owns 170 * the m_sendLocker lock! 171 * 172 * @param socket address of the socket that should be removed from the list. If this socket 173 * does not exist in the list, this method will do nothing. 174 */ 175bool UploadBandwidthThrottler::RemoveFromStandardListNoLock(ThrottledFileSocket* socket) 176{ 177 return (EraseFirstValue( m_StandardOrder_list, socket ) > 0); 178} 179 180 181/** 182* Notifies the send thread that it should try to call controlpacket send 183* for the supplied socket. It is allowed to call this method several times 184* for the same socket, without having controlpacket send called for the socket 185* first. The doublette entries are never filtered, since it is incurs less cpu 186* overhead to simply call Send() in the socket for each double. Send() will 187* already have done its work when the second Send() is called, and will just 188* return with little cpu overhead. 189* 190* @param socket address to the socket that requests to have controlpacket send 191* to be called on it 192*/ 193void UploadBandwidthThrottler::QueueForSendingControlPacket(ThrottledControlSocket* socket, bool hasSent) 194{ 195 // Get critical section 196 wxMutexLocker lock( m_tempQueueLocker ); 197 198 if ( m_doRun ) { 199 if( hasSent ) { 200 m_TempControlQueueFirst_list.push_back(socket); 201 } else { 202 m_TempControlQueue_list.push_back(socket); 203 } 204 } 205} 206 207 208 209/** 210 * Remove the socket from all lists and queues. This will make it safe to 211 * erase/delete the socket. It will also cause the main thread to stop calling 212 * send() for the socket. 213 * 214 * @param socket address to the socket that should be removed 215 */ 216void UploadBandwidthThrottler::DoRemoveFromAllQueues(ThrottledControlSocket* socket) 217{ 218 if ( m_doRun ) { 219 // Remove this socket from control packet queue 220 EraseValue( m_ControlQueue_list, socket ); 221 EraseValue( m_ControlQueueFirst_list, socket ); 222 223 wxMutexLocker lock( m_tempQueueLocker ); 224 EraseValue( m_TempControlQueue_list, socket ); 225 EraseValue( m_TempControlQueueFirst_list, socket ); 226 } 227} 228 229 230void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledControlSocket* socket) 231{ 232 wxMutexLocker lock( m_sendLocker ); 233 234 DoRemoveFromAllQueues( socket ); 235} 236 237 238void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledFileSocket* socket) 239{ 240 wxMutexLocker lock( m_sendLocker ); 241 242 if (m_doRun) { 243 DoRemoveFromAllQueues(socket); 244 245 // And remove it from upload slots 246 RemoveFromStandardListNoLock(socket); 247 } 248} 249 250 251/** 252 * Make the thread exit. This method will not return until the thread has stopped 253 * looping. This guarantees that the thread will not access the CEMSockets after this 254 * call has exited. 255 */ 256void UploadBandwidthThrottler::EndThread() 257{ 258 if (m_doRun) { // do it only once 259 { 260 wxMutexLocker lock(m_sendLocker); 261 262 // signal the thread to stop looping and exit. 263 m_doRun = false; 264 } 265 266 Wait(); 267 } 268} 269 270 271/** 272 * The thread method that handles calling send for the individual sockets. 273 * 274 * Control packets will always be tried to be sent first. If there is any bandwidth leftover 275 * after that, send() for the upload slot sockets will be called in priority order until we have run 276 * out of available bandwidth for this loop. Upload slots will not be allowed to go without having sent 277 * called for more than a defined amount of time (i.e. two seconds). 278 * 279 * @return always returns 0. 280 */ 281void* UploadBandwidthThrottler::Entry() 282{ 283 const uint32 TIME_BETWEEN_UPLOAD_LOOPS = 1; 284 285 uint32 lastLoopTick = GetTickCountFullRes(); 286 // Bytes to spend in current cycle. If we spend more this becomes negative and causes a wait next time. 287 sint32 bytesToSpend = 0; 288 uint32 allowedDataRate = 0; 289 uint32 rememberedSlotCounter = 0; 290 uint32 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS; 291 292 while (m_doRun && !TestDestroy()) { 293 uint32 timeSinceLastLoop = GetTickCountFullRes() - lastLoopTick; 294 295 // Calculate data rate 296 if (thePrefs::GetMaxUpload() == UNLIMITED) { 297 // Try to increase the upload rate from UploadSpeedSense 298 allowedDataRate = (uint32)theStats::GetUploadRate() + 5 * 1024; 299 } else { 300 allowedDataRate = thePrefs::GetMaxUpload() * 1024; 301 } 302 303 uint32 minFragSize = 1300; 304 uint32 doubleSendSize = minFragSize*2; // send two packages at a time so they can share an ACK 305 if (allowedDataRate < 6*1024) { 306 minFragSize = 536; 307 doubleSendSize = minFragSize; // don't send two packages at a time at very low speeds to give them a smoother load 308 } 309 310 311 uint32 sleepTime; 312 if (bytesToSpend < 1) { 313 // We have sent more than allowed in last cycle so we have to wait now 314 // until we can send at least 1 byte. 315 sleepTime = std::max((-bytesToSpend + 1) * 1000 / allowedDataRate + 2, // add 2 ms to allow for rounding inaccuracies 316 extraSleepTime); 317 } else { 318 // We could send at once, but sleep a while to not suck up all cpu 319 sleepTime = extraSleepTime; 320 } 321 322 if (timeSinceLastLoop < sleepTime) { 323 Sleep(sleepTime-timeSinceLastLoop); 324 } 325 326 // Check after sleep in case the thread has been signaled to end 327 if (!m_doRun || TestDestroy()) { 328 break; 329 } 330 331 const uint32 thisLoopTick = GetTickCountFullRes(); 332 timeSinceLastLoop = thisLoopTick - lastLoopTick; 333 lastLoopTick = thisLoopTick; 334 335 if (timeSinceLastLoop > sleepTime + 2000) { 336 AddDebugLogLineN(logGeneral, CFormat(wxT("UploadBandwidthThrottler: Time since last loop too long. time: %ims wanted: %ims Max: %ims")) 337 % timeSinceLastLoop % sleepTime % (sleepTime + 2000)); 338 339 timeSinceLastLoop = sleepTime + 2000; 340 } 341 342 // Calculate how many bytes we can spend 343 344 bytesToSpend += (sint32) (allowedDataRate / 1000.0 * timeSinceLastLoop); 345 346 if (bytesToSpend >= 1) { 347 sint32 spentBytes = 0; 348 sint32 spentOverhead = 0; 349 350 wxMutexLocker sendLock(m_sendLocker); 351 352 { 353 wxMutexLocker queueLock(m_tempQueueLocker); 354 355 // are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list; 356 m_ControlQueueFirst_list.insert( m_ControlQueueFirst_list.end(), 357 m_TempControlQueueFirst_list.begin(), 358 m_TempControlQueueFirst_list.end() ); 359 360 m_ControlQueue_list.insert( m_ControlQueue_list.end(), 361 m_TempControlQueue_list.begin(), 362 m_TempControlQueue_list.end() ); 363 364 m_TempControlQueue_list.clear(); 365 m_TempControlQueueFirst_list.clear(); 366 } 367 368 // Send any queued up control packets first 369 while (spentBytes < bytesToSpend && (!m_ControlQueueFirst_list.empty() || !m_ControlQueue_list.empty())) { 370 ThrottledControlSocket* socket = NULL; 371 372 if (!m_ControlQueueFirst_list.empty()) { 373 socket = m_ControlQueueFirst_list.front(); 374 m_ControlQueueFirst_list.pop_front(); 375 } else if (!m_ControlQueue_list.empty()) { 376 socket = m_ControlQueue_list.front(); 377 m_ControlQueue_list.pop_front(); 378 } 379 380 if (socket != NULL) { 381 SocketSentBytes socketSentBytes = socket->SendControlData(bytesToSpend-spentBytes, minFragSize); 382 spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; 383 spentOverhead += socketSentBytes.sentBytesControlPackets; 384 } 385 } 386 387 // Check if any sockets haven't gotten data for a long time. Then trickle them a package. 388 uint32 slots = m_StandardOrder_list.size(); 389 for (uint32 slotCounter = 0; slotCounter < slots; slotCounter++) { 390 ThrottledFileSocket* socket = m_StandardOrder_list[ slotCounter ]; 391 392 if (socket != NULL) { 393 if (thisLoopTick-socket->GetLastCalledSend() > SEC2MS(1)) { 394 // trickle 395 uint32 neededBytes = socket->GetNeededBytes(); 396 397 if (neededBytes > 0) { 398 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize); 399 spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; 400 spentOverhead += socketSentBytes.sentBytesControlPackets; 401 } 402 } 403 } else { 404 AddDebugLogLineN(logGeneral, CFormat( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (trickle)! Prevented usage. Index: %i Size: %i")) 405 % slotCounter % m_StandardOrder_list.size()); 406 } 407 } 408 409 // Give available bandwidth to slots, starting with the one we ended with last time. 410 // There are two passes. First pass gives packets of doubleSendSize, second pass 411 // gives as much as possible. 412 // Second pass starts with the last slot of the first pass actually. 413 for (uint32 slotCounter = 0; (slotCounter < slots * 2) && spentBytes < bytesToSpend; slotCounter++) { 414 if (rememberedSlotCounter >= slots) { // wrap around pointer 415 rememberedSlotCounter = 0; 416 } 417 418 uint32 data = (slotCounter < slots - 1) ? doubleSendSize // pass 1 419 : (bytesToSpend - spentBytes); // pass 2 420 421 ThrottledFileSocket* socket = m_StandardOrder_list[ rememberedSlotCounter ]; 422 423 if (socket != NULL) { 424 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(data, doubleSendSize); 425 spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; 426 spentOverhead += socketSentBytes.sentBytesControlPackets; 427 } else { 428 AddDebugLogLineN(logGeneral, CFormat(wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (equal-for-all)! Prevented usage. Index: %i Size: %i")) 429 % rememberedSlotCounter % m_StandardOrder_list.size()); 430 } 431 432 rememberedSlotCounter++; 433 } 434 435 // Do some limiting of what we keep for the next loop. 436 bytesToSpend -= spentBytes; 437 sint32 minBytesToSpend = (slots + 1) * minFragSize; 438 439 if (bytesToSpend < - minBytesToSpend) { 440 bytesToSpend = - minBytesToSpend; 441 } else { 442 sint32 bandwidthSavedTolerance = slots * 512 + 1; 443 if (bytesToSpend > bandwidthSavedTolerance) { 444 bytesToSpend = bandwidthSavedTolerance; 445 } 446 } 447 448 m_SentBytesSinceLastCall += spentBytes; 449 m_SentBytesSinceLastCallOverhead += spentOverhead; 450 451 if (spentBytes == 0) { // spentBytes includes the overhead 452 extraSleepTime = std::min<uint32>(extraSleepTime * 5, 1000); // 1s at most 453 } else { 454 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS; 455 } 456 } 457 } 458 459 { 460 wxMutexLocker queueLock(m_tempQueueLocker); 461 m_TempControlQueue_list.clear(); 462 m_TempControlQueueFirst_list.clear(); 463 } 464 465 wxMutexLocker sendLock(m_sendLocker); 466 m_ControlQueue_list.clear(); 467 m_StandardOrder_list.clear(); 468 469 return 0; 470} 471// File_checked_for_headers 472