1/* 2 * Ring buffer I/O for sslThroughput test. 3 */ 4 5#include "ringBufferIo.h" 6#include <strings.h> 7#include <stdio.h> 8#include <stdlib.h> 9#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacErrors.h> 10 11/* synchronizes multi-threaded access to printf() */ 12pthread_mutex_t printfMutex = PTHREAD_MUTEX_INITIALIZER; 13 14/* initialize a RingBuffer */ 15void ringBufSetup( 16 RingBuffer *ring, 17 const char *bufName, 18 size_t numElements, 19 size_t bufSize) 20{ 21 unsigned dex; 22 23 memset(ring, 0, sizeof(*ring)); 24 ring->numElements = numElements; 25 ring->elements = (RingElement *)malloc(sizeof(RingElement) * numElements); 26 memset(ring->elements, 0, sizeof(RingElement) * numElements); 27 for(dex=0; dex<numElements; dex++) { 28 RingElement *elt = &ring->elements[dex]; 29 elt->buf = (unsigned char *)malloc(bufSize); 30 elt->capacity = bufSize; 31 } 32 ring->writerDex = 0; 33 ring->readerDex = 0; 34 ring->bufName = bufName; 35} 36 37#define LOG_RING 0 38#define LOG_RING_DUMP 0 39#if LOG_RING 40 41static void logRingWrite( 42 RingBuffer *ring, 43 size_t written, 44 unsigned dex, 45 void *from) 46{ 47 pthread_mutex_lock(&printfMutex); 48 printf("+++ wrote %4u bytes to %s buf %2u\n", 49 (unsigned)written, ring->bufName, dex); 50 #if LOG_RING_DUMP 51 { 52 unsigned i; 53 unsigned char *cp = (unsigned char *)from; 54 55 for(i=0; i<written; i++) { 56 printf("%02X ", cp[i]); 57 if((i < (written - 1)) && ((i % 16) == 15)) { 58 printf("\n"); 59 } 60 } 61 printf("\n"); 62 } 63 #endif 64 pthread_mutex_unlock(&printfMutex); 65} 66 67static void logRingRead( 68 RingBuffer *ring, 69 size_t bytesRead, 70 unsigned dex, 71 void *to) 72{ 73 pthread_mutex_lock(&printfMutex); 74 printf("--- read %4u bytes from %s buf %2u\n", 75 (unsigned)bytesRead, ring->bufName, dex); 76 #if LOG_RING_DUMP 77 { 78 unsigned i; 79 unsigned char *cp = (unsigned char *)to; 80 81 for(i=0; i<bytesRead; i++) { 82 printf("%02X ", cp[i]); 83 if((i < (bytesRead - 1)) && ((i % 16) == 15)) { 84 printf("\n"); 85 } 86 } 87 printf("\n"); 88 } 89 #endif 90 pthread_mutex_unlock(&printfMutex); 91} 92 93static void logRingStall( 94 RingBuffer *ring, 95 char *readerOrWriter, 96 unsigned dex) 97{ 98 pthread_mutex_lock(&printfMutex); 99 printf("=== %s stalled on %s buf %u\n", 100 readerOrWriter, ring->bufName, dex); 101 pthread_mutex_unlock(&printfMutex); 102} 103 104static void logRingClose( 105 RingBuffer *ring, 106 char *readerOrWriter) 107{ 108 pthread_mutex_lock(&printfMutex); 109 printf("=== %s CLOSED by %s\n", 110 ring->bufName, readerOrWriter); 111 pthread_mutex_unlock(&printfMutex); 112} 113 114static void logRingReset( 115 RingBuffer *ring) 116{ 117 pthread_mutex_lock(&printfMutex); 118 printf("=== %s RESET\n", ring->bufName); 119 pthread_mutex_unlock(&printfMutex); 120} 121 122#else /* LOG_RING */ 123#define logRingWrite(r, w, d, t) 124#define logRingRead(r, b, d, t) 125#define logRingStall(r, row, d) 126#define logRingClose(r, row) 127#define logRingReset(r) 128#endif /* LOG_RING */ 129 130void ringBufferReset( 131 RingBuffer *ring) 132{ 133 unsigned dex; 134 for(dex=0; dex<ring->numElements; dex++) { 135 RingElement *elt = &ring->elements[dex]; 136 elt->validBytes = 0; 137 elt->readOffset = 0; 138 } 139 ring->writerDex = 0; 140 ring->readerDex = 0; 141 ring->closed = false; 142 logRingReset(ring); 143} 144 145/* 146 * The "I/O" callbacks for SecureTransport. 147 * The SSLConnectionRef is a RingBuffers *. 148 */ 149OSStatus ringReadFunc( 150 SSLConnectionRef connRef, 151 void *data, 152 size_t *dataLen) /* IN/OUT */ 153{ 154 RingBuffer *ring = ((RingBuffers *)connRef)->rdBuf; 155 156 if(ring->writerDex == ring->readerDex) { 157 if(ring->closed) { 158 /* 159 * Handle race condition: we saw a stall, then writer filled a 160 * RingElement and then set closed. Make sure we read the data before 161 * handling the close event. 162 */ 163 if(ring->writerDex == ring->readerDex) { 164 /* writer closed: ECONNRESET */ 165 *dataLen = 0; 166 return errSSLClosedAbort; 167 } 168 /* else proceed to read data */ 169 } 170 else { 171 /* read stalled, writer thread is writing to our next element */ 172 *dataLen = 0; 173 return errSSLWouldBlock; 174 } 175 } 176 177 unsigned char *outp = (unsigned char *)data; 178 size_t toMove = *dataLen; 179 size_t haveMoved = 0; 180 181 /* we own ring->elements[ring->readerDex] */ 182 do { 183 /* 184 * Read as much data as there is in the buffer, or 185 * toMove, whichever is less 186 */ 187 RingElement *elt = &ring->elements[ring->readerDex]; 188 size_t thisMove = elt->validBytes; 189 if(thisMove > toMove) { 190 thisMove = toMove; 191 } 192 memmove(outp, elt->buf + elt->readOffset, thisMove); 193 logRingRead(ring, thisMove, ring->readerDex, outp); 194 if(thisMove == 0) { 195 /* should never happen */ 196 printf("***thisMove 0!\n"); 197 return internalComponentErr; 198 } 199 elt->validBytes -= thisMove; 200 elt->readOffset += thisMove; 201 toMove -= thisMove; 202 haveMoved += thisMove; 203 outp += thisMove; 204 205 if(elt->validBytes == 0) { 206 /* 207 * End of this buffer - advance to next one and keep going if it's 208 * not in use 209 */ 210 unsigned nextDex; 211 elt->readOffset = 0; 212 /* increment and wrap must be atomic from the point of 213 * view of readerDex */ 214 nextDex = ring->readerDex + 1; 215 if(nextDex == ring->numElements) { 216 nextDex = 0; 217 } 218 ring->readerDex = nextDex; 219 } 220 if(toMove == 0) { 221 /* caller got what they want */ 222 break; 223 } 224 if(ring->readerDex == ring->writerDex) { 225 logRingStall(ring, "reader ", ring->readerDex); 226 /* stalled */ 227 break; 228 } 229 } while(toMove); 230 231 OSStatus ortn = noErr; 232 if(haveMoved != *dataLen) { 233 if((haveMoved == 0) && ring->closed) { 234 /* writer closed: ECONNRESET */ 235 ortn = errSSLClosedAbort; 236 } 237 else { 238 ortn = errSSLWouldBlock; 239 } 240 } 241 *dataLen = haveMoved; 242 return ortn; 243} 244 245/* 246 * This never returns errSSLWouldBlock - we block (spinning) if 247 * we stall because we run into the reader's element. 248 * Also, each call to this function uses up at least one 249 * RingElement - we don't coalesce multiple writes into one 250 * RingElement. 251 * 252 * On entry, writerDex is the element we're going to write to. 253 * On exit, writerDex is the element we're going to write to next, 254 * and we might stall before we update it as such. 255 */ 256OSStatus ringWriteFunc( 257 SSLConnectionRef connRef, 258 const void *data, 259 size_t *dataLen) /* IN/OUT */ 260{ 261 RingBuffer *ring = ((RingBuffers *)connRef)->wrtBuf; 262 unsigned char *inp = (unsigned char *)data; 263 size_t toMove = *dataLen; 264 size_t haveMoved = 0; 265 unsigned nextDex; 266 OSStatus ortn = noErr; 267 268 /* we own ring->elements[ring->writerDex] */ 269 do { 270 RingElement *elt = &ring->elements[ring->writerDex]; 271 elt->validBytes = 0; 272 273 size_t thisMove = toMove; 274 if(thisMove > elt->capacity) { 275 thisMove = elt->capacity; 276 } 277 memmove(elt->buf, inp, thisMove); 278 logRingWrite(ring, thisMove, ring->writerDex, inp); 279 280 elt->validBytes = thisMove; 281 toMove -= thisMove; 282 haveMoved += thisMove; 283 inp += thisMove; 284 285 /* move on to next element, when it becomes available */ 286 nextDex = ring->writerDex + 1; 287 if(nextDex == ring->numElements) { 288 nextDex = 0; 289 } 290 if(nextDex == ring->readerDex) { 291 logRingStall(ring, "writer", nextDex); 292 while(nextDex == ring->readerDex) { 293 /* if(ring->closed) { 294 break; 295 } */ 296 ; 297 /* else stall */ 298 } 299 } 300 /* we own nextDex */ 301 ring->writerDex = nextDex; 302 if(ring->closed) { 303 break; 304 } 305 } while(toMove); 306 if(ring->closed && (haveMoved == 0)) { 307 /* reader closed socket: EPIPE */ 308 ortn = errSSLClosedAbort; 309 } 310 *dataLen = haveMoved; 311 return ortn; 312} 313 314/* close both sides of a RingBuffers */ 315void ringBuffersClose( 316 RingBuffers *rbs) 317{ 318 if(rbs == NULL) { 319 return; 320 } 321 if(rbs->rdBuf) { 322 logRingClose(rbs->rdBuf, "reader"); 323 rbs->rdBuf->closed = true; 324 } 325 if(rbs->wrtBuf) { 326 logRingClose(rbs->wrtBuf, "writer"); 327 rbs->wrtBuf->closed = true; 328 } 329} 330 331