1from PyObjCTools.TestSupport import *
2from CoreFoundation import *
3import CoreFoundation
4
5try:
6    unicode
7except NameError:
8    unicode = str
9
10
11try:
12    long
13except NameError:
14    long = int
15
16class TestRunLoop (TestCase):
17
18    def testTypes(self):
19        self.assertIsCFType(CFRunLoopRef)
20        self.assertIsCFType(CFRunLoopSourceRef)
21        self.assertIsCFType(CFRunLoopObserverRef)
22        self.assertIsCFType(CFRunLoopTimerRef)
23
24    def testConstants(self):
25        self.assertEqual(kCFRunLoopRunFinished , 1)
26        self.assertEqual(kCFRunLoopRunStopped , 2)
27        self.assertEqual(kCFRunLoopRunTimedOut , 3)
28        self.assertEqual(kCFRunLoopRunHandledSource , 4)
29        self.assertEqual(kCFRunLoopEntry , (1 << 0))
30        self.assertEqual(kCFRunLoopBeforeTimers , (1 << 1))
31        self.assertEqual(kCFRunLoopBeforeSources , (1 << 2))
32        self.assertEqual(kCFRunLoopBeforeWaiting , (1 << 5))
33        self.assertEqual(kCFRunLoopAfterWaiting , (1 << 6))
34        self.assertEqual(kCFRunLoopExit , (1 << 7))
35        self.assertEqual(kCFRunLoopAllActivities , 0x0FFFFFFF)
36        self.assertIsInstance(kCFRunLoopDefaultMode, unicode)
37        self.assertIsInstance(kCFRunLoopCommonModes, unicode)
38
39    def testGetTypeID(self):
40        self.assertIsInstance(CFRunLoopGetTypeID(), (int, long))
41        self.assertIsInstance(CFRunLoopSourceGetTypeID(), (int, long))
42        self.assertIsInstance(CFRunLoopObserverGetTypeID(), (int, long))
43        self.assertIsInstance(CFRunLoopTimerGetTypeID(), (int, long))
44
45    def testRunloop(self):
46        runloop_mode = kCFRunLoopDefaultMode
47        runloop_mode = "pyobjctest.cfrunloop"
48
49        loop = CFRunLoopGetCurrent()
50        self.assertIsInstance(loop, CFRunLoopRef)
51        loop = CFRunLoopGetMain()
52        self.assertIsInstance(loop, CFRunLoopRef)
53        mode = CFRunLoopCopyCurrentMode(loop)
54        if mode is not None:
55            self.assertIsInstance(mode, unicode)
56        self.assertResultIsCFRetained(CFRunLoopCopyAllModes)
57        allmodes = CFRunLoopCopyAllModes(loop)
58        self.assertIsInstance(allmodes, CFArrayRef)
59        self.assertNotEqual(len(allmodes) , 0)
60        for mode in allmodes:
61            self.assertIsInstance(mode, unicode)
62        CFRunLoopAddCommonMode(loop, "pyobjctest")
63        allmodes = CFRunLoopCopyAllModes(loop)
64
65        tm = CFRunLoopGetNextTimerFireDate(loop, runloop_mode)
66        self.assertIsInstance(tm, float)
67        b = CFRunLoopIsWaiting(loop)
68        self.assertIsInstance(b, bool)
69        CFRunLoopWakeUp(loop)
70        CFRunLoopStop(loop)
71
72        res = CFRunLoopRunInMode("mode", 2.0, True)
73        self.assertIsInstance(res, (int, long))
74        self.assertEqual(res, kCFRunLoopRunFinished)
75
76        # CFRunLoopRun is hard to test reliably
77        self.assertHasAttr(CoreFoundation, 'CFRunLoopRun')
78
79    def testObserver(self):
80        runloop_mode = kCFRunLoopDefaultMode
81        runloop_mode = "pyobjctest.cfrunloop"
82
83        rl = CFRunLoopGetCurrent()
84
85        data = {}
86        state = []
87        def callback(observer, activity, info):
88            state.append((observer, activity, info))
89
90        observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit,
91                True, 4, callback, data)
92        self.assertIsInstance(observer, CFRunLoopObserverRef)
93        ctx = CFRunLoopObserverGetContext(observer, None)
94        self.assertIs(ctx, data)
95        self.assertEqual(CFRunLoopObserverGetActivities(observer), kCFRunLoopEntry|kCFRunLoopExit)
96        self.assertIs(CFRunLoopObserverDoesRepeat(observer), True)
97        self.assertEqual(CFRunLoopObserverGetOrder(observer), 4)
98        self.assertIs(CFRunLoopObserverIsValid(observer), True)
99        CFRunLoopObserverInvalidate(observer)
100        self.assertIs(CFRunLoopObserverIsValid(observer), False)
101        ctx = CFRunLoopObserverGetContext(observer, None)
102        self.assertIs(ctx, objc.NULL)
103        observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit,
104                True, 4, callback, data)
105        self.assertIsInstance(observer, CFRunLoopObserverRef)
106        self.assertIs(CFRunLoopContainsObserver(rl, observer, runloop_mode), False)
107        CFRunLoopAddObserver(rl, observer, runloop_mode)
108        self.assertIs(CFRunLoopContainsObserver(rl, observer, runloop_mode), True)
109
110        # Use dummy stream to ensure that the runloop actually performs work
111        strval = b'hello world'
112        stream = CFReadStreamCreateWithBytesNoCopy(None,
113                                strval, len(strval), kCFAllocatorNull)
114        self.assertIsInstance(stream, CFReadStreamRef)
115        CFReadStreamScheduleWithRunLoop(stream, rl, runloop_mode)
116        res = CFRunLoopRunInMode(runloop_mode, 1.0, True)
117        CFReadStreamUnscheduleFromRunLoop(stream, rl, runloop_mode)
118
119        self.assertNotEqual(len(state) , 0 )
120        for item in state:
121            self.assertIs(item[0], observer)
122            self.assertIn(item[1], (kCFRunLoopEntry, kCFRunLoopExit))
123            self.assertIs(item[2], data)
124        CFRunLoopRemoveObserver(rl, observer, runloop_mode)
125        self.assertIs(CFRunLoopContainsObserver(rl, observer, runloop_mode), False)
126
127    def testTimer(self):
128        runloop_mode = kCFRunLoopDefaultMode
129        runloop_mode = "pyobjctest.cfrunloop"
130        rl = CFRunLoopGetCurrent()
131
132        state = []
133        data = {}
134        def callback(timer, info):
135            state.append((timer, info))
136
137        timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data)
138
139        r = CFRunLoopTimerGetNextFireDate(timer)
140        self.assertIsInstance(r, float)
141        CFRunLoopTimerSetNextFireDate(timer, r + 2)
142        r2 = CFRunLoopTimerGetNextFireDate(timer)
143        self.assertEqual(int(r2), int(r + 2))
144
145        r = CFRunLoopTimerGetInterval(timer)
146        self.assertEqual(r, 0.5)
147
148        self.assertIs(CFRunLoopTimerGetContext(timer, None), data)
149        self.assertIs(CFRunLoopTimerDoesRepeat(timer), True)
150        self.assertEqual(CFRunLoopTimerGetOrder(timer), 0)
151        self.assertIs(CFRunLoopTimerIsValid(timer), True)
152        CFRunLoopTimerInvalidate(timer)
153        self.assertIs(CFRunLoopTimerIsValid(timer), False)
154        self.assertIs(CFRunLoopTimerGetContext(timer, None), objc.NULL)
155        timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data)
156        self.assertIs(CFRunLoopContainsTimer(rl, timer, runloop_mode), False)
157        CFRunLoopAddTimer(rl, timer, runloop_mode)
158        self.assertIs(CFRunLoopContainsTimer(rl, timer, runloop_mode), True)
159        res = CFRunLoopRunInMode(runloop_mode, 1.3, True)
160
161        CFRunLoopTimerInvalidate(timer)
162        CFRunLoopRemoveTimer(rl, timer, runloop_mode)
163        self.assertIs(CFRunLoopContainsTimer(rl, timer, runloop_mode), False)
164        self.assertFalse(len(state) < 3)
165        for item in state:
166            self.assertIs(item[0], timer)
167            self.assertIs(item[1], data)
168
169    def testSource(self):
170        runloop_mode = kCFRunLoopDefaultMode
171        runloop_mode = "pyobjctest.cfrunloop"
172
173        rl = CFRunLoopGetCurrent()
174
175        state = []
176        data = {}
177        def schedule(info, rl, mode):
178            state.append(['schedule', info, rl, mode])
179        def cancel(info, rl, mode):
180            state.append(['cancel', info, rl, mode])
181        def perform(info):
182            state.append(['perform', info])
183
184        source = CFRunLoopSourceCreate(None, 55,
185                (0, schedule, cancel, perform, data))
186        self.assertIsInstance(source, CFRunLoopSourceRef)
187        ctx = CFRunLoopSourceGetContext(source, None)
188        self.assertIsInstance(ctx, tuple)
189        self.assertEqual(ctx[0], 0)
190        self.assertEqual(ctx[1], schedule)
191        self.assertEqual(ctx[2], cancel)
192        self.assertEqual(ctx[3], perform)
193        self.assertEqual(ctx[4], data)
194
195        self.assertEqual(CFRunLoopSourceGetOrder(source), 55)
196        self.assertIs(CFRunLoopSourceIsValid(source), True)
197        CFRunLoopSourceInvalidate(source)
198        self.assertIs(CFRunLoopSourceIsValid(source), False)
199        source = CFRunLoopSourceCreate(None, 55,
200                (0, schedule, cancel, perform, data))
201        self.assertIsInstance(source, CFRunLoopSourceRef)
202        self.assertIs(CFRunLoopContainsSource(rl, source, runloop_mode), False)
203        CFRunLoopAddSource(rl, source, runloop_mode)
204        self.assertIs(CFRunLoopContainsSource(rl, source, runloop_mode), True)
205        self.assertEqual(len(state) , 1)
206        self.assertEqual(state[0][0] , 'schedule')
207        self.assertIs(state[0][1], data)
208        self.assertIs(state[0][2], rl)
209        self.assertEqual(state[0][3] , runloop_mode)
210        del state[:]
211
212        res = CFRunLoopRunInMode(runloop_mode, 0.5, True)
213        self.assertIsInstance(res, (int, long))
214        #self.assertEqual(res, kCFRunLoopRunTimedOut)
215
216        self.assertEqual(len(state), 0)
217
218        CFRunLoopSourceSignal(source)
219
220        res = CFRunLoopRunInMode(runloop_mode, 0.5, True)
221        self.assertIsInstance(res, (int, long))
222        self.assertEqual(res, kCFRunLoopRunHandledSource)
223
224        self.assertEqual(len(state), 1)
225        self.assertEqual(state[0][0] , 'perform')
226        self.assertIs(state[0][1], data)
227        del state[:]
228
229        CFRunLoopRemoveSource(rl, source, runloop_mode)
230        self.assertIs(CFRunLoopContainsSource(rl, source, runloop_mode), False)
231        self.assertEqual(len(state), 1)
232        self.assertEqual(state[0][0] , 'cancel')
233        self.assertIs(state[0][1], data)
234        self.assertIs(state[0][2], rl)
235        self.assertEqual(state[0][3] , runloop_mode)
236
237    @min_os_level('10.6')
238    def testFunctions10_6(self):
239        self.assertArgIsBlock(CFRunLoopPerformBlock, 2, b'v')
240
241
242        runloop_mode = kCFRunLoopDefaultMode
243        rl = CFRunLoopGetCurrent()
244
245        l = []
246        def doit():
247            l.append(True)
248        CFRunLoopPerformBlock(rl, runloop_mode, doit)
249        res = CFRunLoopRunInMode(runloop_mode, 0.5, True)
250
251        self.assertEqual(l, [True])
252
253    @min_os_level('10.7')
254    def testFunctions10_7(self):
255        self.assertArgIsBOOL(CFRunLoopObserverCreateWithHandler, 2)
256        self.assertArgIsBlock(CFRunLoopObserverCreateWithHandler, 4, b"v^{__CFRunLoopObserver=}" + objc._C_NSUInteger)
257
258        l = []
259        def record(observer, activity):
260            l.append((observer, activity))
261
262
263        ref = CFRunLoopObserverCreateWithHandler(None, kCFRunLoopAllActivities, False, 0, record)
264        self.assertIsInstance(ref, CFRunLoopObserverRef)
265
266        runloop_mode = kCFRunLoopDefaultMode
267        rl = CFRunLoopGetCurrent()
268
269        CFRunLoopAddObserver(rl, ref, runloop_mode)
270        res = CFRunLoopRunInMode(runloop_mode, 0.5, True)
271        CFRunLoopRemoveObserver(rl, ref, runloop_mode)
272
273
274        self.assertNotEqual(l, [])
275        for a, b in l:
276            self.assertEqual(a, ref)
277            self.assertIsInstance(b, (int, long))
278
279
280        self.assertArgIsBlock(CFRunLoopTimerCreateWithHandler, 5, b'v^{__CFRunLoopTimer=}')
281        l = []
282        ref = CFRunLoopTimerCreateWithHandler(None,
283                CFAbsoluteTimeGetCurrent() + 2.9, 0.0, 0, 0, lambda x: l.append(x))
284        self.assertIsInstance(ref, CFRunLoopTimerRef)
285
286        CFRunLoopAddTimer(rl, ref, runloop_mode)
287        res = CFRunLoopRunInMode(runloop_mode, 6.0, True)
288        CFRunLoopRemoveTimer(rl, ref, runloop_mode)
289
290        # XXX: For some reason the timer fails with a full testrun.
291        # See also issue #11 in the pyobjc tracker
292        import __main__
293        if 'setup' in __main__.__file__:
294            return
295
296        self.assertNotEqual(l, [])
297        for a in l:
298            self.assertEqual(a, ref)
299
300
301
302
303if __name__ == "__main__":
304    main()
305