1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD$");
30
31#include <sys/types.h>
32#include <sys/param.h>
33#include <sys/systm.h>
34#include <sys/counter.h>
35#include <sys/lock.h>
36#include <sys/mutex.h>
37#include <sys/malloc.h>
38#include <machine/cpu.h>
39#include <net/mp_ring.h>
40
41union ring_state {
42	struct {
43		uint16_t pidx_head;
44		uint16_t pidx_tail;
45		uint16_t cidx;
46		uint16_t flags;
47	};
48	uint64_t state;
49};
50
51enum {
52	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
53	BUSY,		/* consumer is running already, or will be shortly. */
54	STALLED,	/* consumer stopped due to lack of resources. */
55	ABDICATED,	/* consumer stopped even though there was work to be
56			   done because it wants another thread to take over. */
57};
58
59static inline uint16_t
60space_available(struct ifmp_ring *r, union ring_state s)
61{
62	uint16_t x = r->size - 1;
63
64	if (s.cidx == s.pidx_head)
65		return (x);
66	else if (s.cidx > s.pidx_head)
67		return (s.cidx - s.pidx_head - 1);
68	else
69		return (x - s.pidx_head + s.cidx);
70}
71
72static inline uint16_t
73increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
74{
75	int x = r->size - idx;
76
77	MPASS(x > 0);
78	return (x > n ? idx + n : n - x);
79}
80
81/* Consumer is about to update the ring's state to s */
82static inline uint16_t
83state_to_flags(union ring_state s, int abdicate)
84{
85
86	if (s.cidx == s.pidx_tail)
87		return (IDLE);
88	else if (abdicate && s.pidx_tail != s.pidx_head)
89		return (ABDICATED);
90
91	return (BUSY);
92}
93
94#ifdef MP_RING_NO_64BIT_ATOMICS
95static void
96drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
97{
98	union ring_state ns;
99	int n, pending, total;
100	uint16_t cidx = os.cidx;
101	uint16_t pidx = os.pidx_tail;
102
103	MPASS(os.flags == BUSY);
104	MPASS(cidx != pidx);
105
106	if (prev == IDLE)
107		counter_u64_add(r->starts, 1);
108	pending = 0;
109	total = 0;
110
111	while (cidx != pidx) {
112
113		/* Items from cidx to pidx are available for consumption. */
114		n = r->drain(r, cidx, pidx);
115		if (n == 0) {
116			os.state = ns.state = r->state;
117			ns.cidx = cidx;
118			ns.flags = STALLED;
119			r->state = ns.state;
120			if (prev != STALLED)
121				counter_u64_add(r->stalls, 1);
122			else if (total > 0) {
123				counter_u64_add(r->restarts, 1);
124				counter_u64_add(r->stalls, 1);
125			}
126			break;
127		}
128		cidx = increment_idx(r, cidx, n);
129		pending += n;
130		total += n;
131
132		/*
133		 * We update the cidx only if we've caught up with the pidx, the
134		 * real cidx is getting too far ahead of the one visible to
135		 * everyone else, or we have exceeded our budget.
136		 */
137		if (cidx != pidx && pending < 64 && total < budget)
138			continue;
139
140		os.state = ns.state = r->state;
141		ns.cidx = cidx;
142		ns.flags = state_to_flags(ns, total >= budget);
143		r->state = ns.state;
144
145		if (ns.flags == ABDICATED)
146			counter_u64_add(r->abdications, 1);
147		if (ns.flags != BUSY) {
148			/* Wrong loop exit if we're going to stall. */
149			MPASS(ns.flags != STALLED);
150			if (prev == STALLED) {
151				MPASS(total > 0);
152				counter_u64_add(r->restarts, 1);
153			}
154			break;
155		}
156
157		/*
158		 * The acquire style atomic above guarantees visibility of items
159		 * associated with any pidx change that we notice here.
160		 */
161		pidx = ns.pidx_tail;
162		pending = 0;
163	}
164}
165#else
166/*
167 * Caller passes in a state, with a guarantee that there is work to do and that
168 * all items up to the pidx_tail in the state are visible.
169 */
170static void
171drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
172{
173	union ring_state ns;
174	int n, pending, total;
175	uint16_t cidx = os.cidx;
176	uint16_t pidx = os.pidx_tail;
177
178	MPASS(os.flags == BUSY);
179	MPASS(cidx != pidx);
180
181	if (prev == IDLE)
182		counter_u64_add(r->starts, 1);
183	pending = 0;
184	total = 0;
185
186	while (cidx != pidx) {
187
188		/* Items from cidx to pidx are available for consumption. */
189		n = r->drain(r, cidx, pidx);
190		if (n == 0) {
191			critical_enter();
192			os.state = r->state;
193			do {
194				ns.state = os.state;
195				ns.cidx = cidx;
196				ns.flags = STALLED;
197			} while (atomic_fcmpset_64(&r->state, &os.state,
198			    ns.state) == 0);
199			critical_exit();
200			if (prev != STALLED)
201				counter_u64_add(r->stalls, 1);
202			else if (total > 0) {
203				counter_u64_add(r->restarts, 1);
204				counter_u64_add(r->stalls, 1);
205			}
206			break;
207		}
208		cidx = increment_idx(r, cidx, n);
209		pending += n;
210		total += n;
211
212		/*
213		 * We update the cidx only if we've caught up with the pidx, the
214		 * real cidx is getting too far ahead of the one visible to
215		 * everyone else, or we have exceeded our budget.
216		 */
217		if (cidx != pidx && pending < 64 && total < budget)
218			continue;
219		critical_enter();
220		os.state = r->state;
221		do {
222			ns.state = os.state;
223			ns.cidx = cidx;
224			ns.flags = state_to_flags(ns, total >= budget);
225		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
226		    ns.state) == 0);
227		critical_exit();
228
229		if (ns.flags == ABDICATED)
230			counter_u64_add(r->abdications, 1);
231		if (ns.flags != BUSY) {
232			/* Wrong loop exit if we're going to stall. */
233			MPASS(ns.flags != STALLED);
234			if (prev == STALLED) {
235				MPASS(total > 0);
236				counter_u64_add(r->restarts, 1);
237			}
238			break;
239		}
240
241		/*
242		 * The acquire style atomic above guarantees visibility of items
243		 * associated with any pidx change that we notice here.
244		 */
245		pidx = ns.pidx_tail;
246		pending = 0;
247	}
248}
249#endif
250
251int
252ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
253    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
254{
255	struct ifmp_ring *r;
256
257	/* All idx are 16b so size can be 65536 at most */
258	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
259	    can_drain == NULL)
260		return (EINVAL);
261	*pr = NULL;
262	flags &= M_NOWAIT | M_WAITOK;
263	MPASS(flags != 0);
264
265	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
266	if (r == NULL)
267		return (ENOMEM);
268	r->size = size;
269	r->cookie = cookie;
270	r->mt = mt;
271	r->drain = drain;
272	r->can_drain = can_drain;
273	r->enqueues = counter_u64_alloc(flags);
274	r->drops = counter_u64_alloc(flags);
275	r->starts = counter_u64_alloc(flags);
276	r->stalls = counter_u64_alloc(flags);
277	r->restarts = counter_u64_alloc(flags);
278	r->abdications = counter_u64_alloc(flags);
279	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
280	    r->stalls == NULL || r->restarts == NULL ||
281	    r->abdications == NULL) {
282		ifmp_ring_free(r);
283		return (ENOMEM);
284	}
285
286	*pr = r;
287#ifdef MP_RING_NO_64BIT_ATOMICS
288	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
289#endif
290	return (0);
291}
292
293void
294ifmp_ring_free(struct ifmp_ring *r)
295{
296
297	if (r == NULL)
298		return;
299
300	if (r->enqueues != NULL)
301		counter_u64_free(r->enqueues);
302	if (r->drops != NULL)
303		counter_u64_free(r->drops);
304	if (r->starts != NULL)
305		counter_u64_free(r->starts);
306	if (r->stalls != NULL)
307		counter_u64_free(r->stalls);
308	if (r->restarts != NULL)
309		counter_u64_free(r->restarts);
310	if (r->abdications != NULL)
311		counter_u64_free(r->abdications);
312
313	free(r, r->mt);
314}
315
316/*
317 * Enqueue n items and maybe drain the ring for some time.
318 *
319 * Returns an errno.
320 */
321#ifdef MP_RING_NO_64BIT_ATOMICS
322int
323ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
324{
325	union ring_state os, ns;
326	uint16_t pidx_start, pidx_stop;
327	int i;
328
329	MPASS(items != NULL);
330	MPASS(n > 0);
331
332	mtx_lock(&r->lock);
333	/*
334	 * Reserve room for the new items.  Our reservation, if successful, is
335	 * from 'pidx_start' to 'pidx_stop'.
336	 */
337	os.state = r->state;
338	if (n >= space_available(r, os)) {
339		counter_u64_add(r->drops, n);
340		MPASS(os.flags != IDLE);
341		mtx_unlock(&r->lock);
342		if (os.flags == STALLED)
343			ifmp_ring_check_drainage(r, 0);
344		return (ENOBUFS);
345	}
346	ns.state = os.state;
347	ns.pidx_head = increment_idx(r, os.pidx_head, n);
348	r->state = ns.state;
349	pidx_start = os.pidx_head;
350	pidx_stop = ns.pidx_head;
351
352	/*
353	 * Wait for other producers who got in ahead of us to enqueue their
354	 * items, one producer at a time.  It is our turn when the ring's
355	 * pidx_tail reaches the beginning of our reservation (pidx_start).
356	 */
357	while (ns.pidx_tail != pidx_start) {
358		cpu_spinwait();
359		ns.state = r->state;
360	}
361
362	/* Now it is our turn to fill up the area we reserved earlier. */
363	i = pidx_start;
364	do {
365		r->items[i] = *items++;
366		if (__predict_false(++i == r->size))
367			i = 0;
368	} while (i != pidx_stop);
369
370	/*
371	 * Update the ring's pidx_tail.  The release style atomic guarantees
372	 * that the items are visible to any thread that sees the updated pidx.
373	 */
374	os.state = ns.state = r->state;
375	ns.pidx_tail = pidx_stop;
376	if (abdicate) {
377		if (os.flags == IDLE)
378			ns.flags = ABDICATED;
379	} else
380		ns.flags = BUSY;
381	r->state = ns.state;
382	counter_u64_add(r->enqueues, n);
383
384	if (!abdicate) {
385		/*
386		 * Turn into a consumer if some other thread isn't active as a consumer
387		 * already.
388		 */
389		if (os.flags != BUSY)
390			drain_ring_locked(r, ns, os.flags, budget);
391	}
392
393	mtx_unlock(&r->lock);
394	return (0);
395}
396#else
397int
398ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
399{
400	union ring_state os, ns;
401	uint16_t pidx_start, pidx_stop;
402	int i;
403
404	MPASS(items != NULL);
405	MPASS(n > 0);
406
407	/*
408	 * Reserve room for the new items.  Our reservation, if successful, is
409	 * from 'pidx_start' to 'pidx_stop'.
410	 */
411	os.state = r->state;
412	for (;;) {
413		if (n >= space_available(r, os)) {
414			counter_u64_add(r->drops, n);
415			MPASS(os.flags != IDLE);
416			if (os.flags == STALLED)
417				ifmp_ring_check_drainage(r, 0);
418			return (ENOBUFS);
419		}
420		ns.state = os.state;
421		ns.pidx_head = increment_idx(r, os.pidx_head, n);
422		critical_enter();
423		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
424			break;
425		critical_exit();
426		cpu_spinwait();
427	}
428	pidx_start = os.pidx_head;
429	pidx_stop = ns.pidx_head;
430
431	/*
432	 * Wait for other producers who got in ahead of us to enqueue their
433	 * items, one producer at a time.  It is our turn when the ring's
434	 * pidx_tail reaches the beginning of our reservation (pidx_start).
435	 */
436	while (ns.pidx_tail != pidx_start) {
437		cpu_spinwait();
438		ns.state = r->state;
439	}
440
441	/* Now it is our turn to fill up the area we reserved earlier. */
442	i = pidx_start;
443	do {
444		r->items[i] = *items++;
445		if (__predict_false(++i == r->size))
446			i = 0;
447	} while (i != pidx_stop);
448
449	/*
450	 * Update the ring's pidx_tail.  The release style atomic guarantees
451	 * that the items are visible to any thread that sees the updated pidx.
452	 */
453	os.state = r->state;
454	do {
455		ns.state = os.state;
456		ns.pidx_tail = pidx_stop;
457		if (abdicate) {
458			if (os.flags == IDLE)
459				ns.flags = ABDICATED;
460		} else
461			ns.flags = BUSY;
462	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
463	critical_exit();
464	counter_u64_add(r->enqueues, n);
465
466	if (!abdicate) {
467		/*
468		 * Turn into a consumer if some other thread isn't active as a consumer
469		 * already.
470		 */
471		if (os.flags != BUSY)
472			drain_ring_lockless(r, ns, os.flags, budget);
473	}
474
475	return (0);
476}
477#endif
478
479void
480ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
481{
482	union ring_state os, ns;
483
484	os.state = r->state;
485	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
486	    os.pidx_head != os.pidx_tail ||			// Require work to be available
487	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
488		return;
489
490	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
491	ns.state = os.state;
492	ns.flags = BUSY;
493
494
495#ifdef MP_RING_NO_64BIT_ATOMICS
496	mtx_lock(&r->lock);
497	if (r->state != os.state) {
498		mtx_unlock(&r->lock);
499		return;
500	}
501	r->state = ns.state;
502	drain_ring_locked(r, ns, os.flags, budget);
503	mtx_unlock(&r->lock);
504#else
505	/*
506	 * The acquire style atomic guarantees visibility of items associated
507	 * with the pidx that we read here.
508	 */
509	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
510		return;
511
512
513	drain_ring_lockless(r, ns, os.flags, budget);
514#endif
515}
516
517void
518ifmp_ring_reset_stats(struct ifmp_ring *r)
519{
520
521	counter_u64_zero(r->enqueues);
522	counter_u64_zero(r->drops);
523	counter_u64_zero(r->starts);
524	counter_u64_zero(r->stalls);
525	counter_u64_zero(r->restarts);
526	counter_u64_zero(r->abdications);
527}
528
529int
530ifmp_ring_is_idle(struct ifmp_ring *r)
531{
532	union ring_state s;
533
534	s.state = r->state;
535	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
536	    s.flags == IDLE)
537		return (1);
538
539	return (0);
540}
541
542int
543ifmp_ring_is_stalled(struct ifmp_ring *r)
544{
545	union ring_state s;
546
547	s.state = r->state;
548	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
549		return (1);
550
551	return (0);
552}
553