1/* $NetBSD: subr_pcq.c,v 1.20 2023/02/24 11:02:27 riastradh Exp $ */ 2 3/*- 4 * Copyright (c) 2009, 2019 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32/* 33 * Lockless producer/consumer queue. 34 * 35 * Summary of the producer algorithm in pcq_put (may run many in 36 * parallel with each other and with a consumer): 37 * 38 * P1. initialize an item 39 * 40 * P2. atomic_cas(&pcq->pcq_pc) loop to advance the producer 41 * pointer, reserving a space at c (fails if not enough space) 42 * 43 * P3. atomic_store_release(&pcq->pcq_items[c], item) to publish 44 * the item in the space it reserved 45 * 46 * Summary of the consumer algorithm in pcq_get (must be serialized by 47 * caller with other consumers, may run in parallel with any number of 48 * producers): 49 * 50 * C1. atomic_load_relaxed(&pcq->pcq_pc) to get the consumer 51 * pointer and a snapshot of the producer pointer, which may 52 * point to null items or point to initialized items (fails if 53 * no space reserved for published items yet) 54 * 55 * C2. atomic_load_consume(&pcq->pcq_items[c]) to get the next 56 * unconsumed but potentially published item (fails if item 57 * not published yet) 58 * 59 * C3. pcq->pcq_items[c] = NULL to consume the next unconsumed but 60 * published item 61 * 62 * C4. membar_producer 63 * 64 * C5. atomic_cas(&pcq->pcq_pc) loop to advance the consumer 65 * pointer 66 * 67 * C6. use the item 68 * 69 * Note that there is a weird bare membar_producer which is not matched 70 * by membar_consumer. This is one of the rare cases of a memory 71 * barrier on one side that is not matched by a memory barrier on 72 * another side, but the ordering works out, with a somewhat more 73 * involved proof. 74 * 75 * Some properties that need to be proved: 76 * 77 * Theorem 1. For pcq_put call that leads into pcq_get: 78 * Initializing item at P1 is dependency-ordered before usage of 79 * item at C6, so items placed by pcq_put can be safely used by 80 * the caller of pcq_get. 81 * 82 * Proof sketch. 83 * 84 * Assume load/store P2 synchronizes with load/store C1 85 * (if not, pcq_get fails in `if (p == c) return NULL'). 86 * 87 * Assume store-release P3 synchronizes with load-consume 88 * C2 (if not, pcq_get fails in `if (item == NULL) return 89 * NULL'). 90 * 91 * Then: 92 * 93 * - P1 is sequenced before store-release P3 94 * - store-release P3 synchronizes with load-consume C2 95 * - load-consume C2 is dependency-ordered before C6 96 * 97 * Hence transitively, P1 is dependency-ordered before C6, 98 * QED. 99 * 100 * Theorem 2. For pcq_get call followed by pcq_put: Nulling out 101 * location at store C3 happens before placing a new item in the 102 * same location at store P3, so items are not lost. 103 * 104 * Proof sketch. 105 * 106 * Assume load/store C5 synchronizes with load/store P2 107 * (otherwise pcq_peek starts over the CAS loop or fails). 108 * 109 * Then: 110 * 111 * - store C3 is sequenced before membar_producer C4 112 * - membar_producer C4 is sequenced before load/store C5 113 * - load/store C5 synchronizes with load/store P2 at &pcq->pcq_pc 114 * - P2 is sequenced before store-release P3 115 * 116 * Hence transitively, store C3 happens before 117 * store-release P3, QED. 118 */ 119 120#include <sys/cdefs.h> 121__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.20 2023/02/24 11:02:27 riastradh Exp $"); 122 123#include <sys/param.h> 124#include <sys/types.h> 125#include <sys/atomic.h> 126#include <sys/kmem.h> 127 128#include <sys/pcq.h> 129 130/* 131 * Internal producer-consumer queue structure. Note: providing a separate 132 * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items. 133 */ 134struct pcq { 135 u_int pcq_nitems; 136 uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)]; 137 volatile uint32_t pcq_pc; 138 uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)]; 139 void * volatile pcq_items[]; 140}; 141 142/* 143 * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc. 144 * Consumer (c) - in the higher 16 bits. 145 * 146 * We have a limitation of 16 bits i.e. 0xffff items in the queue. 147 * The PCQ_MAXLEN constant is set accordingly. 148 */ 149 150static inline void 151pcq_split(uint32_t v, u_int *p, u_int *c) 152{ 153 154 *p = v & 0xffff; 155 *c = v >> 16; 156} 157 158static inline uint32_t 159pcq_combine(u_int p, u_int c) 160{ 161 162 return p | (c << 16); 163} 164 165static inline u_int 166pcq_advance(pcq_t *pcq, u_int pc) 167{ 168 169 if (__predict_false(++pc == pcq->pcq_nitems)) { 170 return 0; 171 } 172 return pc; 173} 174 175/* 176 * pcq_put: place an item at the end of the queue. 177 */ 178bool 179pcq_put(pcq_t *pcq, void *item) 180{ 181 uint32_t v, nv; 182 u_int op, p, c; 183 184 KASSERT(item != NULL); 185 186 do { 187 v = atomic_load_relaxed(&pcq->pcq_pc); 188 pcq_split(v, &op, &c); 189 p = pcq_advance(pcq, op); 190 if (p == c) { 191 /* Queue is full. */ 192 return false; 193 } 194 nv = pcq_combine(p, c); 195 } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v); 196 197 /* 198 * Ensure that the update to pcq_pc is globally visible before the 199 * data item. See pcq_get(). This also ensures that any changes 200 * that the caller made to the data item are globally visible 201 * before we put it onto the list. 202 */ 203 atomic_store_release(&pcq->pcq_items[op], item); 204 205 /* 206 * Synchronization activity to wake up the consumer will ensure 207 * that the update to pcq_items[] is visible before the wakeup 208 * arrives. So, we do not need an additional memory barrier here. 209 */ 210 return true; 211} 212 213/* 214 * pcq_peek: return the next item from the queue without removal. 215 */ 216void * 217pcq_peek(pcq_t *pcq) 218{ 219 const uint32_t v = atomic_load_relaxed(&pcq->pcq_pc); 220 u_int p, c; 221 222 pcq_split(v, &p, &c); 223 224 /* See comment on race below in pcq_get(). */ 225 return (p == c) ? NULL : atomic_load_consume(&pcq->pcq_items[c]); 226} 227 228/* 229 * pcq_get: remove and return the next item for consumption or NULL if empty. 230 * 231 * => The caller must prevent concurrent gets from occurring. 232 */ 233void * 234pcq_get(pcq_t *pcq) 235{ 236 uint32_t v, nv; 237 u_int p, c; 238 void *item; 239 240 v = atomic_load_relaxed(&pcq->pcq_pc); 241 pcq_split(v, &p, &c); 242 if (p == c) { 243 /* Queue is empty: nothing to return. */ 244 return NULL; 245 } 246 item = atomic_load_consume(&pcq->pcq_items[c]); 247 if (item == NULL) { 248 /* 249 * Raced with sender: we rely on a notification (e.g. softint 250 * or wakeup) being generated after the producer's pcq_put(), 251 * causing us to retry pcq_get() later. 252 */ 253 return NULL; 254 } 255 /* 256 * We have exclusive access to this slot, so no need for 257 * atomic_store_*. 258 */ 259 pcq->pcq_items[c] = NULL; 260 c = pcq_advance(pcq, c); 261 nv = pcq_combine(p, c); 262 263 /* 264 * Ensure that update to pcq_items[c] becomes globally visible 265 * before the update to pcq_pc. If it were reordered to occur 266 * after it, we could in theory wipe out a modification made 267 * to pcq_items[c] by pcq_put(). 268 * 269 * No need for load-before-store ordering of membar_release 270 * because the only load we need to ensure happens first is the 271 * load of pcq->pcq_items[c], but that necessarily happens 272 * before the store to pcq->pcq_items[c] to null it out because 273 * it is at the same memory location. Yes, this is a bare 274 * membar_producer with no matching membar_consumer. 275 */ 276 membar_producer(); 277 while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) { 278 v = atomic_load_relaxed(&pcq->pcq_pc); 279 pcq_split(v, &p, &c); 280 c = pcq_advance(pcq, c); 281 nv = pcq_combine(p, c); 282 } 283 return item; 284} 285 286pcq_t * 287pcq_create(size_t nitems, km_flag_t kmflags) 288{ 289 pcq_t *pcq; 290 291 KASSERT(nitems > 0); 292 KASSERT(nitems <= PCQ_MAXLEN); 293 294 pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags); 295 if (pcq != NULL) { 296 pcq->pcq_nitems = nitems; 297 } 298 return pcq; 299} 300 301void 302pcq_destroy(pcq_t *pcq) 303{ 304 305 kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems])); 306} 307 308size_t 309pcq_maxitems(pcq_t *pcq) 310{ 311 312 return pcq->pcq_nitems; 313} 314