1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD$");
30
31#include <sys/types.h>
32#include <sys/param.h>
33#include <sys/systm.h>
34#include <sys/counter.h>
35#include <sys/lock.h>
36#include <sys/malloc.h>
37#include <sys/mutex.h>
38#include <sys/sysctl.h>
39#include <machine/cpu.h>
40
41#include "t4_mp_ring.h"
42
43#if defined(__i386__)
44#define atomic_cmpset_acq_64 atomic_cmpset_64
45#define atomic_cmpset_rel_64 atomic_cmpset_64
46#endif
47
48/*
49 * mp_ring handles multiple threads (producers) enqueueing data to a tx queue.
50 * The thread that is writing the hardware descriptors is the consumer and it
51 * runs with the consumer lock held.  A producer becomes the consumer if there
52 * isn't one already.  The consumer runs with the flags sets to BUSY and
53 * consumes everything (IDLE or COALESCING) or gets STALLED.  If it is running
54 * over its budget it sets flags to TOO_BUSY.  A producer that observes a
55 * TOO_BUSY consumer will become the new consumer by setting flags to
56 * TAKING_OVER.  The original consumer stops and sets the flags back to BUSY for
57 * the new consumer.
58 *
59 * COALESCING is the same as IDLE except there are items being held in the hope
60 * that they can be coalesced with items that follow.  The driver must arrange
61 * for a tx update or some other event that transmits all the held items in a
62 * timely manner if nothing else is enqueued.
63 */
64
65union ring_state {
66	struct {
67		uint16_t pidx_head;
68		uint16_t pidx_tail;
69		uint16_t cidx;
70		uint16_t flags;
71	};
72	uint64_t state;
73};
74
75enum {
76	IDLE = 0,	/* tx is all caught up, nothing to do. */
77	COALESCING,	/* IDLE, but tx frames are being held for coalescing */
78	BUSY,		/* consumer is running already, or will be shortly. */
79	TOO_BUSY,	/* consumer is running and is beyond its budget */
80	TAKING_OVER,	/* new consumer taking over from a TOO_BUSY consumer */
81	STALLED,	/* consumer stopped due to lack of resources. */
82};
83
84enum {
85	C_FAST = 0,
86	C_2,
87	C_3,
88	C_TAKEOVER,
89};
90
91static inline uint16_t
92space_available(struct mp_ring *r, union ring_state s)
93{
94	uint16_t x = r->size - 1;
95
96	if (s.cidx == s.pidx_head)
97		return (x);
98	else if (s.cidx > s.pidx_head)
99		return (s.cidx - s.pidx_head - 1);
100	else
101		return (x - s.pidx_head + s.cidx);
102}
103
104static inline uint16_t
105increment_idx(struct mp_ring *r, uint16_t idx, uint16_t n)
106{
107	int x = r->size - idx;
108
109	MPASS(x > 0);
110	return (x > n ? idx + n : n - x);
111}
112
113/*
114 * Consumer.  Called with the consumer lock held and a guarantee that there is
115 * work to do.
116 */
117static void
118drain_ring(struct mp_ring *r, int budget)
119{
120	union ring_state os, ns;
121	int n, pending, total;
122	uint16_t cidx;
123	uint16_t pidx;
124	bool coalescing;
125
126	mtx_assert(r->cons_lock, MA_OWNED);
127
128	os.state = atomic_load_acq_64(&r->state);
129	MPASS(os.flags == BUSY);
130
131	cidx = os.cidx;
132	pidx = os.pidx_tail;
133	MPASS(cidx != pidx);
134
135	pending = 0;
136	total = 0;
137
138	while (cidx != pidx) {
139
140		/* Items from cidx to pidx are available for consumption. */
141		n = r->drain(r, cidx, pidx, &coalescing);
142		if (n == 0) {
143			critical_enter();
144			os.state = atomic_load_64(&r->state);
145			do {
146				ns.state = os.state;
147				ns.cidx = cidx;
148
149				MPASS(os.flags == BUSY ||
150				    os.flags == TOO_BUSY ||
151				    os.flags == TAKING_OVER);
152
153				if (os.flags == TAKING_OVER)
154					ns.flags = BUSY;
155				else
156					ns.flags = STALLED;
157			} while (atomic_fcmpset_64(&r->state, &os.state,
158			    ns.state) == 0);
159			critical_exit();
160			if (os.flags == TAKING_OVER)
161				counter_u64_add(r->abdications, 1);
162			else if (ns.flags == STALLED)
163				counter_u64_add(r->stalls, 1);
164			break;
165		}
166		cidx = increment_idx(r, cidx, n);
167		pending += n;
168		total += n;
169		counter_u64_add(r->consumed, n);
170
171		os.state = atomic_load_64(&r->state);
172		do {
173			MPASS(os.flags == BUSY || os.flags == TOO_BUSY ||
174			    os.flags == TAKING_OVER);
175
176			ns.state = os.state;
177			ns.cidx = cidx;
178			if (__predict_false(os.flags == TAKING_OVER)) {
179				MPASS(total >= budget);
180				ns.flags = BUSY;
181				continue;
182			}
183			if (cidx == os.pidx_tail) {
184				ns.flags = coalescing ? COALESCING : IDLE;
185				continue;
186			}
187			if (total >= budget) {
188				ns.flags = TOO_BUSY;
189				continue;
190			}
191			MPASS(os.flags == BUSY);
192			if (pending < 32)
193				break;
194		} while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0);
195
196		if (__predict_false(os.flags == TAKING_OVER)) {
197			MPASS(ns.flags == BUSY);
198			counter_u64_add(r->abdications, 1);
199			break;
200		}
201
202		if (ns.flags == IDLE || ns.flags == COALESCING) {
203			MPASS(ns.pidx_tail == cidx);
204			if (ns.pidx_head != ns.pidx_tail)
205				counter_u64_add(r->cons_idle2, 1);
206			else
207				counter_u64_add(r->cons_idle, 1);
208			break;
209		}
210
211		/*
212		 * The acquire style atomic above guarantees visibility of items
213		 * associated with any pidx change that we notice here.
214		 */
215		pidx = ns.pidx_tail;
216		pending = 0;
217	}
218
219#ifdef INVARIANTS
220	if (os.flags == TAKING_OVER)
221		MPASS(ns.flags == BUSY);
222	else {
223		MPASS(ns.flags == IDLE || ns.flags == COALESCING ||
224		    ns.flags == STALLED);
225	}
226#endif
227}
228
229static void
230drain_txpkts(struct mp_ring *r, union ring_state os, int budget)
231{
232	union ring_state ns;
233	uint16_t cidx = os.cidx;
234	uint16_t pidx = os.pidx_tail;
235	bool coalescing;
236
237	mtx_assert(r->cons_lock, MA_OWNED);
238	MPASS(os.flags == BUSY);
239	MPASS(cidx == pidx);
240
241	r->drain(r, cidx, pidx, &coalescing);
242	MPASS(coalescing == false);
243	critical_enter();
244	os.state = atomic_load_64(&r->state);
245	do {
246		ns.state = os.state;
247		MPASS(os.flags == BUSY);
248		MPASS(os.cidx == cidx);
249		if (ns.cidx == ns.pidx_tail)
250			ns.flags = IDLE;
251		else
252			ns.flags = BUSY;
253	} while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0);
254	critical_exit();
255
256	if (ns.flags == BUSY)
257		drain_ring(r, budget);
258}
259
260int
261mp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain,
262    ring_can_drain_t can_drain, struct malloc_type *mt, struct mtx *lck,
263    int flags)
264{
265	struct mp_ring *r;
266	int i;
267
268	/* All idx are 16b so size can be 65536 at most */
269	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
270	    can_drain == NULL)
271		return (EINVAL);
272	*pr = NULL;
273	flags &= M_NOWAIT | M_WAITOK;
274	MPASS(flags != 0);
275
276	r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO);
277	if (r == NULL)
278		return (ENOMEM);
279	r->size = size;
280	r->cookie = cookie;
281	r->mt = mt;
282	r->drain = drain;
283	r->can_drain = can_drain;
284	r->cons_lock = lck;
285	if ((r->dropped = counter_u64_alloc(flags)) == NULL)
286		goto failed;
287	for (i = 0; i < nitems(r->consumer); i++) {
288		if ((r->consumer[i] = counter_u64_alloc(flags)) == NULL)
289			goto failed;
290	}
291	if ((r->not_consumer = counter_u64_alloc(flags)) == NULL)
292		goto failed;
293	if ((r->abdications = counter_u64_alloc(flags)) == NULL)
294		goto failed;
295	if ((r->stalls = counter_u64_alloc(flags)) == NULL)
296		goto failed;
297	if ((r->consumed = counter_u64_alloc(flags)) == NULL)
298		goto failed;
299	if ((r->cons_idle = counter_u64_alloc(flags)) == NULL)
300		goto failed;
301	if ((r->cons_idle2 = counter_u64_alloc(flags)) == NULL)
302		goto failed;
303	*pr = r;
304	return (0);
305failed:
306	mp_ring_free(r);
307	return (ENOMEM);
308}
309
310void
311
312mp_ring_free(struct mp_ring *r)
313{
314	int i;
315
316	if (r == NULL)
317		return;
318
319	if (r->dropped != NULL)
320		counter_u64_free(r->dropped);
321	for (i = 0; i < nitems(r->consumer); i++) {
322		if (r->consumer[i] != NULL)
323			counter_u64_free(r->consumer[i]);
324	}
325	if (r->not_consumer != NULL)
326		counter_u64_free(r->not_consumer);
327	if (r->abdications != NULL)
328		counter_u64_free(r->abdications);
329	if (r->stalls != NULL)
330		counter_u64_free(r->stalls);
331	if (r->consumed != NULL)
332		counter_u64_free(r->consumed);
333	if (r->cons_idle != NULL)
334		counter_u64_free(r->cons_idle);
335	if (r->cons_idle2 != NULL)
336		counter_u64_free(r->cons_idle2);
337
338	free(r, r->mt);
339}
340
341/*
342 * Enqueue n items and maybe drain the ring for some time.
343 *
344 * Returns an errno.
345 */
346int
347mp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget)
348{
349	union ring_state os, ns;
350	uint16_t pidx_start, pidx_stop;
351	int i, nospc, cons;
352	bool consumer;
353
354	MPASS(items != NULL);
355	MPASS(n > 0);
356
357	/*
358	 * Reserve room for the new items.  Our reservation, if successful, is
359	 * from 'pidx_start' to 'pidx_stop'.
360	 */
361	nospc = 0;
362	os.state = atomic_load_64(&r->state);
363	for (;;) {
364		for (;;) {
365			if (__predict_true(space_available(r, os) >= n))
366				break;
367
368			/* Not enough room in the ring. */
369
370			MPASS(os.flags != IDLE);
371			MPASS(os.flags != COALESCING);
372			if (__predict_false(++nospc > 100)) {
373				counter_u64_add(r->dropped, n);
374				return (ENOBUFS);
375			}
376			if (os.flags == STALLED)
377				mp_ring_check_drainage(r, 64);
378			else
379				cpu_spinwait();
380			os.state = atomic_load_64(&r->state);
381		}
382
383		/* There is room in the ring. */
384
385		cons = -1;
386		ns.state = os.state;
387		ns.pidx_head = increment_idx(r, os.pidx_head, n);
388		if (os.flags == IDLE || os.flags == COALESCING) {
389			MPASS(os.pidx_tail == os.cidx);
390			if (os.pidx_head == os.pidx_tail) {
391				cons = C_FAST;
392				ns.pidx_tail = increment_idx(r, os.pidx_tail, n);
393			} else
394				cons = C_2;
395			ns.flags = BUSY;
396		} else if (os.flags == TOO_BUSY) {
397			cons = C_TAKEOVER;
398			ns.flags = TAKING_OVER;
399		}
400		critical_enter();
401		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
402			break;
403		critical_exit();
404		cpu_spinwait();
405	};
406
407	pidx_start = os.pidx_head;
408	pidx_stop = ns.pidx_head;
409
410	if (cons == C_FAST) {
411		i = pidx_start;
412		do {
413			r->items[i] = *items++;
414			if (__predict_false(++i == r->size))
415				i = 0;
416		} while (i != pidx_stop);
417		critical_exit();
418		counter_u64_add(r->consumer[C_FAST], 1);
419		mtx_lock(r->cons_lock);
420		drain_ring(r, budget);
421		mtx_unlock(r->cons_lock);
422		return (0);
423	}
424
425	/*
426	 * Wait for other producers who got in ahead of us to enqueue their
427	 * items, one producer at a time.  It is our turn when the ring's
428	 * pidx_tail reaches the beginning of our reservation (pidx_start).
429	 */
430	while (ns.pidx_tail != pidx_start) {
431		cpu_spinwait();
432		ns.state = atomic_load_64(&r->state);
433	}
434
435	/* Now it is our turn to fill up the area we reserved earlier. */
436	i = pidx_start;
437	do {
438		r->items[i] = *items++;
439		if (__predict_false(++i == r->size))
440			i = 0;
441	} while (i != pidx_stop);
442
443	/*
444	 * Update the ring's pidx_tail.  The release style atomic guarantees
445	 * that the items are visible to any thread that sees the updated pidx.
446	 */
447	os.state = atomic_load_64(&r->state);
448	do {
449		consumer = false;
450		ns.state = os.state;
451		ns.pidx_tail = pidx_stop;
452		if (os.flags == IDLE || os.flags == COALESCING ||
453		    (os.flags == STALLED && r->can_drain(r))) {
454			MPASS(cons == -1);
455			consumer = true;
456			ns.flags = BUSY;
457		}
458	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
459	critical_exit();
460
461	if (cons == -1) {
462		if (consumer)
463			cons = C_3;
464		else {
465			counter_u64_add(r->not_consumer, 1);
466			return (0);
467		}
468	}
469	MPASS(cons > C_FAST && cons < nitems(r->consumer));
470	counter_u64_add(r->consumer[cons], 1);
471	mtx_lock(r->cons_lock);
472	drain_ring(r, budget);
473	mtx_unlock(r->cons_lock);
474
475	return (0);
476}
477
478void
479mp_ring_check_drainage(struct mp_ring *r, int budget)
480{
481	union ring_state os, ns;
482
483	os.state = atomic_load_64(&r->state);
484	if (os.flags == STALLED && r->can_drain(r)) {
485		MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
486		ns.state = os.state;
487		ns.flags = BUSY;
488		if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) {
489			mtx_lock(r->cons_lock);
490			drain_ring(r, budget);
491			mtx_unlock(r->cons_lock);
492		}
493	} else if (os.flags == COALESCING) {
494		MPASS(os.cidx == os.pidx_tail);
495		ns.state = os.state;
496		ns.flags = BUSY;
497		if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) {
498			mtx_lock(r->cons_lock);
499			drain_txpkts(r, ns, budget);
500			mtx_unlock(r->cons_lock);
501		}
502	}
503}
504
505void
506mp_ring_reset_stats(struct mp_ring *r)
507{
508	int i;
509
510	counter_u64_zero(r->dropped);
511	for (i = 0; i < nitems(r->consumer); i++)
512		counter_u64_zero(r->consumer[i]);
513	counter_u64_zero(r->not_consumer);
514	counter_u64_zero(r->abdications);
515	counter_u64_zero(r->stalls);
516	counter_u64_zero(r->consumed);
517	counter_u64_zero(r->cons_idle);
518	counter_u64_zero(r->cons_idle2);
519}
520
521bool
522mp_ring_is_idle(struct mp_ring *r)
523{
524	union ring_state s;
525
526	s.state = atomic_load_64(&r->state);
527	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
528	    s.flags == IDLE)
529		return (true);
530
531	return (false);
532}
533
534void
535mp_ring_sysctls(struct mp_ring *r, struct sysctl_ctx_list *ctx,
536    struct sysctl_oid_list *children)
537{
538	struct sysctl_oid *oid;
539
540	oid = SYSCTL_ADD_NODE(ctx, children, OID_AUTO, "mp_ring", CTLFLAG_RD |
541	    CTLFLAG_MPSAFE, NULL, "mp_ring statistics");
542	children = SYSCTL_CHILDREN(oid);
543
544	SYSCTL_ADD_U64(ctx, children, OID_AUTO, "state", CTLFLAG_RD,
545	    __DEVOLATILE(uint64_t *, &r->state), 0, "ring state");
546	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "dropped", CTLFLAG_RD,
547	    &r->dropped, "# of items dropped");
548	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumed",
549	    CTLFLAG_RD, &r->consumed, "# of items consumed");
550	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "fast_consumer",
551	    CTLFLAG_RD, &r->consumer[C_FAST],
552	    "# of times producer became consumer (fast)");
553	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer2",
554	    CTLFLAG_RD, &r->consumer[C_2],
555	    "# of times producer became consumer (2)");
556	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer3",
557	    CTLFLAG_RD, &r->consumer[C_3],
558	    "# of times producer became consumer (3)");
559	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "takeovers",
560	    CTLFLAG_RD, &r->consumer[C_TAKEOVER],
561	    "# of times producer took over from another consumer.");
562	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "not_consumer",
563	    CTLFLAG_RD, &r->not_consumer,
564	    "# of times producer did not become consumer");
565	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "abdications",
566	    CTLFLAG_RD, &r->abdications, "# of consumer abdications");
567	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "stalls",
568	    CTLFLAG_RD, &r->stalls, "# of consumer stalls");
569	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle",
570	    CTLFLAG_RD, &r->cons_idle,
571	    "# of times consumer ran fully to completion");
572	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle2",
573	    CTLFLAG_RD, &r->cons_idle2,
574	    "# of times consumer idled when another enqueue was in progress");
575}
576