1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/types.h>
29#include <sys/param.h>
30#include <sys/systm.h>
31#include <sys/counter.h>
32#include <sys/lock.h>
33#include <sys/malloc.h>
34#include <sys/mutex.h>
35#include <sys/sysctl.h>
36#include <machine/cpu.h>
37
38#include "t4_mp_ring.h"
39
40#if defined(__i386__)
41#define atomic_cmpset_acq_64 atomic_cmpset_64
42#define atomic_cmpset_rel_64 atomic_cmpset_64
43#endif
44
45/*
46 * mp_ring handles multiple threads (producers) enqueueing data to a tx queue.
47 * The thread that is writing the hardware descriptors is the consumer and it
48 * runs with the consumer lock held.  A producer becomes the consumer if there
49 * isn't one already.  The consumer runs with the flags sets to BUSY and
50 * consumes everything (IDLE or COALESCING) or gets STALLED.  If it is running
51 * over its budget it sets flags to TOO_BUSY.  A producer that observes a
52 * TOO_BUSY consumer will become the new consumer by setting flags to
53 * TAKING_OVER.  The original consumer stops and sets the flags back to BUSY for
54 * the new consumer.
55 *
56 * COALESCING is the same as IDLE except there are items being held in the hope
57 * that they can be coalesced with items that follow.  The driver must arrange
58 * for a tx update or some other event that transmits all the held items in a
59 * timely manner if nothing else is enqueued.
60 */
61
62union ring_state {
63	struct {
64		uint16_t pidx_head;
65		uint16_t pidx_tail;
66		uint16_t cidx;
67		uint16_t flags;
68	};
69	uint64_t state;
70};
71
72enum {
73	IDLE = 0,	/* tx is all caught up, nothing to do. */
74	COALESCING,	/* IDLE, but tx frames are being held for coalescing */
75	BUSY,		/* consumer is running already, or will be shortly. */
76	TOO_BUSY,	/* consumer is running and is beyond its budget */
77	TAKING_OVER,	/* new consumer taking over from a TOO_BUSY consumer */
78	STALLED,	/* consumer stopped due to lack of resources. */
79};
80
81enum {
82	C_FAST = 0,
83	C_2,
84	C_3,
85	C_TAKEOVER,
86};
87
88static inline uint16_t
89space_available(struct mp_ring *r, union ring_state s)
90{
91	uint16_t x = r->size - 1;
92
93	if (s.cidx == s.pidx_head)
94		return (x);
95	else if (s.cidx > s.pidx_head)
96		return (s.cidx - s.pidx_head - 1);
97	else
98		return (x - s.pidx_head + s.cidx);
99}
100
101static inline uint16_t
102increment_idx(struct mp_ring *r, uint16_t idx, uint16_t n)
103{
104	int x = r->size - idx;
105
106	MPASS(x > 0);
107	return (x > n ? idx + n : n - x);
108}
109
110/*
111 * Consumer.  Called with the consumer lock held and a guarantee that there is
112 * work to do.
113 */
114static void
115drain_ring(struct mp_ring *r, int budget)
116{
117	union ring_state os, ns;
118	int n, pending, total;
119	uint16_t cidx;
120	uint16_t pidx;
121	bool coalescing;
122
123	mtx_assert(r->cons_lock, MA_OWNED);
124
125	os.state = atomic_load_acq_64(&r->state);
126	MPASS(os.flags == BUSY);
127
128	cidx = os.cidx;
129	pidx = os.pidx_tail;
130	MPASS(cidx != pidx);
131
132	pending = 0;
133	total = 0;
134
135	while (cidx != pidx) {
136
137		/* Items from cidx to pidx are available for consumption. */
138		n = r->drain(r, cidx, pidx, &coalescing);
139		if (n == 0) {
140			critical_enter();
141			os.state = atomic_load_64(&r->state);
142			do {
143				ns.state = os.state;
144				ns.cidx = cidx;
145
146				MPASS(os.flags == BUSY ||
147				    os.flags == TOO_BUSY ||
148				    os.flags == TAKING_OVER);
149
150				if (os.flags == TAKING_OVER)
151					ns.flags = BUSY;
152				else
153					ns.flags = STALLED;
154			} while (atomic_fcmpset_64(&r->state, &os.state,
155			    ns.state) == 0);
156			critical_exit();
157			if (os.flags == TAKING_OVER)
158				counter_u64_add(r->abdications, 1);
159			else if (ns.flags == STALLED)
160				counter_u64_add(r->stalls, 1);
161			break;
162		}
163		cidx = increment_idx(r, cidx, n);
164		pending += n;
165		total += n;
166		counter_u64_add(r->consumed, n);
167
168		os.state = atomic_load_64(&r->state);
169		do {
170			MPASS(os.flags == BUSY || os.flags == TOO_BUSY ||
171			    os.flags == TAKING_OVER);
172
173			ns.state = os.state;
174			ns.cidx = cidx;
175			if (__predict_false(os.flags == TAKING_OVER)) {
176				MPASS(total >= budget);
177				ns.flags = BUSY;
178				continue;
179			}
180			if (cidx == os.pidx_tail) {
181				ns.flags = coalescing ? COALESCING : IDLE;
182				continue;
183			}
184			if (total >= budget) {
185				ns.flags = TOO_BUSY;
186				continue;
187			}
188			MPASS(os.flags == BUSY);
189			if (pending < 32)
190				break;
191		} while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0);
192
193		if (__predict_false(os.flags == TAKING_OVER)) {
194			MPASS(ns.flags == BUSY);
195			counter_u64_add(r->abdications, 1);
196			break;
197		}
198
199		if (ns.flags == IDLE || ns.flags == COALESCING) {
200			MPASS(ns.pidx_tail == cidx);
201			if (ns.pidx_head != ns.pidx_tail)
202				counter_u64_add(r->cons_idle2, 1);
203			else
204				counter_u64_add(r->cons_idle, 1);
205			break;
206		}
207
208		/*
209		 * The acquire style atomic above guarantees visibility of items
210		 * associated with any pidx change that we notice here.
211		 */
212		pidx = ns.pidx_tail;
213		pending = 0;
214	}
215
216#ifdef INVARIANTS
217	if (os.flags == TAKING_OVER)
218		MPASS(ns.flags == BUSY);
219	else {
220		MPASS(ns.flags == IDLE || ns.flags == COALESCING ||
221		    ns.flags == STALLED);
222	}
223#endif
224}
225
226static void
227drain_txpkts(struct mp_ring *r, union ring_state os, int budget)
228{
229	union ring_state ns;
230	uint16_t cidx = os.cidx;
231	uint16_t pidx = os.pidx_tail;
232	bool coalescing;
233
234	mtx_assert(r->cons_lock, MA_OWNED);
235	MPASS(os.flags == BUSY);
236	MPASS(cidx == pidx);
237
238	r->drain(r, cidx, pidx, &coalescing);
239	MPASS(coalescing == false);
240	critical_enter();
241	os.state = atomic_load_64(&r->state);
242	do {
243		ns.state = os.state;
244		MPASS(os.flags == BUSY);
245		MPASS(os.cidx == cidx);
246		if (ns.cidx == ns.pidx_tail)
247			ns.flags = IDLE;
248		else
249			ns.flags = BUSY;
250	} while (atomic_fcmpset_acq_64(&r->state, &os.state, ns.state) == 0);
251	critical_exit();
252
253	if (ns.flags == BUSY)
254		drain_ring(r, budget);
255}
256
257int
258mp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain,
259    ring_can_drain_t can_drain, struct malloc_type *mt, struct mtx *lck,
260    int flags)
261{
262	struct mp_ring *r;
263	int i;
264
265	/* All idx are 16b so size can be 65536 at most */
266	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
267	    can_drain == NULL)
268		return (EINVAL);
269	*pr = NULL;
270	flags &= M_NOWAIT | M_WAITOK;
271	MPASS(flags != 0);
272
273	r = malloc(__offsetof(struct mp_ring, items[size]), mt, flags | M_ZERO);
274	if (r == NULL)
275		return (ENOMEM);
276	r->size = size;
277	r->cookie = cookie;
278	r->mt = mt;
279	r->drain = drain;
280	r->can_drain = can_drain;
281	r->cons_lock = lck;
282	if ((r->dropped = counter_u64_alloc(flags)) == NULL)
283		goto failed;
284	for (i = 0; i < nitems(r->consumer); i++) {
285		if ((r->consumer[i] = counter_u64_alloc(flags)) == NULL)
286			goto failed;
287	}
288	if ((r->not_consumer = counter_u64_alloc(flags)) == NULL)
289		goto failed;
290	if ((r->abdications = counter_u64_alloc(flags)) == NULL)
291		goto failed;
292	if ((r->stalls = counter_u64_alloc(flags)) == NULL)
293		goto failed;
294	if ((r->consumed = counter_u64_alloc(flags)) == NULL)
295		goto failed;
296	if ((r->cons_idle = counter_u64_alloc(flags)) == NULL)
297		goto failed;
298	if ((r->cons_idle2 = counter_u64_alloc(flags)) == NULL)
299		goto failed;
300	*pr = r;
301	return (0);
302failed:
303	mp_ring_free(r);
304	return (ENOMEM);
305}
306
307void
308
309mp_ring_free(struct mp_ring *r)
310{
311	int i;
312
313	if (r == NULL)
314		return;
315
316	if (r->dropped != NULL)
317		counter_u64_free(r->dropped);
318	for (i = 0; i < nitems(r->consumer); i++) {
319		if (r->consumer[i] != NULL)
320			counter_u64_free(r->consumer[i]);
321	}
322	if (r->not_consumer != NULL)
323		counter_u64_free(r->not_consumer);
324	if (r->abdications != NULL)
325		counter_u64_free(r->abdications);
326	if (r->stalls != NULL)
327		counter_u64_free(r->stalls);
328	if (r->consumed != NULL)
329		counter_u64_free(r->consumed);
330	if (r->cons_idle != NULL)
331		counter_u64_free(r->cons_idle);
332	if (r->cons_idle2 != NULL)
333		counter_u64_free(r->cons_idle2);
334
335	free(r, r->mt);
336}
337
338/*
339 * Enqueue n items and maybe drain the ring for some time.
340 *
341 * Returns an errno.
342 */
343int
344mp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget)
345{
346	union ring_state os, ns;
347	uint16_t pidx_start, pidx_stop;
348	int i, nospc, cons;
349	bool consumer;
350
351	MPASS(items != NULL);
352	MPASS(n > 0);
353
354	/*
355	 * Reserve room for the new items.  Our reservation, if successful, is
356	 * from 'pidx_start' to 'pidx_stop'.
357	 */
358	nospc = 0;
359	os.state = atomic_load_64(&r->state);
360	for (;;) {
361		for (;;) {
362			if (__predict_true(space_available(r, os) >= n))
363				break;
364
365			/* Not enough room in the ring. */
366
367			MPASS(os.flags != IDLE);
368			MPASS(os.flags != COALESCING);
369			if (__predict_false(++nospc > 100)) {
370				counter_u64_add(r->dropped, n);
371				return (ENOBUFS);
372			}
373			if (os.flags == STALLED)
374				mp_ring_check_drainage(r, 64);
375			else
376				cpu_spinwait();
377			os.state = atomic_load_64(&r->state);
378		}
379
380		/* There is room in the ring. */
381
382		cons = -1;
383		ns.state = os.state;
384		ns.pidx_head = increment_idx(r, os.pidx_head, n);
385		if (os.flags == IDLE || os.flags == COALESCING) {
386			MPASS(os.pidx_tail == os.cidx);
387			if (os.pidx_head == os.pidx_tail) {
388				cons = C_FAST;
389				ns.pidx_tail = increment_idx(r, os.pidx_tail, n);
390			} else
391				cons = C_2;
392			ns.flags = BUSY;
393		} else if (os.flags == TOO_BUSY) {
394			cons = C_TAKEOVER;
395			ns.flags = TAKING_OVER;
396		}
397		critical_enter();
398		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
399			break;
400		critical_exit();
401		cpu_spinwait();
402	};
403
404	pidx_start = os.pidx_head;
405	pidx_stop = ns.pidx_head;
406
407	if (cons == C_FAST) {
408		i = pidx_start;
409		do {
410			r->items[i] = *items++;
411			if (__predict_false(++i == r->size))
412				i = 0;
413		} while (i != pidx_stop);
414		critical_exit();
415		counter_u64_add(r->consumer[C_FAST], 1);
416		mtx_lock(r->cons_lock);
417		drain_ring(r, budget);
418		mtx_unlock(r->cons_lock);
419		return (0);
420	}
421
422	/*
423	 * Wait for other producers who got in ahead of us to enqueue their
424	 * items, one producer at a time.  It is our turn when the ring's
425	 * pidx_tail reaches the beginning of our reservation (pidx_start).
426	 */
427	while (ns.pidx_tail != pidx_start) {
428		cpu_spinwait();
429		ns.state = atomic_load_64(&r->state);
430	}
431
432	/* Now it is our turn to fill up the area we reserved earlier. */
433	i = pidx_start;
434	do {
435		r->items[i] = *items++;
436		if (__predict_false(++i == r->size))
437			i = 0;
438	} while (i != pidx_stop);
439
440	/*
441	 * Update the ring's pidx_tail.  The release style atomic guarantees
442	 * that the items are visible to any thread that sees the updated pidx.
443	 */
444	os.state = atomic_load_64(&r->state);
445	do {
446		consumer = false;
447		ns.state = os.state;
448		ns.pidx_tail = pidx_stop;
449		if (os.flags == IDLE || os.flags == COALESCING ||
450		    (os.flags == STALLED && r->can_drain(r))) {
451			MPASS(cons == -1);
452			consumer = true;
453			ns.flags = BUSY;
454		}
455	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
456	critical_exit();
457
458	if (cons == -1) {
459		if (consumer)
460			cons = C_3;
461		else {
462			counter_u64_add(r->not_consumer, 1);
463			return (0);
464		}
465	}
466	MPASS(cons > C_FAST && cons < nitems(r->consumer));
467	counter_u64_add(r->consumer[cons], 1);
468	mtx_lock(r->cons_lock);
469	drain_ring(r, budget);
470	mtx_unlock(r->cons_lock);
471
472	return (0);
473}
474
475void
476mp_ring_check_drainage(struct mp_ring *r, int budget)
477{
478	union ring_state os, ns;
479
480	os.state = atomic_load_64(&r->state);
481	if (os.flags == STALLED && r->can_drain(r)) {
482		MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
483		ns.state = os.state;
484		ns.flags = BUSY;
485		if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) {
486			mtx_lock(r->cons_lock);
487			drain_ring(r, budget);
488			mtx_unlock(r->cons_lock);
489		}
490	} else if (os.flags == COALESCING) {
491		MPASS(os.cidx == os.pidx_tail);
492		ns.state = os.state;
493		ns.flags = BUSY;
494		if (atomic_cmpset_acq_64(&r->state, os.state, ns.state)) {
495			mtx_lock(r->cons_lock);
496			drain_txpkts(r, ns, budget);
497			mtx_unlock(r->cons_lock);
498		}
499	}
500}
501
502void
503mp_ring_reset_stats(struct mp_ring *r)
504{
505	int i;
506
507	counter_u64_zero(r->dropped);
508	for (i = 0; i < nitems(r->consumer); i++)
509		counter_u64_zero(r->consumer[i]);
510	counter_u64_zero(r->not_consumer);
511	counter_u64_zero(r->abdications);
512	counter_u64_zero(r->stalls);
513	counter_u64_zero(r->consumed);
514	counter_u64_zero(r->cons_idle);
515	counter_u64_zero(r->cons_idle2);
516}
517
518bool
519mp_ring_is_idle(struct mp_ring *r)
520{
521	union ring_state s;
522
523	s.state = atomic_load_64(&r->state);
524	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
525	    s.flags == IDLE)
526		return (true);
527
528	return (false);
529}
530
531void
532mp_ring_sysctls(struct mp_ring *r, struct sysctl_ctx_list *ctx,
533    struct sysctl_oid_list *children)
534{
535	struct sysctl_oid *oid;
536
537	oid = SYSCTL_ADD_NODE(ctx, children, OID_AUTO, "mp_ring", CTLFLAG_RD |
538	    CTLFLAG_MPSAFE, NULL, "mp_ring statistics");
539	children = SYSCTL_CHILDREN(oid);
540
541	SYSCTL_ADD_U64(ctx, children, OID_AUTO, "state", CTLFLAG_RD,
542	    __DEVOLATILE(uint64_t *, &r->state), 0, "ring state");
543	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "dropped", CTLFLAG_RD,
544	    &r->dropped, "# of items dropped");
545	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumed",
546	    CTLFLAG_RD, &r->consumed, "# of items consumed");
547	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "fast_consumer",
548	    CTLFLAG_RD, &r->consumer[C_FAST],
549	    "# of times producer became consumer (fast)");
550	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer2",
551	    CTLFLAG_RD, &r->consumer[C_2],
552	    "# of times producer became consumer (2)");
553	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "consumer3",
554	    CTLFLAG_RD, &r->consumer[C_3],
555	    "# of times producer became consumer (3)");
556	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "takeovers",
557	    CTLFLAG_RD, &r->consumer[C_TAKEOVER],
558	    "# of times producer took over from another consumer.");
559	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "not_consumer",
560	    CTLFLAG_RD, &r->not_consumer,
561	    "# of times producer did not become consumer");
562	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "abdications",
563	    CTLFLAG_RD, &r->abdications, "# of consumer abdications");
564	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "stalls",
565	    CTLFLAG_RD, &r->stalls, "# of consumer stalls");
566	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle",
567	    CTLFLAG_RD, &r->cons_idle,
568	    "# of times consumer ran fully to completion");
569	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO, "cons_idle2",
570	    CTLFLAG_RD, &r->cons_idle2,
571	    "# of times consumer idled when another enqueue was in progress");
572}
573