buf_ring.h revision 300060
1/*- 2 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: stable/10/sys/sys/buf_ring.h 300060 2016-05-17 15:18:01Z pfg $ 27 * 28 */ 29 30#ifndef _SYS_BUF_RING_H_ 31#define _SYS_BUF_RING_H_ 32 33#include <machine/cpu.h> 34 35#if defined(INVARIANTS) && !defined(DEBUG_BUFRING) 36#define DEBUG_BUFRING 1 37#endif 38 39#ifdef DEBUG_BUFRING 40#include <sys/lock.h> 41#include <sys/mutex.h> 42#endif 43 44struct buf_ring { 45 volatile uint32_t br_prod_head; 46 volatile uint32_t br_prod_tail; 47 int br_prod_size; 48 int br_prod_mask; 49 uint64_t br_drops; 50 volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); 51 volatile uint32_t br_cons_tail; 52 int br_cons_size; 53 int br_cons_mask; 54#ifdef DEBUG_BUFRING 55 struct mtx *br_lock; 56#endif 57 void *br_ring[0] __aligned(CACHE_LINE_SIZE); 58}; 59 60/* 61 * multi-producer safe lock-free ring buffer enqueue 62 * 63 */ 64static __inline int 65buf_ring_enqueue(struct buf_ring *br, void *buf) 66{ 67 uint32_t prod_head, prod_next, cons_tail; 68#ifdef DEBUG_BUFRING 69 int i; 70 for (i = br->br_cons_head; i != br->br_prod_head; 71 i = ((i + 1) & br->br_cons_mask)) 72 if(br->br_ring[i] == buf) 73 panic("buf=%p already enqueue at %d prod=%d cons=%d", 74 buf, i, br->br_prod_tail, br->br_cons_tail); 75#endif 76 critical_enter(); 77 do { 78 prod_head = br->br_prod_head; 79 prod_next = (prod_head + 1) & br->br_prod_mask; 80 cons_tail = br->br_cons_tail; 81 82 if (prod_next == cons_tail) { 83 rmb(); 84 if (prod_head == br->br_prod_head && 85 cons_tail == br->br_cons_tail) { 86 br->br_drops++; 87 critical_exit(); 88 return (ENOBUFS); 89 } 90 continue; 91 } 92 } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); 93#ifdef DEBUG_BUFRING 94 if (br->br_ring[prod_head] != NULL) 95 panic("dangling value in enqueue"); 96#endif 97 br->br_ring[prod_head] = buf; 98 99 /* 100 * If there are other enqueues in progress 101 * that preceded us, we need to wait for them 102 * to complete 103 */ 104 while (br->br_prod_tail != prod_head) 105 cpu_spinwait(); 106 atomic_store_rel_int(&br->br_prod_tail, prod_next); 107 critical_exit(); 108 return (0); 109} 110 111/* 112 * multi-consumer safe dequeue 113 * 114 */ 115static __inline void * 116buf_ring_dequeue_mc(struct buf_ring *br) 117{ 118 uint32_t cons_head, cons_next; 119 void *buf; 120 121 critical_enter(); 122 do { 123 cons_head = br->br_cons_head; 124 cons_next = (cons_head + 1) & br->br_cons_mask; 125 126 if (cons_head == br->br_prod_tail) { 127 critical_exit(); 128 return (NULL); 129 } 130 } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); 131 132 buf = br->br_ring[cons_head]; 133#ifdef DEBUG_BUFRING 134 br->br_ring[cons_head] = NULL; 135#endif 136 /* 137 * If there are other dequeues in progress 138 * that preceded us, we need to wait for them 139 * to complete 140 */ 141 while (br->br_cons_tail != cons_head) 142 cpu_spinwait(); 143 144 atomic_store_rel_int(&br->br_cons_tail, cons_next); 145 critical_exit(); 146 147 return (buf); 148} 149 150/* 151 * single-consumer dequeue 152 * use where dequeue is protected by a lock 153 * e.g. a network driver's tx queue lock 154 */ 155static __inline void * 156buf_ring_dequeue_sc(struct buf_ring *br) 157{ 158 uint32_t cons_head, cons_next, cons_next_next; 159 uint32_t prod_tail; 160 void *buf; 161 162 cons_head = br->br_cons_head; 163 prod_tail = br->br_prod_tail; 164 165 cons_next = (cons_head + 1) & br->br_cons_mask; 166 cons_next_next = (cons_head + 2) & br->br_cons_mask; 167 168 if (cons_head == prod_tail) 169 return (NULL); 170 171#ifdef PREFETCH_DEFINED 172 if (cons_next != prod_tail) { 173 prefetch(br->br_ring[cons_next]); 174 if (cons_next_next != prod_tail) 175 prefetch(br->br_ring[cons_next_next]); 176 } 177#endif 178 br->br_cons_head = cons_next; 179 buf = br->br_ring[cons_head]; 180 181#ifdef DEBUG_BUFRING 182 br->br_ring[cons_head] = NULL; 183 if (!mtx_owned(br->br_lock)) 184 panic("lock not held on single consumer dequeue"); 185 if (br->br_cons_tail != cons_head) 186 panic("inconsistent list cons_tail=%d cons_head=%d", 187 br->br_cons_tail, cons_head); 188#endif 189 br->br_cons_tail = cons_next; 190 return (buf); 191} 192 193/* 194 * single-consumer advance after a peek 195 * use where it is protected by a lock 196 * e.g. a network driver's tx queue lock 197 */ 198static __inline void 199buf_ring_advance_sc(struct buf_ring *br) 200{ 201 uint32_t cons_head, cons_next; 202 uint32_t prod_tail; 203 204 cons_head = br->br_cons_head; 205 prod_tail = br->br_prod_tail; 206 207 cons_next = (cons_head + 1) & br->br_cons_mask; 208 if (cons_head == prod_tail) 209 return; 210 br->br_cons_head = cons_next; 211#ifdef DEBUG_BUFRING 212 br->br_ring[cons_head] = NULL; 213#endif 214 br->br_cons_tail = cons_next; 215} 216 217/* 218 * Used to return a buffer (most likely already there) 219 * to the top od the ring. The caller should *not* 220 * have used any dequeue to pull it out of the ring 221 * but instead should have used the peek() function. 222 * This is normally used where the transmit queue 223 * of a driver is full, and an mubf must be returned. 224 * Most likely whats in the ring-buffer is what 225 * is being put back (since it was not removed), but 226 * sometimes the lower transmit function may have 227 * done a pullup or other function that will have 228 * changed it. As an optimzation we always put it 229 * back (since jhb says the store is probably cheaper), 230 * if we have to do a multi-queue version we will need 231 * the compare and an atomic. 232 */ 233static __inline void 234buf_ring_putback_sc(struct buf_ring *br, void *new) 235{ 236 KASSERT(br->br_cons_head != br->br_prod_tail, 237 ("Buf-Ring has none in putback")) ; 238 br->br_ring[br->br_cons_head] = new; 239} 240 241/* 242 * return a pointer to the first entry in the ring 243 * without modifying it, or NULL if the ring is empty 244 * race-prone if not protected by a lock 245 */ 246static __inline void * 247buf_ring_peek(struct buf_ring *br) 248{ 249 250#ifdef DEBUG_BUFRING 251 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) 252 panic("lock not held on single consumer dequeue"); 253#endif 254 /* 255 * I believe it is safe to not have a memory barrier 256 * here because we control cons and tail is worst case 257 * a lagging indicator so we worst case we might 258 * return NULL immediately after a buffer has been enqueued 259 */ 260 if (br->br_cons_head == br->br_prod_tail) 261 return (NULL); 262 263 return (br->br_ring[br->br_cons_head]); 264} 265 266static __inline int 267buf_ring_full(struct buf_ring *br) 268{ 269 270 return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); 271} 272 273static __inline int 274buf_ring_empty(struct buf_ring *br) 275{ 276 277 return (br->br_cons_head == br->br_prod_tail); 278} 279 280static __inline int 281buf_ring_count(struct buf_ring *br) 282{ 283 284 return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) 285 & br->br_prod_mask); 286} 287 288struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, 289 struct mtx *); 290void buf_ring_free(struct buf_ring *br, struct malloc_type *type); 291 292 293 294#endif 295