1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD: stable/11/sys/net/mp_ring.c 344093 2019-02-13 14:25:05Z marius $");
30
31#include <sys/types.h>
32#include <sys/param.h>
33#include <sys/systm.h>
34#include <sys/counter.h>
35#include <sys/lock.h>
36#include <sys/mutex.h>
37#include <sys/malloc.h>
38#include <machine/cpu.h>
39
40#if defined(__i386__)
41#define atomic_cmpset_acq_64 atomic_cmpset_64
42#define atomic_cmpset_rel_64 atomic_cmpset_64
43#endif
44
45#include <net/mp_ring.h>
46
47union ring_state {
48	struct {
49		uint16_t pidx_head;
50		uint16_t pidx_tail;
51		uint16_t cidx;
52		uint16_t flags;
53	};
54	uint64_t state;
55};
56
57enum {
58	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
59	BUSY,		/* consumer is running already, or will be shortly. */
60	STALLED,	/* consumer stopped due to lack of resources. */
61	ABDICATED,	/* consumer stopped even though there was work to be
62			   done because it wants another thread to take over. */
63};
64
65static inline uint16_t
66space_available(struct ifmp_ring *r, union ring_state s)
67{
68	uint16_t x = r->size - 1;
69
70	if (s.cidx == s.pidx_head)
71		return (x);
72	else if (s.cidx > s.pidx_head)
73		return (s.cidx - s.pidx_head - 1);
74	else
75		return (x - s.pidx_head + s.cidx);
76}
77
78static inline uint16_t
79increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
80{
81	int x = r->size - idx;
82
83	MPASS(x > 0);
84	return (x > n ? idx + n : n - x);
85}
86
87/* Consumer is about to update the ring's state to s */
88static inline uint16_t
89state_to_flags(union ring_state s, int abdicate)
90{
91
92	if (s.cidx == s.pidx_tail)
93		return (IDLE);
94	else if (abdicate && s.pidx_tail != s.pidx_head)
95		return (ABDICATED);
96
97	return (BUSY);
98}
99
100#ifdef MP_RING_NO_64BIT_ATOMICS
101static void
102drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
103{
104	union ring_state ns;
105	int n, pending, total;
106	uint16_t cidx = os.cidx;
107	uint16_t pidx = os.pidx_tail;
108
109	MPASS(os.flags == BUSY);
110	MPASS(cidx != pidx);
111
112	if (prev == IDLE)
113		counter_u64_add(r->starts, 1);
114	pending = 0;
115	total = 0;
116
117	while (cidx != pidx) {
118
119		/* Items from cidx to pidx are available for consumption. */
120		n = r->drain(r, cidx, pidx);
121		if (n == 0) {
122			os.state = ns.state = r->state;
123			ns.cidx = cidx;
124			ns.flags = STALLED;
125			r->state = ns.state;
126			if (prev != STALLED)
127				counter_u64_add(r->stalls, 1);
128			else if (total > 0) {
129				counter_u64_add(r->restarts, 1);
130				counter_u64_add(r->stalls, 1);
131			}
132			break;
133		}
134		cidx = increment_idx(r, cidx, n);
135		pending += n;
136		total += n;
137
138		/*
139		 * We update the cidx only if we've caught up with the pidx, the
140		 * real cidx is getting too far ahead of the one visible to
141		 * everyone else, or we have exceeded our budget.
142		 */
143		if (cidx != pidx && pending < 64 && total < budget)
144			continue;
145
146		os.state = ns.state = r->state;
147		ns.cidx = cidx;
148		ns.flags = state_to_flags(ns, total >= budget);
149		r->state = ns.state;
150
151		if (ns.flags == ABDICATED)
152			counter_u64_add(r->abdications, 1);
153		if (ns.flags != BUSY) {
154			/* Wrong loop exit if we're going to stall. */
155			MPASS(ns.flags != STALLED);
156			if (prev == STALLED) {
157				MPASS(total > 0);
158				counter_u64_add(r->restarts, 1);
159			}
160			break;
161		}
162
163		/*
164		 * The acquire style atomic above guarantees visibility of items
165		 * associated with any pidx change that we notice here.
166		 */
167		pidx = ns.pidx_tail;
168		pending = 0;
169	}
170}
171#else
172/*
173 * Caller passes in a state, with a guarantee that there is work to do and that
174 * all items up to the pidx_tail in the state are visible.
175 */
176static void
177drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
178{
179	union ring_state ns;
180	int n, pending, total;
181	uint16_t cidx = os.cidx;
182	uint16_t pidx = os.pidx_tail;
183
184	MPASS(os.flags == BUSY);
185	MPASS(cidx != pidx);
186
187	if (prev == IDLE)
188		counter_u64_add(r->starts, 1);
189	pending = 0;
190	total = 0;
191
192	while (cidx != pidx) {
193
194		/* Items from cidx to pidx are available for consumption. */
195		n = r->drain(r, cidx, pidx);
196		if (n == 0) {
197			critical_enter();
198			do {
199				os.state = ns.state = r->state;
200				ns.cidx = cidx;
201				ns.flags = STALLED;
202			} while (atomic_cmpset_64(&r->state, os.state,
203			    ns.state) == 0);
204			critical_exit();
205			if (prev != STALLED)
206				counter_u64_add(r->stalls, 1);
207			else if (total > 0) {
208				counter_u64_add(r->restarts, 1);
209				counter_u64_add(r->stalls, 1);
210			}
211			break;
212		}
213		cidx = increment_idx(r, cidx, n);
214		pending += n;
215		total += n;
216
217		/*
218		 * We update the cidx only if we've caught up with the pidx, the
219		 * real cidx is getting too far ahead of the one visible to
220		 * everyone else, or we have exceeded our budget.
221		 */
222		if (cidx != pidx && pending < 64 && total < budget)
223			continue;
224		critical_enter();
225		do {
226			os.state = ns.state = r->state;
227			ns.cidx = cidx;
228			ns.flags = state_to_flags(ns, total >= budget);
229		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
230		critical_exit();
231
232		if (ns.flags == ABDICATED)
233			counter_u64_add(r->abdications, 1);
234		if (ns.flags != BUSY) {
235			/* Wrong loop exit if we're going to stall. */
236			MPASS(ns.flags != STALLED);
237			if (prev == STALLED) {
238				MPASS(total > 0);
239				counter_u64_add(r->restarts, 1);
240			}
241			break;
242		}
243
244		/*
245		 * The acquire style atomic above guarantees visibility of items
246		 * associated with any pidx change that we notice here.
247		 */
248		pidx = ns.pidx_tail;
249		pending = 0;
250	}
251}
252#endif
253
254int
255ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
256    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
257{
258	struct ifmp_ring *r;
259
260	/* All idx are 16b so size can be 65536 at most */
261	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
262	    can_drain == NULL)
263		return (EINVAL);
264	*pr = NULL;
265	flags &= M_NOWAIT | M_WAITOK;
266	MPASS(flags != 0);
267
268	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269	if (r == NULL)
270		return (ENOMEM);
271	r->size = size;
272	r->cookie = cookie;
273	r->mt = mt;
274	r->drain = drain;
275	r->can_drain = can_drain;
276	r->enqueues = counter_u64_alloc(flags);
277	r->drops = counter_u64_alloc(flags);
278	r->starts = counter_u64_alloc(flags);
279	r->stalls = counter_u64_alloc(flags);
280	r->restarts = counter_u64_alloc(flags);
281	r->abdications = counter_u64_alloc(flags);
282	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
283	    r->stalls == NULL || r->restarts == NULL ||
284	    r->abdications == NULL) {
285		ifmp_ring_free(r);
286		return (ENOMEM);
287	}
288
289	*pr = r;
290#ifdef MP_RING_NO_64BIT_ATOMICS
291	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
292#endif
293	return (0);
294}
295
296void
297ifmp_ring_free(struct ifmp_ring *r)
298{
299
300	if (r == NULL)
301		return;
302
303	if (r->enqueues != NULL)
304		counter_u64_free(r->enqueues);
305	if (r->drops != NULL)
306		counter_u64_free(r->drops);
307	if (r->starts != NULL)
308		counter_u64_free(r->starts);
309	if (r->stalls != NULL)
310		counter_u64_free(r->stalls);
311	if (r->restarts != NULL)
312		counter_u64_free(r->restarts);
313	if (r->abdications != NULL)
314		counter_u64_free(r->abdications);
315
316	free(r, r->mt);
317}
318
319/*
320 * Enqueue n items and maybe drain the ring for some time.
321 *
322 * Returns an errno.
323 */
324#ifdef MP_RING_NO_64BIT_ATOMICS
325int
326ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
327{
328	union ring_state os, ns;
329	uint16_t pidx_start, pidx_stop;
330	int i;
331
332	MPASS(items != NULL);
333	MPASS(n > 0);
334
335	mtx_lock(&r->lock);
336	/*
337	 * Reserve room for the new items.  Our reservation, if successful, is
338	 * from 'pidx_start' to 'pidx_stop'.
339	 */
340	os.state = r->state;
341	if (n >= space_available(r, os)) {
342		counter_u64_add(r->drops, n);
343		MPASS(os.flags != IDLE);
344		mtx_unlock(&r->lock);
345		if (os.flags == STALLED)
346			ifmp_ring_check_drainage(r, 0);
347		return (ENOBUFS);
348	}
349	ns.state = os.state;
350	ns.pidx_head = increment_idx(r, os.pidx_head, n);
351	r->state = ns.state;
352	pidx_start = os.pidx_head;
353	pidx_stop = ns.pidx_head;
354
355	/*
356	 * Wait for other producers who got in ahead of us to enqueue their
357	 * items, one producer at a time.  It is our turn when the ring's
358	 * pidx_tail reaches the beginning of our reservation (pidx_start).
359	 */
360	while (ns.pidx_tail != pidx_start) {
361		cpu_spinwait();
362		ns.state = r->state;
363	}
364
365	/* Now it is our turn to fill up the area we reserved earlier. */
366	i = pidx_start;
367	do {
368		r->items[i] = *items++;
369		if (__predict_false(++i == r->size))
370			i = 0;
371	} while (i != pidx_stop);
372
373	/*
374	 * Update the ring's pidx_tail.  The release style atomic guarantees
375	 * that the items are visible to any thread that sees the updated pidx.
376	 */
377	os.state = ns.state = r->state;
378	ns.pidx_tail = pidx_stop;
379	ns.flags = BUSY;
380	r->state = ns.state;
381	counter_u64_add(r->enqueues, n);
382
383	/*
384	 * Turn into a consumer if some other thread isn't active as a consumer
385	 * already.
386	 */
387	if (os.flags != BUSY)
388		drain_ring_locked(r, ns, os.flags, budget);
389
390	mtx_unlock(&r->lock);
391	return (0);
392}
393
394#else
395int
396ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
397{
398	union ring_state os, ns;
399	uint16_t pidx_start, pidx_stop;
400	int i;
401
402	MPASS(items != NULL);
403	MPASS(n > 0);
404
405	/*
406	 * Reserve room for the new items.  Our reservation, if successful, is
407	 * from 'pidx_start' to 'pidx_stop'.
408	 */
409	for (;;) {
410		os.state = r->state;
411		if (n >= space_available(r, os)) {
412			counter_u64_add(r->drops, n);
413			MPASS(os.flags != IDLE);
414			if (os.flags == STALLED)
415				ifmp_ring_check_drainage(r, 0);
416			return (ENOBUFS);
417		}
418		ns.state = os.state;
419		ns.pidx_head = increment_idx(r, os.pidx_head, n);
420		critical_enter();
421		if (atomic_cmpset_64(&r->state, os.state, ns.state))
422			break;
423		critical_exit();
424		cpu_spinwait();
425	}
426	pidx_start = os.pidx_head;
427	pidx_stop = ns.pidx_head;
428
429	/*
430	 * Wait for other producers who got in ahead of us to enqueue their
431	 * items, one producer at a time.  It is our turn when the ring's
432	 * pidx_tail reaches the beginning of our reservation (pidx_start).
433	 */
434	while (ns.pidx_tail != pidx_start) {
435		cpu_spinwait();
436		ns.state = r->state;
437	}
438
439	/* Now it is our turn to fill up the area we reserved earlier. */
440	i = pidx_start;
441	do {
442		r->items[i] = *items++;
443		if (__predict_false(++i == r->size))
444			i = 0;
445	} while (i != pidx_stop);
446
447	/*
448	 * Update the ring's pidx_tail.  The release style atomic guarantees
449	 * that the items are visible to any thread that sees the updated pidx.
450	 */
451	do {
452		os.state = ns.state = r->state;
453		ns.pidx_tail = pidx_stop;
454		if (os.flags == IDLE)
455			ns.flags = ABDICATED;
456	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
457	critical_exit();
458	counter_u64_add(r->enqueues, n);
459
460	return (0);
461}
462#endif
463
464void
465ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
466{
467	union ring_state os, ns;
468
469	os.state = r->state;
470	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
471	    os.pidx_head != os.pidx_tail ||			// Require work to be available
472	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
473		return;
474
475	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
476	ns.state = os.state;
477	ns.flags = BUSY;
478
479
480#ifdef MP_RING_NO_64BIT_ATOMICS
481	mtx_lock(&r->lock);
482	if (r->state != os.state) {
483		mtx_unlock(&r->lock);
484		return;
485	}
486	r->state = ns.state;
487	drain_ring_locked(r, ns, os.flags, budget);
488	mtx_unlock(&r->lock);
489#else
490	/*
491	 * The acquire style atomic guarantees visibility of items associated
492	 * with the pidx that we read here.
493	 */
494	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
495		return;
496
497
498	drain_ring_lockless(r, ns, os.flags, budget);
499#endif
500}
501
502void
503ifmp_ring_reset_stats(struct ifmp_ring *r)
504{
505
506	counter_u64_zero(r->enqueues);
507	counter_u64_zero(r->drops);
508	counter_u64_zero(r->starts);
509	counter_u64_zero(r->stalls);
510	counter_u64_zero(r->restarts);
511	counter_u64_zero(r->abdications);
512}
513
514int
515ifmp_ring_is_idle(struct ifmp_ring *r)
516{
517	union ring_state s;
518
519	s.state = r->state;
520	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
521	    s.flags == IDLE)
522		return (1);
523
524	return (0);
525}
526
527int
528ifmp_ring_is_stalled(struct ifmp_ring *r)
529{
530	union ring_state s;
531
532	s.state = r->state;
533	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
534		return (1);
535
536	return (0);
537}
538