mp_ring.c revision 344093
11541Srgrimes/*- 21541Srgrimes * Copyright (c) 2014 Chelsio Communications, Inc. 31541Srgrimes * All rights reserved. 41541Srgrimes * Written by: Navdeep Parhar <np@FreeBSD.org> 51541Srgrimes * 61541Srgrimes * Redistribution and use in source and binary forms, with or without 71541Srgrimes * modification, are permitted provided that the following conditions 81541Srgrimes * are met: 91541Srgrimes * 1. Redistributions of source code must retain the above copyright 101541Srgrimes * notice, this list of conditions and the following disclaimer. 111541Srgrimes * 2. Redistributions in binary form must reproduce the above copyright 121541Srgrimes * notice, this list of conditions and the following disclaimer in the 131541Srgrimes * documentation and/or other materials provided with the distribution. 141541Srgrimes * 151541Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 161541Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 171541Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 181541Srgrimes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 191541Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 201541Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 211541Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 221541Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 231541Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 241541Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 251541Srgrimes * SUCH DAMAGE. 261541Srgrimes */ 271541Srgrimes 281541Srgrimes#include <sys/cdefs.h> 291541Srgrimes__FBSDID("$FreeBSD: stable/11/sys/net/mp_ring.c 344093 2019-02-13 14:25:05Z marius $"); 3050477Speter 311541Srgrimes#include <sys/types.h> 321541Srgrimes#include <sys/param.h> 332165Spaul#include <sys/systm.h> 342165Spaul#include <sys/counter.h> 352165Spaul#include <sys/lock.h> 3623888Swollman#include <sys/mutex.h> 3712453Sbde#include <sys/malloc.h> 3883366Sjulian#include <machine/cpu.h> 3912453Sbde 4012453Sbde#if defined(__i386__) 4138482Swollman#define atomic_cmpset_acq_64 atomic_cmpset_64 4212453Sbde#define atomic_cmpset_rel_64 atomic_cmpset_64 4355205Speter#endif 441541Srgrimes 451541Srgrimes#include <net/mp_ring.h> 461541Srgrimes 471541Srgrimesunion ring_state { 481541Srgrimes struct { 491541Srgrimes uint16_t pidx_head; 501541Srgrimes uint16_t pidx_tail; 511541Srgrimes uint16_t cidx; 521541Srgrimes uint16_t flags; 531541Srgrimes }; 541541Srgrimes uint64_t state; 551541Srgrimes}; 561541Srgrimes 571541Srgrimesenum { 5838482Swollman IDLE = 0, /* consumer ran to completion, nothing more to do. */ 591541Srgrimes BUSY, /* consumer is running already, or will be shortly. */ 601541Srgrimes STALLED, /* consumer stopped due to lack of resources. */ 611541Srgrimes ABDICATED, /* consumer stopped even though there was work to be 621541Srgrimes done because it wants another thread to take over. */ 6338482Swollman}; 6438482Swollman 651541Srgrimesstatic inline uint16_t 6681501Sjulianspace_available(struct ifmp_ring *r, union ring_state s) 6781501Sjulian{ 6882824Sjulian uint16_t x = r->size - 1; 6981501Sjulian 7081501Sjulian if (s.cidx == s.pidx_head) 7182824Sjulian return (x); 7281501Sjulian else if (s.cidx > s.pidx_head) 7381501Sjulian return (s.cidx - s.pidx_head - 1); 7481501Sjulian else 7581501Sjulian return (x - s.pidx_head + s.cidx); 7681501Sjulian} 7781501Sjulian 7881501Sjulianstatic inline uint16_t 7982824Sjulianincrement_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n) 8083366Sjulian{ 8181501Sjulian int x = r->size - idx; 821541Srgrimes 831541Srgrimes MPASS(x > 0); 841541Srgrimes return (x > n ? idx + n : n - x); 851541Srgrimes} 861541Srgrimes 871541Srgrimes/* Consumer is about to update the ring's state to s */ 8881501Sjulianstatic inline uint16_t 8981501Sjulianstate_to_flags(union ring_state s, int abdicate) 9081501Sjulian{ 9181501Sjulian 921541Srgrimes if (s.cidx == s.pidx_tail) 9382824Sjulian return (IDLE); 941541Srgrimes else if (abdicate && s.pidx_tail != s.pidx_head) 9581501Sjulian return (ABDICATED); 9681501Sjulian 9781501Sjulian return (BUSY); 9881501Sjulian} 9981501Sjulian 10017047Swollman#ifdef MP_RING_NO_64BIT_ATOMICS 1011541Srgrimesstatic void 10228270Swollmandrain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 1031541Srgrimes{ 1041541Srgrimes union ring_state ns; 1051541Srgrimes int n, pending, total; 1061541Srgrimes uint16_t cidx = os.cidx; 1071541Srgrimes uint16_t pidx = os.pidx_tail; 108136692Sandre 109136692Sandre MPASS(os.flags == BUSY); 110136692Sandre MPASS(cidx != pidx); 111136692Sandre 112136692Sandre if (prev == IDLE) 113136692Sandre counter_u64_add(r->starts, 1); 1141541Srgrimes pending = 0; 1151541Srgrimes total = 0; 1161541Srgrimes 1176223Swollman while (cidx != pidx) { 1186223Swollman 1196223Swollman /* Items from cidx to pidx are available for consumption. */ 1206223Swollman n = r->drain(r, cidx, pidx); 1211541Srgrimes if (n == 0) { 1221541Srgrimes os.state = ns.state = r->state; 1231541Srgrimes ns.cidx = cidx; 1241541Srgrimes ns.flags = STALLED; 1251541Srgrimes r->state = ns.state; 1261541Srgrimes if (prev != STALLED) 1276223Swollman counter_u64_add(r->stalls, 1); 12878064Sume else if (total > 0) { 1291541Srgrimes counter_u64_add(r->restarts, 1); 1301541Srgrimes counter_u64_add(r->stalls, 1); 1311541Srgrimes } 1321541Srgrimes break; 1331541Srgrimes } 134108533Sschweikh cidx = increment_idx(r, cidx, n); 1351541Srgrimes pending += n; 1361541Srgrimes total += n; 1371541Srgrimes 1381541Srgrimes /* 1391541Srgrimes * We update the cidx only if we've caught up with the pidx, the 1401541Srgrimes * real cidx is getting too far ahead of the one visible to 1411541Srgrimes * everyone else, or we have exceeded our budget. 1421541Srgrimes */ 1431541Srgrimes if (cidx != pidx && pending < 64 && total < budget) 1441541Srgrimes continue; 1451541Srgrimes 1461541Srgrimes os.state = ns.state = r->state; 1471541Srgrimes ns.cidx = cidx; 1481541Srgrimes ns.flags = state_to_flags(ns, total >= budget); 1491541Srgrimes r->state = ns.state; 1501541Srgrimes 1511541Srgrimes if (ns.flags == ABDICATED) 1521541Srgrimes counter_u64_add(r->abdications, 1); 1531541Srgrimes if (ns.flags != BUSY) { 1541541Srgrimes /* Wrong loop exit if we're going to stall. */ 1551541Srgrimes MPASS(ns.flags != STALLED); 1561541Srgrimes if (prev == STALLED) { 1571541Srgrimes MPASS(total > 0); 1581541Srgrimes counter_u64_add(r->restarts, 1); 1591541Srgrimes } 1601541Srgrimes break; 1611541Srgrimes } 1621541Srgrimes 1631541Srgrimes /* 1641541Srgrimes * The acquire style atomic above guarantees visibility of items 16517047Swollman * associated with any pidx change that we notice here. 1666223Swollman */ 1676223Swollman pidx = ns.pidx_tail; 1681541Srgrimes pending = 0; 1691541Srgrimes } 170101975Salfred} 1711541Srgrimes#else 1721541Srgrimes/* 1731541Srgrimes * Caller passes in a state, with a guarantee that there is work to do and that 1741541Srgrimes * all items up to the pidx_tail in the state are visible. 1751541Srgrimes */ 1761541Srgrimesstatic void 1776223Swollmandrain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget) 1781541Srgrimes{ 1791541Srgrimes union ring_state ns; 1801541Srgrimes int n, pending, total; 18155205Speter uint16_t cidx = os.cidx; 18217047Swollman uint16_t pidx = os.pidx_tail; 18332995Sbde 18432995Sbde MPASS(os.flags == BUSY); 18532995Sbde MPASS(cidx != pidx); 18632995Sbde 18732995Sbde if (prev == IDLE) 1881541Srgrimes counter_u64_add(r->starts, 1); 18917047Swollman pending = 0; 19025201Swollman total = 0; 19123888Swollman 19223888Swollman while (cidx != pidx) { 193137386Sphk 194137386Sphk /* Items from cidx to pidx are available for consumption. */ 195137386Sphk n = r->drain(r, cidx, pidx); 19617047Swollman if (n == 0) { 19717047Swollman critical_enter(); 198137386Sphk do { 19992719Salfred os.state = ns.state = r->state; 20092719Salfred ns.cidx = cidx; 20192719Salfred ns.flags = STALLED; 20292719Salfred } while (atomic_cmpset_64(&r->state, os.state, 20393008Sbde ns.state) == 0); 20492719Salfred critical_exit(); 20593008Sbde if (prev != STALLED) 20692719Salfred counter_u64_add(r->stalls, 1); 20792719Salfred else if (total > 0) { 20893008Sbde counter_u64_add(r->restarts, 1); 20992719Salfred counter_u64_add(r->stalls, 1); 21092719Salfred } 21192719Salfred break; 21292719Salfred } 21392719Salfred cidx = increment_idx(r, cidx, n); 21492719Salfred pending += n; 21592719Salfred total += n; 21693008Sbde 21793008Sbde /* 21817047Swollman * We update the cidx only if we've caught up with the pidx, the 21917047Swollman * real cidx is getting too far ahead of the one visible to 22042902Sfenner * everyone else, or we have exceeded our budget. 22192719Salfred */ 22292719Salfred if (cidx != pidx && pending < 64 && total < budget) 22392719Salfred continue; 22423888Swollman critical_enter(); 22523888Swollman do { 22625201Swollman os.state = ns.state = r->state; 22723888Swollman ns.cidx = cidx; 22823888Swollman ns.flags = state_to_flags(ns, total >= budget); 22925201Swollman } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0); 23025201Swollman critical_exit(); 23125201Swollman 23223888Swollman if (ns.flags == ABDICATED) 23392719Salfred counter_u64_add(r->abdications, 1); 23493008Sbde if (ns.flags != BUSY) { 23593008Sbde /* Wrong loop exit if we're going to stall. */ 23693008Sbde MPASS(ns.flags != STALLED); 23793008Sbde if (prev == STALLED) { 23893008Sbde MPASS(total > 0); 23992719Salfred counter_u64_add(r->restarts, 1); 24093008Sbde } 241122875Srwatson break; 24217047Swollman } 24317047Swollman 244136692Sandre /* 245136692Sandre * The acquire style atomic above guarantees visibility of items 246136692Sandre * associated with any pidx change that we notice here. 247136692Sandre */ 248136692Sandre pidx = ns.pidx_tail; 249136692Sandre pending = 0; 250136692Sandre } 25192719Salfred} 252136692Sandre#endif 253136692Sandre 254136692Sandreint 25592719Salfredifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain, 25693008Sbde mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags) 25792719Salfred{ 25892719Salfred struct ifmp_ring *r; 25993008Sbde 260136692Sandre /* All idx are 16b so size can be 65536 at most */ 261136692Sandre if (pr == NULL || size < 2 || size > 65536 || drain == NULL || 26292719Salfred can_drain == NULL) 263136692Sandre return (EINVAL); 26492719Salfred *pr = NULL; 26592719Salfred flags &= M_NOWAIT | M_WAITOK; 266136692Sandre MPASS(flags != 0); 267136692Sandre 26892719Salfred r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO); 269136692Sandre if (r == NULL) 270136692Sandre return (ENOMEM); 271136692Sandre r->size = size; 272136692Sandre r->cookie = cookie; 273136692Sandre r->mt = mt; 274136692Sandre r->drain = drain; 275136692Sandre r->can_drain = can_drain; 276136692Sandre r->enqueues = counter_u64_alloc(flags); 277136692Sandre r->drops = counter_u64_alloc(flags); 278136692Sandre r->starts = counter_u64_alloc(flags); 279122875Srwatson r->stalls = counter_u64_alloc(flags); 28017096Swollman r->restarts = counter_u64_alloc(flags); 28155205Speter r->abdications = counter_u64_alloc(flags); 28217047Swollman if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL || 28317047Swollman r->stalls == NULL || r->restarts == NULL || 2841541Srgrimes r->abdications == NULL) { 2851541Srgrimes ifmp_ring_free(r); 2861541Srgrimes return (ENOMEM); 28712881Sbde } 2881541Srgrimes 2891541Srgrimes *pr = r; 2901541Srgrimes#ifdef MP_RING_NO_64BIT_ATOMICS 29122614Swollman mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF); 2921541Srgrimes#endif 2931541Srgrimes return (0); 2941541Srgrimes} 2951541Srgrimes 2961541Srgrimesvoid 2971541Srgrimesifmp_ring_free(struct ifmp_ring *r) 2981541Srgrimes{ 2991541Srgrimes 3001541Srgrimes if (r == NULL) 3011541Srgrimes return; 3021541Srgrimes 3031541Srgrimes if (r->enqueues != NULL) 3041541Srgrimes counter_u64_free(r->enqueues); 3051541Srgrimes if (r->drops != NULL) 3061541Srgrimes counter_u64_free(r->drops); 3071541Srgrimes if (r->starts != NULL) 3081541Srgrimes counter_u64_free(r->starts); 3091541Srgrimes if (r->stalls != NULL) 31072638Sphk counter_u64_free(r->stalls); 3111541Srgrimes if (r->restarts != NULL) 31272638Sphk counter_u64_free(r->restarts); 3131541Srgrimes if (r->abdications != NULL) 3141541Srgrimes counter_u64_free(r->abdications); 3151541Srgrimes 3161541Srgrimes free(r, r->mt); 3171541Srgrimes} 3181541Srgrimes 31922614Swollman/* 3201541Srgrimes * Enqueue n items and maybe drain the ring for some time. 3211541Srgrimes * 3221541Srgrimes * Returns an errno. 3231541Srgrimes */ 32472638Sphk#ifdef MP_RING_NO_64BIT_ATOMICS 3251541Srgrimesint 3261541Srgrimesifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 3271541Srgrimes{ 3281541Srgrimes union ring_state os, ns; 3291541Srgrimes uint16_t pidx_start, pidx_stop; 33025201Swollman int i; 3311541Srgrimes 3321541Srgrimes MPASS(items != NULL); 3331541Srgrimes MPASS(n > 0); 3341541Srgrimes 3351541Srgrimes mtx_lock(&r->lock); 3361541Srgrimes /* 3371541Srgrimes * Reserve room for the new items. Our reservation, if successful, is 3381541Srgrimes * from 'pidx_start' to 'pidx_stop'. 3391541Srgrimes */ 3401541Srgrimes os.state = r->state; 3411541Srgrimes if (n >= space_available(r, os)) { 3421541Srgrimes counter_u64_add(r->drops, n); 3431541Srgrimes MPASS(os.flags != IDLE); 3441541Srgrimes mtx_unlock(&r->lock); 3451541Srgrimes if (os.flags == STALLED) 3461541Srgrimes ifmp_ring_check_drainage(r, 0); 3471541Srgrimes return (ENOBUFS); 3481541Srgrimes } 3491541Srgrimes ns.state = os.state; 3501541Srgrimes ns.pidx_head = increment_idx(r, os.pidx_head, n); 3511541Srgrimes r->state = ns.state; 35255205Speter pidx_start = os.pidx_head; 35392719Salfred pidx_stop = ns.pidx_head; 35492719Salfred 35592719Salfred /* 35692719Salfred * Wait for other producers who got in ahead of us to enqueue their 357136692Sandre * items, one producer at a time. It is our turn when the ring's 358136692Sandre * pidx_tail reaches the beginning of our reservation (pidx_start). 3591541Srgrimes */ 3602165Spaul while (ns.pidx_tail != pidx_start) { 3612165Spaul cpu_spinwait(); 362 ns.state = r->state; 363 } 364 365 /* Now it is our turn to fill up the area we reserved earlier. */ 366 i = pidx_start; 367 do { 368 r->items[i] = *items++; 369 if (__predict_false(++i == r->size)) 370 i = 0; 371 } while (i != pidx_stop); 372 373 /* 374 * Update the ring's pidx_tail. The release style atomic guarantees 375 * that the items are visible to any thread that sees the updated pidx. 376 */ 377 os.state = ns.state = r->state; 378 ns.pidx_tail = pidx_stop; 379 ns.flags = BUSY; 380 r->state = ns.state; 381 counter_u64_add(r->enqueues, n); 382 383 /* 384 * Turn into a consumer if some other thread isn't active as a consumer 385 * already. 386 */ 387 if (os.flags != BUSY) 388 drain_ring_locked(r, ns, os.flags, budget); 389 390 mtx_unlock(&r->lock); 391 return (0); 392} 393 394#else 395int 396ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget) 397{ 398 union ring_state os, ns; 399 uint16_t pidx_start, pidx_stop; 400 int i; 401 402 MPASS(items != NULL); 403 MPASS(n > 0); 404 405 /* 406 * Reserve room for the new items. Our reservation, if successful, is 407 * from 'pidx_start' to 'pidx_stop'. 408 */ 409 for (;;) { 410 os.state = r->state; 411 if (n >= space_available(r, os)) { 412 counter_u64_add(r->drops, n); 413 MPASS(os.flags != IDLE); 414 if (os.flags == STALLED) 415 ifmp_ring_check_drainage(r, 0); 416 return (ENOBUFS); 417 } 418 ns.state = os.state; 419 ns.pidx_head = increment_idx(r, os.pidx_head, n); 420 critical_enter(); 421 if (atomic_cmpset_64(&r->state, os.state, ns.state)) 422 break; 423 critical_exit(); 424 cpu_spinwait(); 425 } 426 pidx_start = os.pidx_head; 427 pidx_stop = ns.pidx_head; 428 429 /* 430 * Wait for other producers who got in ahead of us to enqueue their 431 * items, one producer at a time. It is our turn when the ring's 432 * pidx_tail reaches the beginning of our reservation (pidx_start). 433 */ 434 while (ns.pidx_tail != pidx_start) { 435 cpu_spinwait(); 436 ns.state = r->state; 437 } 438 439 /* Now it is our turn to fill up the area we reserved earlier. */ 440 i = pidx_start; 441 do { 442 r->items[i] = *items++; 443 if (__predict_false(++i == r->size)) 444 i = 0; 445 } while (i != pidx_stop); 446 447 /* 448 * Update the ring's pidx_tail. The release style atomic guarantees 449 * that the items are visible to any thread that sees the updated pidx. 450 */ 451 do { 452 os.state = ns.state = r->state; 453 ns.pidx_tail = pidx_stop; 454 if (os.flags == IDLE) 455 ns.flags = ABDICATED; 456 } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0); 457 critical_exit(); 458 counter_u64_add(r->enqueues, n); 459 460 return (0); 461} 462#endif 463 464void 465ifmp_ring_check_drainage(struct ifmp_ring *r, int budget) 466{ 467 union ring_state os, ns; 468 469 os.state = r->state; 470 if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED 471 os.pidx_head != os.pidx_tail || // Require work to be available 472 (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left 473 return; 474 475 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */ 476 ns.state = os.state; 477 ns.flags = BUSY; 478 479 480#ifdef MP_RING_NO_64BIT_ATOMICS 481 mtx_lock(&r->lock); 482 if (r->state != os.state) { 483 mtx_unlock(&r->lock); 484 return; 485 } 486 r->state = ns.state; 487 drain_ring_locked(r, ns, os.flags, budget); 488 mtx_unlock(&r->lock); 489#else 490 /* 491 * The acquire style atomic guarantees visibility of items associated 492 * with the pidx that we read here. 493 */ 494 if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state)) 495 return; 496 497 498 drain_ring_lockless(r, ns, os.flags, budget); 499#endif 500} 501 502void 503ifmp_ring_reset_stats(struct ifmp_ring *r) 504{ 505 506 counter_u64_zero(r->enqueues); 507 counter_u64_zero(r->drops); 508 counter_u64_zero(r->starts); 509 counter_u64_zero(r->stalls); 510 counter_u64_zero(r->restarts); 511 counter_u64_zero(r->abdications); 512} 513 514int 515ifmp_ring_is_idle(struct ifmp_ring *r) 516{ 517 union ring_state s; 518 519 s.state = r->state; 520 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx && 521 s.flags == IDLE) 522 return (1); 523 524 return (0); 525} 526 527int 528ifmp_ring_is_stalled(struct ifmp_ring *r) 529{ 530 union ring_state s; 531 532 s.state = r->state; 533 if (s.pidx_head == s.pidx_tail && s.flags == STALLED) 534 return (1); 535 536 return (0); 537} 538