1// 2// This file is part of the aMule Project. 3// 4// Copyright (c) 2006-2011 Mikkel Schubert ( xaignar@amule.org / http://www.amule.org ) 5// 6// Any parts of this program derived from the xMule, lMule or eMule project, 7// or contributed by third-party developers are copyrighted by their 8// respective authors. 9// 10// This program is free software; you can redistribute it and/or modify 11// it under the terms of the GNU General Public License as published by 12// the Free Software Foundation; either version 2 of the License, or 13// (at your option) any later version. 14// 15// This program is distributed in the hope that it will be useful, 16// but WITHOUT ANY WARRANTY; without even the implied warranty of 17// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18// GNU General Public License for more details. 19// 20// You should have received a copy of the GNU General Public License 21// along with this program; if not, write to the Free Software 22// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 23// 24 25#include "ThreadScheduler.h" // Interface declarations 26#include "Logger.h" // Needed for Add(Debug)LogLine{C,N} 27#include <common/Format.h> // Needed for CFormat 28#include "ScopedPtr.h" // Needed for CScopedPtr 29 30#include <algorithm> // Needed for std::sort // Do_not_auto_remove (mingw-gcc-3.4.5) 31 32//! Global lock the scheduler and its thread. 33static wxMutex s_lock; 34//! Pointer to the global scheduler instance (automatically instantiated). 35static CThreadScheduler* s_scheduler = NULL; 36//! Specifies if the scheduler is running. 37static bool s_running = false; 38//! Specifies if the gobal scheduler has been terminated. 39static bool s_terminated = false; 40 41/** 42 * This class is used in a custom implementation of wxThreadHelper. 43 * 44 * The reason for not using wxThreadHelper are as follows: 45 * - wxThreadHelper makes use of wxThread:Kill, which is warned against 46 * serveral times in the docs, and even calls it in its destructor. 47 * - Managing the thread-object is difficult, since the only way to 48 * destroy it is to create a new thread. 49 */ 50class CTaskThread : public CMuleThread 51{ 52public: 53 CTaskThread(CThreadScheduler* owner) 54 : CMuleThread(wxTHREAD_JOINABLE), 55 m_owner(owner) 56 { 57 } 58 59 //! For simplicity's sake, all code is placed in CThreadScheduler::Entry 60 void* Entry() { 61 return m_owner->Entry(); 62 } 63 64private: 65 //! The scheduler owning this thread. 66 CThreadScheduler* m_owner; 67}; 68 69 70void CThreadScheduler::Start() 71{ 72 wxMutexLocker lock(s_lock); 73 74 s_running = true; 75 s_terminated = false; 76 77 // Ensures that a thread is started if tasks are already waiting. 78 if (s_scheduler) { 79 AddDebugLogLineN(logThreads, wxT("Starting scheduler")); 80 s_scheduler->CreateSchedulerThread(); 81 } 82} 83 84 85void CThreadScheduler::Terminate() 86{ 87 AddDebugLogLineN(logThreads, wxT("Terminating scheduler")); 88 CThreadScheduler* ptr = NULL; 89 90 { 91 wxMutexLocker lock(s_lock); 92 93 // Safely unlink the scheduler, as to avoid race-conditions. 94 ptr = s_scheduler; 95 s_running = false; 96 s_terminated = true; 97 s_scheduler = NULL; 98 } 99 100 delete ptr; 101 AddDebugLogLineN(logThreads, wxT("Scheduler terminated")); 102} 103 104 105bool CThreadScheduler::AddTask(CThreadTask* task, bool overwrite) 106{ 107 wxMutexLocker lock(s_lock); 108 109 // When terminated (on shutdown), all tasks are ignored. 110 if (s_terminated) { 111 AddDebugLogLineN(logThreads, wxT("Task discarded: ") + task->GetDesc()); 112 delete task; 113 return false; 114 } else if (s_scheduler == NULL) { 115 s_scheduler = new CThreadScheduler(); 116 AddDebugLogLineN(logThreads, wxT("Scheduler created.")); 117 } 118 119 return s_scheduler->DoAddTask(task, overwrite); 120} 121 122 123/** Returns string representation of error code. */ 124wxString GetErrMsg(wxThreadError err) 125{ 126 switch (err) { 127 case wxTHREAD_NO_ERROR: return wxT("wxTHREAD_NO_ERROR"); 128 case wxTHREAD_NO_RESOURCE: return wxT("wxTHREAD_NO_RESOURCE"); 129 case wxTHREAD_RUNNING: return wxT("wxTHREAD_RUNNING"); 130 case wxTHREAD_NOT_RUNNING: return wxT("wxTHREAD_NOT_RUNNING"); 131 case wxTHREAD_KILLED: return wxT("wxTHREAD_KILLED"); 132 case wxTHREAD_MISC_ERROR: return wxT("wxTHREAD_MISC_ERROR"); 133 default: 134 return wxT("Unknown error"); 135 } 136} 137 138 139void CThreadScheduler::CreateSchedulerThread() 140{ 141 if ((m_thread && m_thread->IsAlive()) || m_tasks.empty()) { 142 return; 143 } 144 145 // A thread can only be run once, so the old one must be safely disposed of 146 if (m_thread) { 147 AddDebugLogLineN(logThreads, wxT("CreateSchedulerThread: Disposing of old thread.")); 148 m_thread->Stop(); 149 delete m_thread; 150 } 151 152 m_thread = new CTaskThread(this); 153 154 wxThreadError err = m_thread->Create(); 155 if (err == wxTHREAD_NO_ERROR) { 156 // Try to avoid reducing the latency of the main thread 157 m_thread->SetPriority(WXTHREAD_MIN_PRIORITY); 158 159 err = m_thread->Run(); 160 if (err == wxTHREAD_NO_ERROR) { 161 AddDebugLogLineN(logThreads, wxT("Scheduler thread started")); 162 return; 163 } else { 164 AddDebugLogLineC(logThreads, wxT("Error while starting scheduler thread: ") + GetErrMsg(err)); 165 } 166 } else { 167 AddDebugLogLineC(logThreads, wxT("Error while creating scheduler thread: ") + GetErrMsg(err)); 168 } 169 170 // Creation or running failed. 171 m_thread->Stop(); 172 delete m_thread; 173 m_thread = NULL; 174} 175 176 177/** This is the sorter functor for the task-queue. */ 178struct CTaskSorter 179{ 180 bool operator()(const CThreadScheduler::CEntryPair& a, const CThreadScheduler::CEntryPair& b) { 181 if (a.first->GetPriority() != b.first->GetPriority()) { 182 return a.first->GetPriority() > b.first->GetPriority(); 183 } 184 185 // Compare tasks numbers. 186 return a.second < b.second; 187 } 188}; 189 190 191 192CThreadScheduler::CThreadScheduler() 193 : m_tasksDirty(false), 194 m_thread(NULL), 195 m_currentTask(NULL) 196{ 197 198} 199 200 201CThreadScheduler::~CThreadScheduler() 202{ 203 if (m_thread) { 204 m_thread->Stop(); 205 delete m_thread; 206 } 207} 208 209 210size_t CThreadScheduler::GetTaskCount() const 211{ 212 wxMutexLocker lock(s_lock); 213 214 return m_tasks.size(); 215} 216 217 218bool CThreadScheduler::DoAddTask(CThreadTask* task, bool overwrite) 219{ 220 // GetTick is too lowres, so we just use a counter to ensure that 221 // the sorted order will match the order in which the tasks were added. 222 static unsigned taskAge = 0; 223 224 // Get the map for this task type, implicitly creating it as needed. 225 CDescMap& map = m_taskDescs[task->GetType()]; 226 227 CDescMap::value_type entry(task->GetDesc(), task); 228 if (map.insert(entry).second) { 229 AddDebugLogLineN(logThreads, wxT("Task scheduled: ") + task->GetType() + wxT(" - ") + task->GetDesc()); 230 m_tasks.push_back(CEntryPair(task, taskAge++)); 231 m_tasksDirty = true; 232 } else if (overwrite) { 233 AddDebugLogLineN(logThreads, wxT("Task overwritten: ") + task->GetType() + wxT(" - ") + task->GetDesc()); 234 235 CThreadTask* existingTask = map[task->GetDesc()]; 236 if (m_currentTask == existingTask) { 237 // The duplicate is already being executed, abort it. 238 m_currentTask->m_abort = true; 239 } else { 240 // Task not yet started, simply remove and delete. 241 wxCHECK2(map.erase(existingTask->GetDesc()), /* Do nothing. */); 242 delete existingTask; 243 } 244 245 m_tasks.push_back(CEntryPair(task, taskAge++)); 246 map[task->GetDesc()] = task; 247 m_tasksDirty = true; 248 } else { 249 AddDebugLogLineN(logThreads, wxT("Duplicate task, discarding: ") + task->GetType() + wxT(" - ") + task->GetDesc()); 250 delete task; 251 return false; 252 } 253 254 if (s_running) { 255 CreateSchedulerThread(); 256 } 257 258 return true; 259} 260 261 262void* CThreadScheduler::Entry() 263{ 264 AddDebugLogLineN(logThreads, wxT("Entering scheduling loop")); 265 266 while (!m_thread->TestDestroy()) { 267 CScopedPtr<CThreadTask> task(NULL); 268 269 { 270 wxMutexLocker lock(s_lock); 271 272 // Resort tasks by priority/age if list has been modified. 273 if (m_tasksDirty) { 274 AddDebugLogLineN(logThreads, wxT("Resorting tasks")); 275 std::sort(m_tasks.begin(), m_tasks.end(), CTaskSorter()); 276 m_tasksDirty = false; 277 } else if (m_tasks.empty()) { 278 AddDebugLogLineN(logThreads, wxT("No more tasks, stopping")); 279 break; 280 } 281 282 // Select the next task 283 task.reset(m_tasks.front().first); 284 m_tasks.pop_front(); 285 m_currentTask = task.get(); 286 } 287 288 AddDebugLogLineN(logThreads, wxT("Current task: ") + task->GetType() + wxT(" - ") + task->GetDesc()); 289 // Execute the task 290 task->m_owner = m_thread; 291 task->Entry(); 292 task->OnExit(); 293 294 // Check if this was the last task of this type 295 bool isLastTask = false; 296 297 { 298 wxMutexLocker lock(s_lock); 299 300 // If the task has been aborted, the entry now refers to 301 // a different task, so dont remove it. That also means 302 // that it cant be the last task of this type. 303 if (!task->m_abort) { 304 AddDebugLogLineN(logThreads, 305 CFormat(wxT("Completed task '%s%s', %u tasks remaining.")) 306 % task->GetType() 307 % (task->GetDesc().IsEmpty() ? wxString() : (wxT(" - ") + task->GetDesc())) 308 % m_tasks.size() ); 309 310 CDescMap& map = m_taskDescs[task->GetType()]; 311 if (!map.erase(task->GetDesc())) { 312 wxFAIL; 313 } else if (map.empty()) { 314 m_taskDescs.erase(task->GetType()); 315 isLastTask = true; 316 } 317 } 318 319 m_currentTask = NULL; 320 } 321 322 if (isLastTask) { 323 // Allow the task to signal that all sub-tasks have been completed 324 AddDebugLogLineN(logThreads, wxT("Last task, calling OnLastTask")); 325 task->OnLastTask(); 326 } 327 } 328 329 AddDebugLogLineN(logThreads, wxT("Leaving scheduling loop")); 330 331 return 0; 332} 333 334 335 336CThreadTask::CThreadTask(const wxString& type, const wxString& desc, ETaskPriority priority) 337 : m_type(type), 338 m_desc(desc), 339 m_priority(priority), 340 m_owner(NULL), 341 m_abort(false) 342{ 343} 344 345 346CThreadTask::~CThreadTask() 347{ 348} 349 350 351void CThreadTask::OnLastTask() 352{ 353 // Does nothing by default. 354} 355 356 357void CThreadTask::OnExit() 358{ 359 // Does nothing by default. 360} 361 362 363bool CThreadTask::TestDestroy() const 364{ 365 wxCHECK(m_owner, m_abort); 366 367 return m_abort || m_owner->TestDestroy(); 368} 369 370 371const wxString& CThreadTask::GetType() const 372{ 373 return m_type; 374} 375 376 377const wxString& CThreadTask::GetDesc() const 378{ 379 return m_desc; 380} 381 382 383ETaskPriority CThreadTask::GetPriority() const 384{ 385 return m_priority; 386} 387 388 389// File_checked_for_headers 390