Lines Matching refs:queue

12 #include "funnel-queue.h"
18 * This queue will attempt to handle requests in reasonably sized batches instead of reacting
22 * If the wait time becomes long enough, the queue will become dormant and must be explicitly
24 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
28 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
31 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
35 * the queue to awaken immediately.
50 /* Wait queue for synchronizing producers and consumer */
68 static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
72 entry = vdo_funnel_queue_poll(queue->retry_queue);
76 entry = vdo_funnel_queue_poll(queue->main_queue);
83 static inline bool are_queues_idle(struct uds_request_queue *queue)
85 return vdo_is_funnel_queue_idle(queue->retry_queue) &&
86 vdo_is_funnel_queue_idle(queue->main_queue);
94 static inline bool dequeue_request(struct uds_request_queue *queue,
97 struct uds_request *request = poll_queues(queue);
104 if (!READ_ONCE(queue->running)) {
115 static void wait_for_request(struct uds_request_queue *queue, bool dormant,
120 wait_event_interruptible(queue->wait_head,
121 (dequeue_request(queue, request, waited) ||
122 !are_queues_idle(queue)));
126 wait_event_interruptible_hrtimeout(queue->wait_head,
127 dequeue_request(queue, request, waited),
133 struct uds_request_queue *queue = arg;
136 bool dormant = atomic_read(&queue->dormant);
141 wait_for_request(queue, dormant, time_batch, &request, &waited);
144 queue->processor(request);
145 } else if (!READ_ONCE(queue->running)) {
151 * The queue has been roused from dormancy. Clear the flag so enqueuers can
154 atomic_set(&queue->dormant, false);
169 atomic_set(&queue->dormant, true);
190 while ((request = poll_queues(queue)) != NULL)
191 queue->processor(request);
199 struct uds_request_queue *queue;
201 result = vdo_allocate(1, struct uds_request_queue, __func__, &queue);
205 queue->processor = processor;
206 queue->running = true;
207 atomic_set(&queue->dormant, false);
208 init_waitqueue_head(&queue->wait_head);
210 result = vdo_make_funnel_queue(&queue->main_queue);
212 uds_request_queue_finish(queue);
216 result = vdo_make_funnel_queue(&queue->retry_queue);
218 uds_request_queue_finish(queue);
222 result = vdo_create_thread(request_queue_worker, queue, queue_name,
223 &queue->thread);
225 uds_request_queue_finish(queue);
229 queue->started = true;
230 *queue_ptr = queue;
234 static inline void wake_up_worker(struct uds_request_queue *queue)
236 if (wq_has_sleeper(&queue->wait_head))
237 wake_up(&queue->wait_head);
240 void uds_request_queue_enqueue(struct uds_request_queue *queue,
246 sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
251 * we know the queue operation acts as one.
253 if (atomic_read(&queue->dormant) || unbatched)
254 wake_up_worker(queue);
257 void uds_request_queue_finish(struct uds_request_queue *queue)
259 if (queue == NULL)
265 * able to see any change we made to a next field in the funnel queue entry. The
269 WRITE_ONCE(queue->running, false);
271 if (queue->started) {
272 wake_up_worker(queue);
273 vdo_join_threads(queue->thread);
276 vdo_free_funnel_queue(queue->main_queue);
277 vdo_free_funnel_queue(queue->retry_queue);
278 vdo_free(queue);