ck_ring.h revision 309260
1/*
2 * Copyright 2009-2015 Samy Al Bahra.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#ifndef CK_RING_H
28#define CK_RING_H
29
30#include <ck_cc.h>
31#include <ck_md.h>
32#include <ck_pr.h>
33#include <ck_stdbool.h>
34#include <ck_string.h>
35
36/*
37 * Concurrent ring buffer.
38 */
39
40struct ck_ring {
41	unsigned int c_head;
42	char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43	unsigned int p_tail;
44	unsigned int p_head;
45	char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46	unsigned int size;
47	unsigned int mask;
48};
49typedef struct ck_ring ck_ring_t;
50
51struct ck_ring_buffer {
52	void *value;
53};
54typedef struct ck_ring_buffer ck_ring_buffer_t;
55
56CK_CC_INLINE static unsigned int
57ck_ring_size(const struct ck_ring *ring)
58{
59	unsigned int c, p;
60
61	c = ck_pr_load_uint(&ring->c_head);
62	p = ck_pr_load_uint(&ring->p_tail);
63	return (p - c) & ring->mask;
64}
65
66CK_CC_INLINE static unsigned int
67ck_ring_capacity(const struct ck_ring *ring)
68{
69	return ring->size;
70}
71
72CK_CC_INLINE static void
73ck_ring_init(struct ck_ring *ring, unsigned int size)
74{
75
76	ring->size = size;
77	ring->mask = size - 1;
78	ring->p_tail = 0;
79	ring->p_head = 0;
80	ring->c_head = 0;
81	return;
82}
83
84/*
85 * The _ck_ring_* namespace is internal only and must not used externally.
86 */
87CK_CC_FORCE_INLINE static bool
88_ck_ring_enqueue_sp(struct ck_ring *ring,
89    void *CK_CC_RESTRICT buffer,
90    const void *CK_CC_RESTRICT entry,
91    unsigned int ts,
92    unsigned int *size)
93{
94	const unsigned int mask = ring->mask;
95	unsigned int consumer, producer, delta;
96
97	consumer = ck_pr_load_uint(&ring->c_head);
98	producer = ring->p_tail;
99	delta = producer + 1;
100	if (size != NULL)
101		*size = (producer - consumer) & mask;
102
103	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
104		return false;
105
106	buffer = (char *)buffer + ts * (producer & mask);
107	memcpy(buffer, entry, ts);
108
109	/*
110	 * Make sure to update slot value before indicating
111	 * that the slot is available for consumption.
112	 */
113	ck_pr_fence_store();
114	ck_pr_store_uint(&ring->p_tail, delta);
115	return true;
116}
117
118CK_CC_FORCE_INLINE static bool
119_ck_ring_enqueue_sp_size(struct ck_ring *ring,
120    void *CK_CC_RESTRICT buffer,
121    const void *CK_CC_RESTRICT entry,
122    unsigned int ts,
123    unsigned int *size)
124{
125	unsigned int sz;
126	bool r;
127
128	r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
129	*size = sz;
130	return r;
131}
132
133CK_CC_FORCE_INLINE static bool
134_ck_ring_dequeue_sc(struct ck_ring *ring,
135    const void *CK_CC_RESTRICT buffer,
136    void *CK_CC_RESTRICT target,
137    unsigned int size)
138{
139	const unsigned int mask = ring->mask;
140	unsigned int consumer, producer;
141
142	consumer = ring->c_head;
143	producer = ck_pr_load_uint(&ring->p_tail);
144
145	if (CK_CC_UNLIKELY(consumer == producer))
146		return false;
147
148	/*
149	 * Make sure to serialize with respect to our snapshot
150	 * of the producer counter.
151	 */
152	ck_pr_fence_load();
153
154	buffer = (const char *)buffer + size * (consumer & mask);
155	memcpy(target, buffer, size);
156
157	/*
158	 * Make sure copy is completed with respect to consumer
159	 * update.
160	 */
161	ck_pr_fence_store();
162	ck_pr_store_uint(&ring->c_head, consumer + 1);
163	return true;
164}
165
166CK_CC_FORCE_INLINE static bool
167_ck_ring_enqueue_mp(struct ck_ring *ring,
168    void *buffer,
169    const void *entry,
170    unsigned int ts,
171    unsigned int *size)
172{
173	const unsigned int mask = ring->mask;
174	unsigned int producer, consumer, delta;
175	bool r = true;
176
177	producer = ck_pr_load_uint(&ring->p_head);
178
179	do {
180		/*
181		 * The snapshot of producer must be up to date with
182		 * respect to consumer.
183		 */
184		ck_pr_fence_load();
185		consumer = ck_pr_load_uint(&ring->c_head);
186
187		delta = producer + 1;
188		if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) {
189			r = false;
190			goto leave;
191		}
192	} while (ck_pr_cas_uint_value(&ring->p_head,
193				      producer,
194				      delta,
195				      &producer) == false);
196
197	buffer = (char *)buffer + ts * (producer & mask);
198	memcpy(buffer, entry, ts);
199
200	/*
201	 * Wait until all concurrent producers have completed writing
202	 * their data into the ring buffer.
203	 */
204	while (ck_pr_load_uint(&ring->p_tail) != producer)
205		ck_pr_stall();
206
207	/*
208	 * Ensure that copy is completed before updating shared producer
209	 * counter.
210	 */
211	ck_pr_fence_store();
212	ck_pr_store_uint(&ring->p_tail, delta);
213
214leave:
215	if (size != NULL)
216		*size = (producer - consumer) & mask;
217
218	return r;
219}
220
221CK_CC_FORCE_INLINE static bool
222_ck_ring_enqueue_mp_size(struct ck_ring *ring,
223    void *buffer,
224    const void *entry,
225    unsigned int ts,
226    unsigned int *size)
227{
228	unsigned int sz;
229	bool r;
230
231	r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
232	*size = sz;
233	return r;
234}
235
236CK_CC_FORCE_INLINE static bool
237_ck_ring_trydequeue_mc(struct ck_ring *ring,
238    const void *buffer,
239    void *data,
240    unsigned int size)
241{
242	const unsigned int mask = ring->mask;
243	unsigned int consumer, producer;
244
245	consumer = ck_pr_load_uint(&ring->c_head);
246	ck_pr_fence_load();
247	producer = ck_pr_load_uint(&ring->p_tail);
248
249	if (CK_CC_UNLIKELY(consumer == producer))
250		return false;
251
252	ck_pr_fence_load();
253
254	buffer = (const char *)buffer + size * (consumer & mask);
255	memcpy(data, buffer, size);
256
257	ck_pr_fence_store_atomic();
258	return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
259}
260
261CK_CC_FORCE_INLINE static bool
262_ck_ring_dequeue_mc(struct ck_ring *ring,
263    const void *buffer,
264    void *data,
265    unsigned int ts)
266{
267	const unsigned int mask = ring->mask;
268	unsigned int consumer, producer;
269
270	consumer = ck_pr_load_uint(&ring->c_head);
271
272	do {
273		const char *target;
274
275		/*
276		 * Producer counter must represent state relative to
277		 * our latest consumer snapshot.
278		 */
279		ck_pr_fence_load();
280		producer = ck_pr_load_uint(&ring->p_tail);
281
282		if (CK_CC_UNLIKELY(consumer == producer))
283			return false;
284
285		ck_pr_fence_load();
286
287		target = (const char *)buffer + ts * (consumer & mask);
288		memcpy(data, target, ts);
289
290		/* Serialize load with respect to head update. */
291		ck_pr_fence_store_atomic();
292	} while (ck_pr_cas_uint_value(&ring->c_head,
293				      consumer,
294				      consumer + 1,
295				      &consumer) == false);
296
297	return true;
298}
299
300/*
301 * The ck_ring_*_spsc namespace is the public interface for interacting with a
302 * ring buffer containing pointers. Correctness is only provided if there is up
303 * to one concurrent consumer and up to one concurrent producer.
304 */
305CK_CC_INLINE static bool
306ck_ring_enqueue_spsc_size(struct ck_ring *ring,
307    struct ck_ring_buffer *buffer,
308    const void *entry,
309    unsigned int *size)
310{
311
312	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
313	    sizeof(entry), size);
314}
315
316CK_CC_INLINE static bool
317ck_ring_enqueue_spsc(struct ck_ring *ring,
318    struct ck_ring_buffer *buffer,
319    const void *entry)
320{
321
322	return _ck_ring_enqueue_sp(ring, buffer,
323	    &entry, sizeof(entry), NULL);
324}
325
326CK_CC_INLINE static bool
327ck_ring_dequeue_spsc(struct ck_ring *ring,
328    const struct ck_ring_buffer *buffer,
329    void *data)
330{
331
332	return _ck_ring_dequeue_sc(ring, buffer,
333	    (void **)data, sizeof(void *));
334}
335
336/*
337 * The ck_ring_*_mpmc namespace is the public interface for interacting with a
338 * ring buffer containing pointers. Correctness is provided for any number of
339 * producers and consumers.
340 */
341CK_CC_INLINE static bool
342ck_ring_enqueue_mpmc(struct ck_ring *ring,
343    struct ck_ring_buffer *buffer,
344    const void *entry)
345{
346
347	return _ck_ring_enqueue_mp(ring, buffer, &entry,
348	    sizeof(entry), NULL);
349}
350
351CK_CC_INLINE static bool
352ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
353    struct ck_ring_buffer *buffer,
354    const void *entry,
355    unsigned int *size)
356{
357
358	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
359	    sizeof(entry), size);
360}
361
362CK_CC_INLINE static bool
363ck_ring_trydequeue_mpmc(struct ck_ring *ring,
364    const struct ck_ring_buffer *buffer,
365    void *data)
366{
367
368	return _ck_ring_trydequeue_mc(ring,
369	    buffer, (void **)data, sizeof(void *));
370}
371
372CK_CC_INLINE static bool
373ck_ring_dequeue_mpmc(struct ck_ring *ring,
374    const struct ck_ring_buffer *buffer,
375    void *data)
376{
377
378	return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
379	    sizeof(void *));
380}
381
382/*
383 * The ck_ring_*_spmc namespace is the public interface for interacting with a
384 * ring buffer containing pointers. Correctness is provided for any number of
385 * consumers with up to one concurrent producer.
386 */
387CK_CC_INLINE static bool
388ck_ring_enqueue_spmc_size(struct ck_ring *ring,
389    struct ck_ring_buffer *buffer,
390    const void *entry,
391    unsigned int *size)
392{
393
394	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
395	    sizeof(entry), size);
396}
397
398CK_CC_INLINE static bool
399ck_ring_enqueue_spmc(struct ck_ring *ring,
400    struct ck_ring_buffer *buffer,
401    const void *entry)
402{
403
404	return _ck_ring_enqueue_sp(ring, buffer, &entry,
405	    sizeof(entry), NULL);
406}
407
408CK_CC_INLINE static bool
409ck_ring_trydequeue_spmc(struct ck_ring *ring,
410    const struct ck_ring_buffer *buffer,
411    void *data)
412{
413
414	return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
415}
416
417CK_CC_INLINE static bool
418ck_ring_dequeue_spmc(struct ck_ring *ring,
419    const struct ck_ring_buffer *buffer,
420    void *data)
421{
422
423	return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
424}
425
426/*
427 * The ck_ring_*_mpsc namespace is the public interface for interacting with a
428 * ring buffer containing pointers. Correctness is provided for any number of
429 * producers with up to one concurrent consumers.
430 */
431CK_CC_INLINE static bool
432ck_ring_enqueue_mpsc(struct ck_ring *ring,
433    struct ck_ring_buffer *buffer,
434    const void *entry)
435{
436
437	return _ck_ring_enqueue_mp(ring, buffer, &entry,
438	    sizeof(entry), NULL);
439}
440
441CK_CC_INLINE static bool
442ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
443    struct ck_ring_buffer *buffer,
444    const void *entry,
445    unsigned int *size)
446{
447
448	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
449	    sizeof(entry), size);
450}
451
452CK_CC_INLINE static bool
453ck_ring_dequeue_mpsc(struct ck_ring *ring,
454    const struct ck_ring_buffer *buffer,
455    void *data)
456{
457
458	return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
459	    sizeof(void *));
460}
461
462/*
463 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
464 * values of a particular type in the ring the buffer.
465 */
466#define CK_RING_PROTOTYPE(name, type)			\
467CK_CC_INLINE static bool				\
468ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,	\
469    struct type *b,					\
470    struct type *c,					\
471    unsigned int *d)					\
472{							\
473							\
474	return _ck_ring_enqueue_sp_size(a, b, c,	\
475	    sizeof(struct type), d);			\
476}							\
477							\
478CK_CC_INLINE static bool				\
479ck_ring_enqueue_spsc_##name(struct ck_ring *a,		\
480    struct type *b,					\
481    struct type *c)					\
482{							\
483							\
484	return _ck_ring_enqueue_sp(a, b, c,		\
485	    sizeof(struct type), NULL);			\
486}							\
487							\
488CK_CC_INLINE static bool				\
489ck_ring_dequeue_spsc_##name(struct ck_ring *a,		\
490    struct type *b,					\
491    struct type *c)					\
492{							\
493							\
494	return _ck_ring_dequeue_sc(a, b, c,		\
495	    sizeof(struct type));			\
496}							\
497							\
498CK_CC_INLINE static bool				\
499ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,	\
500    struct type *b,					\
501    struct type *c,					\
502    unsigned int *d)					\
503{							\
504							\
505	return _ck_ring_enqueue_sp_size(a, b, c,	\
506	    sizeof(struct type), d);			\
507}							\
508							\
509CK_CC_INLINE static bool				\
510ck_ring_enqueue_spmc_##name(struct ck_ring *a,		\
511    struct type *b,					\
512    struct type *c)					\
513{							\
514							\
515	return _ck_ring_enqueue_sp(a, b, c,		\
516	    sizeof(struct type), NULL);			\
517}							\
518							\
519CK_CC_INLINE static bool				\
520ck_ring_trydequeue_spmc_##name(struct ck_ring *a,	\
521    struct type *b,					\
522    struct type *c)					\
523{							\
524							\
525	return _ck_ring_trydequeue_mc(a,		\
526	    b, c, sizeof(struct type));			\
527}							\
528							\
529CK_CC_INLINE static bool				\
530ck_ring_dequeue_spmc_##name(struct ck_ring *a,		\
531    struct type *b,					\
532    struct type *c)					\
533{							\
534							\
535	return _ck_ring_dequeue_mc(a, b, c,		\
536	    sizeof(struct type));			\
537}							\
538							\
539CK_CC_INLINE static bool				\
540ck_ring_enqueue_mpsc_##name(struct ck_ring *a,		\
541    struct type *b,					\
542    struct type *c)					\
543{							\
544							\
545	return _ck_ring_enqueue_mp(a, b, c,		\
546	    sizeof(struct type), NULL);			\
547}							\
548							\
549CK_CC_INLINE static bool				\
550ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,	\
551    struct type *b,					\
552    struct type *c,					\
553    unsigned int *d)					\
554{							\
555							\
556	return _ck_ring_enqueue_mp_size(a, b, c,	\
557	    sizeof(struct type), d);			\
558}							\
559							\
560CK_CC_INLINE static bool				\
561ck_ring_dequeue_mpsc_##name(struct ck_ring *a,		\
562    struct type *b,					\
563    struct type *c)					\
564{							\
565							\
566	return _ck_ring_dequeue_sc(a, b, c,		\
567	    sizeof(struct type));			\
568}							\
569							\
570CK_CC_INLINE static bool				\
571ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,	\
572    struct type *b,					\
573    struct type *c,					\
574    unsigned int *d)					\
575{							\
576							\
577	return _ck_ring_enqueue_mp_size(a, b, c,	\
578	    sizeof(struct type), d);			\
579}							\
580							\
581CK_CC_INLINE static bool				\
582ck_ring_enqueue_mpmc_##name(struct ck_ring *a,		\
583    struct type *b,					\
584    struct type *c)					\
585{							\
586							\
587	return _ck_ring_enqueue_mp(a, b, c,		\
588	    sizeof(struct type), NULL);			\
589}							\
590							\
591CK_CC_INLINE static bool				\
592ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,	\
593    struct type *b,					\
594    struct type *c)					\
595{							\
596							\
597	return _ck_ring_trydequeue_mc(a,		\
598	    b, c, sizeof(struct type));			\
599}							\
600							\
601CK_CC_INLINE static bool				\
602ck_ring_dequeue_mpmc_##name(struct ck_ring *a,		\
603    struct type *b,					\
604    struct type *c)					\
605{							\
606							\
607	return _ck_ring_dequeue_mc(a, b, c,		\
608	    sizeof(struct type));			\
609}
610
611/*
612 * A single producer with one concurrent consumer.
613 */
614#define CK_RING_ENQUEUE_SPSC(name, a, b, c)		\
615	ck_ring_enqueue_spsc_##name(a, b, c)
616#define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)	\
617	ck_ring_enqueue_spsc_size_##name(a, b, c, d)
618#define CK_RING_DEQUEUE_SPSC(name, a, b, c)		\
619	ck_ring_dequeue_spsc_##name(a, b, c)
620
621/*
622 * A single producer with any number of concurrent consumers.
623 */
624#define CK_RING_ENQUEUE_SPMC(name, a, b, c)		\
625	ck_ring_enqueue_spmc_##name(a, b, c)
626#define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)	\
627	ck_ring_enqueue_spmc_size_##name(a, b, c, d)
628#define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)		\
629	ck_ring_trydequeue_spmc_##name(a, b, c)
630#define CK_RING_DEQUEUE_SPMC(name, a, b, c)		\
631	ck_ring_dequeue_spmc_##name(a, b, c)
632
633/*
634 * Any number of concurrent producers with up to one
635 * concurrent consumer.
636 */
637#define CK_RING_ENQUEUE_MPSC(name, a, b, c)		\
638	ck_ring_enqueue_mpsc_##name(a, b, c)
639#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)	\
640	ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
641#define CK_RING_DEQUEUE_MPSC(name, a, b, c)		\
642	ck_ring_dequeue_mpsc_##name(a, b, c)
643
644/*
645 * Any number of concurrent producers and consumers.
646 */
647#define CK_RING_ENQUEUE_MPMC(name, a, b, c)		\
648	ck_ring_enqueue_mpmc_##name(a, b, c)
649#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)	\
650	ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
651#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)		\
652	ck_ring_trydequeue_mpmc_##name(a, b, c)
653#define CK_RING_DEQUEUE_MPMC(name, a, b, c)		\
654	ck_ring_dequeue_mpmc_##name(a, b, c)
655
656#endif /* CK_RING_H */
657