1/*
2 * Copyright 2012-2014, Rene Gollent, rene@gollent.com.
3 * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de.
4 * Distributed under the terms of the MIT License.
5 */
6
7#include "Worker.h"
8
9#include <AutoDeleter.h>
10#include <AutoLocker.h>
11
12
13// pragma mark - JobKey
14
15
16JobKey::~JobKey()
17{
18}
19
20
21// pragma mark - SimpleJobKey
22
23
24SimpleJobKey::SimpleJobKey(const void* object, uint32 type)
25	:
26	object(object),
27	type(type)
28{
29}
30
31
32SimpleJobKey::SimpleJobKey(const SimpleJobKey& other)
33	:
34	object(other.object),
35	type(other.type)
36{
37}
38
39
40size_t
41SimpleJobKey::HashValue() const
42{
43	return (size_t)(addr_t)object ^ (size_t)type;
44}
45
46
47bool
48SimpleJobKey::operator==(const JobKey& other) const
49{
50	const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other);
51	return otherKey != NULL && object == otherKey->object
52		&& type == otherKey->type;
53}
54
55
56SimpleJobKey&
57SimpleJobKey::operator=(const SimpleJobKey& other)
58{
59	object = other.object;
60	type = other.type;
61	return *this;
62}
63
64
65// #pragma mark - JobListener
66
67
68JobListener::~JobListener()
69{
70}
71
72
73void
74JobListener::JobStarted(Job* job)
75{
76}
77
78
79void
80JobListener::JobDone(Job* job)
81{
82}
83
84
85void
86JobListener::JobWaitingForInput(Job* job)
87{
88}
89
90
91void
92JobListener::JobFailed(Job* job)
93{
94}
95
96
97void
98JobListener::JobAborted(Job* job)
99{
100}
101
102
103// #pragma mark - Job
104
105
106Job::Job()
107	:
108	fWorker(NULL),
109	fState(JOB_STATE_UNSCHEDULED),
110	fDependency(NULL),
111	fWaitStatus(JOB_DEPENDENCY_NOT_FOUND),
112	fListeners(10)
113{
114}
115
116
117Job::~Job()
118{
119}
120
121
122job_wait_status
123Job::WaitFor(const JobKey& key)
124{
125	return fWorker->WaitForJob(this, key);
126}
127
128
129status_t
130Job::WaitForUserInput()
131{
132	return fWorker->WaitForUserInput(this);
133}
134
135
136void
137Job::SetDescription(const char* format, ...)
138{
139	va_list args;
140	va_start(args, format);
141	fDescription.SetToFormatVarArgs(format, args);
142	va_end(args);
143}
144
145
146void
147Job::SetWorker(Worker* worker)
148{
149	fWorker = worker;
150}
151
152
153void
154Job::SetState(job_state state)
155{
156	fState = state;
157}
158
159
160void
161Job::SetDependency(Job* job)
162{
163	fDependency = job;
164}
165
166
167void
168Job::SetWaitStatus(job_wait_status status)
169{
170	fWaitStatus = status;
171	switch (fWaitStatus) {
172		case JOB_DEPENDENCY_ACTIVE:
173		case JOB_USER_INPUT_WAITING:
174			fState = JOB_STATE_WAITING;
175			break;
176		default:
177			fState = JOB_STATE_ACTIVE;
178			break;
179	}
180}
181
182
183status_t
184Job::AddListener(JobListener* listener)
185{
186	return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY;
187}
188
189
190void
191Job::RemoveListener(JobListener* listener)
192{
193	fListeners.RemoveItem(listener);
194}
195
196
197void
198Job::NotifyListeners()
199{
200	int32 count = fListeners.CountItems();
201	for (int32 i = count - 1; i >= 0; i--) {
202		JobListener* listener = fListeners.ItemAt(i);
203		switch (fState) {
204			case JOB_STATE_ACTIVE:
205				listener->JobStarted(this);
206				break;
207			case JOB_STATE_WAITING:
208				if (fWaitStatus == JOB_USER_INPUT_WAITING)
209					listener->JobWaitingForInput(this);
210				break;
211			case JOB_STATE_SUCCEEDED:
212				listener->JobDone(this);
213				break;
214			case JOB_STATE_FAILED:
215				listener->JobFailed(this);
216				break;
217			case JOB_STATE_ABORTED:
218			default:
219				listener->JobAborted(this);
220				break;
221		}
222	}
223}
224
225
226// #pragma mark - Worker
227
228
229Worker::Worker()
230	:
231	fLock("worker"),
232	fWorkerThread(-1),
233	fTerminating(false)
234{
235}
236
237
238Worker::~Worker()
239{
240	ShutDown();
241
242	if (fWorkerThread >= 0)
243		wait_for_thread(fWorkerThread, NULL);
244}
245
246
247status_t
248Worker::Init()
249{
250	// check lock
251	status_t error = fLock.InitCheck();
252	if (error != B_OK)
253		return error;
254
255	// init jobs table
256	error = fJobs.Init();
257	if (error != B_OK)
258		return error;
259
260	// create semaphore for the worker
261	fWorkToDoSem = create_sem(0, "work to do");
262	if (fWorkToDoSem < 0)
263		return fWorkToDoSem;
264
265	// spawn worker thread
266	fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY,
267		this);
268	if (fWorkerThread < 0)
269		return fWorkerThread;
270
271	resume_thread(fWorkerThread);
272
273	return B_OK;
274}
275
276
277void
278Worker::ShutDown()
279{
280	AutoLocker<Worker> locker(this);
281
282	if (fTerminating)
283		return;
284
285	fTerminating = true;
286
287	// abort all jobs
288	Job* job = fJobs.Clear(true);
289	while (job != NULL) {
290		Job* nextJob = job->fNext;
291		_AbortJob(job, false);
292		job = nextJob;
293
294	}
295
296	// let the work thread terminate
297	delete_sem(fWorkToDoSem);
298	fWorkToDoSem = -1;
299}
300
301
302status_t
303Worker::ScheduleJob(Job* job, JobListener* listener)
304{
305	if (job == NULL)
306		return B_NO_MEMORY;
307
308	BReference<Job> jobReference(job, true);
309	AutoLocker<Worker> locker(this);
310
311	if (fTerminating)
312		return B_ERROR;
313
314	if (listener != NULL) {
315		status_t error = job->AddListener(listener);
316		if (error != B_OK)
317			return error;
318	}
319
320	bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty();
321
322	job->SetWorker(this);
323	job->SetState(JOB_STATE_UNSCHEDULED);
324	fJobs.Insert(job);
325	fUnscheduledJobs.Add(jobReference.Detach());
326
327	if (notify)
328		release_sem(fWorkToDoSem);
329
330	return B_OK;
331}
332
333
334void
335Worker::AbortJob(const JobKey& key)
336{
337	AutoLocker<Worker> locker(this);
338
339	Job* job = fJobs.Lookup(key);
340	if (job == NULL)
341		return;
342
343	_AbortJob(job, true);
344}
345
346
347Job*
348Worker::GetJob(const JobKey& key)
349{
350	AutoLocker<Worker> locker(this);
351	return fJobs.Lookup(key);
352}
353
354
355status_t
356Worker::ResumeJob(Job* job)
357{
358	AutoLocker<Worker> locker(this);
359
360	for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) {
361		if (it.Current() == job) {
362			it.Remove();
363			job->SetState(JOB_STATE_UNSCHEDULED);
364			fUnscheduledJobs.Add(job);
365			release_sem(fWorkToDoSem);
366			return B_OK;
367		}
368	}
369
370	return B_ENTRY_NOT_FOUND;
371}
372
373
374bool
375Worker::HasPendingJobs()
376{
377	AutoLocker<Worker> locker(this);
378	return !fJobs.IsEmpty();
379}
380
381
382status_t
383Worker::AddListener(const JobKey& key, JobListener* listener)
384{
385	AutoLocker<Worker> locker(this);
386
387	Job* job = fJobs.Lookup(key);
388	if (job == NULL)
389		return B_ENTRY_NOT_FOUND;
390
391	return job->AddListener(listener);
392}
393
394
395void
396Worker::RemoveListener(const JobKey& key, JobListener* listener)
397{
398	AutoLocker<Worker> locker(this);
399
400	if (Job* job = fJobs.Lookup(key))
401		job->RemoveListener(listener);
402}
403
404
405job_wait_status
406Worker::WaitForJob(Job* waitingJob, const JobKey& key)
407{
408	AutoLocker<Worker> locker(this);
409
410	// don't wait when the game is over anyway
411	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
412		return JOB_DEPENDENCY_ABORTED;
413
414	Job* job = fJobs.Lookup(key);
415	if (job == NULL)
416		return JOB_DEPENDENCY_NOT_FOUND;
417
418	waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE);
419	waitingJob->SetDependency(job);
420	job->DependentJobs().Add(waitingJob);
421
422	return waitingJob->WaitStatus();
423}
424
425
426status_t
427Worker::WaitForUserInput(Job* waitingJob)
428{
429	AutoLocker<Worker> locker(this);
430
431	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
432		return B_INTERRUPTED;
433
434	waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING);
435	waitingJob->NotifyListeners();
436	fSuspendedJobs.Add(waitingJob);
437
438	return B_OK;
439}
440
441
442/*static*/ status_t
443Worker::_WorkerLoopEntry(void* data)
444{
445	return ((Worker*)data)->_WorkerLoop();
446}
447
448
449status_t
450Worker::_WorkerLoop()
451{
452	_ProcessJobs();
453
454	// clean up aborted jobs
455	AutoLocker<Worker> locker(this);
456	while (Job* job = fAbortedJobs.RemoveHead())
457		_FinishJob(job);
458
459	return B_OK;
460}
461
462
463void
464Worker::_ProcessJobs()
465{
466	while (true) {
467		AutoLocker<Worker> locker(this);
468
469		// wait for next job
470		if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) {
471			locker.Unlock();
472
473			status_t error = acquire_sem(fWorkToDoSem);
474			if (error != B_OK) {
475				if (error == B_INTERRUPTED) {
476					locker.Lock();
477					continue;
478				}
479				break;
480			}
481
482			locker.Lock();
483		}
484
485		// clean up aborted jobs
486		while (Job* job = fAbortedJobs.RemoveHead())
487			_FinishJob(job);
488
489		// process the next job
490		if (Job* job = fUnscheduledJobs.RemoveHead()) {
491			job->SetState(JOB_STATE_ACTIVE);
492			job->NotifyListeners();
493
494			locker.Unlock();
495			status_t error = job->Do();
496			locker.Lock();
497
498			if (job->State() == JOB_STATE_ACTIVE) {
499				job->SetState(
500					error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED);
501			} else if (job->State() == JOB_STATE_WAITING)
502				continue;
503
504			_FinishJob(job);
505		}
506	}
507}
508
509
510void
511Worker::_AbortJob(Job* job, bool removeFromTable)
512{
513	switch (job->State()) {
514		case JOB_STATE_ABORTED:
515			return;
516
517		case JOB_STATE_UNSCHEDULED:
518			fUnscheduledJobs.Remove(job);
519			fAbortedJobs.Add(job);
520			break;
521
522		case JOB_STATE_WAITING:
523		{
524			Job* dependency = job->Dependency();
525			if (dependency != NULL)
526				dependency->DependentJobs().Remove(job);
527			job->SetDependency(NULL);
528			break;
529		}
530		case JOB_STATE_ACTIVE:
531		case JOB_STATE_FAILED:
532		case JOB_STATE_SUCCEEDED:
533		default:
534			break;
535	}
536
537	job->SetState(JOB_STATE_ABORTED);
538	if (removeFromTable)
539		fJobs.Remove(job);
540}
541
542
543void
544Worker::_FinishJob(Job* job)
545{
546	// wake up dependent jobs
547	if (!job->DependentJobs().IsEmpty()) {
548		job_wait_status waitStatus;
549		switch (job->State()) {
550			case JOB_STATE_ABORTED:
551				waitStatus = JOB_DEPENDENCY_ABORTED;
552				break;
553			case JOB_STATE_FAILED:
554				waitStatus = JOB_DEPENDENCY_FAILED;
555				break;
556			case JOB_STATE_SUCCEEDED:
557				waitStatus = JOB_DEPENDENCY_SUCCEEDED;
558				break;
559
560			case JOB_STATE_UNSCHEDULED:
561			case JOB_STATE_WAITING:
562			case JOB_STATE_ACTIVE:
563			default:
564				// should never happen
565				waitStatus = JOB_DEPENDENCY_NOT_FOUND;
566				break;
567		}
568
569		while (Job* dependentJob = job->DependentJobs().RemoveHead()) {
570			dependentJob->SetDependency(NULL);
571			dependentJob->SetWaitStatus(waitStatus);
572			fUnscheduledJobs.Add(dependentJob);
573		}
574
575		release_sem(fWorkToDoSem);
576	}
577
578	if (job->State() != JOB_STATE_ABORTED)
579		fJobs.Remove(job);
580	job->NotifyListeners();
581	job->ReleaseReference();
582}
583