1276485Snp/*-
2276485Snp * Copyright (c) 2014 Chelsio Communications, Inc.
3276485Snp * All rights reserved.
4276485Snp * Written by: Navdeep Parhar <np@FreeBSD.org>
5276485Snp *
6276485Snp * Redistribution and use in source and binary forms, with or without
7276485Snp * modification, are permitted provided that the following conditions
8276485Snp * are met:
9276485Snp * 1. Redistributions of source code must retain the above copyright
10276485Snp *    notice, this list of conditions and the following disclaimer.
11276485Snp * 2. Redistributions in binary form must reproduce the above copyright
12276485Snp *    notice, this list of conditions and the following disclaimer in the
13276485Snp *    documentation and/or other materials provided with the distribution.
14276485Snp *
15276485Snp * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16276485Snp * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17276485Snp * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18276485Snp * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19276485Snp * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20276485Snp * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21276485Snp * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22276485Snp * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23276485Snp * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24276485Snp * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25276485Snp * SUCH DAMAGE.
26276485Snp */
27276485Snp
28276485Snp#include <sys/cdefs.h>
29276485Snp__FBSDID("$FreeBSD: stable/11/sys/dev/cxgbe/t4_mp_ring.c 339400 2018-10-17 01:30:51Z np $");
30276485Snp
31276485Snp#include <sys/types.h>
32276485Snp#include <sys/param.h>
33276485Snp#include <sys/systm.h>
34276485Snp#include <sys/counter.h>
35276485Snp#include <sys/lock.h>
36276485Snp#include <sys/malloc.h>
37276485Snp#include <machine/cpu.h>
38276485Snp
39276485Snp#include "t4_mp_ring.h"
40276485Snp
41277226Snp#if defined(__i386__)
42277226Snp#define atomic_cmpset_acq_64 atomic_cmpset_64
43277226Snp#define atomic_cmpset_rel_64 atomic_cmpset_64
44277226Snp#endif
45277226Snp
46276485Snpunion ring_state {
47276485Snp	struct {
48276485Snp		uint16_t pidx_head;
49276485Snp		uint16_t pidx_tail;
50276485Snp		uint16_t cidx;
51276485Snp		uint16_t flags;
52276485Snp	};
53276485Snp	uint64_t state;
54276485Snp};
55276485Snp
56276485Snpenum {
57276485Snp	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
58276485Snp	BUSY,		/* consumer is running already, or will be shortly. */
59276485Snp	STALLED,	/* consumer stopped due to lack of resources. */
60276485Snp	ABDICATED,	/* consumer stopped even though there was work to be
61276485Snp			   done because it wants another thread to take over. */
62276485Snp};
63276485Snp
64276485Snpstatic inline uint16_t
65276485Snpspace_available(struct mp_ring *r, union ring_state s)
66276485Snp{
67276485Snp	uint16_t x = r->size - 1;
68276485Snp
69276485Snp	if (s.cidx == s.pidx_head)
70276485Snp		return (x);
71276485Snp	else if (s.cidx > s.pidx_head)
72276485Snp		return (s.cidx - s.pidx_head - 1);
73276485Snp	else
74276485Snp		return (x - s.pidx_head + s.cidx);
75276485Snp}
76276485Snp
77276485Snpstatic inline uint16_t
78276485Snpincrement_idx(struct mp_ring *r, uint16_t idx, uint16_t n)
79276485Snp{
80276485Snp	int x = r->size - idx;
81276485Snp
82276485Snp	MPASS(x > 0);
83276485Snp	return (x > n ? idx + n : n - x);
84276485Snp}
85276485Snp
86276485Snp/* Consumer is about to update the ring's state to s */
87276485Snpstatic inline uint16_t
88276485Snpstate_to_flags(union ring_state s, int abdicate)
89276485Snp{
90276485Snp
91276485Snp	if (s.cidx == s.pidx_tail)
92276485Snp		return (IDLE);
93276485Snp	else if (abdicate && s.pidx_tail != s.pidx_head)
94276485Snp		return (ABDICATED);
95276485Snp
96276485Snp	return (BUSY);
97276485Snp}
98276485Snp
99276485Snp/*
100276485Snp * Caller passes in a state, with a guarantee that there is work to do and that
101276485Snp * all items up to the pidx_tail in the state are visible.
102276485Snp */
103276485Snpstatic void
104276485Snpdrain_ring(struct mp_ring *r, union ring_state os, uint16_t prev, int budget)
105276485Snp{
106276485Snp	union ring_state ns;
107276485Snp	int n, pending, total;
108276485Snp	uint16_t cidx = os.cidx;
109276485Snp	uint16_t pidx = os.pidx_tail;
110276485Snp
111276485Snp	MPASS(os.flags == BUSY);
112276485Snp	MPASS(cidx != pidx);
113276485Snp
114276485Snp	if (prev == IDLE)
115276485Snp		counter_u64_add(r->starts, 1);
116276485Snp	pending = 0;
117276485Snp	total = 0;
118276485Snp
119276485Snp	while (cidx != pidx) {
120276485Snp
121276485Snp		/* Items from cidx to pidx are available for consumption. */
122276485Snp		n = r->drain(r, cidx, pidx);
123276485Snp		if (n == 0) {
124276485Snp			critical_enter();
125339400Snp			os.state = r->state;
126276485Snp			do {
127339400Snp				ns.state = os.state;
128276485Snp				ns.cidx = cidx;
129276485Snp				ns.flags = STALLED;
130339400Snp			} while (atomic_fcmpset_64(&r->state, &os.state,
131276485Snp			    ns.state) == 0);
132276485Snp			critical_exit();
133276485Snp			if (prev != STALLED)
134276485Snp				counter_u64_add(r->stalls, 1);
135276485Snp			else if (total > 0) {
136276485Snp				counter_u64_add(r->restarts, 1);
137276485Snp				counter_u64_add(r->stalls, 1);
138276485Snp			}
139276485Snp			break;
140276485Snp		}
141276485Snp		cidx = increment_idx(r, cidx, n);
142276485Snp		pending += n;
143276485Snp		total += n;
144276485Snp
145276485Snp		/*
146276485Snp		 * We update the cidx only if we've caught up with the pidx, the
147276485Snp		 * real cidx is getting too far ahead of the one visible to
148276485Snp		 * everyone else, or we have exceeded our budget.
149276485Snp		 */
150276485Snp		if (cidx != pidx && pending < 64 && total < budget)
151276485Snp			continue;
152276485Snp		critical_enter();
153339400Snp		os.state = r->state;
154276485Snp		do {
155339400Snp			ns.state = os.state;
156276485Snp			ns.cidx = cidx;
157276485Snp			ns.flags = state_to_flags(ns, total >= budget);
158339400Snp		} while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0);
159276485Snp		critical_exit();
160276485Snp
161276485Snp		if (ns.flags == ABDICATED)
162276485Snp			counter_u64_add(r->abdications, 1);
163276485Snp		if (ns.flags != BUSY) {
164276485Snp			/* Wrong loop exit if we're going to stall. */
165276485Snp			MPASS(ns.flags != STALLED);
166276485Snp			if (prev == STALLED) {
167276485Snp				MPASS(total > 0);
168276485Snp				counter_u64_add(r->restarts, 1);
169276485Snp			}
170276485Snp			break;
171276485Snp		}
172276485Snp
173276485Snp		/*
174276485Snp		 * The acquire style atomic above guarantees visibility of items
175276485Snp		 * associated with any pidx change that we notice here.
176276485Snp		 */
177276485Snp		pidx = ns.pidx_tail;
178276485Snp		pending = 0;
179276485Snp	}
180276485Snp}
181276485Snp
182276485Snpint
183276485Snpmp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain,
184276485Snp    ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
185276485Snp{
186276485Snp	struct mp_ring *r;
187276485Snp
188276485Snp	/* All idx are 16b so size can be 65536 at most */
189276485Snp	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
190276485Snp	    can_drain == NULL)
191276485Snp		return (EINVAL);
192276485Snp	*pr = NULL;
193276485Snp	flags &= M_NOWAIT | M_WAITOK;
194276485Snp	MPASS(flags != 0);
195276485Snp
196276485Snp	r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO);
197276485Snp	if (r == NULL)
198276485Snp		return (ENOMEM);
199276485Snp	r->size = size;
200276485Snp	r->cookie = cookie;
201276485Snp	r->mt = mt;
202276485Snp	r->drain = drain;
203276485Snp	r->can_drain = can_drain;
204276485Snp	r->enqueues = counter_u64_alloc(flags);
205276485Snp	r->drops = counter_u64_alloc(flags);
206276485Snp	r->starts = counter_u64_alloc(flags);
207276485Snp	r->stalls = counter_u64_alloc(flags);
208276485Snp	r->restarts = counter_u64_alloc(flags);
209276485Snp	r->abdications = counter_u64_alloc(flags);
210276485Snp	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
211276485Snp	    r->stalls == NULL || r->restarts == NULL ||
212276485Snp	    r->abdications == NULL) {
213276485Snp		mp_ring_free(r);
214276485Snp		return (ENOMEM);
215276485Snp	}
216276485Snp
217276485Snp	*pr = r;
218276485Snp	return (0);
219276485Snp}
220276485Snp
221276485Snpvoid
222276485Snp
223276485Snpmp_ring_free(struct mp_ring *r)
224276485Snp{
225276485Snp
226276485Snp	if (r == NULL)
227276485Snp		return;
228276485Snp
229276485Snp	if (r->enqueues != NULL)
230276485Snp		counter_u64_free(r->enqueues);
231276485Snp	if (r->drops != NULL)
232276485Snp		counter_u64_free(r->drops);
233276485Snp	if (r->starts != NULL)
234276485Snp		counter_u64_free(r->starts);
235276485Snp	if (r->stalls != NULL)
236276485Snp		counter_u64_free(r->stalls);
237276485Snp	if (r->restarts != NULL)
238276485Snp		counter_u64_free(r->restarts);
239276485Snp	if (r->abdications != NULL)
240276485Snp		counter_u64_free(r->abdications);
241276485Snp
242276485Snp	free(r, r->mt);
243276485Snp}
244276485Snp
245276485Snp/*
246276485Snp * Enqueue n items and maybe drain the ring for some time.
247276485Snp *
248276485Snp * Returns an errno.
249276485Snp */
250276485Snpint
251276485Snpmp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget)
252276485Snp{
253276485Snp	union ring_state os, ns;
254276485Snp	uint16_t pidx_start, pidx_stop;
255276485Snp	int i;
256276485Snp
257276485Snp	MPASS(items != NULL);
258276485Snp	MPASS(n > 0);
259276485Snp
260276485Snp	/*
261276485Snp	 * Reserve room for the new items.  Our reservation, if successful, is
262276485Snp	 * from 'pidx_start' to 'pidx_stop'.
263276485Snp	 */
264339400Snp	os.state = r->state;
265276485Snp	for (;;) {
266276485Snp		if (n >= space_available(r, os)) {
267276485Snp			counter_u64_add(r->drops, n);
268276485Snp			MPASS(os.flags != IDLE);
269276485Snp			if (os.flags == STALLED)
270276485Snp				mp_ring_check_drainage(r, 0);
271276485Snp			return (ENOBUFS);
272276485Snp		}
273276485Snp		ns.state = os.state;
274276485Snp		ns.pidx_head = increment_idx(r, os.pidx_head, n);
275276485Snp		critical_enter();
276339400Snp		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
277276485Snp			break;
278276485Snp		critical_exit();
279276485Snp		cpu_spinwait();
280276485Snp	}
281276485Snp	pidx_start = os.pidx_head;
282276485Snp	pidx_stop = ns.pidx_head;
283276485Snp
284276485Snp	/*
285276485Snp	 * Wait for other producers who got in ahead of us to enqueue their
286276485Snp	 * items, one producer at a time.  It is our turn when the ring's
287298955Spfg	 * pidx_tail reaches the beginning of our reservation (pidx_start).
288276485Snp	 */
289276485Snp	while (ns.pidx_tail != pidx_start) {
290276485Snp		cpu_spinwait();
291276485Snp		ns.state = r->state;
292276485Snp	}
293276485Snp
294276485Snp	/* Now it is our turn to fill up the area we reserved earlier. */
295276485Snp	i = pidx_start;
296276485Snp	do {
297276485Snp		r->items[i] = *items++;
298276485Snp		if (__predict_false(++i == r->size))
299276485Snp			i = 0;
300276485Snp	} while (i != pidx_stop);
301276485Snp
302276485Snp	/*
303276485Snp	 * Update the ring's pidx_tail.  The release style atomic guarantees
304276485Snp	 * that the items are visible to any thread that sees the updated pidx.
305276485Snp	 */
306339400Snp	os.state = r->state;
307276485Snp	do {
308339400Snp		ns.state = os.state;
309276485Snp		ns.pidx_tail = pidx_stop;
310276485Snp		ns.flags = BUSY;
311339400Snp	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
312276485Snp	critical_exit();
313276485Snp	counter_u64_add(r->enqueues, n);
314276485Snp
315276485Snp	/*
316276485Snp	 * Turn into a consumer if some other thread isn't active as a consumer
317276485Snp	 * already.
318276485Snp	 */
319276485Snp	if (os.flags != BUSY)
320276485Snp		drain_ring(r, ns, os.flags, budget);
321276485Snp
322276485Snp	return (0);
323276485Snp}
324276485Snp
325276485Snpvoid
326276485Snpmp_ring_check_drainage(struct mp_ring *r, int budget)
327276485Snp{
328276485Snp	union ring_state os, ns;
329276485Snp
330276485Snp	os.state = r->state;
331276485Snp	if (os.flags != STALLED || os.pidx_head != os.pidx_tail ||
332276485Snp	    r->can_drain(r) == 0)
333276485Snp		return;
334276485Snp
335276485Snp	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
336276485Snp	ns.state = os.state;
337276485Snp	ns.flags = BUSY;
338276485Snp
339276485Snp	/*
340276485Snp	 * The acquire style atomic guarantees visibility of items associated
341276485Snp	 * with the pidx that we read here.
342276485Snp	 */
343276485Snp	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
344276485Snp		return;
345276485Snp
346276485Snp	drain_ring(r, ns, os.flags, budget);
347276485Snp}
348276485Snp
349276485Snpvoid
350276485Snpmp_ring_reset_stats(struct mp_ring *r)
351276485Snp{
352276485Snp
353276485Snp	counter_u64_zero(r->enqueues);
354276485Snp	counter_u64_zero(r->drops);
355276485Snp	counter_u64_zero(r->starts);
356276485Snp	counter_u64_zero(r->stalls);
357276485Snp	counter_u64_zero(r->restarts);
358276485Snp	counter_u64_zero(r->abdications);
359276485Snp}
360276485Snp
361276485Snpint
362276485Snpmp_ring_is_idle(struct mp_ring *r)
363276485Snp{
364276485Snp	union ring_state s;
365276485Snp
366276485Snp	s.state = r->state;
367276485Snp	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
368276485Snp	    s.flags == IDLE)
369276485Snp		return (1);
370276485Snp
371276485Snp	return (0);
372276485Snp}
373