1276485Snp/*-
2276485Snp * Copyright (c) 2014 Chelsio Communications, Inc.
3276485Snp * All rights reserved.
4276485Snp * Written by: Navdeep Parhar <np@FreeBSD.org>
5276485Snp *
6276485Snp * Redistribution and use in source and binary forms, with or without
7276485Snp * modification, are permitted provided that the following conditions
8276485Snp * are met:
9276485Snp * 1. Redistributions of source code must retain the above copyright
10276485Snp *    notice, this list of conditions and the following disclaimer.
11276485Snp * 2. Redistributions in binary form must reproduce the above copyright
12276485Snp *    notice, this list of conditions and the following disclaimer in the
13276485Snp *    documentation and/or other materials provided with the distribution.
14276485Snp *
15276485Snp * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16276485Snp * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17276485Snp * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18276485Snp * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19276485Snp * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20276485Snp * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21276485Snp * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22276485Snp * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23276485Snp * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24276485Snp * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25276485Snp * SUCH DAMAGE.
26276485Snp */
27276485Snp
28276485Snp#include <sys/cdefs.h>
29276485Snp__FBSDID("$FreeBSD: releng/10.2/sys/dev/cxgbe/t4_mp_ring.c 284052 2015-06-06 09:28:40Z np $");
30276485Snp
31276485Snp#include <sys/types.h>
32276485Snp#include <sys/param.h>
33276485Snp#include <sys/systm.h>
34276485Snp#include <sys/counter.h>
35276485Snp#include <sys/lock.h>
36276485Snp#include <sys/malloc.h>
37276485Snp#include <machine/cpu.h>
38276485Snp
39276485Snp#include "t4_mp_ring.h"
40276485Snp
41284052Snp#if defined(__i386__)
42284052Snp#define atomic_cmpset_acq_64 atomic_cmpset_64
43284052Snp#define atomic_cmpset_rel_64 atomic_cmpset_64
44284052Snp#endif
45284052Snp
46276485Snpunion ring_state {
47276485Snp	struct {
48276485Snp		uint16_t pidx_head;
49276485Snp		uint16_t pidx_tail;
50276485Snp		uint16_t cidx;
51276485Snp		uint16_t flags;
52276485Snp	};
53276485Snp	uint64_t state;
54276485Snp};
55276485Snp
56276485Snpenum {
57276485Snp	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
58276485Snp	BUSY,		/* consumer is running already, or will be shortly. */
59276485Snp	STALLED,	/* consumer stopped due to lack of resources. */
60276485Snp	ABDICATED,	/* consumer stopped even though there was work to be
61276485Snp			   done because it wants another thread to take over. */
62276485Snp};
63276485Snp
64276485Snpstatic inline uint16_t
65276485Snpspace_available(struct mp_ring *r, union ring_state s)
66276485Snp{
67276485Snp	uint16_t x = r->size - 1;
68276485Snp
69276485Snp	if (s.cidx == s.pidx_head)
70276485Snp		return (x);
71276485Snp	else if (s.cidx > s.pidx_head)
72276485Snp		return (s.cidx - s.pidx_head - 1);
73276485Snp	else
74276485Snp		return (x - s.pidx_head + s.cidx);
75276485Snp}
76276485Snp
77276485Snpstatic inline uint16_t
78276485Snpincrement_idx(struct mp_ring *r, uint16_t idx, uint16_t n)
79276485Snp{
80276485Snp	int x = r->size - idx;
81276485Snp
82276485Snp	MPASS(x > 0);
83276485Snp	return (x > n ? idx + n : n - x);
84276485Snp}
85276485Snp
86276485Snp/* Consumer is about to update the ring's state to s */
87276485Snpstatic inline uint16_t
88276485Snpstate_to_flags(union ring_state s, int abdicate)
89276485Snp{
90276485Snp
91276485Snp	if (s.cidx == s.pidx_tail)
92276485Snp		return (IDLE);
93276485Snp	else if (abdicate && s.pidx_tail != s.pidx_head)
94276485Snp		return (ABDICATED);
95276485Snp
96276485Snp	return (BUSY);
97276485Snp}
98276485Snp
99276485Snp/*
100276485Snp * Caller passes in a state, with a guarantee that there is work to do and that
101276485Snp * all items up to the pidx_tail in the state are visible.
102276485Snp */
103276485Snpstatic void
104276485Snpdrain_ring(struct mp_ring *r, union ring_state os, uint16_t prev, int budget)
105276485Snp{
106276485Snp	union ring_state ns;
107276485Snp	int n, pending, total;
108276485Snp	uint16_t cidx = os.cidx;
109276485Snp	uint16_t pidx = os.pidx_tail;
110276485Snp
111276485Snp	MPASS(os.flags == BUSY);
112276485Snp	MPASS(cidx != pidx);
113276485Snp
114276485Snp	if (prev == IDLE)
115276485Snp		counter_u64_add(r->starts, 1);
116276485Snp	pending = 0;
117276485Snp	total = 0;
118276485Snp
119276485Snp	while (cidx != pidx) {
120276485Snp
121276485Snp		/* Items from cidx to pidx are available for consumption. */
122276485Snp		n = r->drain(r, cidx, pidx);
123276485Snp		if (n == 0) {
124276485Snp			critical_enter();
125276485Snp			do {
126276485Snp				os.state = ns.state = r->state;
127276485Snp				ns.cidx = cidx;
128276485Snp				ns.flags = STALLED;
129276485Snp			} while (atomic_cmpset_64(&r->state, os.state,
130276485Snp			    ns.state) == 0);
131276485Snp			critical_exit();
132276485Snp			if (prev != STALLED)
133276485Snp				counter_u64_add(r->stalls, 1);
134276485Snp			else if (total > 0) {
135276485Snp				counter_u64_add(r->restarts, 1);
136276485Snp				counter_u64_add(r->stalls, 1);
137276485Snp			}
138276485Snp			break;
139276485Snp		}
140276485Snp		cidx = increment_idx(r, cidx, n);
141276485Snp		pending += n;
142276485Snp		total += n;
143276485Snp
144276485Snp		/*
145276485Snp		 * We update the cidx only if we've caught up with the pidx, the
146276485Snp		 * real cidx is getting too far ahead of the one visible to
147276485Snp		 * everyone else, or we have exceeded our budget.
148276485Snp		 */
149276485Snp		if (cidx != pidx && pending < 64 && total < budget)
150276485Snp			continue;
151276485Snp		critical_enter();
152276485Snp		do {
153276485Snp			os.state = ns.state = r->state;
154276485Snp			ns.cidx = cidx;
155276485Snp			ns.flags = state_to_flags(ns, total >= budget);
156276485Snp		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
157276485Snp		critical_exit();
158276485Snp
159276485Snp		if (ns.flags == ABDICATED)
160276485Snp			counter_u64_add(r->abdications, 1);
161276485Snp		if (ns.flags != BUSY) {
162276485Snp			/* Wrong loop exit if we're going to stall. */
163276485Snp			MPASS(ns.flags != STALLED);
164276485Snp			if (prev == STALLED) {
165276485Snp				MPASS(total > 0);
166276485Snp				counter_u64_add(r->restarts, 1);
167276485Snp			}
168276485Snp			break;
169276485Snp		}
170276485Snp
171276485Snp		/*
172276485Snp		 * The acquire style atomic above guarantees visibility of items
173276485Snp		 * associated with any pidx change that we notice here.
174276485Snp		 */
175276485Snp		pidx = ns.pidx_tail;
176276485Snp		pending = 0;
177276485Snp	}
178276485Snp}
179276485Snp
180276485Snpint
181276485Snpmp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain,
182276485Snp    ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
183276485Snp{
184276485Snp	struct mp_ring *r;
185276485Snp
186276485Snp	/* All idx are 16b so size can be 65536 at most */
187276485Snp	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
188276485Snp	    can_drain == NULL)
189276485Snp		return (EINVAL);
190276485Snp	*pr = NULL;
191276485Snp	flags &= M_NOWAIT | M_WAITOK;
192276485Snp	MPASS(flags != 0);
193276485Snp
194276485Snp	r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO);
195276485Snp	if (r == NULL)
196276485Snp		return (ENOMEM);
197276485Snp	r->size = size;
198276485Snp	r->cookie = cookie;
199276485Snp	r->mt = mt;
200276485Snp	r->drain = drain;
201276485Snp	r->can_drain = can_drain;
202276485Snp	r->enqueues = counter_u64_alloc(flags);
203276485Snp	r->drops = counter_u64_alloc(flags);
204276485Snp	r->starts = counter_u64_alloc(flags);
205276485Snp	r->stalls = counter_u64_alloc(flags);
206276485Snp	r->restarts = counter_u64_alloc(flags);
207276485Snp	r->abdications = counter_u64_alloc(flags);
208276485Snp	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
209276485Snp	    r->stalls == NULL || r->restarts == NULL ||
210276485Snp	    r->abdications == NULL) {
211276485Snp		mp_ring_free(r);
212276485Snp		return (ENOMEM);
213276485Snp	}
214276485Snp
215276485Snp	*pr = r;
216276485Snp	return (0);
217276485Snp}
218276485Snp
219276485Snpvoid
220276485Snp
221276485Snpmp_ring_free(struct mp_ring *r)
222276485Snp{
223276485Snp
224276485Snp	if (r == NULL)
225276485Snp		return;
226276485Snp
227276485Snp	if (r->enqueues != NULL)
228276485Snp		counter_u64_free(r->enqueues);
229276485Snp	if (r->drops != NULL)
230276485Snp		counter_u64_free(r->drops);
231276485Snp	if (r->starts != NULL)
232276485Snp		counter_u64_free(r->starts);
233276485Snp	if (r->stalls != NULL)
234276485Snp		counter_u64_free(r->stalls);
235276485Snp	if (r->restarts != NULL)
236276485Snp		counter_u64_free(r->restarts);
237276485Snp	if (r->abdications != NULL)
238276485Snp		counter_u64_free(r->abdications);
239276485Snp
240276485Snp	free(r, r->mt);
241276485Snp}
242276485Snp
243276485Snp/*
244276485Snp * Enqueue n items and maybe drain the ring for some time.
245276485Snp *
246276485Snp * Returns an errno.
247276485Snp */
248276485Snpint
249276485Snpmp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget)
250276485Snp{
251276485Snp	union ring_state os, ns;
252276485Snp	uint16_t pidx_start, pidx_stop;
253276485Snp	int i;
254276485Snp
255276485Snp	MPASS(items != NULL);
256276485Snp	MPASS(n > 0);
257276485Snp
258276485Snp	/*
259276485Snp	 * Reserve room for the new items.  Our reservation, if successful, is
260276485Snp	 * from 'pidx_start' to 'pidx_stop'.
261276485Snp	 */
262276485Snp	for (;;) {
263276485Snp		os.state = r->state;
264276485Snp		if (n >= space_available(r, os)) {
265276485Snp			counter_u64_add(r->drops, n);
266276485Snp			MPASS(os.flags != IDLE);
267276485Snp			if (os.flags == STALLED)
268276485Snp				mp_ring_check_drainage(r, 0);
269276485Snp			return (ENOBUFS);
270276485Snp		}
271276485Snp		ns.state = os.state;
272276485Snp		ns.pidx_head = increment_idx(r, os.pidx_head, n);
273276485Snp		critical_enter();
274276485Snp		if (atomic_cmpset_64(&r->state, os.state, ns.state))
275276485Snp			break;
276276485Snp		critical_exit();
277276485Snp		cpu_spinwait();
278276485Snp	}
279276485Snp	pidx_start = os.pidx_head;
280276485Snp	pidx_stop = ns.pidx_head;
281276485Snp
282276485Snp	/*
283276485Snp	 * Wait for other producers who got in ahead of us to enqueue their
284276485Snp	 * items, one producer at a time.  It is our turn when the ring's
285276485Snp	 * pidx_tail reaches the begining of our reservation (pidx_start).
286276485Snp	 */
287276485Snp	while (ns.pidx_tail != pidx_start) {
288276485Snp		cpu_spinwait();
289276485Snp		ns.state = r->state;
290276485Snp	}
291276485Snp
292276485Snp	/* Now it is our turn to fill up the area we reserved earlier. */
293276485Snp	i = pidx_start;
294276485Snp	do {
295276485Snp		r->items[i] = *items++;
296276485Snp		if (__predict_false(++i == r->size))
297276485Snp			i = 0;
298276485Snp	} while (i != pidx_stop);
299276485Snp
300276485Snp	/*
301276485Snp	 * Update the ring's pidx_tail.  The release style atomic guarantees
302276485Snp	 * that the items are visible to any thread that sees the updated pidx.
303276485Snp	 */
304276485Snp	do {
305276485Snp		os.state = ns.state = r->state;
306276485Snp		ns.pidx_tail = pidx_stop;
307276485Snp		ns.flags = BUSY;
308276485Snp	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
309276485Snp	critical_exit();
310276485Snp	counter_u64_add(r->enqueues, n);
311276485Snp
312276485Snp	/*
313276485Snp	 * Turn into a consumer if some other thread isn't active as a consumer
314276485Snp	 * already.
315276485Snp	 */
316276485Snp	if (os.flags != BUSY)
317276485Snp		drain_ring(r, ns, os.flags, budget);
318276485Snp
319276485Snp	return (0);
320276485Snp}
321276485Snp
322276485Snpvoid
323276485Snpmp_ring_check_drainage(struct mp_ring *r, int budget)
324276485Snp{
325276485Snp	union ring_state os, ns;
326276485Snp
327276485Snp	os.state = r->state;
328276485Snp	if (os.flags != STALLED || os.pidx_head != os.pidx_tail ||
329276485Snp	    r->can_drain(r) == 0)
330276485Snp		return;
331276485Snp
332276485Snp	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
333276485Snp	ns.state = os.state;
334276485Snp	ns.flags = BUSY;
335276485Snp
336276485Snp	/*
337276485Snp	 * The acquire style atomic guarantees visibility of items associated
338276485Snp	 * with the pidx that we read here.
339276485Snp	 */
340276485Snp	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
341276485Snp		return;
342276485Snp
343276485Snp	drain_ring(r, ns, os.flags, budget);
344276485Snp}
345276485Snp
346276485Snpvoid
347276485Snpmp_ring_reset_stats(struct mp_ring *r)
348276485Snp{
349276485Snp
350276485Snp	counter_u64_zero(r->enqueues);
351276485Snp	counter_u64_zero(r->drops);
352276485Snp	counter_u64_zero(r->starts);
353276485Snp	counter_u64_zero(r->stalls);
354276485Snp	counter_u64_zero(r->restarts);
355276485Snp	counter_u64_zero(r->abdications);
356276485Snp}
357276485Snp
358276485Snpint
359276485Snpmp_ring_is_idle(struct mp_ring *r)
360276485Snp{
361276485Snp	union ring_state s;
362276485Snp
363276485Snp	s.state = r->state;
364276485Snp	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
365276485Snp	    s.flags == IDLE)
366276485Snp		return (1);
367276485Snp
368276485Snp	return (0);
369276485Snp}
370