1from PyObjCTools.TestSupport import *
2import socket, time, struct
3from CoreFoundation import *
4import CoreFoundation
5import sys
6
7def onTheNetwork():
8    try:
9        socket.gethostbyname('www.apple.com')
10
11    except socket.gaierror:
12        return False
13
14    return True
15
16
17class TestSocket (TestCase):
18    def testTypes(self):
19        self.assertIsCFType(CFSocketRef)
20
21    def testTypeID(self):
22        self.assertIsInstance(CFSocketGetTypeID(), (int, long))
23    def testConstants(self):
24        self.assertEqual(kCFSocketSuccess , 0)
25        self.assertEqual(kCFSocketError , -1)
26        self.assertEqual(kCFSocketTimeout , -2)
27        self.assertEqual(kCFSocketNoCallBack , 0)
28        self.assertEqual(kCFSocketReadCallBack , 1)
29        self.assertEqual(kCFSocketAcceptCallBack , 2)
30        self.assertEqual(kCFSocketDataCallBack , 3)
31        self.assertEqual(kCFSocketConnectCallBack , 4)
32        self.assertEqual(kCFSocketWriteCallBack , 8)
33        self.assertEqual(kCFSocketAutomaticallyReenableReadCallBack , 1)
34        self.assertEqual(kCFSocketAutomaticallyReenableAcceptCallBack , 2)
35        self.assertEqual(kCFSocketAutomaticallyReenableDataCallBack , 3)
36        self.assertEqual(kCFSocketAutomaticallyReenableWriteCallBack , 8)
37        self.assertEqual(kCFSocketCloseOnInvalidate , 128)
38        self.assertIsInstance(kCFSocketCommandKey, unicode)
39        self.assertIsInstance(kCFSocketNameKey, unicode)
40        self.assertIsInstance(kCFSocketValueKey, unicode)
41        self.assertIsInstance(kCFSocketResultKey, unicode)
42        self.assertIsInstance(kCFSocketErrorKey, unicode)
43        self.assertIsInstance(kCFSocketRegisterCommand, unicode)
44        self.assertIsInstance(kCFSocketRetrieveCommand, unicode)
45        self.assertEqual(kCFSocketLeaveErrors, 64)
46
47
48    def testStructs(self):
49        o = CFSocketSignature()
50        self.assertHasAttr(o, 'protocolFamily')
51        self.assertHasAttr(o, 'socketType')
52        self.assertHasAttr(o, 'protocol')
53        self.assertHasAttr(o, 'address')
54    def testNameRegistry(self):
55        p1 = CFSocketGetDefaultNameRegistryPortNumber()
56        self.assertIsInstance(p1, (int, long))
57        CFSocketSetDefaultNameRegistryPortNumber(p1+1)
58        p2 = CFSocketGetDefaultNameRegistryPortNumber()
59        self.assertIsInstance(p2, (int, long))
60        self.assertEqual(p2, p1+1)
61
62        CFSocketSetDefaultNameRegistryPortNumber(p1)
63
64
65    @onlyIf(onTheNetwork(), "cannot test without internet connection")
66    def testSocketFunctions(self):
67        data = {}
68        state = []
69        def callback(sock, kind, address, data, info):
70            state.append((sock, kind, address, data, info))
71
72        sock = CFSocketCreate(None, socket.AF_INET, socket.SOCK_STREAM, 0,
73                kCFSocketReadCallBack|kCFSocketWriteCallBack,
74                callback, data)
75        self.assertIsInstance(sock, CFSocketRef)
76        localaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 9425, 127, 0, 0, 1)
77        localaddr += b'\0' * 8
78        if sys.version_info[0] == 2:
79            localaddr = buffer(localaddr)
80        err = CFSocketSetAddress(sock, localaddr)
81        self.assertEqual(err, kCFSocketSuccess)
82
83
84        sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
85        sock = CFSocketCreateWithNative(None, sd.fileno(),
86                kCFSocketReadCallBack|kCFSocketWriteCallBack,
87                callback, data)
88        self.assertIsInstance(sock, CFSocketRef)
89        n = CFSocketGetNative(sock)
90        self.assertIsInstance(n, (int, long))
91        self.assertEqual(n, sd.fileno())
92
93
94
95        ctx = CFSocketGetContext(sock, None)
96        self.assertIs(ctx, data)
97        flags = CFSocketGetSocketFlags(sock)
98        self.assertIsInstance(flags, (int, long))
99        CFSocketSetSocketFlags(sock, kCFSocketAutomaticallyReenableReadCallBack|kCFSocketAutomaticallyReenableAcceptCallBack)
100        flags2 = CFSocketGetSocketFlags(sock)
101        self.assertIsInstance(flags2, (int, long))
102        self.assertEqual(flags2, kCFSocketAutomaticallyReenableReadCallBack|kCFSocketAutomaticallyReenableAcceptCallBack)
103
104
105        # Note: I don't expect anyone to actually use this api, building
106        # struct sockaddr buffers by hand is madness in python.
107        ip = socket.gethostbyname('www.apple.com')
108        ip = map(int, ip.split('.'))
109
110        sockaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 80, *ip)
111        sockaddr += b'\0' * 8
112        if sys.version_info[0] == 2:
113            sockaddr = buffer(sockaddr)
114
115        e = CFSocketConnectToAddress(sock, sockaddr, 1.0)
116        self.assertIsInstance(e, (int, long))
117        self.assertEqual(e, kCFSocketSuccess)
118
119
120        self.assertResultIsCFRetained(CFSocketCopyPeerAddress)
121        addr = CFSocketCopyPeerAddress(sock)
122        self.assertIsInstance(addr, CFDataRef)
123        self.assertResultIsCFRetained(CFSocketCopyAddress)
124        addr = CFSocketCopyAddress(sock)
125        self.assertIsInstance(addr, CFDataRef)
126        CFSocketDisableCallBacks(sock, kCFSocketReadCallBack|kCFSocketAcceptCallBack)
127        CFSocketEnableCallBacks(sock, kCFSocketReadCallBack|kCFSocketAcceptCallBack)
128
129        err = CFSocketSendData(sock, None, buffer("GET / HTTP/1.0"), 1.0)
130        self.assertEqual(err, kCFSocketSuccess)
131
132
133
134        ok = CFSocketIsValid(sock)
135        self.assertIs(ok, True)
136        CFSocketInvalidate(sock)
137        self.assertResultIsBOOL(CFSocketIsValid)
138        ok = CFSocketIsValid(sock)
139        self.assertIs(ok, False)
140        localaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 9424, 127, 0, 0, 1)
141        localaddr += '\0' * 8
142        signature = CFSocketSignature(
143                socket.AF_INET,
144                socket.SOCK_STREAM,
145                0,
146                buffer(localaddr))
147
148        sock = CFSocketCreateWithSocketSignature(None, signature,
149                kCFSocketReadCallBack|kCFSocketWriteCallBack,
150                callback, data)
151        self.assertIsInstance(sock, CFSocketRef)
152        signature = CFSocketSignature(
153                socket.AF_INET,
154                socket.SOCK_STREAM,
155                0,
156                buffer(sockaddr))
157        sock = CFSocketCreateConnectedToSocketSignature(None, signature,
158                kCFSocketReadCallBack|kCFSocketWriteCallBack,
159                callback, data, 1.0)
160        self.assertIsInstance(sock, CFSocketRef)
161        self.assertResultIsCFRetained(CFSocketCreateRunLoopSource)
162        src = CFSocketCreateRunLoopSource(None, sock, 0)
163        self.assertIsInstance(src, CFRunLoopSourceRef)
164    def testSocketNameServer(self):
165        # The documentation says:
166        #   Name server functionality is currently inoperable in Mac OS X.
167        #
168        # Therefore these functions are not available from Python
169        self.assertNotHasAttr(CoreFoundation, 'CFSocketCopyRegisteredSocketSignature')
170        self.assertNotHasAttr(CoreFoundation, 'CFSocketCopyRegisteredValue')
171        self.assertNotHasAttr(CoreFoundation, 'CFSocketRegisterSocketSignature')
172        self.assertNotHasAttr(CoreFoundation, 'CFSocketRegisterValue')
173        self.assertNotHasAttr(CoreFoundation, 'CFSocketUnregister')
174
175if __name__ == "__main__":
176    main()
177