1from PyObjCTools.TestSupport import *
2from CoreFoundation import *
3import errno, time, os, socket, sys
4import contextlib
5
6from .test_cfsocket import onTheNetwork
7
8try:
9    long
10except NameError:
11    long = int
12
13try:
14    unicode
15except NameError:
16    unicode = str
17
18
19class TestStream (TestCase):
20    def testTypes(self):
21        self.assertIsCFType(CFReadStreamRef)
22        self.assertIsCFType(CFWriteStreamRef)
23
24    def testConstants(self):
25        self.assertEqual(kCFStreamStatusNotOpen , 0)
26        self.assertEqual(kCFStreamStatusOpening , 1)
27        self.assertEqual(kCFStreamStatusOpen , 2)
28        self.assertEqual(kCFStreamStatusReading , 3)
29        self.assertEqual(kCFStreamStatusWriting , 4)
30        self.assertEqual(kCFStreamStatusAtEnd , 5)
31        self.assertEqual(kCFStreamStatusClosed , 6)
32        self.assertEqual(kCFStreamStatusError , 7)
33        self.assertEqual(kCFStreamEventNone , 0)
34        self.assertEqual(kCFStreamEventOpenCompleted , 1)
35        self.assertEqual(kCFStreamEventHasBytesAvailable , 2)
36        self.assertEqual(kCFStreamEventCanAcceptBytes , 4)
37        self.assertEqual(kCFStreamEventErrorOccurred , 8)
38        self.assertEqual(kCFStreamEventEndEncountered , 16)
39        self.assertEqual(kCFStreamErrorDomainCustom , -1)
40        self.assertEqual(kCFStreamErrorDomainPOSIX , 1)
41        self.assertEqual(kCFStreamErrorDomainMacOSStatus , 2)
42        self.assertIsInstance(kCFStreamPropertyDataWritten, unicode)
43        self.assertIsInstance(kCFStreamPropertyAppendToFile, unicode)
44        self.assertIsInstance(kCFStreamPropertyFileCurrentOffset, unicode)
45        self.assertIsInstance(kCFStreamPropertySocketNativeHandle, unicode)
46        self.assertIsInstance(kCFStreamPropertySocketRemoteHostName, unicode)
47        self.assertIsInstance(kCFStreamPropertySocketRemotePortNumber, unicode)
48
49    def testStructs(self):
50        o = CFStreamError()
51        self.assertHasAttr(o, 'domain')
52        self.assertHasAttr(o, 'error')
53
54    def testGetTypeID(self):
55        v = CFReadStreamGetTypeID()
56        self.assertIsInstance(v, (int, long))
57        v = CFWriteStreamGetTypeID()
58        self.assertIsInstance(v, (int, long))
59
60    def testReadStream(self):
61        strval = b"hello world"
62        self.assertArgHasType(CFReadStreamCreateWithBytesNoCopy, 1, b'n^v')
63        self.assertArgSizeInArg(CFReadStreamCreateWithBytesNoCopy, 1, 2)
64        stream = CFReadStreamCreateWithBytesNoCopy(None,
65                strval, len(strval), kCFAllocatorNull)
66        self.assertIsInstance(stream, CFReadStreamRef)
67        r, buf = CFReadStreamRead(stream, None, 10)
68        self.assertEqual(r, -1)
69        self.assertEqual(buf, b'')
70
71        self.assertResultIsCFRetained(CFReadStreamCopyError)
72        err  = CFReadStreamCopyError(stream)
73        if err is not None:
74            self.assertIsInstance(err, CFErrorRef)
75        status = CFReadStreamGetStatus(stream)
76        self.assertIsInstance(status, (int, long))
77        self.assertEqual(status, kCFStreamStatusNotOpen)
78
79        self.assertResultIsBOOL(CFReadStreamOpen)
80        r = CFReadStreamOpen(stream)
81        self.assertIs(r, True)
82        status = CFReadStreamGetStatus(stream)
83        self.assertIsInstance(status, (int, long))
84        self.assertEqual(status, kCFStreamStatusOpen)
85
86        self.assertResultIsBOOL(CFReadStreamHasBytesAvailable)
87        r = CFReadStreamHasBytesAvailable(stream)
88        self.assertIs(r, True)
89        self.assertArgHasType(CFReadStreamRead, 1, b'o^v')
90        self.assertArgSizeInArg(CFReadStreamRead, 1, 2)
91        self.assertArgSizeInResult(CFReadStreamRead, 1)
92        r, buf = CFReadStreamRead(stream, None, 5)
93        self.assertEqual(r, 5)
94        self.assertEqual(buf, b"hello")
95
96        r, buf = CFReadStreamRead(stream, None, 10)
97        self.assertEqual(r, 6)
98        self.assertEqual(buf, b" world")
99
100        r = CFReadStreamHasBytesAvailable(stream)
101        self.assertIs(r, False)
102        r = CFReadStreamClose(stream)
103        self.assertIs(r, None)
104        status = CFReadStreamGetStatus(stream)
105        self.assertIsInstance(status, (int, long))
106        self.assertEqual(status, kCFStreamStatusClosed)
107
108
109        del stream
110
111        self.assertResultIsCFRetained(CFReadStreamCreateWithFile)
112        stream = CFReadStreamCreateWithFile(None,
113                    CFURLCreateWithString(None, b"file:///etc/shells".decode('ascii'), None))
114        self.assertIsInstance(stream, CFReadStreamRef)
115        r = CFReadStreamOpen(stream)
116        self.assertIs(r, True)
117        status = CFReadStreamGetStatus(stream)
118        self.assertIsInstance(status, (int, long))
119        self.assertEqual(status, kCFStreamStatusOpen)
120
121        r, buf = CFReadStreamRead(stream, None, 5)
122        self.assertEqual(r, 5)
123        self.assertIsInstance(buf, bytes)
124        self.assertResultSizeInArg(CFReadStreamGetBuffer, 2)
125        self.assertResultHasType(CFReadStreamGetBuffer, b'^v')
126        self.assertArgIsOut(CFReadStreamGetBuffer, 2)
127        buf, numBytes = CFReadStreamGetBuffer(stream, 20, None)
128        if buf is objc.NULL:
129            self.assertEqual(numBytes, 0)
130        else:
131            self.assertIsInstance(buf, str)
132            self.assertEqual(numBytes, len(buf))
133
134        val = CFReadStreamCopyProperty(stream, kCFStreamPropertyFileCurrentOffset)
135        self.assertEqual(val, 5)
136
137        r = CFReadStreamSetProperty(stream, kCFStreamPropertyFileCurrentOffset, 10)
138        self.assertIs(r, True)
139        val = CFReadStreamCopyProperty(stream, kCFStreamPropertyFileCurrentOffset)
140        self.assertEqual(val, 10)
141
142        err = CFReadStreamGetError(stream)
143        self.assertIsInstance(err, CFStreamError)
144        self.assertEqual(err.domain , 0)
145        self.assertEqual(err.error , 0)
146
147    def testWriteStream(self):
148        import array
149        a = array.array('b', b" "*20)
150
151        # XXX: cannot express the actual type as metadata :-(
152        self.assertArgHasType(CFWriteStreamCreateWithBuffer, 1, b'n^v')
153        self.assertArgSizeInArg(CFWriteStreamCreateWithBuffer, 1, 2)
154        stream = CFWriteStreamCreateWithBuffer(None, a, 20)
155        self.assertIsInstance(stream, CFWriteStreamRef)
156        self.assertResultIsBOOL(CFWriteStreamOpen)
157        r = CFWriteStreamOpen(stream)
158        self.assertIs(r, True)
159        status = CFWriteStreamGetStatus(stream)
160        self.assertIsInstance(status, (int, long))
161        self.assertEqual(status, kCFStreamStatusOpen)
162
163        self.assertResultIsBOOL(CFWriteStreamCanAcceptBytes)
164        b = CFWriteStreamCanAcceptBytes(stream)
165        self.assertIs(b, True)
166        self.assertArgHasType(CFWriteStreamWrite, 1, b'n^v')
167        self.assertArgSizeInArg(CFWriteStreamWrite, 1, 2)
168        n = CFWriteStreamWrite(stream, b"0123456789ABCDE", 15)
169        self.assertEqual(n, 15)
170
171        if sys.version_info[0] == 3:
172            self.assertEqual(bytes(a[0:1]), b'0')
173            self.assertEqual(bytes(a[1:2]), b'1')
174            self.assertEqual(bytes(a[9:10]), b'9')
175        else:
176            self.assertEqual((a[0]), ord('0'))
177            self.assertEqual((a[1]), ord('1'))
178            self.assertEqual((a[9]), ord('9'))
179
180        n = CFWriteStreamWrite(stream, b"0123456789ABCDE", 15)
181        self.assertEqual(n, -1)
182
183        err = CFWriteStreamCopyError(stream)
184        self.assertIsInstance(err, CFErrorRef)
185        err = CFWriteStreamGetError(stream)
186        self.assertIsInstance(err, CFStreamError)
187        self.assertEqual(err.domain , kCFStreamErrorDomainPOSIX)
188        self.assertEqual(err.error , errno.ENOMEM)
189        status = CFWriteStreamGetStatus(stream)
190        self.assertIsInstance(status, (int, long))
191        self.assertEqual(status, kCFStreamStatusError)
192
193
194        del stream
195
196
197        self.assertResultIsCFRetained(CFWriteStreamCreateWithAllocatedBuffers)
198        stream = CFWriteStreamCreateWithAllocatedBuffers(None, None)
199        self.assertIsInstance(stream, CFWriteStreamRef)
200        r = CFWriteStreamOpen(stream)
201        self.assertIs(r, True)
202        n = CFWriteStreamWrite(stream, b"0123456789ABCDE", 15)
203        self.assertEqual(n, 15)
204
205        self.assertResultIsCFRetained(CFWriteStreamCopyProperty)
206        buf = CFWriteStreamCopyProperty(stream, kCFStreamPropertyDataWritten)
207        self.assertIsInstance(buf, CFDataRef)
208        buf = CFDataGetBytes(buf, (0, CFDataGetLength(buf)), None)
209        self.assertIsInstance(buf, bytes)
210        self.assertEqual(buf, b'0123456789ABCDE')
211
212        CFWriteStreamClose(stream)
213        status = CFWriteStreamGetStatus(stream)
214        self.assertIsInstance(status, (int, long))
215        self.assertEqual(status, kCFStreamStatusClosed)
216
217        del stream
218
219
220        stream = CFWriteStreamCreateWithFile(None,
221                CFURLCreateWithString(None, b"file:///tmp/pyobjc.test.txt".decode('ascii'), None))
222        self.assertIsInstance(stream, CFWriteStreamRef)
223        r = CFWriteStreamOpen(stream)
224        self.assertIs(r, True)
225        n = CFWriteStreamWrite(stream, b"0123456789ABCDE", 15)
226        self.assertEqual(n, 15)
227
228        self.assertResultIsCFRetained(CFReadStreamCopyProperty)
229        val = CFReadStreamCopyProperty(stream, kCFStreamPropertyFileCurrentOffset)
230        self.assertEqual(val, 15)
231
232        self.assertResultIsBOOL(CFReadStreamSetProperty)
233        r = CFReadStreamSetProperty(stream, kCFStreamPropertyFileCurrentOffset, 10)
234        self.assertIs(r, True)
235        val = CFReadStreamCopyProperty(stream, kCFStreamPropertyFileCurrentOffset)
236        self.assertEqual(val, 10)
237
238        CFWriteStreamClose(stream)
239        status = CFWriteStreamGetStatus(stream)
240        self.assertIsInstance(status, (int, long))
241        self.assertEqual(status, kCFStreamStatusClosed)
242
243        self.assertResultIsBOOL(CFWriteStreamSetProperty)
244        CFWriteStreamSetProperty(stream, kCFStreamPropertyFileCurrentOffset, 0)
245
246        with open('/tmp/pyobjc.test.txt', 'rb') as fp:
247            data = fp.read()
248        self.assertEqual(data, b'0123456789ABCDE')
249        os.unlink('/tmp/pyobjc.test.txt')
250
251    def testStreamPair(self):
252
253        self.assertArgIsOut(CFStreamCreateBoundPair, 1)
254        self.assertArgIsOut(CFStreamCreateBoundPair, 2)
255        readStream, writeStream = CFStreamCreateBoundPair(None, None, None, 1024*1024)
256        self.assertIsInstance(readStream, CFReadStreamRef)
257        self.assertIsInstance(writeStream, CFWriteStreamRef)
258        # Make sure we actually have streams instead of random pointers.
259        status = CFReadStreamGetStatus(readStream)
260        self.assertIsInstance(status, (int, long))
261        self.assertEqual(status, kCFStreamStatusNotOpen)
262
263        status = CFWriteStreamGetStatus(writeStream)
264        self.assertIsInstance(status, (int, long))
265        self.assertEqual(status, kCFStreamStatusNotOpen)
266
267        del readStream, writeStream
268
269    @onlyIf(onTheNetwork)
270    def testSockets(self):
271        with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sd:
272            sd.connect(('www.apple.com', 80))
273
274            self.assertArgIsOut(CFStreamCreatePairWithSocket, 2)
275            self.assertArgIsOut(CFStreamCreatePairWithSocket, 3)
276            readStream, writeStream = CFStreamCreatePairWithSocket(None,
277                    sd.fileno(), None, None)
278
279            status = CFReadStreamGetStatus(readStream)
280            self.assertIsInstance(status, (int, long))
281            self.assertEqual(status, kCFStreamStatusNotOpen)
282
283            status = CFWriteStreamGetStatus(writeStream)
284            self.assertIsInstance(status, (int, long))
285            self.assertEqual(status, kCFStreamStatusNotOpen)
286
287            del readStream, writeStream, sd
288
289        self.assertArgIsOut(CFStreamCreatePairWithSocketToHost, 3)
290        self.assertArgIsOut(CFStreamCreatePairWithSocketToHost, 4)
291        readStream, writeStream = CFStreamCreatePairWithSocketToHost(None,
292                "www.apple.com", 80, None, None)
293
294        status = CFReadStreamGetStatus(readStream)
295        self.assertIsInstance(status, (int, long))
296        self.assertEqual(status, kCFStreamStatusNotOpen)
297
298        status = CFWriteStreamGetStatus(writeStream)
299        self.assertIsInstance(status, (int, long))
300        self.assertEqual(status, kCFStreamStatusNotOpen)
301
302        del readStream, writeStream
303
304
305        # Note: I don't expect anyone to actually use this api, building
306        # struct sockaddr buffers by hand is madness in python.
307        ip = socket.gethostbyname('www.apple.com')
308        ip = map(int, ip.split('.'))
309
310        import struct
311        sockaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 80, *ip)
312
313        if sys.version_info[0] == 3:
314            sockaddr_buffer = sockaddr
315        else:
316            sockaddr_buffer = buffer(sockaddr)
317
318        signature = CFSocketSignature(
319                protocolFamily=socket.AF_INET,
320                socketType=socket.SOCK_STREAM,
321                protocol=0,
322                address=sockaddr_buffer)
323
324        self.assertArgIsOut(CFStreamCreatePairWithPeerSocketSignature, 2)
325        self.assertArgIsOut(CFStreamCreatePairWithPeerSocketSignature, 3)
326        readStream, writeStream = CFStreamCreatePairWithPeerSocketSignature(
327                None, signature, None, None)
328
329        self.assertResultIsCFRetained(CFWriteStreamCopyError)
330        status = CFReadStreamGetStatus(readStream)
331        self.assertIsInstance(status, (int, long))
332        self.assertEqual(status, kCFStreamStatusNotOpen)
333
334        status = CFWriteStreamGetStatus(writeStream)
335        self.assertIsInstance(status, (int, long))
336        self.assertEqual(status, kCFStreamStatusNotOpen)
337
338    def testReadSocketASync(self):
339        rl = CFRunLoopGetCurrent()
340
341        strval = b"hello world"
342        readStream = CFReadStreamCreateWithBytesNoCopy(None,
343                strval, len(strval), kCFAllocatorNull)
344        self.assertIsInstance(readStream, CFReadStreamRef)
345        data = {}
346
347        state = []
348        def callback(stream, kind, info):
349            state.append((stream, kind, info))
350
351        status = CFReadStreamGetStatus(readStream)
352        self.assertIsInstance(status, (int, long))
353        self.assertEqual(status, kCFStreamStatusNotOpen)
354
355        CFReadStreamOpen(readStream)
356
357        ok = CFReadStreamSetClient(readStream,
358                kCFStreamEventHasBytesAvailable | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
359                callback, data)
360        self.assertTrue(ok)
361
362        CFReadStreamScheduleWithRunLoop(readStream, rl, kCFRunLoopDefaultMode)
363        try:
364            CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, True)
365            CFRunLoopWakeUp(rl)
366        finally:
367            CFReadStreamClose(readStream)
368            CFReadStreamUnscheduleFromRunLoop(readStream, rl, kCFRunLoopDefaultMode)
369            ok = CFReadStreamSetClient(readStream,
370                kCFStreamEventHasBytesAvailable | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
371                callback, objc.NULL)
372            self.assertTrue(ok)
373
374        self.assertEqual(len(state) , 1)
375        self.assertIs(state[0][0], readStream)
376        self.assertIs(state[0][2], data)
377        self.assertEqual(state[0][1], kCFStreamEventHasBytesAvailable)
378
379
380    def testWriteSocketAsync(self):
381        rl = CFRunLoopGetCurrent()
382
383        import array
384        a = array.array('b', b" "*20)
385
386        writeStream = CFWriteStreamCreateWithBuffer(None, a, 20)
387        self.assertIsInstance(writeStream, CFWriteStreamRef)
388        r = CFWriteStreamOpen(writeStream)
389        self.assertIs(r, True)
390        data = {}
391        state = []
392        def callback(stream, kind, info):
393            state.append((stream, kind, info))
394
395        ok = CFWriteStreamSetClient(writeStream,
396                kCFStreamEventCanAcceptBytes | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
397                callback, data)
398        self.assertTrue(ok)
399
400        CFWriteStreamScheduleWithRunLoop(writeStream, rl, kCFRunLoopDefaultMode)
401        try:
402            CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, True)
403            CFRunLoopWakeUp(rl)
404        finally:
405            CFWriteStreamClose(writeStream)
406            CFWriteStreamUnscheduleFromRunLoop(writeStream, rl, kCFRunLoopDefaultMode)
407            ok = CFWriteStreamSetClient(writeStream,
408                kCFStreamEventCanAcceptBytes | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
409                callback, objc.NULL)
410            self.assertTrue(ok)
411
412        self.assertEqual(len(state) , 1)
413        self.assertIs(state[0][0], writeStream)
414        self.assertIs(state[0][2], data)
415        self.assertEqual(state[0][1], kCFStreamEventCanAcceptBytes)
416
417if __name__ == "__main__":
418    main()
419