1/*
2 * Ring buffer I/O for sslThroughput test.
3 */
4
5#include "ringBufferIo.h"
6#include <strings.h>
7#include <stdio.h>
8#include <stdlib.h>
9#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacErrors.h>
10
11/* synchronizes multi-threaded access to printf() */
12pthread_mutex_t printfMutex = PTHREAD_MUTEX_INITIALIZER;
13
14/* initialize a RingBuffer */
15void ringBufSetup(
16	RingBuffer *ring,
17	const char *bufName,
18	size_t numElements,
19	size_t bufSize)
20{
21	unsigned dex;
22
23	memset(ring, 0, sizeof(*ring));
24	ring->numElements = numElements;
25	ring->elements = (RingElement *)malloc(sizeof(RingElement) * numElements);
26	memset(ring->elements, 0, sizeof(RingElement) * numElements);
27	for(dex=0; dex<numElements; dex++) {
28		RingElement *elt = &ring->elements[dex];
29		elt->buf = (unsigned char *)malloc(bufSize);
30		elt->capacity = bufSize;
31	}
32	ring->writerDex = 0;
33	ring->readerDex = 0;
34	ring->bufName = bufName;
35}
36
37#define LOG_RING		0
38#define LOG_RING_DUMP	0
39#if		LOG_RING
40
41static void logRingWrite(
42	RingBuffer *ring,
43	size_t written,
44	unsigned dex,
45	void *from)
46{
47	pthread_mutex_lock(&printfMutex);
48	printf("+++ wrote %4u bytes   to %s buf %2u\n",
49		(unsigned)written, ring->bufName, dex);
50	#if LOG_RING_DUMP
51	{
52		unsigned i;
53		unsigned char *cp = (unsigned char *)from;
54
55		for(i=0; i<written; i++) {
56			printf("%02X ", cp[i]);
57			if((i < (written - 1)) && ((i % 16) == 15)) {
58				printf("\n");
59			}
60		}
61		printf("\n");
62	}
63	#endif
64	pthread_mutex_unlock(&printfMutex);
65}
66
67static void logRingRead(
68	RingBuffer *ring,
69	size_t bytesRead,
70	unsigned dex,
71	void *to)
72{
73	pthread_mutex_lock(&printfMutex);
74	printf("--- read  %4u bytes from %s buf %2u\n",
75		(unsigned)bytesRead, ring->bufName, dex);
76	#if LOG_RING_DUMP
77	{
78		unsigned i;
79		unsigned char *cp = (unsigned char *)to;
80
81		for(i=0; i<bytesRead; i++) {
82			printf("%02X ", cp[i]);
83			if((i < (bytesRead - 1)) && ((i % 16) == 15)) {
84				printf("\n");
85			}
86		}
87		printf("\n");
88	}
89	#endif
90	pthread_mutex_unlock(&printfMutex);
91}
92
93static void logRingStall(
94	RingBuffer *ring,
95	char *readerOrWriter,
96	unsigned dex)
97{
98	pthread_mutex_lock(&printfMutex);
99	printf("=== %s stalled on %s buf %u\n",
100		readerOrWriter, ring->bufName, dex);
101	pthread_mutex_unlock(&printfMutex);
102}
103
104static void logRingClose(
105	RingBuffer *ring,
106	char *readerOrWriter)
107{
108	pthread_mutex_lock(&printfMutex);
109	printf("=== %s CLOSED by %s\n",
110		ring->bufName, readerOrWriter);
111	pthread_mutex_unlock(&printfMutex);
112}
113
114static void logRingReset(
115	RingBuffer *ring)
116{
117	pthread_mutex_lock(&printfMutex);
118	printf("=== %s RESET\n", ring->bufName);
119	pthread_mutex_unlock(&printfMutex);
120}
121
122#else	/* LOG_RING */
123#define logRingWrite(r, w, d, t)
124#define logRingRead(r, b, d, t)
125#define logRingStall(r, row, d)
126#define logRingClose(r, row)
127#define logRingReset(r)
128#endif	/* LOG_RING */
129
130void ringBufferReset(
131	RingBuffer *ring)
132{
133	unsigned dex;
134	for(dex=0; dex<ring->numElements; dex++) {
135		RingElement *elt = &ring->elements[dex];
136		elt->validBytes = 0;
137		elt->readOffset = 0;
138	}
139	ring->writerDex = 0;
140	ring->readerDex = 0;
141	ring->closed = false;
142	logRingReset(ring);
143}
144
145/*
146 * The "I/O" callbacks for SecureTransport.
147 * The SSLConnectionRef is a RingBuffers *.
148 */
149OSStatus ringReadFunc(
150	SSLConnectionRef	connRef,
151	void				*data,
152	size_t				*dataLen)	/* IN/OUT */
153{
154	RingBuffer	*ring = ((RingBuffers *)connRef)->rdBuf;
155
156	if(ring->writerDex == ring->readerDex) {
157		if(ring->closed) {
158			/*
159			 * Handle race condition: we saw a stall, then writer filled a
160			 * RingElement and then set closed. Make sure we read the data before
161			 * handling the close event.
162			 */
163			if(ring->writerDex == ring->readerDex) {
164				/* writer closed: ECONNRESET */
165				*dataLen = 0;
166				return errSSLClosedAbort;
167			}
168			/* else proceed to read data */
169		}
170		else {
171			/* read stalled, writer thread is writing to our next element */
172			*dataLen = 0;
173			return errSSLWouldBlock;
174		}
175	}
176
177	unsigned char *outp = (unsigned char *)data;
178	size_t toMove = *dataLen;
179	size_t haveMoved = 0;
180
181	/* we own ring->elements[ring->readerDex] */
182	do {
183		/*
184		 * Read as much data as there is in the buffer, or
185		 * toMove, whichever is less
186		 */
187		RingElement *elt = &ring->elements[ring->readerDex];
188		size_t thisMove = elt->validBytes;
189		if(thisMove > toMove) {
190			thisMove = toMove;
191		}
192		memmove(outp, elt->buf + elt->readOffset, thisMove);
193		logRingRead(ring, thisMove, ring->readerDex, outp);
194		if(thisMove == 0) {
195			/* should never happen */
196			printf("***thisMove 0!\n");
197			return internalComponentErr;
198		}
199		elt->validBytes -= thisMove;
200		elt->readOffset += thisMove;
201		toMove          -= thisMove;
202		haveMoved       += thisMove;
203		outp			+= thisMove;
204
205		if(elt->validBytes == 0) {
206			/*
207			 * End of this buffer - advance to next one and keep going if it's
208			 * not in use
209			 */
210			unsigned nextDex;
211			elt->readOffset = 0;
212			/* increment and wrap must be atomic from the point of
213			 * view of readerDex */
214			nextDex = ring->readerDex + 1;
215			if(nextDex == ring->numElements) {
216				nextDex = 0;
217			}
218			ring->readerDex = nextDex;
219		}
220		if(toMove == 0) {
221			/* caller got what they want */
222			break;
223		}
224		if(ring->readerDex == ring->writerDex) {
225			logRingStall(ring, "reader ", ring->readerDex);
226			/* stalled */
227			break;
228		}
229	} while(toMove);
230
231	OSStatus ortn = noErr;
232	if(haveMoved != *dataLen) {
233		if((haveMoved == 0) && ring->closed) {
234			/* writer closed: ECONNRESET */
235			ortn = errSSLClosedAbort;
236		}
237		else {
238			ortn = errSSLWouldBlock;
239		}
240	}
241	*dataLen = haveMoved;
242	return ortn;
243}
244
245/*
246 * This never returns errSSLWouldBlock - we block (spinning) if
247 * we stall because we run into the reader's element.
248 * Also, each call to this function uses up at least one
249 * RingElement - we don't coalesce multiple writes into one
250 * RingElement.
251 *
252 * On entry, writerDex is the element we're going to write to.
253 * On exit, writerDex is the element we're going to write to next,
254 * and we might stall before we update it as such.
255 */
256OSStatus ringWriteFunc(
257	SSLConnectionRef	connRef,
258	const void			*data,
259	size_t				*dataLen)	/* IN/OUT */
260{
261	RingBuffer	*ring = ((RingBuffers *)connRef)->wrtBuf;
262	unsigned char *inp = (unsigned char *)data;
263	size_t toMove = *dataLen;
264	size_t haveMoved = 0;
265	unsigned nextDex;
266	OSStatus ortn = noErr;
267
268	/* we own ring->elements[ring->writerDex] */
269	do {
270		RingElement *elt = &ring->elements[ring->writerDex];
271		elt->validBytes = 0;
272
273		size_t thisMove = toMove;
274		if(thisMove > elt->capacity) {
275			thisMove = elt->capacity;
276		}
277		memmove(elt->buf, inp, thisMove);
278		logRingWrite(ring, thisMove, ring->writerDex, inp);
279
280		elt->validBytes  = thisMove;
281		toMove          -= thisMove;
282		haveMoved       += thisMove;
283		inp				+= thisMove;
284
285		/* move on to next element, when it becomes available */
286		nextDex = ring->writerDex + 1;
287		if(nextDex == ring->numElements) {
288			nextDex = 0;
289		}
290		if(nextDex == ring->readerDex) {
291			logRingStall(ring, "writer", nextDex);
292			while(nextDex == ring->readerDex) {
293				/* if(ring->closed) {
294					break;
295				} */
296				;
297				/* else stall */
298			}
299		}
300		/* we own nextDex */
301		ring->writerDex = nextDex;
302		if(ring->closed) {
303			break;
304		}
305	} while(toMove);
306	if(ring->closed && (haveMoved == 0)) {
307		/* reader closed socket: EPIPE */
308		ortn = errSSLClosedAbort;
309	}
310	*dataLen = haveMoved;
311	return ortn;
312}
313
314/* close both sides of a RingBuffers */
315void ringBuffersClose(
316	RingBuffers			*rbs)
317{
318	if(rbs == NULL) {
319		return;
320	}
321	if(rbs->rdBuf) {
322		logRingClose(rbs->rdBuf, "reader");
323		rbs->rdBuf->closed = true;
324	}
325	if(rbs->wrtBuf) {
326		logRingClose(rbs->wrtBuf, "writer");
327		rbs->wrtBuf->closed = true;
328	}
329}
330
331