1from PyObjCTools.TestSupport import *
2import socket, time, struct
3from CoreFoundation import *
4import CoreFoundation
5
6
7class TestSocket (TestCase):
8    def testTypes(self):
9        self.failUnlessIsCFType(CFSocketRef)
10
11    def testTypeID(self):
12        self.failUnless(isinstance(CFSocketGetTypeID(), (int, long)))
13
14    def testConstants(self):
15        self.failUnless(kCFSocketSuccess == 0)
16        self.failUnless(kCFSocketError == -1)
17        self.failUnless(kCFSocketTimeout == -2)
18
19        self.failUnless(kCFSocketNoCallBack == 0)
20        self.failUnless(kCFSocketReadCallBack == 1)
21        self.failUnless(kCFSocketAcceptCallBack == 2)
22        self.failUnless(kCFSocketDataCallBack == 3)
23        self.failUnless(kCFSocketConnectCallBack == 4)
24        self.failUnless(kCFSocketWriteCallBack == 8)
25
26        self.failUnless(kCFSocketAutomaticallyReenableReadCallBack == 1)
27        self.failUnless(kCFSocketAutomaticallyReenableAcceptCallBack == 2)
28        self.failUnless(kCFSocketAutomaticallyReenableDataCallBack == 3)
29        self.failUnless(kCFSocketAutomaticallyReenableWriteCallBack == 8)
30        self.failUnless(kCFSocketCloseOnInvalidate == 128)
31
32        self.failUnless(isinstance(kCFSocketCommandKey, unicode))
33        self.failUnless(isinstance(kCFSocketNameKey, unicode))
34        self.failUnless(isinstance(kCFSocketValueKey, unicode))
35        self.failUnless(isinstance(kCFSocketResultKey, unicode))
36        self.failUnless(isinstance(kCFSocketErrorKey, unicode))
37        self.failUnless(isinstance(kCFSocketRegisterCommand, unicode))
38        self.failUnless(isinstance(kCFSocketRetrieveCommand, unicode))
39
40
41    def testStructs(self):
42        o = CFSocketSignature()
43        self.failUnless(hasattr(o, 'protocolFamily'))
44        self.failUnless(hasattr(o, 'socketType'))
45        self.failUnless(hasattr(o, 'protocol'))
46        self.failUnless(hasattr(o, 'address'))
47
48    def testNameRegistry(self):
49        p1 = CFSocketGetDefaultNameRegistryPortNumber()
50        self.failUnless(isinstance(p1, (int, long)))
51
52        CFSocketSetDefaultNameRegistryPortNumber(p1+1)
53        p2 = CFSocketGetDefaultNameRegistryPortNumber()
54        self.failUnless(isinstance(p2, (int, long)))
55        self.assertEquals(p2, p1+1)
56
57        CFSocketSetDefaultNameRegistryPortNumber(p1)
58
59
60    def testSocketFunctions(self):
61        data = {}
62        state = []
63        def callback(sock, kind, address, data, info):
64            state.append((sock, kind, address, data, info))
65
66        sock = CFSocketCreate(None, socket.AF_INET, socket.SOCK_STREAM, 0,
67                kCFSocketReadCallBack|kCFSocketWriteCallBack,
68                callback, data)
69        self.failUnless(isinstance(sock, CFSocketRef))
70
71        localaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 9425, 127, 0, 0, 1)
72        localaddr += '\0' * 8
73        err = CFSocketSetAddress(sock, buffer(localaddr))
74        self.assertEquals(err, kCFSocketSuccess)
75
76
77        sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
78        sock = CFSocketCreateWithNative(None, sd.fileno(),
79                kCFSocketReadCallBack|kCFSocketWriteCallBack,
80                callback, data)
81        self.failUnless(isinstance(sock, CFSocketRef))
82
83        n = CFSocketGetNative(sock)
84        self.failUnless(isinstance(n, (int, long)))
85        self.assertEquals(n, sd.fileno())
86
87
88
89        ctx = CFSocketGetContext(sock, None)
90        self.failUnless(ctx is data)
91
92        flags = CFSocketGetSocketFlags(sock)
93        self.failUnless(isinstance(flags, (int, long)))
94
95        CFSocketSetSocketFlags(sock, kCFSocketAutomaticallyReenableReadCallBack|kCFSocketAutomaticallyReenableAcceptCallBack)
96        flags2 = CFSocketGetSocketFlags(sock)
97        self.failUnless(isinstance(flags2, (int, long)))
98        self.assertEquals(flags2, kCFSocketAutomaticallyReenableReadCallBack|kCFSocketAutomaticallyReenableAcceptCallBack)
99
100
101        # Note: I don't expect anyone to actually use this api, building
102        # struct sockaddr buffers by hand is madness in python.
103        ip = socket.gethostbyname('www.apple.com')
104        ip = map(int, ip.split('.'))
105
106        sockaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 80, *ip)
107        sockaddr += '\0' * 8
108
109        e = CFSocketConnectToAddress(sock, buffer(sockaddr), 1.0)
110        self.failUnless(isinstance(e, (int, long)))
111        self.assertEquals(e, kCFSocketSuccess)
112
113
114        self.failUnlessResultIsCFRetained(CFSocketCopyPeerAddress)
115        addr = CFSocketCopyPeerAddress(sock)
116        self.failUnless( isinstance(addr, CFDataRef))
117
118        self.failUnlessResultIsCFRetained(CFSocketCopyAddress)
119        addr = CFSocketCopyAddress(sock)
120        self.failUnless( isinstance(addr, CFDataRef))
121
122
123        CFSocketDisableCallBacks(sock, kCFSocketReadCallBack|kCFSocketAcceptCallBack)
124        CFSocketEnableCallBacks(sock, kCFSocketReadCallBack|kCFSocketAcceptCallBack)
125
126        err = CFSocketSendData(sock, None, buffer("GET / HTTP/1.0"), 1.0)
127        self.assertEquals(err, kCFSocketSuccess)
128
129
130
131        ok = CFSocketIsValid(sock)
132        self.failUnless(ok is True)
133
134        CFSocketInvalidate(sock)
135        self.failUnlessResultIsBOOL(CFSocketIsValid)
136        ok = CFSocketIsValid(sock)
137        self.failUnless(ok is False)
138
139        localaddr = struct.pack('>BBHBBBB', 16, socket.AF_INET, 9424, 127, 0, 0, 1)
140        localaddr += '\0' * 8
141        signature = CFSocketSignature(
142                socket.AF_INET,
143                socket.SOCK_STREAM,
144                0,
145                buffer(localaddr))
146
147        sock = CFSocketCreateWithSocketSignature(None, signature,
148                kCFSocketReadCallBack|kCFSocketWriteCallBack,
149                callback, data)
150        self.failUnless(isinstance(sock, CFSocketRef))
151
152        signature = CFSocketSignature(
153                socket.AF_INET,
154                socket.SOCK_STREAM,
155                0,
156                buffer(sockaddr))
157        sock = CFSocketCreateConnectedToSocketSignature(None, signature,
158                kCFSocketReadCallBack|kCFSocketWriteCallBack,
159                callback, data, 1.0)
160        self.failUnless(isinstance(sock, CFSocketRef))
161
162        self.failUnlessResultIsCFRetained(CFSocketCreateRunLoopSource)
163        src = CFSocketCreateRunLoopSource(None, sock, 0)
164        self.failUnless(isinstance(src, CFRunLoopSourceRef))
165
166
167    def testSocketNameServer(self):
168        # The documentation says:
169        #   Name server functionality is currently inoperable in Mac OS X.
170        #
171        # Therefore these functions are not available from Python
172        self.failIf( hasattr(CoreFoundation, 'CFSocketCopyRegisteredSocketSignature') )
173        self.failIf( hasattr(CoreFoundation, 'CFSocketCopyRegisteredValue') )
174        self.failIf( hasattr(CoreFoundation, 'CFSocketRegisterSocketSignature') )
175        self.failIf( hasattr(CoreFoundation, 'CFSocketRegisterValue') )
176        self.failIf( hasattr(CoreFoundation, 'CFSocketUnregister') )
177
178
179if __name__ == "__main__":
180    main()
181