1300113Sscottl/*-
2300113Sscottl * Copyright (c) 2014 Chelsio Communications, Inc.
3300113Sscottl * All rights reserved.
4300113Sscottl * Written by: Navdeep Parhar <np@FreeBSD.org>
5300113Sscottl *
6300113Sscottl * Redistribution and use in source and binary forms, with or without
7300113Sscottl * modification, are permitted provided that the following conditions
8300113Sscottl * are met:
9300113Sscottl * 1. Redistributions of source code must retain the above copyright
10300113Sscottl *    notice, this list of conditions and the following disclaimer.
11300113Sscottl * 2. Redistributions in binary form must reproduce the above copyright
12300113Sscottl *    notice, this list of conditions and the following disclaimer in the
13300113Sscottl *    documentation and/or other materials provided with the distribution.
14300113Sscottl *
15300113Sscottl * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16300113Sscottl * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17300113Sscottl * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18300113Sscottl * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19300113Sscottl * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20300113Sscottl * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21300113Sscottl * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22300113Sscottl * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23300113Sscottl * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24300113Sscottl * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25300113Sscottl * SUCH DAMAGE.
26300113Sscottl */
27300113Sscottl
28300113Sscottl#include <sys/cdefs.h>
29300113Sscottl__FBSDID("$FreeBSD: stable/11/sys/net/mp_ring.c 344093 2019-02-13 14:25:05Z marius $");
30300113Sscottl
31300113Sscottl#include <sys/types.h>
32300113Sscottl#include <sys/param.h>
33300113Sscottl#include <sys/systm.h>
34300113Sscottl#include <sys/counter.h>
35300113Sscottl#include <sys/lock.h>
36300113Sscottl#include <sys/mutex.h>
37300113Sscottl#include <sys/malloc.h>
38300113Sscottl#include <machine/cpu.h>
39300113Sscottl
40300113Sscottl#if defined(__i386__)
41300113Sscottl#define atomic_cmpset_acq_64 atomic_cmpset_64
42300113Sscottl#define atomic_cmpset_rel_64 atomic_cmpset_64
43300113Sscottl#endif
44300113Sscottl
45300154Sscottl#include <net/mp_ring.h>
46300154Sscottl
47300113Sscottlunion ring_state {
48300113Sscottl	struct {
49300113Sscottl		uint16_t pidx_head;
50300113Sscottl		uint16_t pidx_tail;
51300113Sscottl		uint16_t cidx;
52300113Sscottl		uint16_t flags;
53300113Sscottl	};
54300113Sscottl	uint64_t state;
55300113Sscottl};
56300113Sscottl
57300113Sscottlenum {
58300113Sscottl	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
59300113Sscottl	BUSY,		/* consumer is running already, or will be shortly. */
60300113Sscottl	STALLED,	/* consumer stopped due to lack of resources. */
61300113Sscottl	ABDICATED,	/* consumer stopped even though there was work to be
62300113Sscottl			   done because it wants another thread to take over. */
63300113Sscottl};
64300113Sscottl
65300113Sscottlstatic inline uint16_t
66300113Sscottlspace_available(struct ifmp_ring *r, union ring_state s)
67300113Sscottl{
68300113Sscottl	uint16_t x = r->size - 1;
69300113Sscottl
70300113Sscottl	if (s.cidx == s.pidx_head)
71300113Sscottl		return (x);
72300113Sscottl	else if (s.cidx > s.pidx_head)
73300113Sscottl		return (s.cidx - s.pidx_head - 1);
74300113Sscottl	else
75300113Sscottl		return (x - s.pidx_head + s.cidx);
76300113Sscottl}
77300113Sscottl
78300113Sscottlstatic inline uint16_t
79300113Sscottlincrement_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
80300113Sscottl{
81300113Sscottl	int x = r->size - idx;
82300113Sscottl
83300113Sscottl	MPASS(x > 0);
84300113Sscottl	return (x > n ? idx + n : n - x);
85300113Sscottl}
86300113Sscottl
87300113Sscottl/* Consumer is about to update the ring's state to s */
88300113Sscottlstatic inline uint16_t
89300113Sscottlstate_to_flags(union ring_state s, int abdicate)
90300113Sscottl{
91300113Sscottl
92300113Sscottl	if (s.cidx == s.pidx_tail)
93300113Sscottl		return (IDLE);
94300113Sscottl	else if (abdicate && s.pidx_tail != s.pidx_head)
95300113Sscottl		return (ABDICATED);
96300113Sscottl
97300113Sscottl	return (BUSY);
98300113Sscottl}
99300113Sscottl
100344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS
101300113Sscottlstatic void
102300113Sscottldrain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
103300113Sscottl{
104300113Sscottl	union ring_state ns;
105300113Sscottl	int n, pending, total;
106300113Sscottl	uint16_t cidx = os.cidx;
107300113Sscottl	uint16_t pidx = os.pidx_tail;
108300113Sscottl
109300113Sscottl	MPASS(os.flags == BUSY);
110300113Sscottl	MPASS(cidx != pidx);
111300113Sscottl
112300113Sscottl	if (prev == IDLE)
113300113Sscottl		counter_u64_add(r->starts, 1);
114300113Sscottl	pending = 0;
115300113Sscottl	total = 0;
116300113Sscottl
117300113Sscottl	while (cidx != pidx) {
118300113Sscottl
119300113Sscottl		/* Items from cidx to pidx are available for consumption. */
120300113Sscottl		n = r->drain(r, cidx, pidx);
121300113Sscottl		if (n == 0) {
122300113Sscottl			os.state = ns.state = r->state;
123300113Sscottl			ns.cidx = cidx;
124300113Sscottl			ns.flags = STALLED;
125300113Sscottl			r->state = ns.state;
126300113Sscottl			if (prev != STALLED)
127300113Sscottl				counter_u64_add(r->stalls, 1);
128300113Sscottl			else if (total > 0) {
129300113Sscottl				counter_u64_add(r->restarts, 1);
130300113Sscottl				counter_u64_add(r->stalls, 1);
131300113Sscottl			}
132300113Sscottl			break;
133300113Sscottl		}
134300113Sscottl		cidx = increment_idx(r, cidx, n);
135300113Sscottl		pending += n;
136300113Sscottl		total += n;
137300113Sscottl
138300113Sscottl		/*
139300113Sscottl		 * We update the cidx only if we've caught up with the pidx, the
140300113Sscottl		 * real cidx is getting too far ahead of the one visible to
141300113Sscottl		 * everyone else, or we have exceeded our budget.
142300113Sscottl		 */
143300113Sscottl		if (cidx != pidx && pending < 64 && total < budget)
144300113Sscottl			continue;
145300113Sscottl
146300113Sscottl		os.state = ns.state = r->state;
147300113Sscottl		ns.cidx = cidx;
148300113Sscottl		ns.flags = state_to_flags(ns, total >= budget);
149300113Sscottl		r->state = ns.state;
150300113Sscottl
151300113Sscottl		if (ns.flags == ABDICATED)
152300113Sscottl			counter_u64_add(r->abdications, 1);
153300113Sscottl		if (ns.flags != BUSY) {
154300113Sscottl			/* Wrong loop exit if we're going to stall. */
155300113Sscottl			MPASS(ns.flags != STALLED);
156300113Sscottl			if (prev == STALLED) {
157300113Sscottl				MPASS(total > 0);
158300113Sscottl				counter_u64_add(r->restarts, 1);
159300113Sscottl			}
160300113Sscottl			break;
161300113Sscottl		}
162300113Sscottl
163300113Sscottl		/*
164300113Sscottl		 * The acquire style atomic above guarantees visibility of items
165300113Sscottl		 * associated with any pidx change that we notice here.
166300113Sscottl		 */
167300113Sscottl		pidx = ns.pidx_tail;
168300113Sscottl		pending = 0;
169300113Sscottl	}
170300113Sscottl}
171300113Sscottl#else
172300113Sscottl/*
173300113Sscottl * Caller passes in a state, with a guarantee that there is work to do and that
174300113Sscottl * all items up to the pidx_tail in the state are visible.
175300113Sscottl */
176300113Sscottlstatic void
177300113Sscottldrain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
178300113Sscottl{
179300113Sscottl	union ring_state ns;
180300113Sscottl	int n, pending, total;
181300113Sscottl	uint16_t cidx = os.cidx;
182300113Sscottl	uint16_t pidx = os.pidx_tail;
183300113Sscottl
184300113Sscottl	MPASS(os.flags == BUSY);
185300113Sscottl	MPASS(cidx != pidx);
186300113Sscottl
187300113Sscottl	if (prev == IDLE)
188300113Sscottl		counter_u64_add(r->starts, 1);
189300113Sscottl	pending = 0;
190300113Sscottl	total = 0;
191300113Sscottl
192300113Sscottl	while (cidx != pidx) {
193300113Sscottl
194300113Sscottl		/* Items from cidx to pidx are available for consumption. */
195300113Sscottl		n = r->drain(r, cidx, pidx);
196300113Sscottl		if (n == 0) {
197300113Sscottl			critical_enter();
198300113Sscottl			do {
199300113Sscottl				os.state = ns.state = r->state;
200300113Sscottl				ns.cidx = cidx;
201300113Sscottl				ns.flags = STALLED;
202300113Sscottl			} while (atomic_cmpset_64(&r->state, os.state,
203300113Sscottl			    ns.state) == 0);
204300113Sscottl			critical_exit();
205300113Sscottl			if (prev != STALLED)
206300113Sscottl				counter_u64_add(r->stalls, 1);
207300113Sscottl			else if (total > 0) {
208300113Sscottl				counter_u64_add(r->restarts, 1);
209300113Sscottl				counter_u64_add(r->stalls, 1);
210300113Sscottl			}
211300113Sscottl			break;
212300113Sscottl		}
213300113Sscottl		cidx = increment_idx(r, cidx, n);
214300113Sscottl		pending += n;
215300113Sscottl		total += n;
216300113Sscottl
217300113Sscottl		/*
218300113Sscottl		 * We update the cidx only if we've caught up with the pidx, the
219300113Sscottl		 * real cidx is getting too far ahead of the one visible to
220300113Sscottl		 * everyone else, or we have exceeded our budget.
221300113Sscottl		 */
222300113Sscottl		if (cidx != pidx && pending < 64 && total < budget)
223300113Sscottl			continue;
224300113Sscottl		critical_enter();
225300113Sscottl		do {
226300113Sscottl			os.state = ns.state = r->state;
227300113Sscottl			ns.cidx = cidx;
228300113Sscottl			ns.flags = state_to_flags(ns, total >= budget);
229300113Sscottl		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
230300113Sscottl		critical_exit();
231300113Sscottl
232300113Sscottl		if (ns.flags == ABDICATED)
233300113Sscottl			counter_u64_add(r->abdications, 1);
234300113Sscottl		if (ns.flags != BUSY) {
235300113Sscottl			/* Wrong loop exit if we're going to stall. */
236300113Sscottl			MPASS(ns.flags != STALLED);
237300113Sscottl			if (prev == STALLED) {
238300113Sscottl				MPASS(total > 0);
239300113Sscottl				counter_u64_add(r->restarts, 1);
240300113Sscottl			}
241300113Sscottl			break;
242300113Sscottl		}
243300113Sscottl
244300113Sscottl		/*
245300113Sscottl		 * The acquire style atomic above guarantees visibility of items
246300113Sscottl		 * associated with any pidx change that we notice here.
247300113Sscottl		 */
248300113Sscottl		pidx = ns.pidx_tail;
249300113Sscottl		pending = 0;
250300113Sscottl	}
251300113Sscottl}
252300113Sscottl#endif
253300113Sscottl
254300113Sscottlint
255300113Sscottlifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
256300113Sscottl    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
257300113Sscottl{
258300113Sscottl	struct ifmp_ring *r;
259300113Sscottl
260300113Sscottl	/* All idx are 16b so size can be 65536 at most */
261300113Sscottl	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
262300113Sscottl	    can_drain == NULL)
263300113Sscottl		return (EINVAL);
264300113Sscottl	*pr = NULL;
265300113Sscottl	flags &= M_NOWAIT | M_WAITOK;
266300113Sscottl	MPASS(flags != 0);
267300113Sscottl
268300113Sscottl	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269300113Sscottl	if (r == NULL)
270300113Sscottl		return (ENOMEM);
271300113Sscottl	r->size = size;
272300113Sscottl	r->cookie = cookie;
273300113Sscottl	r->mt = mt;
274300113Sscottl	r->drain = drain;
275300113Sscottl	r->can_drain = can_drain;
276300113Sscottl	r->enqueues = counter_u64_alloc(flags);
277300113Sscottl	r->drops = counter_u64_alloc(flags);
278300113Sscottl	r->starts = counter_u64_alloc(flags);
279300113Sscottl	r->stalls = counter_u64_alloc(flags);
280300113Sscottl	r->restarts = counter_u64_alloc(flags);
281300113Sscottl	r->abdications = counter_u64_alloc(flags);
282300113Sscottl	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
283300113Sscottl	    r->stalls == NULL || r->restarts == NULL ||
284300113Sscottl	    r->abdications == NULL) {
285300113Sscottl		ifmp_ring_free(r);
286300113Sscottl		return (ENOMEM);
287300113Sscottl	}
288300113Sscottl
289300113Sscottl	*pr = r;
290344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS
291300113Sscottl	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
292300113Sscottl#endif
293300113Sscottl	return (0);
294300113Sscottl}
295300113Sscottl
296300113Sscottlvoid
297300113Sscottlifmp_ring_free(struct ifmp_ring *r)
298300113Sscottl{
299300113Sscottl
300300113Sscottl	if (r == NULL)
301300113Sscottl		return;
302300113Sscottl
303300113Sscottl	if (r->enqueues != NULL)
304300113Sscottl		counter_u64_free(r->enqueues);
305300113Sscottl	if (r->drops != NULL)
306300113Sscottl		counter_u64_free(r->drops);
307300113Sscottl	if (r->starts != NULL)
308300113Sscottl		counter_u64_free(r->starts);
309300113Sscottl	if (r->stalls != NULL)
310300113Sscottl		counter_u64_free(r->stalls);
311300113Sscottl	if (r->restarts != NULL)
312300113Sscottl		counter_u64_free(r->restarts);
313300113Sscottl	if (r->abdications != NULL)
314300113Sscottl		counter_u64_free(r->abdications);
315300113Sscottl
316300113Sscottl	free(r, r->mt);
317300113Sscottl}
318300113Sscottl
319300113Sscottl/*
320300113Sscottl * Enqueue n items and maybe drain the ring for some time.
321300113Sscottl *
322300113Sscottl * Returns an errno.
323300113Sscottl */
324344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS
325300113Sscottlint
326300113Sscottlifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
327300113Sscottl{
328300113Sscottl	union ring_state os, ns;
329300113Sscottl	uint16_t pidx_start, pidx_stop;
330300113Sscottl	int i;
331300113Sscottl
332300113Sscottl	MPASS(items != NULL);
333300113Sscottl	MPASS(n > 0);
334300113Sscottl
335300113Sscottl	mtx_lock(&r->lock);
336300113Sscottl	/*
337300113Sscottl	 * Reserve room for the new items.  Our reservation, if successful, is
338300113Sscottl	 * from 'pidx_start' to 'pidx_stop'.
339300113Sscottl	 */
340300113Sscottl	os.state = r->state;
341300113Sscottl	if (n >= space_available(r, os)) {
342300113Sscottl		counter_u64_add(r->drops, n);
343300113Sscottl		MPASS(os.flags != IDLE);
344344093Smarius		mtx_unlock(&r->lock);
345300113Sscottl		if (os.flags == STALLED)
346300113Sscottl			ifmp_ring_check_drainage(r, 0);
347300113Sscottl		return (ENOBUFS);
348300113Sscottl	}
349300113Sscottl	ns.state = os.state;
350300113Sscottl	ns.pidx_head = increment_idx(r, os.pidx_head, n);
351300113Sscottl	r->state = ns.state;
352300113Sscottl	pidx_start = os.pidx_head;
353300113Sscottl	pidx_stop = ns.pidx_head;
354300113Sscottl
355300113Sscottl	/*
356300113Sscottl	 * Wait for other producers who got in ahead of us to enqueue their
357300113Sscottl	 * items, one producer at a time.  It is our turn when the ring's
358300215Spfg	 * pidx_tail reaches the beginning of our reservation (pidx_start).
359300113Sscottl	 */
360300113Sscottl	while (ns.pidx_tail != pidx_start) {
361300113Sscottl		cpu_spinwait();
362300113Sscottl		ns.state = r->state;
363300113Sscottl	}
364300113Sscottl
365300113Sscottl	/* Now it is our turn to fill up the area we reserved earlier. */
366300113Sscottl	i = pidx_start;
367300113Sscottl	do {
368300113Sscottl		r->items[i] = *items++;
369300113Sscottl		if (__predict_false(++i == r->size))
370300113Sscottl			i = 0;
371300113Sscottl	} while (i != pidx_stop);
372300113Sscottl
373300113Sscottl	/*
374300113Sscottl	 * Update the ring's pidx_tail.  The release style atomic guarantees
375300113Sscottl	 * that the items are visible to any thread that sees the updated pidx.
376300113Sscottl	 */
377300113Sscottl	os.state = ns.state = r->state;
378300113Sscottl	ns.pidx_tail = pidx_stop;
379300113Sscottl	ns.flags = BUSY;
380300113Sscottl	r->state = ns.state;
381300113Sscottl	counter_u64_add(r->enqueues, n);
382300113Sscottl
383300113Sscottl	/*
384300113Sscottl	 * Turn into a consumer if some other thread isn't active as a consumer
385300113Sscottl	 * already.
386300113Sscottl	 */
387300113Sscottl	if (os.flags != BUSY)
388300113Sscottl		drain_ring_locked(r, ns, os.flags, budget);
389300113Sscottl
390300113Sscottl	mtx_unlock(&r->lock);
391300113Sscottl	return (0);
392300113Sscottl}
393300113Sscottl
394300113Sscottl#else
395300113Sscottlint
396300113Sscottlifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
397300113Sscottl{
398300113Sscottl	union ring_state os, ns;
399300113Sscottl	uint16_t pidx_start, pidx_stop;
400300113Sscottl	int i;
401300113Sscottl
402300113Sscottl	MPASS(items != NULL);
403300113Sscottl	MPASS(n > 0);
404300113Sscottl
405300113Sscottl	/*
406300113Sscottl	 * Reserve room for the new items.  Our reservation, if successful, is
407300113Sscottl	 * from 'pidx_start' to 'pidx_stop'.
408300113Sscottl	 */
409300113Sscottl	for (;;) {
410300113Sscottl		os.state = r->state;
411300113Sscottl		if (n >= space_available(r, os)) {
412300113Sscottl			counter_u64_add(r->drops, n);
413300113Sscottl			MPASS(os.flags != IDLE);
414300113Sscottl			if (os.flags == STALLED)
415300113Sscottl				ifmp_ring_check_drainage(r, 0);
416300113Sscottl			return (ENOBUFS);
417300113Sscottl		}
418300113Sscottl		ns.state = os.state;
419300113Sscottl		ns.pidx_head = increment_idx(r, os.pidx_head, n);
420300113Sscottl		critical_enter();
421300113Sscottl		if (atomic_cmpset_64(&r->state, os.state, ns.state))
422300113Sscottl			break;
423300113Sscottl		critical_exit();
424300113Sscottl		cpu_spinwait();
425300113Sscottl	}
426300113Sscottl	pidx_start = os.pidx_head;
427300113Sscottl	pidx_stop = ns.pidx_head;
428300113Sscottl
429300113Sscottl	/*
430300113Sscottl	 * Wait for other producers who got in ahead of us to enqueue their
431300113Sscottl	 * items, one producer at a time.  It is our turn when the ring's
432300215Spfg	 * pidx_tail reaches the beginning of our reservation (pidx_start).
433300113Sscottl	 */
434300113Sscottl	while (ns.pidx_tail != pidx_start) {
435300113Sscottl		cpu_spinwait();
436300113Sscottl		ns.state = r->state;
437300113Sscottl	}
438300113Sscottl
439300113Sscottl	/* Now it is our turn to fill up the area we reserved earlier. */
440300113Sscottl	i = pidx_start;
441300113Sscottl	do {
442300113Sscottl		r->items[i] = *items++;
443300113Sscottl		if (__predict_false(++i == r->size))
444300113Sscottl			i = 0;
445300113Sscottl	} while (i != pidx_stop);
446300113Sscottl
447300113Sscottl	/*
448300113Sscottl	 * Update the ring's pidx_tail.  The release style atomic guarantees
449300113Sscottl	 * that the items are visible to any thread that sees the updated pidx.
450300113Sscottl	 */
451300113Sscottl	do {
452300113Sscottl		os.state = ns.state = r->state;
453300113Sscottl		ns.pidx_tail = pidx_stop;
454333338Sshurd		if (os.flags == IDLE)
455333338Sshurd			ns.flags = ABDICATED;
456300113Sscottl	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
457300113Sscottl	critical_exit();
458300113Sscottl	counter_u64_add(r->enqueues, n);
459300113Sscottl
460300113Sscottl	return (0);
461300113Sscottl}
462300113Sscottl#endif
463300113Sscottl
464300113Sscottlvoid
465300113Sscottlifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
466300113Sscottl{
467300113Sscottl	union ring_state os, ns;
468300113Sscottl
469300113Sscottl	os.state = r->state;
470333338Sshurd	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
471333338Sshurd	    os.pidx_head != os.pidx_tail ||			// Require work to be available
472333338Sshurd	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
473300113Sscottl		return;
474300113Sscottl
475300113Sscottl	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
476300113Sscottl	ns.state = os.state;
477300113Sscottl	ns.flags = BUSY;
478300113Sscottl
479300113Sscottl
480344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS
481300113Sscottl	mtx_lock(&r->lock);
482300113Sscottl	if (r->state != os.state) {
483300113Sscottl		mtx_unlock(&r->lock);
484300113Sscottl		return;
485300113Sscottl	}
486300113Sscottl	r->state = ns.state;
487300113Sscottl	drain_ring_locked(r, ns, os.flags, budget);
488300113Sscottl	mtx_unlock(&r->lock);
489300113Sscottl#else
490300113Sscottl	/*
491300113Sscottl	 * The acquire style atomic guarantees visibility of items associated
492300113Sscottl	 * with the pidx that we read here.
493300113Sscottl	 */
494300113Sscottl	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
495300113Sscottl		return;
496300113Sscottl
497300113Sscottl
498300113Sscottl	drain_ring_lockless(r, ns, os.flags, budget);
499300113Sscottl#endif
500300113Sscottl}
501300113Sscottl
502300113Sscottlvoid
503300113Sscottlifmp_ring_reset_stats(struct ifmp_ring *r)
504300113Sscottl{
505300113Sscottl
506300113Sscottl	counter_u64_zero(r->enqueues);
507300113Sscottl	counter_u64_zero(r->drops);
508300113Sscottl	counter_u64_zero(r->starts);
509300113Sscottl	counter_u64_zero(r->stalls);
510300113Sscottl	counter_u64_zero(r->restarts);
511300113Sscottl	counter_u64_zero(r->abdications);
512300113Sscottl}
513300113Sscottl
514300113Sscottlint
515300113Sscottlifmp_ring_is_idle(struct ifmp_ring *r)
516300113Sscottl{
517300113Sscottl	union ring_state s;
518300113Sscottl
519300113Sscottl	s.state = r->state;
520300113Sscottl	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
521300113Sscottl	    s.flags == IDLE)
522300113Sscottl		return (1);
523300113Sscottl
524300113Sscottl	return (0);
525300113Sscottl}
526300113Sscottl
527300113Sscottlint
528300113Sscottlifmp_ring_is_stalled(struct ifmp_ring *r)
529300113Sscottl{
530300113Sscottl	union ring_state s;
531300113Sscottl
532300113Sscottl	s.state = r->state;
533300113Sscottl	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
534300113Sscottl		return (1);
535300113Sscottl
536300113Sscottl	return (0);
537300113Sscottl}
538