1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/types.h>
29#include <sys/param.h>
30#include <sys/systm.h>
31#include <sys/counter.h>
32#include <sys/lock.h>
33#include <sys/mutex.h>
34#include <sys/malloc.h>
35#include <machine/cpu.h>
36#include <net/mp_ring.h>
37
38union ring_state {
39	struct {
40		uint16_t pidx_head;
41		uint16_t pidx_tail;
42		uint16_t cidx;
43		uint16_t flags;
44	};
45	uint64_t state;
46};
47
48enum {
49	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
50	BUSY,		/* consumer is running already, or will be shortly. */
51	STALLED,	/* consumer stopped due to lack of resources. */
52	ABDICATED,	/* consumer stopped even though there was work to be
53			   done because it wants another thread to take over. */
54};
55
56static inline uint16_t
57space_available(struct ifmp_ring *r, union ring_state s)
58{
59	uint16_t x = r->size - 1;
60
61	if (s.cidx == s.pidx_head)
62		return (x);
63	else if (s.cidx > s.pidx_head)
64		return (s.cidx - s.pidx_head - 1);
65	else
66		return (x - s.pidx_head + s.cidx);
67}
68
69static inline uint16_t
70increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
71{
72	int x = r->size - idx;
73
74	MPASS(x > 0);
75	return (x > n ? idx + n : n - x);
76}
77
78/* Consumer is about to update the ring's state to s */
79static inline uint16_t
80state_to_flags(union ring_state s, int abdicate)
81{
82
83	if (s.cidx == s.pidx_tail)
84		return (IDLE);
85	else if (abdicate && s.pidx_tail != s.pidx_head)
86		return (ABDICATED);
87
88	return (BUSY);
89}
90
91#ifdef MP_RING_NO_64BIT_ATOMICS
92static void
93drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
94{
95	union ring_state ns;
96	int n, pending, total;
97	uint16_t cidx = os.cidx;
98	uint16_t pidx = os.pidx_tail;
99
100	MPASS(os.flags == BUSY);
101	MPASS(cidx != pidx);
102
103	if (prev == IDLE)
104		counter_u64_add(r->starts, 1);
105	pending = 0;
106	total = 0;
107
108	while (cidx != pidx) {
109		/* Items from cidx to pidx are available for consumption. */
110		n = r->drain(r, cidx, pidx);
111		if (n == 0) {
112			os.state = ns.state = r->state;
113			ns.cidx = cidx;
114			ns.flags = STALLED;
115			r->state = ns.state;
116			if (prev != STALLED)
117				counter_u64_add(r->stalls, 1);
118			else if (total > 0) {
119				counter_u64_add(r->restarts, 1);
120				counter_u64_add(r->stalls, 1);
121			}
122			break;
123		}
124		cidx = increment_idx(r, cidx, n);
125		pending += n;
126		total += n;
127
128		/*
129		 * We update the cidx only if we've caught up with the pidx, the
130		 * real cidx is getting too far ahead of the one visible to
131		 * everyone else, or we have exceeded our budget.
132		 */
133		if (cidx != pidx && pending < 64 && total < budget)
134			continue;
135
136		os.state = ns.state = r->state;
137		ns.cidx = cidx;
138		ns.flags = state_to_flags(ns, total >= budget);
139		r->state = ns.state;
140
141		if (ns.flags == ABDICATED)
142			counter_u64_add(r->abdications, 1);
143		if (ns.flags != BUSY) {
144			/* Wrong loop exit if we're going to stall. */
145			MPASS(ns.flags != STALLED);
146			if (prev == STALLED) {
147				MPASS(total > 0);
148				counter_u64_add(r->restarts, 1);
149			}
150			break;
151		}
152
153		/*
154		 * The acquire style atomic above guarantees visibility of items
155		 * associated with any pidx change that we notice here.
156		 */
157		pidx = ns.pidx_tail;
158		pending = 0;
159	}
160}
161#else
162/*
163 * Caller passes in a state, with a guarantee that there is work to do and that
164 * all items up to the pidx_tail in the state are visible.
165 */
166static void
167drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
168{
169	union ring_state ns;
170	int n, pending, total;
171	uint16_t cidx = os.cidx;
172	uint16_t pidx = os.pidx_tail;
173
174	MPASS(os.flags == BUSY);
175	MPASS(cidx != pidx);
176
177	if (prev == IDLE)
178		counter_u64_add(r->starts, 1);
179	pending = 0;
180	total = 0;
181
182	while (cidx != pidx) {
183		/* Items from cidx to pidx are available for consumption. */
184		n = r->drain(r, cidx, pidx);
185		if (n == 0) {
186			critical_enter();
187			os.state = r->state;
188			do {
189				ns.state = os.state;
190				ns.cidx = cidx;
191				ns.flags = STALLED;
192			} while (atomic_fcmpset_64(&r->state, &os.state,
193			    ns.state) == 0);
194			critical_exit();
195			if (prev != STALLED)
196				counter_u64_add(r->stalls, 1);
197			else if (total > 0) {
198				counter_u64_add(r->restarts, 1);
199				counter_u64_add(r->stalls, 1);
200			}
201			break;
202		}
203		cidx = increment_idx(r, cidx, n);
204		pending += n;
205		total += n;
206
207		/*
208		 * We update the cidx only if we've caught up with the pidx, the
209		 * real cidx is getting too far ahead of the one visible to
210		 * everyone else, or we have exceeded our budget.
211		 */
212		if (cidx != pidx && pending < 64 && total < budget)
213			continue;
214		critical_enter();
215		os.state = r->state;
216		do {
217			ns.state = os.state;
218			ns.cidx = cidx;
219			ns.flags = state_to_flags(ns, total >= budget);
220		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
221		    ns.state) == 0);
222		critical_exit();
223
224		if (ns.flags == ABDICATED)
225			counter_u64_add(r->abdications, 1);
226		if (ns.flags != BUSY) {
227			/* Wrong loop exit if we're going to stall. */
228			MPASS(ns.flags != STALLED);
229			if (prev == STALLED) {
230				MPASS(total > 0);
231				counter_u64_add(r->restarts, 1);
232			}
233			break;
234		}
235
236		/*
237		 * The acquire style atomic above guarantees visibility of items
238		 * associated with any pidx change that we notice here.
239		 */
240		pidx = ns.pidx_tail;
241		pending = 0;
242	}
243}
244#endif
245
246int
247ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
248    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
249{
250	struct ifmp_ring *r;
251
252	/* All idx are 16b so size can be 65536 at most */
253	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
254	    can_drain == NULL)
255		return (EINVAL);
256	*pr = NULL;
257	flags &= M_NOWAIT | M_WAITOK;
258	MPASS(flags != 0);
259
260	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
261	if (r == NULL)
262		return (ENOMEM);
263	r->size = size;
264	r->cookie = cookie;
265	r->mt = mt;
266	r->drain = drain;
267	r->can_drain = can_drain;
268	r->enqueues = counter_u64_alloc(flags);
269	r->drops = counter_u64_alloc(flags);
270	r->starts = counter_u64_alloc(flags);
271	r->stalls = counter_u64_alloc(flags);
272	r->restarts = counter_u64_alloc(flags);
273	r->abdications = counter_u64_alloc(flags);
274	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
275	    r->stalls == NULL || r->restarts == NULL ||
276	    r->abdications == NULL) {
277		ifmp_ring_free(r);
278		return (ENOMEM);
279	}
280
281	*pr = r;
282#ifdef MP_RING_NO_64BIT_ATOMICS
283	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
284#endif
285	return (0);
286}
287
288void
289ifmp_ring_free(struct ifmp_ring *r)
290{
291
292	if (r == NULL)
293		return;
294
295	if (r->enqueues != NULL)
296		counter_u64_free(r->enqueues);
297	if (r->drops != NULL)
298		counter_u64_free(r->drops);
299	if (r->starts != NULL)
300		counter_u64_free(r->starts);
301	if (r->stalls != NULL)
302		counter_u64_free(r->stalls);
303	if (r->restarts != NULL)
304		counter_u64_free(r->restarts);
305	if (r->abdications != NULL)
306		counter_u64_free(r->abdications);
307
308	free(r, r->mt);
309}
310
311/*
312 * Enqueue n items and maybe drain the ring for some time.
313 *
314 * Returns an errno.
315 */
316#ifdef MP_RING_NO_64BIT_ATOMICS
317int
318ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
319{
320	union ring_state os, ns;
321	uint16_t pidx_start, pidx_stop;
322	int i;
323
324	MPASS(items != NULL);
325	MPASS(n > 0);
326
327	mtx_lock(&r->lock);
328	/*
329	 * Reserve room for the new items.  Our reservation, if successful, is
330	 * from 'pidx_start' to 'pidx_stop'.
331	 */
332	os.state = r->state;
333	if (n >= space_available(r, os)) {
334		counter_u64_add(r->drops, n);
335		MPASS(os.flags != IDLE);
336		mtx_unlock(&r->lock);
337		if (os.flags == STALLED)
338			ifmp_ring_check_drainage(r, 0);
339		return (ENOBUFS);
340	}
341	ns.state = os.state;
342	ns.pidx_head = increment_idx(r, os.pidx_head, n);
343	r->state = ns.state;
344	pidx_start = os.pidx_head;
345	pidx_stop = ns.pidx_head;
346
347	/*
348	 * Wait for other producers who got in ahead of us to enqueue their
349	 * items, one producer at a time.  It is our turn when the ring's
350	 * pidx_tail reaches the beginning of our reservation (pidx_start).
351	 */
352	while (ns.pidx_tail != pidx_start) {
353		cpu_spinwait();
354		ns.state = r->state;
355	}
356
357	/* Now it is our turn to fill up the area we reserved earlier. */
358	i = pidx_start;
359	do {
360		r->items[i] = *items++;
361		if (__predict_false(++i == r->size))
362			i = 0;
363	} while (i != pidx_stop);
364
365	/*
366	 * Update the ring's pidx_tail.  The release style atomic guarantees
367	 * that the items are visible to any thread that sees the updated pidx.
368	 */
369	os.state = ns.state = r->state;
370	ns.pidx_tail = pidx_stop;
371	if (abdicate) {
372		if (os.flags == IDLE)
373			ns.flags = ABDICATED;
374	} else
375		ns.flags = BUSY;
376	r->state = ns.state;
377	counter_u64_add(r->enqueues, n);
378
379	if (!abdicate) {
380		/*
381		 * Turn into a consumer if some other thread isn't active as a consumer
382		 * already.
383		 */
384		if (os.flags != BUSY)
385			drain_ring_locked(r, ns, os.flags, budget);
386	}
387
388	mtx_unlock(&r->lock);
389	return (0);
390}
391#else
392int
393ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
394{
395	union ring_state os, ns;
396	uint16_t pidx_start, pidx_stop;
397	int i;
398
399	MPASS(items != NULL);
400	MPASS(n > 0);
401
402	/*
403	 * Reserve room for the new items.  Our reservation, if successful, is
404	 * from 'pidx_start' to 'pidx_stop'.
405	 */
406	os.state = r->state;
407	for (;;) {
408		if (n >= space_available(r, os)) {
409			counter_u64_add(r->drops, n);
410			MPASS(os.flags != IDLE);
411			if (os.flags == STALLED)
412				ifmp_ring_check_drainage(r, 0);
413			return (ENOBUFS);
414		}
415		ns.state = os.state;
416		ns.pidx_head = increment_idx(r, os.pidx_head, n);
417		critical_enter();
418		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
419			break;
420		critical_exit();
421		cpu_spinwait();
422	}
423	pidx_start = os.pidx_head;
424	pidx_stop = ns.pidx_head;
425
426	/*
427	 * Wait for other producers who got in ahead of us to enqueue their
428	 * items, one producer at a time.  It is our turn when the ring's
429	 * pidx_tail reaches the beginning of our reservation (pidx_start).
430	 */
431	while (ns.pidx_tail != pidx_start) {
432		cpu_spinwait();
433		ns.state = r->state;
434	}
435
436	/* Now it is our turn to fill up the area we reserved earlier. */
437	i = pidx_start;
438	do {
439		r->items[i] = *items++;
440		if (__predict_false(++i == r->size))
441			i = 0;
442	} while (i != pidx_stop);
443
444	/*
445	 * Update the ring's pidx_tail.  The release style atomic guarantees
446	 * that the items are visible to any thread that sees the updated pidx.
447	 */
448	os.state = r->state;
449	do {
450		ns.state = os.state;
451		ns.pidx_tail = pidx_stop;
452		if (abdicate) {
453			if (os.flags == IDLE)
454				ns.flags = ABDICATED;
455		} else
456			ns.flags = BUSY;
457	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
458	critical_exit();
459	counter_u64_add(r->enqueues, n);
460
461	if (!abdicate) {
462		/*
463		 * Turn into a consumer if some other thread isn't active as a consumer
464		 * already.
465		 */
466		if (os.flags != BUSY)
467			drain_ring_lockless(r, ns, os.flags, budget);
468	}
469
470	return (0);
471}
472#endif
473
474void
475ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
476{
477	union ring_state os, ns;
478
479	os.state = r->state;
480	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
481	    os.pidx_head != os.pidx_tail ||			// Require work to be available
482	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
483		return;
484
485	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
486	ns.state = os.state;
487	ns.flags = BUSY;
488
489#ifdef MP_RING_NO_64BIT_ATOMICS
490	mtx_lock(&r->lock);
491	if (r->state != os.state) {
492		mtx_unlock(&r->lock);
493		return;
494	}
495	r->state = ns.state;
496	drain_ring_locked(r, ns, os.flags, budget);
497	mtx_unlock(&r->lock);
498#else
499	/*
500	 * The acquire style atomic guarantees visibility of items associated
501	 * with the pidx that we read here.
502	 */
503	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
504		return;
505
506	drain_ring_lockless(r, ns, os.flags, budget);
507#endif
508}
509
510void
511ifmp_ring_reset_stats(struct ifmp_ring *r)
512{
513
514	counter_u64_zero(r->enqueues);
515	counter_u64_zero(r->drops);
516	counter_u64_zero(r->starts);
517	counter_u64_zero(r->stalls);
518	counter_u64_zero(r->restarts);
519	counter_u64_zero(r->abdications);
520}
521
522int
523ifmp_ring_is_idle(struct ifmp_ring *r)
524{
525	union ring_state s;
526
527	s.state = r->state;
528	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
529	    s.flags == IDLE)
530		return (1);
531
532	return (0);
533}
534
535int
536ifmp_ring_is_stalled(struct ifmp_ring *r)
537{
538	union ring_state s;
539
540	s.state = r->state;
541	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
542		return (1);
543
544	return (0);
545}
546