1/*
2 * Copyright 2011-2015, Haiku, Inc. All Rights Reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Axel D��rfler <axeld@pinc-software.de>
7 *		Oliver Tappe <zooey@hirschkaefer.de>
8 */
9
10
11#include <JobQueue.h>
12
13#include <set>
14
15#include <Autolock.h>
16#include <Job.h>
17
18#include <JobPrivate.h>
19
20
21namespace BSupportKit {
22
23namespace BPrivate {
24
25
26struct JobQueue::JobPriorityLess {
27	bool operator()(const BJob* left, const BJob* right) const;
28};
29
30
31/*!	Sort jobs by:
32		1. descending count of dependencies (only jobs without dependencies are
33		   runnable)
34		2. job ticket number (order in which jobs were added to the queue)
35*/
36bool
37JobQueue::JobPriorityLess::operator()(const BJob* left, const BJob* right) const
38{
39	int32 difference = left->CountDependencies() - right->CountDependencies();
40	if (difference < 0)
41		return true;
42	if (difference > 0)
43		return false;
44
45	return left->TicketNumber() < right->TicketNumber();
46};
47
48
49class JobQueue::JobPriorityQueue
50	: public std::set<BJob*, JobPriorityLess> {
51};
52
53
54// #pragma mark -
55
56
57JobQueue::JobQueue()
58	:
59	fLock("job queue"),
60	fNextTicketNumber(1)
61{
62	fInitStatus = _Init();
63}
64
65
66JobQueue::~JobQueue()
67{
68	Close();
69	delete fQueuedJobs;
70}
71
72
73status_t
74JobQueue::InitCheck() const
75{
76	return fInitStatus;
77}
78
79
80status_t
81JobQueue::AddJob(BJob* job)
82{
83	if (fQueuedJobs == NULL)
84		return B_NO_INIT;
85
86	BAutolock lock(&fLock);
87	if (!lock.IsLocked())
88		return B_ERROR;
89
90	try {
91		if (!fQueuedJobs->insert(job).second)
92			return B_NAME_IN_USE;
93	} catch (const std::bad_alloc& e) {
94		return B_NO_MEMORY;
95	} catch (...) {
96		return B_ERROR;
97	}
98	BJob::Private(*job).SetTicketNumber(fNextTicketNumber++);
99	job->AddStateListener(this);
100	if (job->IsRunnable())
101		release_sem(fHaveRunnableJobSem);
102
103	return B_OK;
104}
105
106
107status_t
108JobQueue::RemoveJob(BJob* job)
109{
110	if (fQueuedJobs == NULL)
111		return B_NO_INIT;
112
113	BAutolock lock(&fLock);
114	if (lock.IsLocked()) {
115		try {
116			if (fQueuedJobs->erase(job) == 0)
117				return B_NAME_NOT_FOUND;
118		} catch (...) {
119			return B_ERROR;
120		}
121		BJob::Private(*job).ClearTicketNumber();
122		job->RemoveStateListener(this);
123	}
124
125	return B_OK;
126}
127
128
129void
130JobQueue::JobSucceeded(BJob* job)
131{
132	BAutolock lock(&fLock);
133	if (lock.IsLocked())
134		_RequeueDependantJobsOf(job);
135}
136
137
138void
139JobQueue::JobFailed(BJob* job)
140{
141	BAutolock lock(&fLock);
142	if (lock.IsLocked())
143		_RemoveDependantJobsOf(job);
144}
145
146
147BJob*
148JobQueue::Pop()
149{
150	BJob* job;
151	if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK)
152		return job;
153
154	return NULL;
155}
156
157
158status_t
159JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job)
160{
161	BAutolock lock(&fLock);
162	if (lock.IsLocked()) {
163		while (true) {
164			JobPriorityQueue::iterator head = fQueuedJobs->begin();
165			if (head != fQueuedJobs->end()) {
166				if ((*head)->IsRunnable()) {
167					*_job = *head;
168					fQueuedJobs->erase(head);
169					return B_OK;
170				}
171			} else if (returnWhenEmpty)
172				return B_ENTRY_NOT_FOUND;
173
174			// we need to wait until a job becomes available/runnable
175			status_t result;
176			do {
177				lock.Unlock();
178				result = acquire_sem_etc(fHaveRunnableJobSem, 1,
179					B_RELATIVE_TIMEOUT, timeout);
180				if (!lock.Lock())
181					return B_ERROR;
182			} while (result == B_INTERRUPTED);
183			if (result != B_OK)
184				return result;
185		}
186	}
187
188	return B_ERROR;
189}
190
191
192size_t
193JobQueue::CountJobs() const
194{
195	BAutolock locker(fLock);
196	return fQueuedJobs->size();
197}
198
199
200void
201JobQueue::Close()
202{
203	if (fHaveRunnableJobSem < 0)
204		return;
205
206	BAutolock lock(&fLock);
207	if (lock.IsLocked()) {
208		delete_sem(fHaveRunnableJobSem);
209		fHaveRunnableJobSem = -1;
210
211		if (fQueuedJobs != NULL) {
212			// get rid of all jobs
213			for (JobPriorityQueue::iterator iter = fQueuedJobs->begin();
214				iter != fQueuedJobs->end(); ++iter) {
215				delete (*iter);
216			}
217			fQueuedJobs->clear();
218		}
219	}
220}
221
222
223status_t
224JobQueue::_Init()
225{
226	status_t result = fLock.InitCheck();
227	if (result != B_OK)
228		return result;
229
230	fQueuedJobs = new (std::nothrow) JobPriorityQueue();
231	if (fQueuedJobs == NULL)
232		return B_NO_MEMORY;
233
234	fHaveRunnableJobSem = create_sem(0, "have runnable job");
235	if (fHaveRunnableJobSem < 0)
236		return fHaveRunnableJobSem;
237
238	return B_OK;
239}
240
241
242void
243JobQueue::_RequeueDependantJobsOf(BJob* job)
244{
245	while (BJob* dependantJob = job->DependantJobAt(0)) {
246		JobPriorityQueue::iterator found = fQueuedJobs->find(dependantJob);
247		bool removed = false;
248		if (found != fQueuedJobs->end()) {
249			try {
250				fQueuedJobs->erase(dependantJob);
251				removed = true;
252			} catch (...) {
253			}
254		}
255		dependantJob->RemoveDependency(job);
256		if (removed) {
257			// Only insert a job if it was in our queue before
258			try {
259				fQueuedJobs->insert(dependantJob);
260				if (dependantJob->IsRunnable())
261					release_sem(fHaveRunnableJobSem);
262			} catch (...) {
263			}
264		}
265	}
266}
267
268
269void
270JobQueue::_RemoveDependantJobsOf(BJob* job)
271{
272	while (BJob* dependantJob = job->DependantJobAt(0)) {
273		try {
274			fQueuedJobs->erase(dependantJob);
275		} catch (...) {
276		}
277
278		if (dependantJob->State() != B_JOB_STATE_ABORTED) {
279			BJob::Private(*dependantJob).SetState(B_JOB_STATE_ABORTED);
280			BJob::Private(*dependantJob).NotifyStateListeners();
281		}
282
283		_RemoveDependantJobsOf(dependantJob);
284		dependantJob->RemoveDependency(job);
285		// TODO: we need some sort of ownership management
286		delete dependantJob;
287	}
288}
289
290
291}	// namespace BPrivate
292
293}	// namespace BPackageKit
294