Lines Matching refs:queue

15 #include "funnel-queue.h"
28 * DOC: Work queue definition.
36 /* Name of just the work queue (e.g., "cpuQ12") */
73 static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
75 return ((queue == NULL) ?
76 NULL : container_of(queue, struct simple_work_queue, common));
79 static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
81 return ((queue == NULL) ?
83 container_of(queue, struct round_robin_work_queue, common));
96 static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
100 for (i = queue->common.type->max_priority; i >= 0; i--) {
101 struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]);
110 static void enqueue_work_queue_completion(struct simple_work_queue *queue,
115 completion, completion->callback, queue, completion->my_queue);
117 completion->priority = queue->common.type->default_priority;
119 if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority,
120 "priority is in range for queue") != VDO_SUCCESS)
123 completion->my_queue = &queue->common;
125 /* Funnel queue handles the synchronization for the put. */
126 vdo_funnel_queue_put(queue->priority_lists[completion->priority],
130 * Due to how funnel queue synchronization is handled (just atomic operations), the
132 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133 * item to the queue, the consumer thread may not see this since it is not guaranteed to
134 * have the same view of the queue as a producer thread.
146 if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
150 wake_up(&queue->waiting_worker_threads);
153 static void run_start_hook(struct simple_work_queue *queue)
155 if (queue->common.type->start != NULL)
156 queue->common.type->start(queue->private);
159 static void run_finish_hook(struct simple_work_queue *queue)
161 if (queue->common.type->finish != NULL)
162 queue->common.type->finish(queue->private);
174 static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
180 prepare_to_wait(&queue->waiting_worker_threads, &wait,
186 * queue; the producer side will do them in the reverse order. (There's still a
190 atomic_set(&queue->idle, 1);
191 smp_mb(); /* store-load barrier between "idle" and funnel queue */
193 completion = poll_for_completion(queue);
211 completion = poll_for_completion(queue);
216 finish_wait(&queue->waiting_worker_threads, &wait);
217 atomic_set(&queue->idle, 0);
222 static void process_completion(struct simple_work_queue *queue,
225 if (VDO_ASSERT(completion->my_queue == &queue->common,
226 "completion %px from queue %px marked as being in this queue (%px)",
227 completion, queue, completion->my_queue) == VDO_SUCCESS)
233 static void service_work_queue(struct simple_work_queue *queue)
235 run_start_hook(queue);
238 struct vdo_completion *completion = poll_for_completion(queue);
241 completion = wait_for_next_completion(queue);
248 process_completion(queue, completion);
259 run_finish_hook(queue);
264 struct simple_work_queue *queue = ptr;
266 complete(queue->started);
267 service_work_queue(queue);
273 static void free_simple_work_queue(struct simple_work_queue *queue)
278 vdo_free_funnel_queue(queue->priority_lists[i]);
279 vdo_free(queue->common.name);
280 vdo_free(queue);
283 static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
285 struct simple_work_queue **queue_table = queue->service_queues;
286 unsigned int count = queue->num_service_queues;
289 queue->service_queues = NULL;
294 vdo_free(queue->common.name);
295 vdo_free(queue);
298 void vdo_free_work_queue(struct vdo_work_queue *queue)
300 if (queue == NULL)
303 vdo_finish_work_queue(queue);
305 if (queue->round_robin_mode)
306 free_round_robin_work_queue(as_round_robin_work_queue(queue));
308 free_simple_work_queue(as_simple_work_queue(queue));
317 struct simple_work_queue *queue;
323 "queue priority count %u within limit %u", type->max_priority,
326 result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue);
330 queue->private = private;
331 queue->started = &started;
332 queue->common.type = type;
333 queue->common.owner = owner;
334 init_waitqueue_head(&queue->waiting_worker_threads);
336 result = vdo_duplicate_string(name, "queue name", &queue->common.name);
338 vdo_free(queue);
343 result = vdo_make_funnel_queue(&queue->priority_lists[i]);
345 free_simple_work_queue(queue);
350 thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
351 queue->common.name);
353 free_simple_work_queue(queue);
357 queue->thread = thread;
369 *queue_ptr = queue;
374 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
377 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
378 * of the actual number of queues and threads allocated here, code outside of the queue
386 struct round_robin_work_queue *queue;
402 result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue",
403 &queue);
408 "subordinate work queues", &queue->service_queues);
410 vdo_free(queue);
414 queue->num_service_queues = thread_count;
415 queue->common.round_robin_mode = true;
416 queue->common.owner = owner;
418 result = vdo_duplicate_string(name, "queue name", &queue->common.name);
420 vdo_free(queue->service_queues);
421 vdo_free(queue);
425 *queue_ptr = &queue->common;
432 context, type, &queue->service_queues[i]);
434 queue->num_service_queues = i;
444 static void finish_simple_work_queue(struct simple_work_queue *queue)
446 if (queue->thread == NULL)
450 kthread_stop(queue->thread);
451 queue->thread = NULL;
454 static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
456 struct simple_work_queue **queue_table = queue->service_queues;
457 unsigned int count = queue->num_service_queues;
465 void vdo_finish_work_queue(struct vdo_work_queue *queue)
467 if (queue == NULL)
470 if (queue->round_robin_mode)
471 finish_round_robin_work_queue(as_round_robin_work_queue(queue));
473 finish_simple_work_queue(as_simple_work_queue(queue));
478 static void dump_simple_work_queue(struct simple_work_queue *queue)
483 if (queue->thread != NULL) {
484 task_state_report = task_state_to_char(queue->thread);
485 thread_status = atomic_read(&queue->idle) ? "idle" : "running";
488 vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
491 /* ->waiting_worker_threads wait queue status? anyone waiting? */
499 void vdo_dump_work_queue(struct vdo_work_queue *queue)
501 if (queue->round_robin_mode) {
502 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
508 dump_simple_work_queue(as_simple_work_queue(queue));
556 void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
560 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
565 if (!queue->round_robin_mode) {
566 simple_queue = as_simple_work_queue(queue);
568 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
589 * Return the work queue pointer recorded at initialization time in the work-queue stack handle
596 * the queue for the thread which was interrupted. However, the interrupted thread may have
604 /* Not a VDO work queue thread. */
612 struct simple_work_queue *queue = get_current_thread_work_queue();
614 return (queue == NULL) ? NULL : &queue->common;
617 struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
619 return queue->owner;
624 * queue, or NULL if none or if the current thread is not a
625 * work queue thread.
629 struct simple_work_queue *queue = get_current_thread_work_queue();
631 return (queue != NULL) ? queue->private : NULL;
634 bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
637 return (queue->type == type);