1300113Sscottl/*- 2300113Sscottl * Copyright (c) 2014 Chelsio Communications, Inc. 3300113Sscottl * All rights reserved. 4300113Sscottl * Written by: Navdeep Parhar <np@FreeBSD.org> 5300113Sscottl * 6300113Sscottl * Redistribution and use in source and binary forms, with or without 7300113Sscottl * modification, are permitted provided that the following conditions 8300113Sscottl * are met: 9300113Sscottl * 1. Redistributions of source code must retain the above copyright 10300113Sscottl * notice, this list of conditions and the following disclaimer. 11300113Sscottl * 2. Redistributions in binary form must reproduce the above copyright 12300113Sscottl * notice, this list of conditions and the following disclaimer in the 13300113Sscottl * documentation and/or other materials provided with the distribution. 14300113Sscottl * 15300113Sscottl * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16300113Sscottl * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17300113Sscottl * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18300113Sscottl * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19300113Sscottl * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20300113Sscottl * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21300113Sscottl * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22300113Sscottl * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23300113Sscottl * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24300113Sscottl * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25300113Sscottl * SUCH DAMAGE. 26300113Sscottl */ 27300113Sscottl 28300113Sscottl#include <sys/cdefs.h> 29300113Sscottl__FBSDID("$FreeBSD: stable/11/sys/net/mp_ring.c 344093 2019-02-13 14:25:05Z marius $"); 30300113Sscottl 31300113Sscottl#include <sys/types.h> 32300113Sscottl#include <sys/param.h> 33300113Sscottl#include <sys/systm.h> 34300113Sscottl#include <sys/counter.h> 35300113Sscottl#include <sys/lock.h> 36300113Sscottl#include <sys/mutex.h> 37300113Sscottl#include <sys/malloc.h> 38300113Sscottl#include <machine/cpu.h> 39300113Sscottl 40300113Sscottl#if defined(__i386__) 41300113Sscottl#define atomic_cmpset_acq_64 atomic_cmpset_64 42300113Sscottl#define atomic_cmpset_rel_64 atomic_cmpset_64 43300113Sscottl#endif 44300113Sscottl 45300154Sscottl#include <net/mp_ring.h> 46300154Sscottl 47300113Sscottlunion ring_state { 48300113Sscottl struct { 49300113Sscottl uint16_t pidx_head; 50300113Sscottl uint16_t pidx_tail; 51300113Sscottl uint16_t cidx; 52300113Sscottl uint16_t flags; 53300113Sscottl }; 54300113Sscottl uint64_t state; 55300113Sscottl}; 56300113Sscottl 57300113Sscottlenum { 58300113Sscottl IDLE = 0, /* consumer ran to completion, nothing more to do. */ 59300113Sscottl BUSY, /* consumer is running already, or will be shortly. */ 60300113Sscottl STALLED, /* consumer stopped due to lack of resources. */ 61300113Sscottl ABDICATED, /* consumer stopped even though there was work to be 62300113Sscottl done because it wants another thread to take over. */ 63300113Sscottl}; 64300113Sscottl 65300113Sscottlstatic inline uint16_t 66300113Sscottlspace_available(struct ifmp_ring *r, union ring_state s) 67300113Sscottl{ 68300113Sscottl uint16_t x = r->size - 1; 69300113Sscottl 70300113Sscottl if (s.cidx == s.pidx_head) 71300113Sscottl return (x); 72300113Sscottl else if (s.cidx > s.pidx_head) 73300113Sscottl return (s.cidx - s.pidx_head - 1); 74300113Sscottl else 75300113Sscottl return (x - s.pidx_head + s.cidx); 76300113Sscottl} 77300113Sscottl 78300113Sscottlstatic inline uint16_t 79300113Sscottlincrement_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 80300113Sscottl{ 81300113Sscottl int x = r->size - idx; 82300113Sscottl 83300113Sscottl MPASS(x > 0); 84300113Sscottl return (x > n ? idx + n : n - x); 85300113Sscottl} 86300113Sscottl 87300113Sscottl/* Consumer is about to update the ring's state to s */ 88300113Sscottlstatic inline uint16_t 89300113Sscottlstate_to_flags(union ring_state s, int abdicate) 90300113Sscottl{ 91300113Sscottl 92300113Sscottl if (s.cidx == s.pidx_tail) 93300113Sscottl return (IDLE); 94300113Sscottl else if (abdicate && s.pidx_tail != s.pidx_head) 95300113Sscottl return (ABDICATED); 96300113Sscottl 97300113Sscottl return (BUSY); 98300113Sscottl} 99300113Sscottl 100344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS 101300113Sscottlstatic void 102300113Sscottldrain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 103300113Sscottl{ 104300113Sscottl union ring_state ns; 105300113Sscottl int n, pending, total; 106300113Sscottl uint16_t cidx = os.cidx; 107300113Sscottl uint16_t pidx = os.pidx_tail; 108300113Sscottl 109300113Sscottl MPASS(os.flags == BUSY); 110300113Sscottl MPASS(cidx != pidx); 111300113Sscottl 112300113Sscottl if (prev == IDLE) 113300113Sscottl counter_u64_add(r->starts, 1); 114300113Sscottl pending = 0; 115300113Sscottl total = 0; 116300113Sscottl 117300113Sscottl while (cidx != pidx) { 118300113Sscottl 119300113Sscottl /* Items from cidx to pidx are available for consumption. */ 120300113Sscottl n = r->drain(r, cidx, pidx); 121300113Sscottl if (n == 0) { 122300113Sscottl os.state = ns.state = r->state; 123300113Sscottl ns.cidx = cidx; 124300113Sscottl ns.flags = STALLED; 125300113Sscottl r->state = ns.state; 126300113Sscottl if (prev != STALLED) 127300113Sscottl counter_u64_add(r->stalls, 1); 128300113Sscottl else if (total > 0) { 129300113Sscottl counter_u64_add(r->restarts, 1); 130300113Sscottl counter_u64_add(r->stalls, 1); 131300113Sscottl } 132300113Sscottl break; 133300113Sscottl } 134300113Sscottl cidx = increment_idx(r, cidx, n); 135300113Sscottl pending += n; 136300113Sscottl total += n; 137300113Sscottl 138300113Sscottl /* 139300113Sscottl * We update the cidx only if we've caught up with the pidx, the 140300113Sscottl * real cidx is getting too far ahead of the one visible to 141300113Sscottl * everyone else, or we have exceeded our budget. 142300113Sscottl */ 143300113Sscottl if (cidx != pidx && pending < 64 && total < budget) 144300113Sscottl continue; 145300113Sscottl 146300113Sscottl os.state = ns.state = r->state; 147300113Sscottl ns.cidx = cidx; 148300113Sscottl ns.flags = state_to_flags(ns, total >= budget); 149300113Sscottl r->state = ns.state; 150300113Sscottl 151300113Sscottl if (ns.flags == ABDICATED) 152300113Sscottl counter_u64_add(r->abdications, 1); 153300113Sscottl if (ns.flags != BUSY) { 154300113Sscottl /* Wrong loop exit if we're going to stall. */ 155300113Sscottl MPASS(ns.flags != STALLED); 156300113Sscottl if (prev == STALLED) { 157300113Sscottl MPASS(total > 0); 158300113Sscottl counter_u64_add(r->restarts, 1); 159300113Sscottl } 160300113Sscottl break; 161300113Sscottl } 162300113Sscottl 163300113Sscottl /* 164300113Sscottl * The acquire style atomic above guarantees visibility of items 165300113Sscottl * associated with any pidx change that we notice here. 166300113Sscottl */ 167300113Sscottl pidx = ns.pidx_tail; 168300113Sscottl pending = 0; 169300113Sscottl } 170300113Sscottl} 171300113Sscottl#else 172300113Sscottl/* 173300113Sscottl * Caller passes in a state, with a guarantee that there is work to do and that 174300113Sscottl * all items up to the pidx_tail in the state are visible. 175300113Sscottl */ 176300113Sscottlstatic void 177300113Sscottldrain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 178300113Sscottl{ 179300113Sscottl union ring_state ns; 180300113Sscottl int n, pending, total; 181300113Sscottl uint16_t cidx = os.cidx; 182300113Sscottl uint16_t pidx = os.pidx_tail; 183300113Sscottl 184300113Sscottl MPASS(os.flags == BUSY); 185300113Sscottl MPASS(cidx != pidx); 186300113Sscottl 187300113Sscottl if (prev == IDLE) 188300113Sscottl counter_u64_add(r->starts, 1); 189300113Sscottl pending = 0; 190300113Sscottl total = 0; 191300113Sscottl 192300113Sscottl while (cidx != pidx) { 193300113Sscottl 194300113Sscottl /* Items from cidx to pidx are available for consumption. */ 195300113Sscottl n = r->drain(r, cidx, pidx); 196300113Sscottl if (n == 0) { 197300113Sscottl critical_enter(); 198300113Sscottl do { 199300113Sscottl os.state = ns.state = r->state; 200300113Sscottl ns.cidx = cidx; 201300113Sscottl ns.flags = STALLED; 202300113Sscottl } while (atomic_cmpset_64(&r->state, os.state, 203300113Sscottl ns.state) == 0); 204300113Sscottl critical_exit(); 205300113Sscottl if (prev != STALLED) 206300113Sscottl counter_u64_add(r->stalls, 1); 207300113Sscottl else if (total > 0) { 208300113Sscottl counter_u64_add(r->restarts, 1); 209300113Sscottl counter_u64_add(r->stalls, 1); 210300113Sscottl } 211300113Sscottl break; 212300113Sscottl } 213300113Sscottl cidx = increment_idx(r, cidx, n); 214300113Sscottl pending += n; 215300113Sscottl total += n; 216300113Sscottl 217300113Sscottl /* 218300113Sscottl * We update the cidx only if we've caught up with the pidx, the 219300113Sscottl * real cidx is getting too far ahead of the one visible to 220300113Sscottl * everyone else, or we have exceeded our budget. 221300113Sscottl */ 222300113Sscottl if (cidx != pidx && pending < 64 && total < budget) 223300113Sscottl continue; 224300113Sscottl critical_enter(); 225300113Sscottl do { 226300113Sscottl os.state = ns.state = r->state; 227300113Sscottl ns.cidx = cidx; 228300113Sscottl ns.flags = state_to_flags(ns, total >= budget); 229300113Sscottl } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0); 230300113Sscottl critical_exit(); 231300113Sscottl 232300113Sscottl if (ns.flags == ABDICATED) 233300113Sscottl counter_u64_add(r->abdications, 1); 234300113Sscottl if (ns.flags != BUSY) { 235300113Sscottl /* Wrong loop exit if we're going to stall. */ 236300113Sscottl MPASS(ns.flags != STALLED); 237300113Sscottl if (prev == STALLED) { 238300113Sscottl MPASS(total > 0); 239300113Sscottl counter_u64_add(r->restarts, 1); 240300113Sscottl } 241300113Sscottl break; 242300113Sscottl } 243300113Sscottl 244300113Sscottl /* 245300113Sscottl * The acquire style atomic above guarantees visibility of items 246300113Sscottl * associated with any pidx change that we notice here. 247300113Sscottl */ 248300113Sscottl pidx = ns.pidx_tail; 249300113Sscottl pending = 0; 250300113Sscottl } 251300113Sscottl} 252300113Sscottl#endif 253300113Sscottl 254300113Sscottlint 255300113Sscottlifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 256300113Sscottl mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 257300113Sscottl{ 258300113Sscottl struct ifmp_ring *r; 259300113Sscottl 260300113Sscottl /* All idx are 16b so size can be 65536 at most */ 261300113Sscottl if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 262300113Sscottl can_drain == NULL) 263300113Sscottl return (EINVAL); 264300113Sscottl *pr = NULL; 265300113Sscottl flags &= M_NOWAIT | M_WAITOK; 266300113Sscottl MPASS(flags != 0); 267300113Sscottl 268300113Sscottl r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 269300113Sscottl if (r == NULL) 270300113Sscottl return (ENOMEM); 271300113Sscottl r->size = size; 272300113Sscottl r->cookie = cookie; 273300113Sscottl r->mt = mt; 274300113Sscottl r->drain = drain; 275300113Sscottl r->can_drain = can_drain; 276300113Sscottl r->enqueues = counter_u64_alloc(flags); 277300113Sscottl r->drops = counter_u64_alloc(flags); 278300113Sscottl r->starts = counter_u64_alloc(flags); 279300113Sscottl r->stalls = counter_u64_alloc(flags); 280300113Sscottl r->restarts = counter_u64_alloc(flags); 281300113Sscottl r->abdications = counter_u64_alloc(flags); 282300113Sscottl if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 283300113Sscottl r->stalls == NULL || r->restarts == NULL || 284300113Sscottl r->abdications == NULL) { 285300113Sscottl ifmp_ring_free(r); 286300113Sscottl return (ENOMEM); 287300113Sscottl } 288300113Sscottl 289300113Sscottl *pr = r; 290344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS 291300113Sscottl mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 292300113Sscottl#endif 293300113Sscottl return (0); 294300113Sscottl} 295300113Sscottl 296300113Sscottlvoid 297300113Sscottlifmp_ring_free(struct ifmp_ring *r) 298300113Sscottl{ 299300113Sscottl 300300113Sscottl if (r == NULL) 301300113Sscottl return; 302300113Sscottl 303300113Sscottl if (r->enqueues != NULL) 304300113Sscottl counter_u64_free(r->enqueues); 305300113Sscottl if (r->drops != NULL) 306300113Sscottl counter_u64_free(r->drops); 307300113Sscottl if (r->starts != NULL) 308300113Sscottl counter_u64_free(r->starts); 309300113Sscottl if (r->stalls != NULL) 310300113Sscottl counter_u64_free(r->stalls); 311300113Sscottl if (r->restarts != NULL) 312300113Sscottl counter_u64_free(r->restarts); 313300113Sscottl if (r->abdications != NULL) 314300113Sscottl counter_u64_free(r->abdications); 315300113Sscottl 316300113Sscottl free(r, r->mt); 317300113Sscottl} 318300113Sscottl 319300113Sscottl/* 320300113Sscottl * Enqueue n items and maybe drain the ring for some time. 321300113Sscottl * 322300113Sscottl * Returns an errno. 323300113Sscottl */ 324344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS 325300113Sscottlint 326300113Sscottlifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 327300113Sscottl{ 328300113Sscottl union ring_state os, ns; 329300113Sscottl uint16_t pidx_start, pidx_stop; 330300113Sscottl int i; 331300113Sscottl 332300113Sscottl MPASS(items != NULL); 333300113Sscottl MPASS(n > 0); 334300113Sscottl 335300113Sscottl mtx_lock(&r->lock); 336300113Sscottl /* 337300113Sscottl * Reserve room for the new items. Our reservation, if successful, is 338300113Sscottl * from 'pidx_start' to 'pidx_stop'. 339300113Sscottl */ 340300113Sscottl os.state = r->state; 341300113Sscottl if (n >= space_available(r, os)) { 342300113Sscottl counter_u64_add(r->drops, n); 343300113Sscottl MPASS(os.flags != IDLE); 344344093Smarius mtx_unlock(&r->lock); 345300113Sscottl if (os.flags == STALLED) 346300113Sscottl ifmp_ring_check_drainage(r, 0); 347300113Sscottl return (ENOBUFS); 348300113Sscottl } 349300113Sscottl ns.state = os.state; 350300113Sscottl ns.pidx_head = increment_idx(r, os.pidx_head, n); 351300113Sscottl r->state = ns.state; 352300113Sscottl pidx_start = os.pidx_head; 353300113Sscottl pidx_stop = ns.pidx_head; 354300113Sscottl 355300113Sscottl /* 356300113Sscottl * Wait for other producers who got in ahead of us to enqueue their 357300113Sscottl * items, one producer at a time. It is our turn when the ring's 358300215Spfg * pidx_tail reaches the beginning of our reservation (pidx_start). 359300113Sscottl */ 360300113Sscottl while (ns.pidx_tail != pidx_start) { 361300113Sscottl cpu_spinwait(); 362300113Sscottl ns.state = r->state; 363300113Sscottl } 364300113Sscottl 365300113Sscottl /* Now it is our turn to fill up the area we reserved earlier. */ 366300113Sscottl i = pidx_start; 367300113Sscottl do { 368300113Sscottl r->items[i] = *items++; 369300113Sscottl if (__predict_false(++i == r->size)) 370300113Sscottl i = 0; 371300113Sscottl } while (i != pidx_stop); 372300113Sscottl 373300113Sscottl /* 374300113Sscottl * Update the ring's pidx_tail. The release style atomic guarantees 375300113Sscottl * that the items are visible to any thread that sees the updated pidx. 376300113Sscottl */ 377300113Sscottl os.state = ns.state = r->state; 378300113Sscottl ns.pidx_tail = pidx_stop; 379300113Sscottl ns.flags = BUSY; 380300113Sscottl r->state = ns.state; 381300113Sscottl counter_u64_add(r->enqueues, n); 382300113Sscottl 383300113Sscottl /* 384300113Sscottl * Turn into a consumer if some other thread isn't active as a consumer 385300113Sscottl * already. 386300113Sscottl */ 387300113Sscottl if (os.flags != BUSY) 388300113Sscottl drain_ring_locked(r, ns, os.flags, budget); 389300113Sscottl 390300113Sscottl mtx_unlock(&r->lock); 391300113Sscottl return (0); 392300113Sscottl} 393300113Sscottl 394300113Sscottl#else 395300113Sscottlint 396300113Sscottlifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 397300113Sscottl{ 398300113Sscottl union ring_state os, ns; 399300113Sscottl uint16_t pidx_start, pidx_stop; 400300113Sscottl int i; 401300113Sscottl 402300113Sscottl MPASS(items != NULL); 403300113Sscottl MPASS(n > 0); 404300113Sscottl 405300113Sscottl /* 406300113Sscottl * Reserve room for the new items. Our reservation, if successful, is 407300113Sscottl * from 'pidx_start' to 'pidx_stop'. 408300113Sscottl */ 409300113Sscottl for (;;) { 410300113Sscottl os.state = r->state; 411300113Sscottl if (n >= space_available(r, os)) { 412300113Sscottl counter_u64_add(r->drops, n); 413300113Sscottl MPASS(os.flags != IDLE); 414300113Sscottl if (os.flags == STALLED) 415300113Sscottl ifmp_ring_check_drainage(r, 0); 416300113Sscottl return (ENOBUFS); 417300113Sscottl } 418300113Sscottl ns.state = os.state; 419300113Sscottl ns.pidx_head = increment_idx(r, os.pidx_head, n); 420300113Sscottl critical_enter(); 421300113Sscottl if (atomic_cmpset_64(&r->state, os.state, ns.state)) 422300113Sscottl break; 423300113Sscottl critical_exit(); 424300113Sscottl cpu_spinwait(); 425300113Sscottl } 426300113Sscottl pidx_start = os.pidx_head; 427300113Sscottl pidx_stop = ns.pidx_head; 428300113Sscottl 429300113Sscottl /* 430300113Sscottl * Wait for other producers who got in ahead of us to enqueue their 431300113Sscottl * items, one producer at a time. It is our turn when the ring's 432300215Spfg * pidx_tail reaches the beginning of our reservation (pidx_start). 433300113Sscottl */ 434300113Sscottl while (ns.pidx_tail != pidx_start) { 435300113Sscottl cpu_spinwait(); 436300113Sscottl ns.state = r->state; 437300113Sscottl } 438300113Sscottl 439300113Sscottl /* Now it is our turn to fill up the area we reserved earlier. */ 440300113Sscottl i = pidx_start; 441300113Sscottl do { 442300113Sscottl r->items[i] = *items++; 443300113Sscottl if (__predict_false(++i == r->size)) 444300113Sscottl i = 0; 445300113Sscottl } while (i != pidx_stop); 446300113Sscottl 447300113Sscottl /* 448300113Sscottl * Update the ring's pidx_tail. The release style atomic guarantees 449300113Sscottl * that the items are visible to any thread that sees the updated pidx. 450300113Sscottl */ 451300113Sscottl do { 452300113Sscottl os.state = ns.state = r->state; 453300113Sscottl ns.pidx_tail = pidx_stop; 454333338Sshurd if (os.flags == IDLE) 455333338Sshurd ns.flags = ABDICATED; 456300113Sscottl } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0); 457300113Sscottl critical_exit(); 458300113Sscottl counter_u64_add(r->enqueues, n); 459300113Sscottl 460300113Sscottl return (0); 461300113Sscottl} 462300113Sscottl#endif 463300113Sscottl 464300113Sscottlvoid 465300113Sscottlifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 466300113Sscottl{ 467300113Sscottl union ring_state os, ns; 468300113Sscottl 469300113Sscottl os.state = r->state; 470333338Sshurd if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED 471333338Sshurd os.pidx_head != os.pidx_tail || // Require work to be available 472333338Sshurd (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left 473300113Sscottl return; 474300113Sscottl 475300113Sscottl MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 476300113Sscottl ns.state = os.state; 477300113Sscottl ns.flags = BUSY; 478300113Sscottl 479300113Sscottl 480344093Smarius#ifdef MP_RING_NO_64BIT_ATOMICS 481300113Sscottl mtx_lock(&r->lock); 482300113Sscottl if (r->state != os.state) { 483300113Sscottl mtx_unlock(&r->lock); 484300113Sscottl return; 485300113Sscottl } 486300113Sscottl r->state = ns.state; 487300113Sscottl drain_ring_locked(r, ns, os.flags, budget); 488300113Sscottl mtx_unlock(&r->lock); 489300113Sscottl#else 490300113Sscottl /* 491300113Sscottl * The acquire style atomic guarantees visibility of items associated 492300113Sscottl * with the pidx that we read here. 493300113Sscottl */ 494300113Sscottl if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 495300113Sscottl return; 496300113Sscottl 497300113Sscottl 498300113Sscottl drain_ring_lockless(r, ns, os.flags, budget); 499300113Sscottl#endif 500300113Sscottl} 501300113Sscottl 502300113Sscottlvoid 503300113Sscottlifmp_ring_reset_stats(struct ifmp_ring *r) 504300113Sscottl{ 505300113Sscottl 506300113Sscottl counter_u64_zero(r->enqueues); 507300113Sscottl counter_u64_zero(r->drops); 508300113Sscottl counter_u64_zero(r->starts); 509300113Sscottl counter_u64_zero(r->stalls); 510300113Sscottl counter_u64_zero(r->restarts); 511300113Sscottl counter_u64_zero(r->abdications); 512300113Sscottl} 513300113Sscottl 514300113Sscottlint 515300113Sscottlifmp_ring_is_idle(struct ifmp_ring *r) 516300113Sscottl{ 517300113Sscottl union ring_state s; 518300113Sscottl 519300113Sscottl s.state = r->state; 520300113Sscottl if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 521300113Sscottl s.flags == IDLE) 522300113Sscottl return (1); 523300113Sscottl 524300113Sscottl return (0); 525300113Sscottl} 526300113Sscottl 527300113Sscottlint 528300113Sscottlifmp_ring_is_stalled(struct ifmp_ring *r) 529300113Sscottl{ 530300113Sscottl union ring_state s; 531300113Sscottl 532300113Sscottl s.state = r->state; 533300113Sscottl if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 534300113Sscottl return (1); 535300113Sscottl 536300113Sscottl return (0); 537300113Sscottl} 538