1/*
2 * Copyright 2009, Colin G��nther, coling@gmx.de
3 * Copyright 2007, Hugo Santos. All Rights Reserved.
4 * Distributed under the terms of the MIT License.
5 *
6 * Authors:
7 *      Hugo Santos, hugosantos@gmail.com
8 */
9
10
11#include "device.h"
12
13#include <stdio.h>
14
15#include <compat/sys/taskqueue.h>
16#include <compat/sys/haiku-module.h>
17
18
19#define TQ_FLAGS_ACTIVE		(1 << 0)
20#define TQ_FLAGS_BLOCKED	(1 << 1)
21#define TQ_FLAGS_PENDING	(1 << 2)
22
23
24struct taskqueue {
25	char tq_name[64];
26	mutex tq_mutex;
27	struct list tq_list;
28	taskqueue_enqueue_fn tq_enqueue;
29	void *tq_arg;
30	int tq_fast;
31	spinlock tq_spinlock;
32	sem_id tq_sem;
33	thread_id *tq_threads;
34	thread_id tq_thread_storage;
35	int tq_threadcount;
36	int tq_flags;
37};
38
39struct taskqueue *taskqueue_fast = NULL;
40struct taskqueue *taskqueue_swi = NULL;
41
42
43static struct taskqueue *
44_taskqueue_create(const char *name, int mflags, int fast,
45	taskqueue_enqueue_fn enqueueFunction, void *context)
46{
47	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
48	if (tq == NULL)
49		return NULL;
50
51	tq->tq_fast = fast;
52
53	if (fast) {
54		B_INITIALIZE_SPINLOCK(&tq->tq_spinlock);
55	} else {
56		mutex_init_etc(&tq->tq_mutex, name, MUTEX_FLAG_CLONE_NAME);
57	}
58
59	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
60	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
61	tq->tq_enqueue = enqueueFunction;
62	tq->tq_arg = context;
63
64	tq->tq_sem = -1;
65	tq->tq_threads = NULL;
66	tq->tq_threadcount = 0;
67	tq->tq_flags = TQ_FLAGS_ACTIVE;
68
69	return tq;
70}
71
72
73static void
74tq_lock(struct taskqueue *taskQueue, cpu_status *status)
75{
76	if (taskQueue->tq_fast) {
77		*status = disable_interrupts();
78		acquire_spinlock(&taskQueue->tq_spinlock);
79	} else {
80		mutex_lock(&taskQueue->tq_mutex);
81	}
82}
83
84
85static void
86tq_unlock(struct taskqueue *taskQueue, cpu_status status)
87{
88	if (taskQueue->tq_fast) {
89		release_spinlock(&taskQueue->tq_spinlock);
90		restore_interrupts(status);
91	} else {
92		mutex_unlock(&taskQueue->tq_mutex);
93	}
94}
95
96
97struct taskqueue *
98taskqueue_create(const char *name, int mflags,
99	taskqueue_enqueue_fn enqueueFunction, void *context)
100{
101	return _taskqueue_create(name, mflags, 0, enqueueFunction, context);
102}
103
104
105static int32
106tq_handle_thread(void *data)
107{
108	struct taskqueue *tq = data;
109	cpu_status cpu_state;
110	struct task *t;
111	int pending;
112	sem_id sem;
113
114	/* just a synchronization point */
115	tq_lock(tq, &cpu_state);
116	sem = tq->tq_sem;
117	tq_unlock(tq, cpu_state);
118
119	while (acquire_sem(sem) == B_NO_ERROR) {
120		tq_lock(tq, &cpu_state);
121		t = list_remove_head_item(&tq->tq_list);
122		tq_unlock(tq, cpu_state);
123		if (t == NULL)
124			continue;
125		pending = t->ta_pending;
126		t->ta_pending = 0;
127
128		t->ta_handler(t->ta_argument, pending);
129	}
130
131	return 0;
132}
133
134
135static int
136_taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
137	const char *name)
138{
139	struct taskqueue *tq = (*taskQueue);
140	int i, j;
141
142	if (count == 0)
143		return -1;
144
145	if (tq->tq_threads != NULL)
146		return -1;
147
148	if (count == 1) {
149		tq->tq_threads = &tq->tq_thread_storage;
150	} else {
151		tq->tq_threads = malloc(sizeof(thread_id) * count);
152		if (tq->tq_threads == NULL)
153			return B_NO_MEMORY;
154	}
155
156	tq->tq_sem = create_sem(0, tq->tq_name);
157	if (tq->tq_sem < B_OK) {
158		if (count > 1)
159			free(tq->tq_threads);
160		tq->tq_threads = NULL;
161		return tq->tq_sem;
162	}
163
164	for (i = 0; i < count; i++) {
165		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
166			priority, tq);
167		if (tq->tq_threads[i] < B_OK) {
168			status_t status = tq->tq_threads[i];
169			for (j = 0; j < i; j++)
170				kill_thread(tq->tq_threads[j]);
171			if (count > 1)
172				free(tq->tq_threads);
173			tq->tq_threads = NULL;
174			delete_sem(tq->tq_sem);
175			return status;
176		}
177	}
178
179	tq->tq_threadcount = count;
180
181	for (i = 0; i < count; i++)
182		resume_thread(tq->tq_threads[i]);
183
184	return 0;
185}
186
187
188int
189taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
190	const char *format, ...)
191{
192	/* we assume that start_threads is called in a sane place, and thus
193	 * don't need to be locked. This is mostly due to the fact that if
194	 * the TQ is 'fast', locking the TQ disables interrupts... and then
195	 * we can't create semaphores, threads and bananas. */
196
197	/* cpu_status state; */
198	char name[64];
199	int result;
200	va_list vl;
201
202	va_start(vl, format);
203	vsnprintf(name, sizeof(name), format, vl);
204	va_end(vl);
205
206	/*tq_lock(*tqp, &state);*/
207	result = _taskqueue_start_threads(taskQueue, count, priority, name);
208	/*tq_unlock(*tqp, state);*/
209
210	return result;
211}
212
213
214void
215taskqueue_free(struct taskqueue *taskQueue)
216{
217	/* lock and  drain list? */
218	taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE;
219	if (!taskQueue->tq_fast)
220		mutex_destroy(&taskQueue->tq_mutex);
221	if (taskQueue->tq_sem != -1) {
222		int i;
223
224		delete_sem(taskQueue->tq_sem);
225
226		for (i = 0; i < taskQueue->tq_threadcount; i++) {
227			status_t status;
228			wait_for_thread(taskQueue->tq_threads[i], &status);
229		}
230
231		if (taskQueue->tq_threadcount > 1)
232			free(taskQueue->tq_threads);
233	}
234
235	free(taskQueue);
236}
237
238
239void
240taskqueue_drain(struct taskqueue *taskQueue, struct task *task)
241{
242	cpu_status status;
243
244	tq_lock(taskQueue, &status);
245	while (task->ta_pending != 0) {
246		tq_unlock(taskQueue, status);
247		snooze(0);
248		tq_lock(taskQueue, &status);
249	}
250	tq_unlock(taskQueue, status);
251}
252
253
254int
255taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task)
256{
257	cpu_status status;
258	tq_lock(taskQueue, &status);
259	/* we don't really support priorities */
260	if (task->ta_pending) {
261		task->ta_pending++;
262	} else {
263		list_add_item(&taskQueue->tq_list, task);
264		task->ta_pending = 1;
265		if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
266			taskQueue->tq_enqueue(taskQueue->tq_arg);
267		else
268			taskQueue->tq_flags |= TQ_FLAGS_PENDING;
269	}
270	tq_unlock(taskQueue, status);
271	return 0;
272}
273
274
275void
276taskqueue_thread_enqueue(void *context)
277{
278	struct taskqueue **tqp = context;
279	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
280}
281
282
283int
284taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task)
285{
286	return taskqueue_enqueue(taskQueue, task);
287}
288
289
290struct taskqueue *
291taskqueue_create_fast(const char *name, int mflags,
292	taskqueue_enqueue_fn enqueueFunction, void *context)
293{
294	return _taskqueue_create(name, mflags, 1, enqueueFunction, context);
295}
296
297
298void
299task_init(struct task *task, int prio, task_handler_t handler, void *context)
300{
301	task->ta_priority = prio;
302	task->ta_handler = handler;
303	task->ta_argument = context;
304	task->ta_pending = 0;
305}
306
307
308status_t
309init_taskqueues()
310{
311	status_t status = B_NO_MEMORY;
312
313	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
314		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
315			taskqueue_thread_enqueue, &taskqueue_fast);
316		if (taskqueue_fast == NULL)
317			return B_NO_MEMORY;
318
319		status = taskqueue_start_threads(&taskqueue_fast, 1,
320			B_REAL_TIME_PRIORITY, "fast taskq thread");
321		if (status < B_OK)
322			goto err_1;
323	}
324
325	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
326		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
327			taskqueue_thread_enqueue, &taskqueue_swi);
328		if (taskqueue_swi == NULL) {
329			status = B_NO_MEMORY;
330			goto err_1;
331		}
332
333		status = taskqueue_start_threads(&taskqueue_swi, 1,
334			B_REAL_TIME_PRIORITY, "swi taskq");
335		if (status < B_OK)
336			goto err_2;
337	}
338
339	return B_OK;
340
341err_2:
342	if (taskqueue_swi)
343		taskqueue_free(taskqueue_swi);
344
345err_1:
346	if (taskqueue_fast)
347		taskqueue_free(taskqueue_fast);
348
349	return status;
350}
351
352
353void
354uninit_taskqueues()
355{
356	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
357		taskqueue_free(taskqueue_swi);
358
359	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
360		taskqueue_free(taskqueue_fast);
361}
362
363
364void
365taskqueue_block(struct taskqueue *taskQueue)
366{
367	cpu_status status;
368
369	tq_lock(taskQueue, &status);
370	taskQueue->tq_flags |= TQ_FLAGS_BLOCKED;
371	tq_unlock(taskQueue, status);
372}
373
374
375void
376taskqueue_unblock(struct taskqueue *taskQueue)
377{
378	cpu_status status;
379
380	tq_lock(taskQueue, &status);
381	taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED;
382	if (taskQueue->tq_flags & TQ_FLAGS_PENDING) {
383		taskQueue->tq_flags &= ~TQ_FLAGS_PENDING;
384		taskQueue->tq_enqueue(taskQueue->tq_arg);
385	}
386	tq_unlock(taskQueue, status);
387}
388