1//
2// This file is part of the aMule Project.
3//
4// Copyright (c) 2005-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5// Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6//
7// Any parts of this program derived from the xMule, lMule or eMule project,
8// or contributed by third-party developers are copyrighted by their
9// respective authors.
10//
11// This program is free software; you can redistribute it and/or modify
12// it under the terms of the GNU General Public License as published by
13// the Free Software Foundation; either version 2 of the License, or
14// (at your option) any later version.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA
24//
25
26#include "UploadBandwidthThrottler.h"
27
28#include <protocol/ed2k/Constants.h>
29#include <common/Macros.h>
30#include <common/Constants.h>
31
32#include <cmath>
33#include <limits> // Do_not_auto_remove (NetBSD)
34#include "OtherFunctions.h"
35#include "ThrottledSocket.h"
36#include "Logger.h"
37#include "Preferences.h"
38#include "Statistics.h"
39
40#ifndef _MSC_VER
41
42#ifdef _UI64_MAX
43#undef _UI64_MAX
44#endif
45
46#ifdef _I64_MAX
47#undef _I64_MAX
48#endif
49
50const uint32 _UI32_MAX = std::numeric_limits<uint32>::max();
51const sint32 _I32_MAX = std::numeric_limits<sint32>::max();
52const uint64 _UI64_MAX = std::numeric_limits<uint64>::max();
53const sint64 _I64_MAX = std::numeric_limits<sint64>::max();
54
55#endif
56
57/////////////////////////////////////
58
59
60/**
61 * The constructor starts the thread.
62 */
63UploadBandwidthThrottler::UploadBandwidthThrottler()
64		: wxThread( wxTHREAD_JOINABLE )
65{
66	m_SentBytesSinceLastCall = 0;
67	m_SentBytesSinceLastCallOverhead = 0;
68
69	m_doRun = true;
70
71	Create();
72	Run();
73}
74
75
76/**
77 * The destructor stops the thread. If the thread has already stoppped, destructor does nothing.
78 */
79UploadBandwidthThrottler::~UploadBandwidthThrottler()
80{
81	EndThread();
82}
83
84
85/**
86 * Find out how many bytes that has been put on the sockets since the last call to this
87 * method. Includes overhead of control packets.
88 *
89 * @return the number of bytes that has been put on the sockets since the last call
90 */
91uint64 UploadBandwidthThrottler::GetNumberOfSentBytesSinceLastCallAndReset()
92{
93	wxMutexLocker lock( m_sendLocker );
94
95	uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCall;
96	m_SentBytesSinceLastCall = 0;
97
98	return numberOfSentBytesSinceLastCall;
99}
100
101/**
102 * Find out how many bytes that has been put on the sockets since the last call to this
103 * method. Excludes overhead of control packets.
104 *
105 * @return the number of bytes that has been put on the sockets since the last call
106 */
107uint64 UploadBandwidthThrottler::GetNumberOfSentBytesOverheadSinceLastCallAndReset()
108{
109	wxMutexLocker lock( m_sendLocker );
110
111	uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCallOverhead;
112	m_SentBytesSinceLastCallOverhead = 0;
113
114	return numberOfSentBytesSinceLastCall;
115}
116
117
118/**
119 * Add a socket to the list of sockets that have upload slots. The main thread will
120 * continously call send on these sockets, to give them chance to work off their queues.
121 * The sockets are called in the order they exist in the list, so the top socket (index 0)
122 * will be given a chance first to use bandwidth, and then the next socket (index 1) etc.
123 *
124 * It is possible to add a socket several times to the list without removing it inbetween,
125 * but that should be avoided.
126 *
127 * @param index insert the socket at this place in the list. An index that is higher than the
128 *              current number of sockets in the list will mean that the socket should be inserted
129 *              last in the list.
130 *
131 * @param socket the address to the socket that should be added to the list. If the address is NULL,
132 *               this method will do nothing.
133 */
134void UploadBandwidthThrottler::AddToStandardList(uint32 index, ThrottledFileSocket* socket)
135{
136	if ( socket ) {
137		wxMutexLocker lock( m_sendLocker );
138
139		RemoveFromStandardListNoLock(socket);
140		if (index > (uint32)m_StandardOrder_list.size()) {
141			index = m_StandardOrder_list.size();
142		}
143
144		m_StandardOrder_list.insert(m_StandardOrder_list.begin() + index, socket);
145	}
146}
147
148
149/**
150 * Remove a socket from the list of sockets that have upload slots.
151 *
152 * If the socket has mistakenly been added several times to the list, this method
153 * will return all of the entries for the socket.
154 *
155 * @param socket the address of the socket that should be removed from the list. If this socket
156 *               does not exist in the list, this method will do nothing.
157 */
158bool UploadBandwidthThrottler::RemoveFromStandardList(ThrottledFileSocket* socket)
159{
160	wxMutexLocker lock( m_sendLocker );
161
162	return RemoveFromStandardListNoLock(socket);
163}
164
165
166/**
167 * Remove a socket from the list of sockets that have upload slots. NOT THREADSAFE!
168 * This is an internal method that doesn't take the necessary lock before it removes
169 * the socket. This method should only be called when the current thread already owns
170 * the m_sendLocker lock!
171 *
172 * @param socket address of the socket that should be removed from the list. If this socket
173 *               does not exist in the list, this method will do nothing.
174 */
175bool UploadBandwidthThrottler::RemoveFromStandardListNoLock(ThrottledFileSocket* socket)
176{
177	return (EraseFirstValue( m_StandardOrder_list, socket ) > 0);
178}
179
180
181/**
182* Notifies the send thread that it should try to call controlpacket send
183* for the supplied socket. It is allowed to call this method several times
184* for the same socket, without having controlpacket send called for the socket
185* first. The doublette entries are never filtered, since it is incurs less cpu
186* overhead to simply call Send() in the socket for each double. Send() will
187* already have done its work when the second Send() is called, and will just
188* return with little cpu overhead.
189*
190* @param socket address to the socket that requests to have controlpacket send
191*               to be called on it
192*/
193void UploadBandwidthThrottler::QueueForSendingControlPacket(ThrottledControlSocket* socket, bool hasSent)
194{
195	// Get critical section
196	wxMutexLocker lock( m_tempQueueLocker );
197
198	if ( m_doRun ) {
199		if( hasSent ) {
200			m_TempControlQueueFirst_list.push_back(socket);
201		} else {
202			m_TempControlQueue_list.push_back(socket);
203		}
204	}
205}
206
207
208
209/**
210 * Remove the socket from all lists and queues. This will make it safe to
211 * erase/delete the socket. It will also cause the main thread to stop calling
212 * send() for the socket.
213 *
214 * @param socket address to the socket that should be removed
215 */
216void UploadBandwidthThrottler::DoRemoveFromAllQueues(ThrottledControlSocket* socket)
217{
218	if ( m_doRun ) {
219		// Remove this socket from control packet queue
220		EraseValue( m_ControlQueue_list, socket );
221		EraseValue( m_ControlQueueFirst_list, socket );
222
223		wxMutexLocker lock( m_tempQueueLocker );
224		EraseValue( m_TempControlQueue_list, socket );
225		EraseValue( m_TempControlQueueFirst_list, socket );
226	}
227}
228
229
230void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledControlSocket* socket)
231{
232	wxMutexLocker lock( m_sendLocker );
233
234	DoRemoveFromAllQueues( socket );
235}
236
237
238void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledFileSocket* socket)
239{
240	wxMutexLocker lock( m_sendLocker );
241
242	if (m_doRun) {
243		DoRemoveFromAllQueues(socket);
244
245		// And remove it from upload slots
246		RemoveFromStandardListNoLock(socket);
247	}
248}
249
250
251/**
252 * Make the thread exit. This method will not return until the thread has stopped
253 * looping. This guarantees that the thread will not access the CEMSockets after this
254 * call has exited.
255 */
256void UploadBandwidthThrottler::EndThread()
257{
258	if (m_doRun) {	// do it only once
259		{
260			wxMutexLocker lock(m_sendLocker);
261
262			// signal the thread to stop looping and exit.
263			m_doRun = false;
264		}
265
266		Wait();
267	}
268}
269
270
271/**
272 * The thread method that handles calling send for the individual sockets.
273 *
274 * Control packets will always be tried to be sent first. If there is any bandwidth leftover
275 * after that, send() for the upload slot sockets will be called in priority order until we have run
276 * out of available bandwidth for this loop. Upload slots will not be allowed to go without having sent
277 * called for more than a defined amount of time (i.e. two seconds).
278 *
279 * @return always returns 0.
280 */
281void* UploadBandwidthThrottler::Entry()
282{
283	const uint32 TIME_BETWEEN_UPLOAD_LOOPS = 1;
284
285	uint32 lastLoopTick = GetTickCountFullRes();
286	// Bytes to spend in current cycle. If we spend more this becomes negative and causes a wait next time.
287	sint32 bytesToSpend = 0;
288	uint32 allowedDataRate = 0;
289	uint32 rememberedSlotCounter = 0;
290	uint32 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
291
292	while (m_doRun && !TestDestroy()) {
293		uint32 timeSinceLastLoop = GetTickCountFullRes() - lastLoopTick;
294
295		// Calculate data rate
296		if (thePrefs::GetMaxUpload() == UNLIMITED) {
297			// Try to increase the upload rate from UploadSpeedSense
298			allowedDataRate = (uint32)theStats::GetUploadRate() + 5 * 1024;
299		} else {
300			allowedDataRate = thePrefs::GetMaxUpload() * 1024;
301		}
302
303		uint32 minFragSize = 1300;
304		uint32 doubleSendSize = minFragSize*2; // send two packages at a time so they can share an ACK
305		if (allowedDataRate < 6*1024) {
306			minFragSize = 536;
307			doubleSendSize = minFragSize; // don't send two packages at a time at very low speeds to give them a smoother load
308		}
309
310
311		uint32 sleepTime;
312		if (bytesToSpend < 1) {
313			// We have sent more than allowed in last cycle so we have to wait now
314			// until we can send at least 1 byte.
315			sleepTime = std::max((-bytesToSpend + 1) * 1000 / allowedDataRate + 2, // add 2 ms to allow for rounding inaccuracies
316									extraSleepTime);
317		} else {
318			// We could send at once, but sleep a while to not suck up all cpu
319			sleepTime = extraSleepTime;
320		}
321
322		if (timeSinceLastLoop < sleepTime) {
323			Sleep(sleepTime-timeSinceLastLoop);
324		}
325
326		// Check after sleep in case the thread has been signaled to end
327		if (!m_doRun || TestDestroy()) {
328			break;
329		}
330
331		const uint32 thisLoopTick = GetTickCountFullRes();
332		timeSinceLastLoop = thisLoopTick - lastLoopTick;
333		lastLoopTick = thisLoopTick;
334
335		if (timeSinceLastLoop > sleepTime + 2000) {
336			AddDebugLogLineN(logGeneral, CFormat(wxT("UploadBandwidthThrottler: Time since last loop too long. time: %ims wanted: %ims Max: %ims"))
337				% timeSinceLastLoop % sleepTime % (sleepTime + 2000));
338
339			timeSinceLastLoop = sleepTime + 2000;
340		}
341
342		// Calculate how many bytes we can spend
343
344		bytesToSpend += (sint32) (allowedDataRate / 1000.0 * timeSinceLastLoop);
345
346		if (bytesToSpend >= 1) {
347			sint32 spentBytes = 0;
348			sint32 spentOverhead = 0;
349
350			wxMutexLocker sendLock(m_sendLocker);
351
352			{
353				wxMutexLocker queueLock(m_tempQueueLocker);
354
355				// are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list;
356				m_ControlQueueFirst_list.insert(	m_ControlQueueFirst_list.end(),
357													m_TempControlQueueFirst_list.begin(),
358													m_TempControlQueueFirst_list.end() );
359
360				m_ControlQueue_list.insert( m_ControlQueue_list.end(),
361											m_TempControlQueue_list.begin(),
362											m_TempControlQueue_list.end() );
363
364				m_TempControlQueue_list.clear();
365				m_TempControlQueueFirst_list.clear();
366			}
367
368			// Send any queued up control packets first
369			while (spentBytes < bytesToSpend && (!m_ControlQueueFirst_list.empty() || !m_ControlQueue_list.empty())) {
370				ThrottledControlSocket* socket = NULL;
371
372				if (!m_ControlQueueFirst_list.empty()) {
373					socket = m_ControlQueueFirst_list.front();
374					m_ControlQueueFirst_list.pop_front();
375				} else if (!m_ControlQueue_list.empty()) {
376					socket = m_ControlQueue_list.front();
377					m_ControlQueue_list.pop_front();
378				}
379
380				if (socket != NULL) {
381					SocketSentBytes socketSentBytes = socket->SendControlData(bytesToSpend-spentBytes, minFragSize);
382					spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
383					spentOverhead += socketSentBytes.sentBytesControlPackets;
384				}
385			}
386
387			// Check if any sockets haven't gotten data for a long time. Then trickle them a package.
388			uint32 slots = m_StandardOrder_list.size();
389			for (uint32 slotCounter = 0; slotCounter < slots; slotCounter++) {
390				ThrottledFileSocket* socket = m_StandardOrder_list[ slotCounter ];
391
392				if (socket != NULL) {
393					if (thisLoopTick-socket->GetLastCalledSend() > SEC2MS(1)) {
394						// trickle
395						uint32 neededBytes = socket->GetNeededBytes();
396
397						if (neededBytes > 0) {
398							SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);
399							spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
400							spentOverhead += socketSentBytes.sentBytesControlPackets;
401						}
402					}
403				} else {
404					AddDebugLogLineN(logGeneral, CFormat( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (trickle)! Prevented usage. Index: %i Size: %i"))
405						% slotCounter % m_StandardOrder_list.size());
406				}
407			}
408
409			// Give available bandwidth to slots, starting with the one we ended with last time.
410			// There are two passes. First pass gives packets of doubleSendSize, second pass
411			// gives as much as possible.
412			// Second pass starts with the last slot of the first pass actually.
413			for (uint32 slotCounter = 0; (slotCounter < slots * 2) && spentBytes < bytesToSpend; slotCounter++) {
414				if (rememberedSlotCounter >= slots) {	// wrap around pointer
415					rememberedSlotCounter = 0;
416				}
417
418				uint32 data = (slotCounter < slots - 1)	? doubleSendSize				// pass 1
419															: (bytesToSpend - spentBytes);	// pass 2
420
421				ThrottledFileSocket* socket = m_StandardOrder_list[ rememberedSlotCounter ];
422
423				if (socket != NULL) {
424					SocketSentBytes socketSentBytes = socket->SendFileAndControlData(data, doubleSendSize);
425					spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
426					spentOverhead += socketSentBytes.sentBytesControlPackets;
427				} else {
428					AddDebugLogLineN(logGeneral, CFormat(wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (equal-for-all)! Prevented usage. Index: %i Size: %i"))
429						% rememberedSlotCounter % m_StandardOrder_list.size());
430				}
431
432				rememberedSlotCounter++;
433			}
434
435			// Do some limiting of what we keep for the next loop.
436			bytesToSpend -= spentBytes;
437			sint32 minBytesToSpend = (slots + 1) * minFragSize;
438
439			if (bytesToSpend < - minBytesToSpend) {
440				bytesToSpend = - minBytesToSpend;
441			} else {
442				sint32 bandwidthSavedTolerance = slots * 512 + 1;
443				if (bytesToSpend > bandwidthSavedTolerance) {
444					bytesToSpend = bandwidthSavedTolerance;
445				}
446			}
447
448			m_SentBytesSinceLastCall += spentBytes;
449			m_SentBytesSinceLastCallOverhead += spentOverhead;
450
451			if (spentBytes == 0) {	// spentBytes includes the overhead
452				extraSleepTime = std::min<uint32>(extraSleepTime * 5, 1000); // 1s at most
453			} else {
454				extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
455			}
456		}
457	}
458
459	{
460		wxMutexLocker queueLock(m_tempQueueLocker);
461		m_TempControlQueue_list.clear();
462		m_TempControlQueueFirst_list.clear();
463	}
464
465	wxMutexLocker sendLock(m_sendLocker);
466	m_ControlQueue_list.clear();
467	m_StandardOrder_list.clear();
468
469	return 0;
470}
471// File_checked_for_headers
472