mp_ring.c revision 344093
11541Srgrimes/*-
21541Srgrimes * Copyright (c) 2014 Chelsio Communications, Inc.
31541Srgrimes * All rights reserved.
41541Srgrimes * Written by: Navdeep Parhar <np@FreeBSD.org>
51541Srgrimes *
61541Srgrimes * Redistribution and use in source and binary forms, with or without
71541Srgrimes * modification, are permitted provided that the following conditions
81541Srgrimes * are met:
91541Srgrimes * 1. Redistributions of source code must retain the above copyright
101541Srgrimes *    notice, this list of conditions and the following disclaimer.
111541Srgrimes * 2. Redistributions in binary form must reproduce the above copyright
121541Srgrimes *    notice, this list of conditions and the following disclaimer in the
131541Srgrimes *    documentation and/or other materials provided with the distribution.
141541Srgrimes *
151541Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
161541Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
171541Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
181541Srgrimes * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
191541Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
201541Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
211541Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
221541Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
231541Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
241541Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
251541Srgrimes * SUCH DAMAGE.
261541Srgrimes */
271541Srgrimes
281541Srgrimes#include <sys/cdefs.h>
291541Srgrimes__FBSDID("$FreeBSD: stable/11/sys/net/mp_ring.c 344093 2019-02-13 14:25:05Z marius $");
3050477Speter
311541Srgrimes#include <sys/types.h>
321541Srgrimes#include <sys/param.h>
332165Spaul#include <sys/systm.h>
342165Spaul#include <sys/counter.h>
352165Spaul#include <sys/lock.h>
3623888Swollman#include <sys/mutex.h>
3712453Sbde#include <sys/malloc.h>
3883366Sjulian#include <machine/cpu.h>
3912453Sbde
4012453Sbde#if defined(__i386__)
4138482Swollman#define atomic_cmpset_acq_64 atomic_cmpset_64
4212453Sbde#define atomic_cmpset_rel_64 atomic_cmpset_64
4355205Speter#endif
441541Srgrimes
451541Srgrimes#include <net/mp_ring.h>
461541Srgrimes
471541Srgrimesunion ring_state {
481541Srgrimes	struct {
491541Srgrimes		uint16_t pidx_head;
501541Srgrimes		uint16_t pidx_tail;
511541Srgrimes		uint16_t cidx;
521541Srgrimes		uint16_t flags;
531541Srgrimes	};
541541Srgrimes	uint64_t state;
551541Srgrimes};
561541Srgrimes
571541Srgrimesenum {
5838482Swollman	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
591541Srgrimes	BUSY,		/* consumer is running already, or will be shortly. */
601541Srgrimes	STALLED,	/* consumer stopped due to lack of resources. */
611541Srgrimes	ABDICATED,	/* consumer stopped even though there was work to be
621541Srgrimes			   done because it wants another thread to take over. */
6338482Swollman};
6438482Swollman
651541Srgrimesstatic inline uint16_t
6681501Sjulianspace_available(struct ifmp_ring *r, union ring_state s)
6781501Sjulian{
6882824Sjulian	uint16_t x = r->size - 1;
6981501Sjulian
7081501Sjulian	if (s.cidx == s.pidx_head)
7182824Sjulian		return (x);
7281501Sjulian	else if (s.cidx > s.pidx_head)
7381501Sjulian		return (s.cidx - s.pidx_head - 1);
7481501Sjulian	else
7581501Sjulian		return (x - s.pidx_head + s.cidx);
7681501Sjulian}
7781501Sjulian
7881501Sjulianstatic inline uint16_t
7982824Sjulianincrement_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
8083366Sjulian{
8181501Sjulian	int x = r->size - idx;
821541Srgrimes
831541Srgrimes	MPASS(x > 0);
841541Srgrimes	return (x > n ? idx + n : n - x);
851541Srgrimes}
861541Srgrimes
871541Srgrimes/* Consumer is about to update the ring's state to s */
8881501Sjulianstatic inline uint16_t
8981501Sjulianstate_to_flags(union ring_state s, int abdicate)
9081501Sjulian{
9181501Sjulian
921541Srgrimes	if (s.cidx == s.pidx_tail)
9382824Sjulian		return (IDLE);
941541Srgrimes	else if (abdicate && s.pidx_tail != s.pidx_head)
9581501Sjulian		return (ABDICATED);
9681501Sjulian
9781501Sjulian	return (BUSY);
9881501Sjulian}
9981501Sjulian
10017047Swollman#ifdef MP_RING_NO_64BIT_ATOMICS
1011541Srgrimesstatic void
10228270Swollmandrain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1031541Srgrimes{
1041541Srgrimes	union ring_state ns;
1051541Srgrimes	int n, pending, total;
1061541Srgrimes	uint16_t cidx = os.cidx;
1071541Srgrimes	uint16_t pidx = os.pidx_tail;
108136692Sandre
109136692Sandre	MPASS(os.flags == BUSY);
110136692Sandre	MPASS(cidx != pidx);
111136692Sandre
112136692Sandre	if (prev == IDLE)
113136692Sandre		counter_u64_add(r->starts, 1);
1141541Srgrimes	pending = 0;
1151541Srgrimes	total = 0;
1161541Srgrimes
1176223Swollman	while (cidx != pidx) {
1186223Swollman
1196223Swollman		/* Items from cidx to pidx are available for consumption. */
1206223Swollman		n = r->drain(r, cidx, pidx);
1211541Srgrimes		if (n == 0) {
1221541Srgrimes			os.state = ns.state = r->state;
1231541Srgrimes			ns.cidx = cidx;
1241541Srgrimes			ns.flags = STALLED;
1251541Srgrimes			r->state = ns.state;
1261541Srgrimes			if (prev != STALLED)
1276223Swollman				counter_u64_add(r->stalls, 1);
12878064Sume			else if (total > 0) {
1291541Srgrimes				counter_u64_add(r->restarts, 1);
1301541Srgrimes				counter_u64_add(r->stalls, 1);
1311541Srgrimes			}
1321541Srgrimes			break;
1331541Srgrimes		}
134108533Sschweikh		cidx = increment_idx(r, cidx, n);
1351541Srgrimes		pending += n;
1361541Srgrimes		total += n;
1371541Srgrimes
1381541Srgrimes		/*
1391541Srgrimes		 * We update the cidx only if we've caught up with the pidx, the
1401541Srgrimes		 * real cidx is getting too far ahead of the one visible to
1411541Srgrimes		 * everyone else, or we have exceeded our budget.
1421541Srgrimes		 */
1431541Srgrimes		if (cidx != pidx && pending < 64 && total < budget)
1441541Srgrimes			continue;
1451541Srgrimes
1461541Srgrimes		os.state = ns.state = r->state;
1471541Srgrimes		ns.cidx = cidx;
1481541Srgrimes		ns.flags = state_to_flags(ns, total >= budget);
1491541Srgrimes		r->state = ns.state;
1501541Srgrimes
1511541Srgrimes		if (ns.flags == ABDICATED)
1521541Srgrimes			counter_u64_add(r->abdications, 1);
1531541Srgrimes		if (ns.flags != BUSY) {
1541541Srgrimes			/* Wrong loop exit if we're going to stall. */
1551541Srgrimes			MPASS(ns.flags != STALLED);
1561541Srgrimes			if (prev == STALLED) {
1571541Srgrimes				MPASS(total > 0);
1581541Srgrimes				counter_u64_add(r->restarts, 1);
1591541Srgrimes			}
1601541Srgrimes			break;
1611541Srgrimes		}
1621541Srgrimes
1631541Srgrimes		/*
1641541Srgrimes		 * The acquire style atomic above guarantees visibility of items
16517047Swollman		 * associated with any pidx change that we notice here.
1666223Swollman		 */
1676223Swollman		pidx = ns.pidx_tail;
1681541Srgrimes		pending = 0;
1691541Srgrimes	}
170101975Salfred}
1711541Srgrimes#else
1721541Srgrimes/*
1731541Srgrimes * Caller passes in a state, with a guarantee that there is work to do and that
1741541Srgrimes * all items up to the pidx_tail in the state are visible.
1751541Srgrimes */
1761541Srgrimesstatic void
1776223Swollmandrain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1781541Srgrimes{
1791541Srgrimes	union ring_state ns;
1801541Srgrimes	int n, pending, total;
18155205Speter	uint16_t cidx = os.cidx;
18217047Swollman	uint16_t pidx = os.pidx_tail;
18332995Sbde
18432995Sbde	MPASS(os.flags == BUSY);
18532995Sbde	MPASS(cidx != pidx);
18632995Sbde
18732995Sbde	if (prev == IDLE)
1881541Srgrimes		counter_u64_add(r->starts, 1);
18917047Swollman	pending = 0;
19025201Swollman	total = 0;
19123888Swollman
19223888Swollman	while (cidx != pidx) {
193137386Sphk
194137386Sphk		/* Items from cidx to pidx are available for consumption. */
195137386Sphk		n = r->drain(r, cidx, pidx);
19617047Swollman		if (n == 0) {
19717047Swollman			critical_enter();
198137386Sphk			do {
19992719Salfred				os.state = ns.state = r->state;
20092719Salfred				ns.cidx = cidx;
20192719Salfred				ns.flags = STALLED;
20292719Salfred			} while (atomic_cmpset_64(&r->state, os.state,
20393008Sbde			    ns.state) == 0);
20492719Salfred			critical_exit();
20593008Sbde			if (prev != STALLED)
20692719Salfred				counter_u64_add(r->stalls, 1);
20792719Salfred			else if (total > 0) {
20893008Sbde				counter_u64_add(r->restarts, 1);
20992719Salfred				counter_u64_add(r->stalls, 1);
21092719Salfred			}
21192719Salfred			break;
21292719Salfred		}
21392719Salfred		cidx = increment_idx(r, cidx, n);
21492719Salfred		pending += n;
21592719Salfred		total += n;
21693008Sbde
21793008Sbde		/*
21817047Swollman		 * We update the cidx only if we've caught up with the pidx, the
21917047Swollman		 * real cidx is getting too far ahead of the one visible to
22042902Sfenner		 * everyone else, or we have exceeded our budget.
22192719Salfred		 */
22292719Salfred		if (cidx != pidx && pending < 64 && total < budget)
22392719Salfred			continue;
22423888Swollman		critical_enter();
22523888Swollman		do {
22625201Swollman			os.state = ns.state = r->state;
22723888Swollman			ns.cidx = cidx;
22823888Swollman			ns.flags = state_to_flags(ns, total >= budget);
22925201Swollman		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
23025201Swollman		critical_exit();
23125201Swollman
23223888Swollman		if (ns.flags == ABDICATED)
23392719Salfred			counter_u64_add(r->abdications, 1);
23493008Sbde		if (ns.flags != BUSY) {
23593008Sbde			/* Wrong loop exit if we're going to stall. */
23693008Sbde			MPASS(ns.flags != STALLED);
23793008Sbde			if (prev == STALLED) {
23893008Sbde				MPASS(total > 0);
23992719Salfred				counter_u64_add(r->restarts, 1);
24093008Sbde			}
241122875Srwatson			break;
24217047Swollman		}
24317047Swollman
244136692Sandre		/*
245136692Sandre		 * The acquire style atomic above guarantees visibility of items
246136692Sandre		 * associated with any pidx change that we notice here.
247136692Sandre		 */
248136692Sandre		pidx = ns.pidx_tail;
249136692Sandre		pending = 0;
250136692Sandre	}
25192719Salfred}
252136692Sandre#endif
253136692Sandre
254136692Sandreint
25592719Salfredifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
25693008Sbde    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
25792719Salfred{
25892719Salfred	struct ifmp_ring *r;
25993008Sbde
260136692Sandre	/* All idx are 16b so size can be 65536 at most */
261136692Sandre	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
26292719Salfred	    can_drain == NULL)
263136692Sandre		return (EINVAL);
26492719Salfred	*pr = NULL;
26592719Salfred	flags &= M_NOWAIT | M_WAITOK;
266136692Sandre	MPASS(flags != 0);
267136692Sandre
26892719Salfred	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269136692Sandre	if (r == NULL)
270136692Sandre		return (ENOMEM);
271136692Sandre	r->size = size;
272136692Sandre	r->cookie = cookie;
273136692Sandre	r->mt = mt;
274136692Sandre	r->drain = drain;
275136692Sandre	r->can_drain = can_drain;
276136692Sandre	r->enqueues = counter_u64_alloc(flags);
277136692Sandre	r->drops = counter_u64_alloc(flags);
278136692Sandre	r->starts = counter_u64_alloc(flags);
279122875Srwatson	r->stalls = counter_u64_alloc(flags);
28017096Swollman	r->restarts = counter_u64_alloc(flags);
28155205Speter	r->abdications = counter_u64_alloc(flags);
28217047Swollman	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
28317047Swollman	    r->stalls == NULL || r->restarts == NULL ||
2841541Srgrimes	    r->abdications == NULL) {
2851541Srgrimes		ifmp_ring_free(r);
2861541Srgrimes		return (ENOMEM);
28712881Sbde	}
2881541Srgrimes
2891541Srgrimes	*pr = r;
2901541Srgrimes#ifdef MP_RING_NO_64BIT_ATOMICS
29122614Swollman	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
2921541Srgrimes#endif
2931541Srgrimes	return (0);
2941541Srgrimes}
2951541Srgrimes
2961541Srgrimesvoid
2971541Srgrimesifmp_ring_free(struct ifmp_ring *r)
2981541Srgrimes{
2991541Srgrimes
3001541Srgrimes	if (r == NULL)
3011541Srgrimes		return;
3021541Srgrimes
3031541Srgrimes	if (r->enqueues != NULL)
3041541Srgrimes		counter_u64_free(r->enqueues);
3051541Srgrimes	if (r->drops != NULL)
3061541Srgrimes		counter_u64_free(r->drops);
3071541Srgrimes	if (r->starts != NULL)
3081541Srgrimes		counter_u64_free(r->starts);
3091541Srgrimes	if (r->stalls != NULL)
31072638Sphk		counter_u64_free(r->stalls);
3111541Srgrimes	if (r->restarts != NULL)
31272638Sphk		counter_u64_free(r->restarts);
3131541Srgrimes	if (r->abdications != NULL)
3141541Srgrimes		counter_u64_free(r->abdications);
3151541Srgrimes
3161541Srgrimes	free(r, r->mt);
3171541Srgrimes}
3181541Srgrimes
31922614Swollman/*
3201541Srgrimes * Enqueue n items and maybe drain the ring for some time.
3211541Srgrimes *
3221541Srgrimes * Returns an errno.
3231541Srgrimes */
32472638Sphk#ifdef MP_RING_NO_64BIT_ATOMICS
3251541Srgrimesint
3261541Srgrimesifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
3271541Srgrimes{
3281541Srgrimes	union ring_state os, ns;
3291541Srgrimes	uint16_t pidx_start, pidx_stop;
33025201Swollman	int i;
3311541Srgrimes
3321541Srgrimes	MPASS(items != NULL);
3331541Srgrimes	MPASS(n > 0);
3341541Srgrimes
3351541Srgrimes	mtx_lock(&r->lock);
3361541Srgrimes	/*
3371541Srgrimes	 * Reserve room for the new items.  Our reservation, if successful, is
3381541Srgrimes	 * from 'pidx_start' to 'pidx_stop'.
3391541Srgrimes	 */
3401541Srgrimes	os.state = r->state;
3411541Srgrimes	if (n >= space_available(r, os)) {
3421541Srgrimes		counter_u64_add(r->drops, n);
3431541Srgrimes		MPASS(os.flags != IDLE);
3441541Srgrimes		mtx_unlock(&r->lock);
3451541Srgrimes		if (os.flags == STALLED)
3461541Srgrimes			ifmp_ring_check_drainage(r, 0);
3471541Srgrimes		return (ENOBUFS);
3481541Srgrimes	}
3491541Srgrimes	ns.state = os.state;
3501541Srgrimes	ns.pidx_head = increment_idx(r, os.pidx_head, n);
3511541Srgrimes	r->state = ns.state;
35255205Speter	pidx_start = os.pidx_head;
35392719Salfred	pidx_stop = ns.pidx_head;
35492719Salfred
35592719Salfred	/*
35692719Salfred	 * Wait for other producers who got in ahead of us to enqueue their
357136692Sandre	 * items, one producer at a time.  It is our turn when the ring's
358136692Sandre	 * pidx_tail reaches the beginning of our reservation (pidx_start).
3591541Srgrimes	 */
3602165Spaul	while (ns.pidx_tail != pidx_start) {
3612165Spaul		cpu_spinwait();
362		ns.state = r->state;
363	}
364
365	/* Now it is our turn to fill up the area we reserved earlier. */
366	i = pidx_start;
367	do {
368		r->items[i] = *items++;
369		if (__predict_false(++i == r->size))
370			i = 0;
371	} while (i != pidx_stop);
372
373	/*
374	 * Update the ring's pidx_tail.  The release style atomic guarantees
375	 * that the items are visible to any thread that sees the updated pidx.
376	 */
377	os.state = ns.state = r->state;
378	ns.pidx_tail = pidx_stop;
379	ns.flags = BUSY;
380	r->state = ns.state;
381	counter_u64_add(r->enqueues, n);
382
383	/*
384	 * Turn into a consumer if some other thread isn't active as a consumer
385	 * already.
386	 */
387	if (os.flags != BUSY)
388		drain_ring_locked(r, ns, os.flags, budget);
389
390	mtx_unlock(&r->lock);
391	return (0);
392}
393
394#else
395int
396ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
397{
398	union ring_state os, ns;
399	uint16_t pidx_start, pidx_stop;
400	int i;
401
402	MPASS(items != NULL);
403	MPASS(n > 0);
404
405	/*
406	 * Reserve room for the new items.  Our reservation, if successful, is
407	 * from 'pidx_start' to 'pidx_stop'.
408	 */
409	for (;;) {
410		os.state = r->state;
411		if (n >= space_available(r, os)) {
412			counter_u64_add(r->drops, n);
413			MPASS(os.flags != IDLE);
414			if (os.flags == STALLED)
415				ifmp_ring_check_drainage(r, 0);
416			return (ENOBUFS);
417		}
418		ns.state = os.state;
419		ns.pidx_head = increment_idx(r, os.pidx_head, n);
420		critical_enter();
421		if (atomic_cmpset_64(&r->state, os.state, ns.state))
422			break;
423		critical_exit();
424		cpu_spinwait();
425	}
426	pidx_start = os.pidx_head;
427	pidx_stop = ns.pidx_head;
428
429	/*
430	 * Wait for other producers who got in ahead of us to enqueue their
431	 * items, one producer at a time.  It is our turn when the ring's
432	 * pidx_tail reaches the beginning of our reservation (pidx_start).
433	 */
434	while (ns.pidx_tail != pidx_start) {
435		cpu_spinwait();
436		ns.state = r->state;
437	}
438
439	/* Now it is our turn to fill up the area we reserved earlier. */
440	i = pidx_start;
441	do {
442		r->items[i] = *items++;
443		if (__predict_false(++i == r->size))
444			i = 0;
445	} while (i != pidx_stop);
446
447	/*
448	 * Update the ring's pidx_tail.  The release style atomic guarantees
449	 * that the items are visible to any thread that sees the updated pidx.
450	 */
451	do {
452		os.state = ns.state = r->state;
453		ns.pidx_tail = pidx_stop;
454		if (os.flags == IDLE)
455			ns.flags = ABDICATED;
456	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
457	critical_exit();
458	counter_u64_add(r->enqueues, n);
459
460	return (0);
461}
462#endif
463
464void
465ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
466{
467	union ring_state os, ns;
468
469	os.state = r->state;
470	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
471	    os.pidx_head != os.pidx_tail ||			// Require work to be available
472	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
473		return;
474
475	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
476	ns.state = os.state;
477	ns.flags = BUSY;
478
479
480#ifdef MP_RING_NO_64BIT_ATOMICS
481	mtx_lock(&r->lock);
482	if (r->state != os.state) {
483		mtx_unlock(&r->lock);
484		return;
485	}
486	r->state = ns.state;
487	drain_ring_locked(r, ns, os.flags, budget);
488	mtx_unlock(&r->lock);
489#else
490	/*
491	 * The acquire style atomic guarantees visibility of items associated
492	 * with the pidx that we read here.
493	 */
494	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
495		return;
496
497
498	drain_ring_lockless(r, ns, os.flags, budget);
499#endif
500}
501
502void
503ifmp_ring_reset_stats(struct ifmp_ring *r)
504{
505
506	counter_u64_zero(r->enqueues);
507	counter_u64_zero(r->drops);
508	counter_u64_zero(r->starts);
509	counter_u64_zero(r->stalls);
510	counter_u64_zero(r->restarts);
511	counter_u64_zero(r->abdications);
512}
513
514int
515ifmp_ring_is_idle(struct ifmp_ring *r)
516{
517	union ring_state s;
518
519	s.state = r->state;
520	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
521	    s.flags == IDLE)
522		return (1);
523
524	return (0);
525}
526
527int
528ifmp_ring_is_stalled(struct ifmp_ring *r)
529{
530	union ring_state s;
531
532	s.state = r->state;
533	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
534		return (1);
535
536	return (0);
537}
538