1//
2// This file is part of the aMule Project.
3//
4// Copyright (c) 2006-2011 Mikkel Schubert ( xaignar@amule.org / http://www.amule.org )
5//
6// Any parts of this program derived from the xMule, lMule or eMule project,
7// or contributed by third-party developers are copyrighted by their
8// respective authors.
9//
10// This program is free software; you can redistribute it and/or modify
11// it under the terms of the GNU General Public License as published by
12// the Free Software Foundation; either version 2 of the License, or
13// (at your option) any later version.
14//
15// This program is distributed in the hope that it will be useful,
16// but WITHOUT ANY WARRANTY; without even the implied warranty of
17// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18// GNU General Public License for more details.
19//
20// You should have received a copy of the GNU General Public License
21// along with this program; if not, write to the Free Software
22// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA
23//
24
25#include "ThreadScheduler.h"	// Interface declarations
26#include "Logger.h"				// Needed for Add(Debug)LogLine{C,N}
27#include <common/Format.h>		// Needed for CFormat
28#include "ScopedPtr.h"			// Needed for CScopedPtr
29
30#include <algorithm>			// Needed for std::sort		// Do_not_auto_remove (mingw-gcc-3.4.5)
31
32//! Global lock the scheduler and its thread.
33static wxMutex s_lock;
34//! Pointer to the global scheduler instance (automatically instantiated).
35static CThreadScheduler* s_scheduler = NULL;
36//! Specifies if the scheduler is running.
37static bool	s_running = false;
38//! Specifies if the gobal scheduler has been terminated.
39static bool s_terminated = false;
40
41/**
42 * This class is used in a custom implementation of wxThreadHelper.
43 *
44 * The reason for not using wxThreadHelper are as follows:
45 *  - wxThreadHelper makes use of wxThread:Kill, which is warned against
46 *    serveral times in the docs, and even calls it in its destructor.
47 *  - Managing the thread-object is difficult, since the only way to
48 *    destroy it is to create a new thread.
49 */
50class CTaskThread : public CMuleThread
51{
52public:
53	CTaskThread(CThreadScheduler* owner)
54		: CMuleThread(wxTHREAD_JOINABLE),
55		  m_owner(owner)
56	{
57	}
58
59	//! For simplicity's sake, all code is placed in CThreadScheduler::Entry
60	void* Entry() {
61		return m_owner->Entry();
62	}
63
64private:
65	//! The scheduler owning this thread.
66	CThreadScheduler* m_owner;
67};
68
69
70void CThreadScheduler::Start()
71{
72	wxMutexLocker lock(s_lock);
73
74	s_running = true;
75	s_terminated = false;
76
77	// Ensures that a thread is started if tasks are already waiting.
78	if (s_scheduler) {
79		AddDebugLogLineN(logThreads, wxT("Starting scheduler"));
80		s_scheduler->CreateSchedulerThread();
81	}
82}
83
84
85void CThreadScheduler::Terminate()
86{
87	AddDebugLogLineN(logThreads, wxT("Terminating scheduler"));
88	CThreadScheduler* ptr = NULL;
89
90	{
91		wxMutexLocker lock(s_lock);
92
93		// Safely unlink the scheduler, as to avoid race-conditions.
94		ptr = s_scheduler;
95		s_running = false;
96		s_terminated = true;
97		s_scheduler = NULL;
98	}
99
100	delete ptr;
101	AddDebugLogLineN(logThreads, wxT("Scheduler terminated"));
102}
103
104
105bool CThreadScheduler::AddTask(CThreadTask* task, bool overwrite)
106{
107	wxMutexLocker lock(s_lock);
108
109	// When terminated (on shutdown), all tasks are ignored.
110	if (s_terminated) {
111		AddDebugLogLineN(logThreads, wxT("Task discarded: ") + task->GetDesc());
112		delete task;
113		return false;
114	} else if (s_scheduler == NULL) {
115		s_scheduler = new CThreadScheduler();
116		AddDebugLogLineN(logThreads, wxT("Scheduler created."));
117	}
118
119	return s_scheduler->DoAddTask(task, overwrite);
120}
121
122
123/** Returns string representation of error code. */
124wxString GetErrMsg(wxThreadError err)
125{
126	switch (err) {
127		case wxTHREAD_NO_ERROR:		return wxT("wxTHREAD_NO_ERROR");
128		case wxTHREAD_NO_RESOURCE:	return wxT("wxTHREAD_NO_RESOURCE");
129		case wxTHREAD_RUNNING:		return wxT("wxTHREAD_RUNNING");
130		case wxTHREAD_NOT_RUNNING:	return wxT("wxTHREAD_NOT_RUNNING");
131		case wxTHREAD_KILLED:		return wxT("wxTHREAD_KILLED");
132		case wxTHREAD_MISC_ERROR:	return wxT("wxTHREAD_MISC_ERROR");
133		default:
134			return wxT("Unknown error");
135	}
136}
137
138
139void CThreadScheduler::CreateSchedulerThread()
140{
141	if ((m_thread && m_thread->IsAlive()) || m_tasks.empty()) {
142		return;
143	}
144
145	// A thread can only be run once, so the old one must be safely disposed of
146	if (m_thread) {
147		AddDebugLogLineN(logThreads, wxT("CreateSchedulerThread: Disposing of old thread."));
148		m_thread->Stop();
149		delete m_thread;
150	}
151
152	m_thread = new CTaskThread(this);
153
154	wxThreadError err = m_thread->Create();
155	if (err == wxTHREAD_NO_ERROR) {
156		// Try to avoid reducing the latency of the main thread
157		m_thread->SetPriority(WXTHREAD_MIN_PRIORITY);
158
159		err = m_thread->Run();
160		if (err == wxTHREAD_NO_ERROR) {
161			AddDebugLogLineN(logThreads, wxT("Scheduler thread started"));
162			return;
163		} else {
164			AddDebugLogLineC(logThreads, wxT("Error while starting scheduler thread: ") + GetErrMsg(err));
165		}
166	} else {
167		AddDebugLogLineC(logThreads, wxT("Error while creating scheduler thread: ") + GetErrMsg(err));
168	}
169
170	// Creation or running failed.
171	m_thread->Stop();
172	delete m_thread;
173	m_thread = NULL;
174}
175
176
177/** This is the sorter functor for the task-queue. */
178struct CTaskSorter
179{
180	bool operator()(const CThreadScheduler::CEntryPair& a, const CThreadScheduler::CEntryPair& b) {
181		if (a.first->GetPriority() != b.first->GetPriority()) {
182			return a.first->GetPriority() > b.first->GetPriority();
183		}
184
185		// Compare tasks numbers.
186		return a.second < b.second;
187	}
188};
189
190
191
192CThreadScheduler::CThreadScheduler()
193	: m_tasksDirty(false),
194	  m_thread(NULL),
195	  m_currentTask(NULL)
196{
197
198}
199
200
201CThreadScheduler::~CThreadScheduler()
202{
203	if (m_thread) {
204		m_thread->Stop();
205		delete m_thread;
206	}
207}
208
209
210size_t CThreadScheduler::GetTaskCount() const
211{
212	wxMutexLocker lock(s_lock);
213
214	return m_tasks.size();
215}
216
217
218bool CThreadScheduler::DoAddTask(CThreadTask* task, bool overwrite)
219{
220	// GetTick is too lowres, so we just use a counter to ensure that
221	// the sorted order will match the order in which the tasks were added.
222	static unsigned taskAge = 0;
223
224	// Get the map for this task type, implicitly creating it as needed.
225	CDescMap& map = m_taskDescs[task->GetType()];
226
227	CDescMap::value_type entry(task->GetDesc(), task);
228	if (map.insert(entry).second) {
229		AddDebugLogLineN(logThreads, wxT("Task scheduled: ") + task->GetType() + wxT(" - ") + task->GetDesc());
230		m_tasks.push_back(CEntryPair(task, taskAge++));
231		m_tasksDirty = true;
232	} else if (overwrite) {
233		AddDebugLogLineN(logThreads, wxT("Task overwritten: ") + task->GetType() + wxT(" - ") + task->GetDesc());
234
235		CThreadTask* existingTask = map[task->GetDesc()];
236		if (m_currentTask == existingTask) {
237			// The duplicate is already being executed, abort it.
238			m_currentTask->m_abort = true;
239		} else {
240			// Task not yet started, simply remove and delete.
241			wxCHECK2(map.erase(existingTask->GetDesc()), /* Do nothing. */);
242			delete existingTask;
243		}
244
245		m_tasks.push_back(CEntryPair(task, taskAge++));
246		map[task->GetDesc()] = task;
247		m_tasksDirty = true;
248	} else {
249		AddDebugLogLineN(logThreads, wxT("Duplicate task, discarding: ") + task->GetType() + wxT(" - ") + task->GetDesc());
250		delete task;
251		return false;
252	}
253
254	if (s_running) {
255		CreateSchedulerThread();
256	}
257
258	return true;
259}
260
261
262void* CThreadScheduler::Entry()
263{
264	AddDebugLogLineN(logThreads, wxT("Entering scheduling loop"));
265
266	while (!m_thread->TestDestroy()) {
267		CScopedPtr<CThreadTask> task(NULL);
268
269		{
270			wxMutexLocker lock(s_lock);
271
272			// Resort tasks by priority/age if list has been modified.
273			if (m_tasksDirty) {
274				AddDebugLogLineN(logThreads, wxT("Resorting tasks"));
275				std::sort(m_tasks.begin(), m_tasks.end(), CTaskSorter());
276				m_tasksDirty = false;
277			} else if (m_tasks.empty()) {
278				AddDebugLogLineN(logThreads, wxT("No more tasks, stopping"));
279				break;
280			}
281
282			// Select the next task
283			task.reset(m_tasks.front().first);
284			m_tasks.pop_front();
285			m_currentTask = task.get();
286		}
287
288		AddDebugLogLineN(logThreads, wxT("Current task: ") + task->GetType() + wxT(" - ") + task->GetDesc());
289		// Execute the task
290		task->m_owner = m_thread;
291		task->Entry();
292		task->OnExit();
293
294		// Check if this was the last task of this type
295		bool isLastTask = false;
296
297		{
298			wxMutexLocker lock(s_lock);
299
300			// If the task has been aborted, the entry now refers to
301			// a different task, so dont remove it. That also means
302			// that it cant be the last task of this type.
303			if (!task->m_abort) {
304				AddDebugLogLineN(logThreads,
305					CFormat(wxT("Completed task '%s%s', %u tasks remaining."))
306						% task->GetType()
307						% (task->GetDesc().IsEmpty() ? wxString() : (wxT(" - ") + task->GetDesc()))
308						% m_tasks.size() );
309
310				CDescMap& map = m_taskDescs[task->GetType()];
311				if (!map.erase(task->GetDesc())) {
312					wxFAIL;
313				} else if (map.empty()) {
314					m_taskDescs.erase(task->GetType());
315					isLastTask = true;
316				}
317			}
318
319			m_currentTask = NULL;
320		}
321
322		if (isLastTask) {
323			// Allow the task to signal that all sub-tasks have been completed
324			AddDebugLogLineN(logThreads, wxT("Last task, calling OnLastTask"));
325			task->OnLastTask();
326		}
327	}
328
329	AddDebugLogLineN(logThreads, wxT("Leaving scheduling loop"));
330
331	return 0;
332}
333
334
335
336CThreadTask::CThreadTask(const wxString& type, const wxString& desc, ETaskPriority priority)
337	: m_type(type),
338	  m_desc(desc),
339	  m_priority(priority),
340	  m_owner(NULL),
341	  m_abort(false)
342{
343}
344
345
346CThreadTask::~CThreadTask()
347{
348}
349
350
351void CThreadTask::OnLastTask()
352{
353	// Does nothing by default.
354}
355
356
357void CThreadTask::OnExit()
358{
359	// Does nothing by default.
360}
361
362
363bool CThreadTask::TestDestroy() const
364{
365	wxCHECK(m_owner, m_abort);
366
367	return m_abort || m_owner->TestDestroy();
368}
369
370
371const wxString& CThreadTask::GetType() const
372{
373	return m_type;
374}
375
376
377const wxString& CThreadTask::GetDesc() const
378{
379	return m_desc;
380}
381
382
383ETaskPriority CThreadTask::GetPriority() const
384{
385	return m_priority;
386}
387
388
389// File_checked_for_headers
390