1from PyObjCTools.TestSupport import *
2from CoreFoundation import *
3
4
5class TestRunLoop (TestCase):
6
7    def testTypes(self):
8        self.failUnlessIsCFType(CFRunLoopRef)
9        self.failUnlessIsCFType(CFRunLoopSourceRef)
10        self.failUnlessIsCFType(CFRunLoopObserverRef)
11        self.failUnlessIsCFType(CFRunLoopTimerRef)
12
13    def testConstants(self):
14        self.failUnless(kCFRunLoopRunFinished == 1)
15        self.failUnless(kCFRunLoopRunStopped == 2)
16        self.failUnless(kCFRunLoopRunTimedOut == 3)
17        self.failUnless(kCFRunLoopRunHandledSource == 4)
18
19        self.failUnless(kCFRunLoopEntry == (1 << 0))
20        self.failUnless(kCFRunLoopBeforeTimers == (1 << 1))
21        self.failUnless(kCFRunLoopBeforeSources == (1 << 2))
22        self.failUnless(kCFRunLoopBeforeWaiting == (1 << 5))
23        self.failUnless(kCFRunLoopAfterWaiting == (1 << 6))
24        self.failUnless(kCFRunLoopExit == (1 << 7))
25        self.failUnless(kCFRunLoopAllActivities == 0x0FFFFFFF)
26
27        self.failUnless(isinstance(kCFRunLoopDefaultMode, unicode))
28        self.failUnless(isinstance(kCFRunLoopCommonModes, unicode))
29
30    def testGetTypeID(self):
31        self.failUnless(isinstance(CFRunLoopGetTypeID(), (int, long)))
32        self.failUnless(isinstance(CFRunLoopSourceGetTypeID(), (int, long)))
33        self.failUnless(isinstance(CFRunLoopObserverGetTypeID(), (int, long)))
34        self.failUnless(isinstance(CFRunLoopTimerGetTypeID(), (int, long)))
35
36    def testRunloop(self):
37        loop = CFRunLoopGetCurrent()
38        self.failUnless(isinstance(loop, CFRunLoopRef))
39
40        loop = CFRunLoopGetMain()
41        self.failUnless(isinstance(loop, CFRunLoopRef))
42
43        mode = CFRunLoopCopyCurrentMode(loop)
44        self.failUnless(mode is None or isinstance(mode, unicode))
45
46        self.failUnlessResultIsCFRetained(CFRunLoopCopyAllModes)
47        allmodes = CFRunLoopCopyAllModes(loop)
48        self.failUnless(isinstance(allmodes, CFArrayRef))
49        self.failIf(len(allmodes) == 0)
50        for mode in allmodes:
51            self.failUnless(isinstance(mode, unicode))
52
53        CFRunLoopAddCommonMode(loop, "pyobjctest")
54        allmodes = CFRunLoopCopyAllModes(loop)
55
56        tm = CFRunLoopGetNextTimerFireDate(loop, kCFRunLoopDefaultMode)
57        self.failUnless(isinstance(tm, float))
58
59        b = CFRunLoopIsWaiting(loop)
60        self.failUnless(isinstance(b, bool))
61        CFRunLoopWakeUp(loop)
62        CFRunLoopStop(loop)
63
64        res = CFRunLoopRunInMode("mode", 2.0, True)
65        self.failUnless(isinstance(res, (int, long)))
66        self.assertEquals(res, kCFRunLoopRunFinished)
67
68        # CFRunLoopRun is hard to test reliably
69        self.failUnless(hasattr(CoreFoundation, 'CFRunLoopRun'))
70
71    def testObserver(self):
72
73        rl = CFRunLoopGetCurrent()
74
75        data = {}
76        state = []
77        def callback(observer, activity, info):
78            state.append((observer, activity, info))
79
80        observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit,
81                True, 4, callback, data)
82        self.failUnless(isinstance(observer, CFRunLoopObserverRef))
83        ctx = CFRunLoopObserverGetContext(observer)
84        self.failUnless(ctx is data)
85
86        self.assertEquals(CFRunLoopObserverGetActivities(observer), kCFRunLoopEntry|kCFRunLoopExit)
87        self.failUnless(CFRunLoopObserverDoesRepeat(observer) is True)
88        self.assertEquals(CFRunLoopObserverGetOrder(observer), 4)
89        self.failUnless(CFRunLoopObserverIsValid(observer) is True)
90        CFRunLoopObserverInvalidate(observer)
91        self.failUnless(CFRunLoopObserverIsValid(observer) is False)
92        ctx = CFRunLoopObserverGetContext(observer)
93        self.failUnless(ctx is objc.NULL)
94
95
96        observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit,
97                True, 4, callback, data)
98        self.failUnless(isinstance(observer, CFRunLoopObserverRef))
99
100        self.failUnless (CFRunLoopContainsObserver(rl, observer, kCFRunLoopDefaultMode) is False)
101        CFRunLoopAddObserver(rl, observer, kCFRunLoopDefaultMode)
102        self.failUnless (CFRunLoopContainsObserver(rl, observer, kCFRunLoopDefaultMode) is True)
103
104        # Use dummy stream to ensure that the runloop actually performs work
105        strval = 'hello world'
106        stream = CFReadStreamCreateWithBytesNoCopy(None,
107                                strval, len(strval), kCFAllocatorNull)
108        self.failUnless(isinstance(stream, CFReadStreamRef))
109
110        CFReadStreamScheduleWithRunLoop(stream, rl, kCFRunLoopDefaultMode)
111        res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, True)
112        CFReadStreamUnscheduleFromRunLoop(stream, rl, kCFRunLoopDefaultMode)
113
114        self.failIf( len(state) == 0 )
115        for item in state:
116            self.failUnless(item[0] is observer)
117            self.failUnless(item[1] in (kCFRunLoopEntry, kCFRunLoopExit))
118            self.failUnless(item[2] is data)
119
120
121        CFRunLoopRemoveObserver(rl, observer, kCFRunLoopDefaultMode)
122        self.failUnless (CFRunLoopContainsObserver(rl, observer, kCFRunLoopDefaultMode) is False)
123
124
125    def testTimer(self):
126        rl = CFRunLoopGetCurrent()
127
128        state = []
129        data = {}
130        def callback(timer, info):
131            state.append((timer, info))
132
133        timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data)
134
135        r = CFRunLoopTimerGetNextFireDate(timer)
136        self.failUnless(isinstance(r, float))
137
138        CFRunLoopTimerSetNextFireDate(timer, r + 2)
139        r2 = CFRunLoopTimerGetNextFireDate(timer)
140        self.assertEquals(int(r2), int(r + 2))
141
142        r = CFRunLoopTimerGetInterval(timer)
143        self.assertEquals(r, 0.5)
144
145        self.failUnless(CFRunLoopTimerGetContext(timer) is data)
146        self.failUnless(CFRunLoopTimerDoesRepeat(timer) is True)
147        self.assertEquals(CFRunLoopTimerGetOrder(timer), 0)
148        self.failUnless(CFRunLoopTimerIsValid(timer) is True)
149        CFRunLoopTimerInvalidate(timer)
150        self.failUnless(CFRunLoopTimerIsValid(timer) is False)
151        self.failUnless(CFRunLoopTimerGetContext(timer) is objc.NULL)
152
153
154        timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data)
155        self.failUnless(CFRunLoopContainsTimer(rl, timer, kCFRunLoopDefaultMode) is False)
156        CFRunLoopAddTimer(rl, timer, kCFRunLoopDefaultMode)
157        self.failUnless(CFRunLoopContainsTimer(rl, timer, kCFRunLoopDefaultMode) is True)
158
159        res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 2.0, True)
160
161        CFRunLoopRemoveTimer(rl, timer, kCFRunLoopDefaultMode)
162        self.failUnless(CFRunLoopContainsTimer(rl, timer, kCFRunLoopDefaultMode) is False)
163
164        self.failIf(len(state) < 3)
165        for item in state:
166            self.failUnless(item[0] is timer)
167            self.failUnless(item[1] is data)
168
169    def testSource(self):
170        rl = CFRunLoopGetCurrent()
171
172        state = []
173        data = {}
174        def schedule(info, rl, mode):
175            state.append(['schedule', info, rl, mode])
176        def cancel(info, rl, mode):
177            state.append(['cancel', info, rl, mode])
178        def perform(info):
179            state.append(['perform', info])
180
181        source = CFRunLoopSourceCreate(None, 55,
182                (0, schedule, cancel, perform, data))
183        self.failUnless(isinstance(source, CFRunLoopSourceRef))
184
185        ctx = CFRunLoopSourceGetContext(source, None)
186        self.failUnless(isinstance(ctx, tuple))
187        self.assertEquals(ctx[0], 0)
188        self.assertEquals(ctx[1], schedule)
189        self.assertEquals(ctx[2], cancel)
190        self.assertEquals(ctx[3], perform)
191        self.assertEquals(ctx[4], data)
192
193        self.assertEquals(CFRunLoopSourceGetOrder(source), 55)
194        self.failUnless(CFRunLoopSourceIsValid(source) is True)
195        CFRunLoopSourceInvalidate(source)
196        self.failUnless(CFRunLoopSourceIsValid(source) is False)
197
198        source = CFRunLoopSourceCreate(None, 55,
199                (0, schedule, cancel, perform, data))
200        self.failUnless(isinstance(source, CFRunLoopSourceRef))
201
202        self.failUnless(CFRunLoopContainsSource(rl, source, kCFRunLoopDefaultMode) is False)
203        CFRunLoopAddSource(rl, source, kCFRunLoopDefaultMode)
204        self.failUnless(CFRunLoopContainsSource(rl, source, kCFRunLoopDefaultMode) is True)
205        self.failUnless(len(state) == 1)
206        self.failUnless(state[0][0] == 'schedule')
207        self.failUnless(state[0][1] is data)
208        self.failUnless(state[0][2] is rl)
209        self.failUnless(state[0][3] == kCFRunLoopDefaultMode)
210
211
212        del state[:]
213
214        res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.5, True)
215        self.failUnless(isinstance(res, (int, long)))
216        #self.assertEquals(res, kCFRunLoopRunTimedOut)
217
218        self.assertEquals(len(state), 0)
219
220        CFRunLoopSourceSignal(source)
221
222        res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.5, True)
223        self.failUnless(isinstance(res, (int, long)))
224        self.assertEquals(res, kCFRunLoopRunHandledSource)
225
226        self.assertEquals(len(state), 1)
227        self.failUnless(state[0][0] == 'perform')
228        self.failUnless(state[0][1] is data)
229
230        del state[:]
231
232        CFRunLoopRemoveSource(rl, source, kCFRunLoopDefaultMode)
233        self.failUnless(CFRunLoopContainsSource(rl, source, kCFRunLoopDefaultMode) is False)
234
235        self.assertEquals(len(state), 1)
236        self.failUnless(state[0][0] == 'cancel')
237        self.failUnless(state[0][1] is data)
238        self.failUnless(state[0][2] is rl)
239        self.failUnless(state[0][3] == kCFRunLoopDefaultMode)
240
241if __name__ == "__main__":
242    main()
243