1/*
2 * Copyright 2009-2015 Samy Al Bahra.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#ifndef CK_RING_H
28#define CK_RING_H
29
30#include <ck_cc.h>
31#include <ck_md.h>
32#include <ck_pr.h>
33#include <ck_stdbool.h>
34#include <ck_string.h>
35
36/*
37 * Concurrent ring buffer.
38 */
39
40struct ck_ring {
41	unsigned int c_head;
42	char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43	unsigned int p_tail;
44	unsigned int p_head;
45	char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46	unsigned int size;
47	unsigned int mask;
48};
49typedef struct ck_ring ck_ring_t;
50
51struct ck_ring_buffer {
52	void *value;
53};
54typedef struct ck_ring_buffer ck_ring_buffer_t;
55
56CK_CC_INLINE static unsigned int
57ck_ring_size(const struct ck_ring *ring)
58{
59	unsigned int c, p;
60
61	c = ck_pr_load_uint(&ring->c_head);
62	p = ck_pr_load_uint(&ring->p_tail);
63	return (p - c) & ring->mask;
64}
65
66CK_CC_INLINE static unsigned int
67ck_ring_capacity(const struct ck_ring *ring)
68{
69	return ring->size;
70}
71
72CK_CC_INLINE static void
73ck_ring_init(struct ck_ring *ring, unsigned int size)
74{
75
76	ring->size = size;
77	ring->mask = size - 1;
78	ring->p_tail = 0;
79	ring->p_head = 0;
80	ring->c_head = 0;
81	return;
82}
83
84/*
85 * The _ck_ring_* namespace is internal only and must not used externally.
86 */
87CK_CC_FORCE_INLINE static bool
88_ck_ring_enqueue_sp(struct ck_ring *ring,
89    void *CK_CC_RESTRICT buffer,
90    const void *CK_CC_RESTRICT entry,
91    unsigned int ts,
92    unsigned int *size)
93{
94	const unsigned int mask = ring->mask;
95	unsigned int consumer, producer, delta;
96
97	consumer = ck_pr_load_uint(&ring->c_head);
98	producer = ring->p_tail;
99	delta = producer + 1;
100	if (size != NULL)
101		*size = (producer - consumer) & mask;
102
103	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
104		return false;
105
106	buffer = (char *)buffer + ts * (producer & mask);
107	memcpy(buffer, entry, ts);
108
109	/*
110	 * Make sure to update slot value before indicating
111	 * that the slot is available for consumption.
112	 */
113	ck_pr_fence_store();
114	ck_pr_store_uint(&ring->p_tail, delta);
115	return true;
116}
117
118CK_CC_FORCE_INLINE static bool
119_ck_ring_enqueue_sp_size(struct ck_ring *ring,
120    void *CK_CC_RESTRICT buffer,
121    const void *CK_CC_RESTRICT entry,
122    unsigned int ts,
123    unsigned int *size)
124{
125	unsigned int sz;
126	bool r;
127
128	r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
129	*size = sz;
130	return r;
131}
132
133CK_CC_FORCE_INLINE static bool
134_ck_ring_dequeue_sc(struct ck_ring *ring,
135    const void *CK_CC_RESTRICT buffer,
136    void *CK_CC_RESTRICT target,
137    unsigned int size)
138{
139	const unsigned int mask = ring->mask;
140	unsigned int consumer, producer;
141
142	consumer = ring->c_head;
143	producer = ck_pr_load_uint(&ring->p_tail);
144
145	if (CK_CC_UNLIKELY(consumer == producer))
146		return false;
147
148	/*
149	 * Make sure to serialize with respect to our snapshot
150	 * of the producer counter.
151	 */
152	ck_pr_fence_load();
153
154	buffer = (const char *)buffer + size * (consumer & mask);
155	memcpy(target, buffer, size);
156
157	/*
158	 * Make sure copy is completed with respect to consumer
159	 * update.
160	 */
161	ck_pr_fence_store();
162	ck_pr_store_uint(&ring->c_head, consumer + 1);
163	return true;
164}
165
166CK_CC_FORCE_INLINE static bool
167_ck_ring_enqueue_mp(struct ck_ring *ring,
168    void *buffer,
169    const void *entry,
170    unsigned int ts,
171    unsigned int *size)
172{
173	const unsigned int mask = ring->mask;
174	unsigned int producer, consumer, delta;
175	bool r = true;
176
177	producer = ck_pr_load_uint(&ring->p_head);
178
179	for (;;) {
180		/*
181		 * The snapshot of producer must be up to date with respect to
182		 * consumer.
183		 */
184		ck_pr_fence_load();
185		consumer = ck_pr_load_uint(&ring->c_head);
186
187		delta = producer + 1;
188
189		/*
190		 * Only try to CAS if the producer is not clearly stale (not
191		 * less than consumer) and the buffer is definitely not full.
192		 */
193		if (CK_CC_LIKELY((producer - consumer) < mask)) {
194			if (ck_pr_cas_uint_value(&ring->p_head,
195			    producer, delta, &producer) == true) {
196				break;
197			}
198		} else {
199			unsigned int new_producer;
200
201			/*
202			 * Slow path.  Either the buffer is full or we have a
203			 * stale snapshot of p_head.  Execute a second read of
204			 * p_read that must be ordered wrt the snapshot of
205			 * c_head.
206			 */
207			ck_pr_fence_load();
208			new_producer = ck_pr_load_uint(&ring->p_head);
209
210			/*
211			 * Only fail if we haven't made forward progress in
212			 * production: the buffer must have been full when we
213			 * read new_producer (or we wrapped around UINT_MAX
214			 * during this iteration).
215			 */
216			if (producer == new_producer) {
217				r = false;
218				goto leave;
219			}
220
221			/*
222			 * p_head advanced during this iteration. Try again.
223			 */
224			producer = new_producer;
225		}
226	}
227
228	buffer = (char *)buffer + ts * (producer & mask);
229	memcpy(buffer, entry, ts);
230
231	/*
232	 * Wait until all concurrent producers have completed writing
233	 * their data into the ring buffer.
234	 */
235	while (ck_pr_load_uint(&ring->p_tail) != producer)
236		ck_pr_stall();
237
238	/*
239	 * Ensure that copy is completed before updating shared producer
240	 * counter.
241	 */
242	ck_pr_fence_store();
243	ck_pr_store_uint(&ring->p_tail, delta);
244
245leave:
246	if (size != NULL)
247		*size = (producer - consumer) & mask;
248
249	return r;
250}
251
252CK_CC_FORCE_INLINE static bool
253_ck_ring_enqueue_mp_size(struct ck_ring *ring,
254    void *buffer,
255    const void *entry,
256    unsigned int ts,
257    unsigned int *size)
258{
259	unsigned int sz;
260	bool r;
261
262	r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
263	*size = sz;
264	return r;
265}
266
267CK_CC_FORCE_INLINE static bool
268_ck_ring_trydequeue_mc(struct ck_ring *ring,
269    const void *buffer,
270    void *data,
271    unsigned int size)
272{
273	const unsigned int mask = ring->mask;
274	unsigned int consumer, producer;
275
276	consumer = ck_pr_load_uint(&ring->c_head);
277	ck_pr_fence_load();
278	producer = ck_pr_load_uint(&ring->p_tail);
279
280	if (CK_CC_UNLIKELY(consumer == producer))
281		return false;
282
283	ck_pr_fence_load();
284
285	buffer = (const char *)buffer + size * (consumer & mask);
286	memcpy(data, buffer, size);
287
288	ck_pr_fence_store_atomic();
289	return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
290}
291
292CK_CC_FORCE_INLINE static bool
293_ck_ring_dequeue_mc(struct ck_ring *ring,
294    const void *buffer,
295    void *data,
296    unsigned int ts)
297{
298	const unsigned int mask = ring->mask;
299	unsigned int consumer, producer;
300
301	consumer = ck_pr_load_uint(&ring->c_head);
302
303	do {
304		const char *target;
305
306		/*
307		 * Producer counter must represent state relative to
308		 * our latest consumer snapshot.
309		 */
310		ck_pr_fence_load();
311		producer = ck_pr_load_uint(&ring->p_tail);
312
313		if (CK_CC_UNLIKELY(consumer == producer))
314			return false;
315
316		ck_pr_fence_load();
317
318		target = (const char *)buffer + ts * (consumer & mask);
319		memcpy(data, target, ts);
320
321		/* Serialize load with respect to head update. */
322		ck_pr_fence_store_atomic();
323	} while (ck_pr_cas_uint_value(&ring->c_head,
324				      consumer,
325				      consumer + 1,
326				      &consumer) == false);
327
328	return true;
329}
330
331/*
332 * The ck_ring_*_spsc namespace is the public interface for interacting with a
333 * ring buffer containing pointers. Correctness is only provided if there is up
334 * to one concurrent consumer and up to one concurrent producer.
335 */
336CK_CC_INLINE static bool
337ck_ring_enqueue_spsc_size(struct ck_ring *ring,
338    struct ck_ring_buffer *buffer,
339    const void *entry,
340    unsigned int *size)
341{
342
343	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
344	    sizeof(entry), size);
345}
346
347CK_CC_INLINE static bool
348ck_ring_enqueue_spsc(struct ck_ring *ring,
349    struct ck_ring_buffer *buffer,
350    const void *entry)
351{
352
353	return _ck_ring_enqueue_sp(ring, buffer,
354	    &entry, sizeof(entry), NULL);
355}
356
357CK_CC_INLINE static bool
358ck_ring_dequeue_spsc(struct ck_ring *ring,
359    const struct ck_ring_buffer *buffer,
360    void *data)
361{
362
363	return _ck_ring_dequeue_sc(ring, buffer,
364	    (void **)data, sizeof(void *));
365}
366
367/*
368 * The ck_ring_*_mpmc namespace is the public interface for interacting with a
369 * ring buffer containing pointers. Correctness is provided for any number of
370 * producers and consumers.
371 */
372CK_CC_INLINE static bool
373ck_ring_enqueue_mpmc(struct ck_ring *ring,
374    struct ck_ring_buffer *buffer,
375    const void *entry)
376{
377
378	return _ck_ring_enqueue_mp(ring, buffer, &entry,
379	    sizeof(entry), NULL);
380}
381
382CK_CC_INLINE static bool
383ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
384    struct ck_ring_buffer *buffer,
385    const void *entry,
386    unsigned int *size)
387{
388
389	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
390	    sizeof(entry), size);
391}
392
393CK_CC_INLINE static bool
394ck_ring_trydequeue_mpmc(struct ck_ring *ring,
395    const struct ck_ring_buffer *buffer,
396    void *data)
397{
398
399	return _ck_ring_trydequeue_mc(ring,
400	    buffer, (void **)data, sizeof(void *));
401}
402
403CK_CC_INLINE static bool
404ck_ring_dequeue_mpmc(struct ck_ring *ring,
405    const struct ck_ring_buffer *buffer,
406    void *data)
407{
408
409	return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
410	    sizeof(void *));
411}
412
413/*
414 * The ck_ring_*_spmc namespace is the public interface for interacting with a
415 * ring buffer containing pointers. Correctness is provided for any number of
416 * consumers with up to one concurrent producer.
417 */
418CK_CC_INLINE static bool
419ck_ring_enqueue_spmc_size(struct ck_ring *ring,
420    struct ck_ring_buffer *buffer,
421    const void *entry,
422    unsigned int *size)
423{
424
425	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
426	    sizeof(entry), size);
427}
428
429CK_CC_INLINE static bool
430ck_ring_enqueue_spmc(struct ck_ring *ring,
431    struct ck_ring_buffer *buffer,
432    const void *entry)
433{
434
435	return _ck_ring_enqueue_sp(ring, buffer, &entry,
436	    sizeof(entry), NULL);
437}
438
439CK_CC_INLINE static bool
440ck_ring_trydequeue_spmc(struct ck_ring *ring,
441    const struct ck_ring_buffer *buffer,
442    void *data)
443{
444
445	return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
446}
447
448CK_CC_INLINE static bool
449ck_ring_dequeue_spmc(struct ck_ring *ring,
450    const struct ck_ring_buffer *buffer,
451    void *data)
452{
453
454	return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
455}
456
457/*
458 * The ck_ring_*_mpsc namespace is the public interface for interacting with a
459 * ring buffer containing pointers. Correctness is provided for any number of
460 * producers with up to one concurrent consumers.
461 */
462CK_CC_INLINE static bool
463ck_ring_enqueue_mpsc(struct ck_ring *ring,
464    struct ck_ring_buffer *buffer,
465    const void *entry)
466{
467
468	return _ck_ring_enqueue_mp(ring, buffer, &entry,
469	    sizeof(entry), NULL);
470}
471
472CK_CC_INLINE static bool
473ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
474    struct ck_ring_buffer *buffer,
475    const void *entry,
476    unsigned int *size)
477{
478
479	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
480	    sizeof(entry), size);
481}
482
483CK_CC_INLINE static bool
484ck_ring_dequeue_mpsc(struct ck_ring *ring,
485    const struct ck_ring_buffer *buffer,
486    void *data)
487{
488
489	return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
490	    sizeof(void *));
491}
492
493/*
494 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
495 * values of a particular type in the ring the buffer.
496 */
497#define CK_RING_PROTOTYPE(name, type)			\
498CK_CC_INLINE static bool				\
499ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,	\
500    struct type *b,					\
501    struct type *c,					\
502    unsigned int *d)					\
503{							\
504							\
505	return _ck_ring_enqueue_sp_size(a, b, c,	\
506	    sizeof(struct type), d);			\
507}							\
508							\
509CK_CC_INLINE static bool				\
510ck_ring_enqueue_spsc_##name(struct ck_ring *a,		\
511    struct type *b,					\
512    struct type *c)					\
513{							\
514							\
515	return _ck_ring_enqueue_sp(a, b, c,		\
516	    sizeof(struct type), NULL);			\
517}							\
518							\
519CK_CC_INLINE static bool				\
520ck_ring_dequeue_spsc_##name(struct ck_ring *a,		\
521    struct type *b,					\
522    struct type *c)					\
523{							\
524							\
525	return _ck_ring_dequeue_sc(a, b, c,		\
526	    sizeof(struct type));			\
527}							\
528							\
529CK_CC_INLINE static bool				\
530ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,	\
531    struct type *b,					\
532    struct type *c,					\
533    unsigned int *d)					\
534{							\
535							\
536	return _ck_ring_enqueue_sp_size(a, b, c,	\
537	    sizeof(struct type), d);			\
538}							\
539							\
540CK_CC_INLINE static bool				\
541ck_ring_enqueue_spmc_##name(struct ck_ring *a,		\
542    struct type *b,					\
543    struct type *c)					\
544{							\
545							\
546	return _ck_ring_enqueue_sp(a, b, c,		\
547	    sizeof(struct type), NULL);			\
548}							\
549							\
550CK_CC_INLINE static bool				\
551ck_ring_trydequeue_spmc_##name(struct ck_ring *a,	\
552    struct type *b,					\
553    struct type *c)					\
554{							\
555							\
556	return _ck_ring_trydequeue_mc(a,		\
557	    b, c, sizeof(struct type));			\
558}							\
559							\
560CK_CC_INLINE static bool				\
561ck_ring_dequeue_spmc_##name(struct ck_ring *a,		\
562    struct type *b,					\
563    struct type *c)					\
564{							\
565							\
566	return _ck_ring_dequeue_mc(a, b, c,		\
567	    sizeof(struct type));			\
568}							\
569							\
570CK_CC_INLINE static bool				\
571ck_ring_enqueue_mpsc_##name(struct ck_ring *a,		\
572    struct type *b,					\
573    struct type *c)					\
574{							\
575							\
576	return _ck_ring_enqueue_mp(a, b, c,		\
577	    sizeof(struct type), NULL);			\
578}							\
579							\
580CK_CC_INLINE static bool				\
581ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,	\
582    struct type *b,					\
583    struct type *c,					\
584    unsigned int *d)					\
585{							\
586							\
587	return _ck_ring_enqueue_mp_size(a, b, c,	\
588	    sizeof(struct type), d);			\
589}							\
590							\
591CK_CC_INLINE static bool				\
592ck_ring_dequeue_mpsc_##name(struct ck_ring *a,		\
593    struct type *b,					\
594    struct type *c)					\
595{							\
596							\
597	return _ck_ring_dequeue_sc(a, b, c,		\
598	    sizeof(struct type));			\
599}							\
600							\
601CK_CC_INLINE static bool				\
602ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,	\
603    struct type *b,					\
604    struct type *c,					\
605    unsigned int *d)					\
606{							\
607							\
608	return _ck_ring_enqueue_mp_size(a, b, c,	\
609	    sizeof(struct type), d);			\
610}							\
611							\
612CK_CC_INLINE static bool				\
613ck_ring_enqueue_mpmc_##name(struct ck_ring *a,		\
614    struct type *b,					\
615    struct type *c)					\
616{							\
617							\
618	return _ck_ring_enqueue_mp(a, b, c,		\
619	    sizeof(struct type), NULL);			\
620}							\
621							\
622CK_CC_INLINE static bool				\
623ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,	\
624    struct type *b,					\
625    struct type *c)					\
626{							\
627							\
628	return _ck_ring_trydequeue_mc(a,		\
629	    b, c, sizeof(struct type));			\
630}							\
631							\
632CK_CC_INLINE static bool				\
633ck_ring_dequeue_mpmc_##name(struct ck_ring *a,		\
634    struct type *b,					\
635    struct type *c)					\
636{							\
637							\
638	return _ck_ring_dequeue_mc(a, b, c,		\
639	    sizeof(struct type));			\
640}
641
642/*
643 * A single producer with one concurrent consumer.
644 */
645#define CK_RING_ENQUEUE_SPSC(name, a, b, c)		\
646	ck_ring_enqueue_spsc_##name(a, b, c)
647#define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)	\
648	ck_ring_enqueue_spsc_size_##name(a, b, c, d)
649#define CK_RING_DEQUEUE_SPSC(name, a, b, c)		\
650	ck_ring_dequeue_spsc_##name(a, b, c)
651
652/*
653 * A single producer with any number of concurrent consumers.
654 */
655#define CK_RING_ENQUEUE_SPMC(name, a, b, c)		\
656	ck_ring_enqueue_spmc_##name(a, b, c)
657#define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)	\
658	ck_ring_enqueue_spmc_size_##name(a, b, c, d)
659#define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)		\
660	ck_ring_trydequeue_spmc_##name(a, b, c)
661#define CK_RING_DEQUEUE_SPMC(name, a, b, c)		\
662	ck_ring_dequeue_spmc_##name(a, b, c)
663
664/*
665 * Any number of concurrent producers with up to one
666 * concurrent consumer.
667 */
668#define CK_RING_ENQUEUE_MPSC(name, a, b, c)		\
669	ck_ring_enqueue_mpsc_##name(a, b, c)
670#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)	\
671	ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
672#define CK_RING_DEQUEUE_MPSC(name, a, b, c)		\
673	ck_ring_dequeue_mpsc_##name(a, b, c)
674
675/*
676 * Any number of concurrent producers and consumers.
677 */
678#define CK_RING_ENQUEUE_MPMC(name, a, b, c)		\
679	ck_ring_enqueue_mpmc_##name(a, b, c)
680#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)	\
681	ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
682#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)		\
683	ck_ring_trydequeue_mpmc_##name(a, b, c)
684#define CK_RING_DEQUEUE_MPMC(name, a, b, c)		\
685	ck_ring_dequeue_mpmc_##name(a, b, c)
686
687#endif /* CK_RING_H */
688