1/*
2 * Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6
7#include "Worker.h"
8
9
10static const bigtime_t kWorkerTimeout = 1000000;
11	// One second until a worker thread quits without a job
12
13static const int32 kWorkerCountPerCPU = 3;
14
15static int32 sWorkerCount;
16
17
18Worker::Worker(JobQueue& queue)
19	:
20	fThread(-1),
21	fJobQueue(queue)
22{
23}
24
25
26Worker::~Worker()
27{
28}
29
30
31status_t
32Worker::Init()
33{
34	fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY,
35		this);
36	if (fThread < 0)
37		return fThread;
38
39	status_t status = resume_thread(fThread);
40	if (status == B_OK)
41		atomic_add(&sWorkerCount, 1);
42
43	return status;
44}
45
46
47status_t
48Worker::Process()
49{
50	while (true) {
51		BJob* job;
52		status_t status = fJobQueue.Pop(Timeout(), false, &job);
53		if (status != B_OK)
54			return status;
55
56		status = Run(job);
57		if (status != B_OK) {
58			// TODO: proper error reporting on failed job!
59			debug_printf("Launching %s failed: %s\n", job->Title().String(),
60				strerror(status));
61		}
62	}
63}
64
65
66bigtime_t
67Worker::Timeout() const
68{
69	return kWorkerTimeout;
70}
71
72
73const char*
74Worker::Name() const
75{
76	return "worker";
77}
78
79
80status_t
81Worker::Run(BJob* job)
82{
83	return job->Run();
84}
85
86
87/*static*/ status_t
88Worker::_Process(void* _self)
89{
90	Worker* self = (Worker*)_self;
91	status_t status = self->Process();
92	delete self;
93
94	return status;
95}
96
97
98// #pragma mark -
99
100
101MainWorker::MainWorker(JobQueue& queue)
102	:
103	Worker(queue),
104	fMaxWorkerCount(kWorkerCountPerCPU)
105{
106	// TODO: keep track of workers, and quit them on destruction
107	system_info info;
108	if (get_system_info(&info) == B_OK)
109		fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU;
110}
111
112
113bigtime_t
114MainWorker::Timeout() const
115{
116	return B_INFINITE_TIMEOUT;
117}
118
119
120const char*
121MainWorker::Name() const
122{
123	return "main worker";
124}
125
126
127status_t
128MainWorker::Run(BJob* job)
129{
130	int32 count = atomic_get(&sWorkerCount);
131
132	size_t jobCount = fJobQueue.CountJobs();
133	if (jobCount > INT_MAX)
134		jobCount = INT_MAX;
135
136	if ((int32)jobCount > count && count < fMaxWorkerCount) {
137		Worker* worker = new Worker(fJobQueue);
138		worker->Init();
139	}
140
141	return Worker::Run(job);
142}
143