1/*
2 * Copyright 2012, Rene Gollent, rene@gollent.com.
3 * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de.
4 * Distributed under the terms of the MIT License.
5 */
6
7#include "Worker.h"
8
9#include <AutoDeleter.h>
10#include <AutoLocker.h>
11
12
13// pragma mark - JobKey
14
15
16JobKey::~JobKey()
17{
18}
19
20
21// pragma mark - SimpleJobKey
22
23
24SimpleJobKey::SimpleJobKey(void* object, uint32 type)
25	:
26	object(object),
27	type(type)
28{
29}
30
31
32SimpleJobKey::SimpleJobKey(const SimpleJobKey& other)
33	:
34	object(other.object),
35	type(other.type)
36{
37}
38
39
40size_t
41SimpleJobKey::HashValue() const
42{
43	return (size_t)(addr_t)object ^ (size_t)type;
44}
45
46
47bool
48SimpleJobKey::operator==(const JobKey& other) const
49{
50	const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other);
51	return otherKey != NULL && object == otherKey->object
52		&& type == otherKey->type;
53}
54
55
56SimpleJobKey&
57SimpleJobKey::operator=(const SimpleJobKey& other)
58{
59	object = other.object;
60	type = other.type;
61	return *this;
62}
63
64
65// #pragma mark - JobListener
66
67
68JobListener::~JobListener()
69{
70}
71
72
73void
74JobListener::JobDone(Job* job)
75{
76}
77
78
79void
80JobListener::JobFailed(Job* job)
81{
82}
83
84
85void
86JobListener::JobAborted(Job* job)
87{
88}
89
90
91// #pragma mark - Job
92
93
94Job::Job()
95	:
96	fWorker(NULL),
97	fState(JOB_STATE_UNSCHEDULED),
98	fDependency(NULL),
99	fWaitStatus(JOB_DEPENDENCY_NOT_FOUND),
100	fListeners(10)
101{
102}
103
104
105Job::~Job()
106{
107}
108
109
110job_wait_status
111Job::WaitFor(const JobKey& key)
112{
113	return fWorker->WaitForJob(this, key);
114}
115
116
117void
118Job::SetWorker(Worker* worker)
119{
120	fWorker = worker;
121}
122
123
124void
125Job::SetState(job_state state)
126{
127	fState = state;
128}
129
130
131void
132Job::SetDependency(Job* job)
133{
134	fDependency = job;
135}
136
137
138void
139Job::SetWaitStatus(job_wait_status status)
140{
141	fWaitStatus = status;
142	switch (fWaitStatus) {
143		case JOB_DEPENDENCY_ACTIVE:
144			fState = JOB_STATE_WAITING;
145			break;
146		default:
147			fState = JOB_STATE_ACTIVE;
148			break;
149	}
150}
151
152
153status_t
154Job::AddListener(JobListener* listener)
155{
156	return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY;
157}
158
159
160void
161Job::RemoveListener(JobListener* listener)
162{
163	fListeners.RemoveItem(listener);
164}
165
166
167void
168Job::NotifyListeners()
169{
170	int32 count = fListeners.CountItems();
171	for (int32 i = count - 1; i >= 0; i--) {
172		JobListener* listener = fListeners.ItemAt(i);
173		switch (fState) {
174			case JOB_STATE_SUCCEEDED:
175				listener->JobDone(this);
176				break;
177			case JOB_STATE_FAILED:
178				listener->JobFailed(this);
179				break;
180			case JOB_STATE_ABORTED:
181			default:
182				listener->JobAborted(this);
183				break;
184		}
185	}
186}
187
188
189// #pragma mark - Worker
190
191
192Worker::Worker()
193	:
194	fLock("worker"),
195	fWorkerThread(-1),
196	fTerminating(false)
197{
198}
199
200
201Worker::~Worker()
202{
203	ShutDown();
204
205	if (fWorkerThread >= 0)
206		wait_for_thread(fWorkerThread, NULL);
207}
208
209
210status_t
211Worker::Init()
212{
213	// check lock
214	status_t error = fLock.InitCheck();
215	if (error != B_OK)
216		return error;
217
218	// init jobs table
219	error = fJobs.Init();
220	if (error != B_OK)
221		return error;
222
223	// create semaphore for the worker
224	fWorkToDoSem = create_sem(0, "work to do");
225	if (fWorkToDoSem < 0)
226		return fWorkToDoSem;
227
228	// spawn worker thread
229	fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY,
230		this);
231	if (fWorkerThread < 0)
232		return fWorkerThread;
233
234	resume_thread(fWorkerThread);
235
236	return B_OK;
237}
238
239
240void
241Worker::ShutDown()
242{
243	AutoLocker<Worker> locker(this);
244
245	if (fTerminating)
246		return;
247
248	fTerminating = true;
249
250	// abort all jobs
251	Job* job = fJobs.Clear(true);
252	while (job != NULL) {
253		Job* nextJob = job->fNext;
254		_AbortJob(job, false);
255		job = nextJob;
256
257	}
258
259	// let the work thread terminate
260	delete_sem(fWorkToDoSem);
261	fWorkToDoSem = -1;
262}
263
264
265status_t
266Worker::ScheduleJob(Job* job, JobListener* listener)
267{
268	if (job == NULL)
269		return B_NO_MEMORY;
270
271	BReference<Job> jobReference(job, true);
272	AutoLocker<Worker> locker(this);
273
274	if (fTerminating)
275		return B_ERROR;
276
277	if (listener != NULL) {
278		status_t error = job->AddListener(listener);
279		if (error != B_OK)
280			return error;
281	}
282
283	bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty();
284
285	job->SetWorker(this);
286	job->SetState(JOB_STATE_UNSCHEDULED);
287	fJobs.Insert(job);
288	fUnscheduledJobs.Add(jobReference.Detach());
289
290	if (notify)
291		release_sem(fWorkToDoSem);
292
293	return B_OK;
294}
295
296
297void
298Worker::AbortJob(const JobKey& key)
299{
300	AutoLocker<Worker> locker(this);
301
302	Job* job = fJobs.Lookup(key);
303	if (job == NULL)
304		return;
305
306	_AbortJob(job, true);
307}
308
309
310Job*
311Worker::GetJob(const JobKey& key)
312{
313	AutoLocker<Worker> locker(this);
314	return fJobs.Lookup(key);
315}
316
317
318status_t
319Worker::AddListener(const JobKey& key, JobListener* listener)
320{
321	AutoLocker<Worker> locker(this);
322
323	Job* job = fJobs.Lookup(key);
324	if (job == NULL)
325		return B_ENTRY_NOT_FOUND;
326
327	return job->AddListener(listener);
328}
329
330
331void
332Worker::RemoveListener(const JobKey& key, JobListener* listener)
333{
334	AutoLocker<Worker> locker(this);
335
336	if (Job* job = fJobs.Lookup(key))
337		job->RemoveListener(listener);
338}
339
340
341job_wait_status
342Worker::WaitForJob(Job* waitingJob, const JobKey& key)
343{
344	AutoLocker<Worker> locker(this);
345
346	// don't wait when the game is over anyway
347	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
348		return JOB_DEPENDENCY_ABORTED;
349
350	Job* job = fJobs.Lookup(key);
351	if (job == NULL)
352		return JOB_DEPENDENCY_NOT_FOUND;
353
354	waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE);
355	waitingJob->SetDependency(job);
356	job->DependentJobs().Add(waitingJob);
357
358	return waitingJob->WaitStatus();
359}
360
361
362/*static*/ status_t
363Worker::_WorkerLoopEntry(void* data)
364{
365	return ((Worker*)data)->_WorkerLoop();
366}
367
368
369status_t
370Worker::_WorkerLoop()
371{
372	_ProcessJobs();
373
374	// clean up aborted jobs
375	AutoLocker<Worker> locker(this);
376	while (Job* job = fAbortedJobs.RemoveHead())
377		_FinishJob(job);
378
379	return B_OK;
380}
381
382
383void
384Worker::_ProcessJobs()
385{
386	while (true) {
387		AutoLocker<Worker> locker(this);
388
389		// wait for next job
390		if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) {
391			locker.Unlock();
392
393			status_t error = acquire_sem(fWorkToDoSem);
394			if (error != B_OK) {
395				if (error == B_INTERRUPTED) {
396					locker.Lock();
397					continue;
398				}
399				break;
400			}
401
402			locker.Lock();
403		}
404
405		// clean up aborted jobs
406		while (Job* job = fAbortedJobs.RemoveHead())
407			_FinishJob(job);
408
409		// process the next job
410		if (Job* job = fUnscheduledJobs.RemoveHead()) {
411			job->SetState(JOB_STATE_ACTIVE);
412
413			locker.Unlock();
414			status_t error = job->Do();
415			locker.Lock();
416
417			if (job->State() == JOB_STATE_ACTIVE) {
418				job->SetState(
419					error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED);
420			} else if (job->State() == JOB_STATE_WAITING)
421				continue;
422
423			_FinishJob(job);
424		}
425	}
426}
427
428
429void
430Worker::_AbortJob(Job* job, bool removeFromTable)
431{
432	switch (job->State()) {
433		case JOB_STATE_ABORTED:
434			return;
435
436		case JOB_STATE_UNSCHEDULED:
437			fUnscheduledJobs.Remove(job);
438			fAbortedJobs.Add(job);
439			break;
440
441		case JOB_STATE_WAITING:
442			job->Dependency()->DependentJobs().Remove(job);
443			job->SetDependency(NULL);
444			break;
445
446		case JOB_STATE_ACTIVE:
447		case JOB_STATE_FAILED:
448		case JOB_STATE_SUCCEEDED:
449		default:
450			break;
451	}
452
453	job->SetState(JOB_STATE_ABORTED);
454	if (removeFromTable)
455		fJobs.Remove(job);
456}
457
458
459void
460Worker::_FinishJob(Job* job)
461{
462	// wake up dependent jobs
463	if (!job->DependentJobs().IsEmpty()) {
464		job_wait_status waitStatus;
465		switch (job->State()) {
466			case JOB_STATE_ABORTED:
467				waitStatus = JOB_DEPENDENCY_ABORTED;
468				break;
469			case JOB_STATE_FAILED:
470				waitStatus = JOB_DEPENDENCY_FAILED;
471				break;
472			case JOB_STATE_SUCCEEDED:
473				waitStatus = JOB_DEPENDENCY_SUCCEEDED;
474				break;
475
476			case JOB_STATE_UNSCHEDULED:
477			case JOB_STATE_WAITING:
478			case JOB_STATE_ACTIVE:
479			default:
480				// should never happen
481				waitStatus = JOB_DEPENDENCY_NOT_FOUND;
482				break;
483		}
484
485		while (Job* dependentJob = job->DependentJobs().RemoveHead()) {
486			dependentJob->SetDependency(NULL);
487			dependentJob->SetWaitStatus(waitStatus);
488			fUnscheduledJobs.Add(dependentJob);
489		}
490
491		release_sem(fWorkToDoSem);
492	}
493
494	if (job->State() != JOB_STATE_ABORTED)
495		fJobs.Remove(job);
496	job->NotifyListeners();
497	job->ReleaseReference();
498}
499