1/* 2 * Copyright 2012, Rene Gollent, rene@gollent.com. 3 * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de. 4 * Distributed under the terms of the MIT License. 5 */ 6 7#include "Worker.h" 8 9#include <AutoDeleter.h> 10#include <AutoLocker.h> 11 12 13// pragma mark - JobKey 14 15 16JobKey::~JobKey() 17{ 18} 19 20 21// pragma mark - SimpleJobKey 22 23 24SimpleJobKey::SimpleJobKey(void* object, uint32 type) 25 : 26 object(object), 27 type(type) 28{ 29} 30 31 32SimpleJobKey::SimpleJobKey(const SimpleJobKey& other) 33 : 34 object(other.object), 35 type(other.type) 36{ 37} 38 39 40size_t 41SimpleJobKey::HashValue() const 42{ 43 return (size_t)(addr_t)object ^ (size_t)type; 44} 45 46 47bool 48SimpleJobKey::operator==(const JobKey& other) const 49{ 50 const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other); 51 return otherKey != NULL && object == otherKey->object 52 && type == otherKey->type; 53} 54 55 56SimpleJobKey& 57SimpleJobKey::operator=(const SimpleJobKey& other) 58{ 59 object = other.object; 60 type = other.type; 61 return *this; 62} 63 64 65// #pragma mark - JobListener 66 67 68JobListener::~JobListener() 69{ 70} 71 72 73void 74JobListener::JobDone(Job* job) 75{ 76} 77 78 79void 80JobListener::JobFailed(Job* job) 81{ 82} 83 84 85void 86JobListener::JobAborted(Job* job) 87{ 88} 89 90 91// #pragma mark - Job 92 93 94Job::Job() 95 : 96 fWorker(NULL), 97 fState(JOB_STATE_UNSCHEDULED), 98 fDependency(NULL), 99 fWaitStatus(JOB_DEPENDENCY_NOT_FOUND), 100 fListeners(10) 101{ 102} 103 104 105Job::~Job() 106{ 107} 108 109 110job_wait_status 111Job::WaitFor(const JobKey& key) 112{ 113 return fWorker->WaitForJob(this, key); 114} 115 116 117void 118Job::SetWorker(Worker* worker) 119{ 120 fWorker = worker; 121} 122 123 124void 125Job::SetState(job_state state) 126{ 127 fState = state; 128} 129 130 131void 132Job::SetDependency(Job* job) 133{ 134 fDependency = job; 135} 136 137 138void 139Job::SetWaitStatus(job_wait_status status) 140{ 141 fWaitStatus = status; 142 switch (fWaitStatus) { 143 case JOB_DEPENDENCY_ACTIVE: 144 fState = JOB_STATE_WAITING; 145 break; 146 default: 147 fState = JOB_STATE_ACTIVE; 148 break; 149 } 150} 151 152 153status_t 154Job::AddListener(JobListener* listener) 155{ 156 return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY; 157} 158 159 160void 161Job::RemoveListener(JobListener* listener) 162{ 163 fListeners.RemoveItem(listener); 164} 165 166 167void 168Job::NotifyListeners() 169{ 170 int32 count = fListeners.CountItems(); 171 for (int32 i = count - 1; i >= 0; i--) { 172 JobListener* listener = fListeners.ItemAt(i); 173 switch (fState) { 174 case JOB_STATE_SUCCEEDED: 175 listener->JobDone(this); 176 break; 177 case JOB_STATE_FAILED: 178 listener->JobFailed(this); 179 break; 180 case JOB_STATE_ABORTED: 181 default: 182 listener->JobAborted(this); 183 break; 184 } 185 } 186} 187 188 189// #pragma mark - Worker 190 191 192Worker::Worker() 193 : 194 fLock("worker"), 195 fWorkerThread(-1), 196 fTerminating(false) 197{ 198} 199 200 201Worker::~Worker() 202{ 203 ShutDown(); 204 205 if (fWorkerThread >= 0) 206 wait_for_thread(fWorkerThread, NULL); 207} 208 209 210status_t 211Worker::Init() 212{ 213 // check lock 214 status_t error = fLock.InitCheck(); 215 if (error != B_OK) 216 return error; 217 218 // init jobs table 219 error = fJobs.Init(); 220 if (error != B_OK) 221 return error; 222 223 // create semaphore for the worker 224 fWorkToDoSem = create_sem(0, "work to do"); 225 if (fWorkToDoSem < 0) 226 return fWorkToDoSem; 227 228 // spawn worker thread 229 fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY, 230 this); 231 if (fWorkerThread < 0) 232 return fWorkerThread; 233 234 resume_thread(fWorkerThread); 235 236 return B_OK; 237} 238 239 240void 241Worker::ShutDown() 242{ 243 AutoLocker<Worker> locker(this); 244 245 if (fTerminating) 246 return; 247 248 fTerminating = true; 249 250 // abort all jobs 251 Job* job = fJobs.Clear(true); 252 while (job != NULL) { 253 Job* nextJob = job->fNext; 254 _AbortJob(job, false); 255 job = nextJob; 256 257 } 258 259 // let the work thread terminate 260 delete_sem(fWorkToDoSem); 261 fWorkToDoSem = -1; 262} 263 264 265status_t 266Worker::ScheduleJob(Job* job, JobListener* listener) 267{ 268 if (job == NULL) 269 return B_NO_MEMORY; 270 271 BReference<Job> jobReference(job, true); 272 AutoLocker<Worker> locker(this); 273 274 if (fTerminating) 275 return B_ERROR; 276 277 if (listener != NULL) { 278 status_t error = job->AddListener(listener); 279 if (error != B_OK) 280 return error; 281 } 282 283 bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty(); 284 285 job->SetWorker(this); 286 job->SetState(JOB_STATE_UNSCHEDULED); 287 fJobs.Insert(job); 288 fUnscheduledJobs.Add(jobReference.Detach()); 289 290 if (notify) 291 release_sem(fWorkToDoSem); 292 293 return B_OK; 294} 295 296 297void 298Worker::AbortJob(const JobKey& key) 299{ 300 AutoLocker<Worker> locker(this); 301 302 Job* job = fJobs.Lookup(key); 303 if (job == NULL) 304 return; 305 306 _AbortJob(job, true); 307} 308 309 310Job* 311Worker::GetJob(const JobKey& key) 312{ 313 AutoLocker<Worker> locker(this); 314 return fJobs.Lookup(key); 315} 316 317 318status_t 319Worker::AddListener(const JobKey& key, JobListener* listener) 320{ 321 AutoLocker<Worker> locker(this); 322 323 Job* job = fJobs.Lookup(key); 324 if (job == NULL) 325 return B_ENTRY_NOT_FOUND; 326 327 return job->AddListener(listener); 328} 329 330 331void 332Worker::RemoveListener(const JobKey& key, JobListener* listener) 333{ 334 AutoLocker<Worker> locker(this); 335 336 if (Job* job = fJobs.Lookup(key)) 337 job->RemoveListener(listener); 338} 339 340 341job_wait_status 342Worker::WaitForJob(Job* waitingJob, const JobKey& key) 343{ 344 AutoLocker<Worker> locker(this); 345 346 // don't wait when the game is over anyway 347 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED) 348 return JOB_DEPENDENCY_ABORTED; 349 350 Job* job = fJobs.Lookup(key); 351 if (job == NULL) 352 return JOB_DEPENDENCY_NOT_FOUND; 353 354 waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE); 355 waitingJob->SetDependency(job); 356 job->DependentJobs().Add(waitingJob); 357 358 return waitingJob->WaitStatus(); 359} 360 361 362/*static*/ status_t 363Worker::_WorkerLoopEntry(void* data) 364{ 365 return ((Worker*)data)->_WorkerLoop(); 366} 367 368 369status_t 370Worker::_WorkerLoop() 371{ 372 _ProcessJobs(); 373 374 // clean up aborted jobs 375 AutoLocker<Worker> locker(this); 376 while (Job* job = fAbortedJobs.RemoveHead()) 377 _FinishJob(job); 378 379 return B_OK; 380} 381 382 383void 384Worker::_ProcessJobs() 385{ 386 while (true) { 387 AutoLocker<Worker> locker(this); 388 389 // wait for next job 390 if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) { 391 locker.Unlock(); 392 393 status_t error = acquire_sem(fWorkToDoSem); 394 if (error != B_OK) { 395 if (error == B_INTERRUPTED) { 396 locker.Lock(); 397 continue; 398 } 399 break; 400 } 401 402 locker.Lock(); 403 } 404 405 // clean up aborted jobs 406 while (Job* job = fAbortedJobs.RemoveHead()) 407 _FinishJob(job); 408 409 // process the next job 410 if (Job* job = fUnscheduledJobs.RemoveHead()) { 411 job->SetState(JOB_STATE_ACTIVE); 412 413 locker.Unlock(); 414 status_t error = job->Do(); 415 locker.Lock(); 416 417 if (job->State() == JOB_STATE_ACTIVE) { 418 job->SetState( 419 error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED); 420 } else if (job->State() == JOB_STATE_WAITING) 421 continue; 422 423 _FinishJob(job); 424 } 425 } 426} 427 428 429void 430Worker::_AbortJob(Job* job, bool removeFromTable) 431{ 432 switch (job->State()) { 433 case JOB_STATE_ABORTED: 434 return; 435 436 case JOB_STATE_UNSCHEDULED: 437 fUnscheduledJobs.Remove(job); 438 fAbortedJobs.Add(job); 439 break; 440 441 case JOB_STATE_WAITING: 442 job->Dependency()->DependentJobs().Remove(job); 443 job->SetDependency(NULL); 444 break; 445 446 case JOB_STATE_ACTIVE: 447 case JOB_STATE_FAILED: 448 case JOB_STATE_SUCCEEDED: 449 default: 450 break; 451 } 452 453 job->SetState(JOB_STATE_ABORTED); 454 if (removeFromTable) 455 fJobs.Remove(job); 456} 457 458 459void 460Worker::_FinishJob(Job* job) 461{ 462 // wake up dependent jobs 463 if (!job->DependentJobs().IsEmpty()) { 464 job_wait_status waitStatus; 465 switch (job->State()) { 466 case JOB_STATE_ABORTED: 467 waitStatus = JOB_DEPENDENCY_ABORTED; 468 break; 469 case JOB_STATE_FAILED: 470 waitStatus = JOB_DEPENDENCY_FAILED; 471 break; 472 case JOB_STATE_SUCCEEDED: 473 waitStatus = JOB_DEPENDENCY_SUCCEEDED; 474 break; 475 476 case JOB_STATE_UNSCHEDULED: 477 case JOB_STATE_WAITING: 478 case JOB_STATE_ACTIVE: 479 default: 480 // should never happen 481 waitStatus = JOB_DEPENDENCY_NOT_FOUND; 482 break; 483 } 484 485 while (Job* dependentJob = job->DependentJobs().RemoveHead()) { 486 dependentJob->SetDependency(NULL); 487 dependentJob->SetWaitStatus(waitStatus); 488 fUnscheduledJobs.Add(dependentJob); 489 } 490 491 release_sem(fWorkToDoSem); 492 } 493 494 if (job->State() != JOB_STATE_ABORTED) 495 fJobs.Remove(job); 496 job->NotifyListeners(); 497 job->ReleaseReference(); 498} 499