1from PyObjCTools.TestSupport import * 2from CoreFoundation import * 3import CoreFoundation 4 5try: 6 unicode 7except NameError: 8 unicode = str 9 10 11try: 12 long 13except NameError: 14 long = int 15 16class TestRunLoop (TestCase): 17 18 def testTypes(self): 19 self.assertIsCFType(CFRunLoopRef) 20 self.assertIsCFType(CFRunLoopSourceRef) 21 self.assertIsCFType(CFRunLoopObserverRef) 22 self.assertIsCFType(CFRunLoopTimerRef) 23 24 def testConstants(self): 25 self.assertEqual(kCFRunLoopRunFinished , 1) 26 self.assertEqual(kCFRunLoopRunStopped , 2) 27 self.assertEqual(kCFRunLoopRunTimedOut , 3) 28 self.assertEqual(kCFRunLoopRunHandledSource , 4) 29 self.assertEqual(kCFRunLoopEntry , (1 << 0)) 30 self.assertEqual(kCFRunLoopBeforeTimers , (1 << 1)) 31 self.assertEqual(kCFRunLoopBeforeSources , (1 << 2)) 32 self.assertEqual(kCFRunLoopBeforeWaiting , (1 << 5)) 33 self.assertEqual(kCFRunLoopAfterWaiting , (1 << 6)) 34 self.assertEqual(kCFRunLoopExit , (1 << 7)) 35 self.assertEqual(kCFRunLoopAllActivities , 0x0FFFFFFF) 36 self.assertIsInstance(kCFRunLoopDefaultMode, unicode) 37 self.assertIsInstance(kCFRunLoopCommonModes, unicode) 38 39 def testGetTypeID(self): 40 self.assertIsInstance(CFRunLoopGetTypeID(), (int, long)) 41 self.assertIsInstance(CFRunLoopSourceGetTypeID(), (int, long)) 42 self.assertIsInstance(CFRunLoopObserverGetTypeID(), (int, long)) 43 self.assertIsInstance(CFRunLoopTimerGetTypeID(), (int, long)) 44 45 def testRunloop(self): 46 runloop_mode = kCFRunLoopDefaultMode 47 runloop_mode = "pyobjctest.cfrunloop" 48 49 loop = CFRunLoopGetCurrent() 50 self.assertIsInstance(loop, CFRunLoopRef) 51 loop = CFRunLoopGetMain() 52 self.assertIsInstance(loop, CFRunLoopRef) 53 mode = CFRunLoopCopyCurrentMode(loop) 54 if mode is not None: 55 self.assertIsInstance(mode, unicode) 56 self.assertResultIsCFRetained(CFRunLoopCopyAllModes) 57 allmodes = CFRunLoopCopyAllModes(loop) 58 self.assertIsInstance(allmodes, CFArrayRef) 59 self.assertNotEqual(len(allmodes) , 0) 60 for mode in allmodes: 61 self.assertIsInstance(mode, unicode) 62 CFRunLoopAddCommonMode(loop, "pyobjctest") 63 allmodes = CFRunLoopCopyAllModes(loop) 64 65 tm = CFRunLoopGetNextTimerFireDate(loop, runloop_mode) 66 self.assertIsInstance(tm, float) 67 b = CFRunLoopIsWaiting(loop) 68 self.assertIsInstance(b, bool) 69 CFRunLoopWakeUp(loop) 70 CFRunLoopStop(loop) 71 72 res = CFRunLoopRunInMode("mode", 2.0, True) 73 self.assertIsInstance(res, (int, long)) 74 self.assertEqual(res, kCFRunLoopRunFinished) 75 76 # CFRunLoopRun is hard to test reliably 77 self.assertHasAttr(CoreFoundation, 'CFRunLoopRun') 78 79 def testObserver(self): 80 runloop_mode = kCFRunLoopDefaultMode 81 runloop_mode = "pyobjctest.cfrunloop" 82 83 rl = CFRunLoopGetCurrent() 84 85 data = {} 86 state = [] 87 def callback(observer, activity, info): 88 state.append((observer, activity, info)) 89 90 observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit, 91 True, 4, callback, data) 92 self.assertIsInstance(observer, CFRunLoopObserverRef) 93 ctx = CFRunLoopObserverGetContext(observer, None) 94 self.assertIs(ctx, data) 95 self.assertEqual(CFRunLoopObserverGetActivities(observer), kCFRunLoopEntry|kCFRunLoopExit) 96 self.assertIs(CFRunLoopObserverDoesRepeat(observer), True) 97 self.assertEqual(CFRunLoopObserverGetOrder(observer), 4) 98 self.assertIs(CFRunLoopObserverIsValid(observer), True) 99 CFRunLoopObserverInvalidate(observer) 100 self.assertIs(CFRunLoopObserverIsValid(observer), False) 101 ctx = CFRunLoopObserverGetContext(observer, None) 102 self.assertIs(ctx, objc.NULL) 103 observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit, 104 True, 4, callback, data) 105 self.assertIsInstance(observer, CFRunLoopObserverRef) 106 self.assertIs(CFRunLoopContainsObserver(rl, observer, runloop_mode), False) 107 CFRunLoopAddObserver(rl, observer, runloop_mode) 108 self.assertIs(CFRunLoopContainsObserver(rl, observer, runloop_mode), True) 109 110 # Use dummy stream to ensure that the runloop actually performs work 111 strval = b'hello world' 112 stream = CFReadStreamCreateWithBytesNoCopy(None, 113 strval, len(strval), kCFAllocatorNull) 114 self.assertIsInstance(stream, CFReadStreamRef) 115 CFReadStreamScheduleWithRunLoop(stream, rl, runloop_mode) 116 res = CFRunLoopRunInMode(runloop_mode, 1.0, True) 117 CFReadStreamUnscheduleFromRunLoop(stream, rl, runloop_mode) 118 119 self.assertNotEqual(len(state) , 0 ) 120 for item in state: 121 self.assertIs(item[0], observer) 122 self.assertIn(item[1], (kCFRunLoopEntry, kCFRunLoopExit)) 123 self.assertIs(item[2], data) 124 CFRunLoopRemoveObserver(rl, observer, runloop_mode) 125 self.assertIs(CFRunLoopContainsObserver(rl, observer, runloop_mode), False) 126 127 def testTimer(self): 128 runloop_mode = kCFRunLoopDefaultMode 129 runloop_mode = "pyobjctest.cfrunloop" 130 rl = CFRunLoopGetCurrent() 131 132 state = [] 133 data = {} 134 def callback(timer, info): 135 state.append((timer, info)) 136 137 timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data) 138 139 r = CFRunLoopTimerGetNextFireDate(timer) 140 self.assertIsInstance(r, float) 141 CFRunLoopTimerSetNextFireDate(timer, r + 2) 142 r2 = CFRunLoopTimerGetNextFireDate(timer) 143 self.assertEqual(int(r2), int(r + 2)) 144 145 r = CFRunLoopTimerGetInterval(timer) 146 self.assertEqual(r, 0.5) 147 148 self.assertIs(CFRunLoopTimerGetContext(timer, None), data) 149 self.assertIs(CFRunLoopTimerDoesRepeat(timer), True) 150 self.assertEqual(CFRunLoopTimerGetOrder(timer), 0) 151 self.assertIs(CFRunLoopTimerIsValid(timer), True) 152 CFRunLoopTimerInvalidate(timer) 153 self.assertIs(CFRunLoopTimerIsValid(timer), False) 154 self.assertIs(CFRunLoopTimerGetContext(timer, None), objc.NULL) 155 timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data) 156 self.assertIs(CFRunLoopContainsTimer(rl, timer, runloop_mode), False) 157 CFRunLoopAddTimer(rl, timer, runloop_mode) 158 self.assertIs(CFRunLoopContainsTimer(rl, timer, runloop_mode), True) 159 res = CFRunLoopRunInMode(runloop_mode, 1.3, True) 160 161 CFRunLoopTimerInvalidate(timer) 162 CFRunLoopRemoveTimer(rl, timer, runloop_mode) 163 self.assertIs(CFRunLoopContainsTimer(rl, timer, runloop_mode), False) 164 self.assertFalse(len(state) < 3) 165 for item in state: 166 self.assertIs(item[0], timer) 167 self.assertIs(item[1], data) 168 169 def testSource(self): 170 runloop_mode = kCFRunLoopDefaultMode 171 runloop_mode = "pyobjctest.cfrunloop" 172 173 rl = CFRunLoopGetCurrent() 174 175 state = [] 176 data = {} 177 def schedule(info, rl, mode): 178 state.append(['schedule', info, rl, mode]) 179 def cancel(info, rl, mode): 180 state.append(['cancel', info, rl, mode]) 181 def perform(info): 182 state.append(['perform', info]) 183 184 source = CFRunLoopSourceCreate(None, 55, 185 (0, schedule, cancel, perform, data)) 186 self.assertIsInstance(source, CFRunLoopSourceRef) 187 ctx = CFRunLoopSourceGetContext(source, None) 188 self.assertIsInstance(ctx, tuple) 189 self.assertEqual(ctx[0], 0) 190 self.assertEqual(ctx[1], schedule) 191 self.assertEqual(ctx[2], cancel) 192 self.assertEqual(ctx[3], perform) 193 self.assertEqual(ctx[4], data) 194 195 self.assertEqual(CFRunLoopSourceGetOrder(source), 55) 196 self.assertIs(CFRunLoopSourceIsValid(source), True) 197 CFRunLoopSourceInvalidate(source) 198 self.assertIs(CFRunLoopSourceIsValid(source), False) 199 source = CFRunLoopSourceCreate(None, 55, 200 (0, schedule, cancel, perform, data)) 201 self.assertIsInstance(source, CFRunLoopSourceRef) 202 self.assertIs(CFRunLoopContainsSource(rl, source, runloop_mode), False) 203 CFRunLoopAddSource(rl, source, runloop_mode) 204 self.assertIs(CFRunLoopContainsSource(rl, source, runloop_mode), True) 205 self.assertEqual(len(state) , 1) 206 self.assertEqual(state[0][0] , 'schedule') 207 self.assertIs(state[0][1], data) 208 self.assertIs(state[0][2], rl) 209 self.assertEqual(state[0][3] , runloop_mode) 210 del state[:] 211 212 res = CFRunLoopRunInMode(runloop_mode, 0.5, True) 213 self.assertIsInstance(res, (int, long)) 214 #self.assertEqual(res, kCFRunLoopRunTimedOut) 215 216 self.assertEqual(len(state), 0) 217 218 CFRunLoopSourceSignal(source) 219 220 res = CFRunLoopRunInMode(runloop_mode, 0.5, True) 221 self.assertIsInstance(res, (int, long)) 222 self.assertEqual(res, kCFRunLoopRunHandledSource) 223 224 self.assertEqual(len(state), 1) 225 self.assertEqual(state[0][0] , 'perform') 226 self.assertIs(state[0][1], data) 227 del state[:] 228 229 CFRunLoopRemoveSource(rl, source, runloop_mode) 230 self.assertIs(CFRunLoopContainsSource(rl, source, runloop_mode), False) 231 self.assertEqual(len(state), 1) 232 self.assertEqual(state[0][0] , 'cancel') 233 self.assertIs(state[0][1], data) 234 self.assertIs(state[0][2], rl) 235 self.assertEqual(state[0][3] , runloop_mode) 236 237 @min_os_level('10.6') 238 def testFunctions10_6(self): 239 self.assertArgIsBlock(CFRunLoopPerformBlock, 2, b'v') 240 241 242 runloop_mode = kCFRunLoopDefaultMode 243 rl = CFRunLoopGetCurrent() 244 245 l = [] 246 def doit(): 247 l.append(True) 248 CFRunLoopPerformBlock(rl, runloop_mode, doit) 249 res = CFRunLoopRunInMode(runloop_mode, 0.5, True) 250 251 self.assertEqual(l, [True]) 252 253 @min_os_level('10.7') 254 def testFunctions10_7(self): 255 self.assertArgIsBOOL(CFRunLoopObserverCreateWithHandler, 2) 256 self.assertArgIsBlock(CFRunLoopObserverCreateWithHandler, 4, b"v^{__CFRunLoopObserver=}" + objc._C_NSUInteger) 257 258 l = [] 259 def record(observer, activity): 260 l.append((observer, activity)) 261 262 263 ref = CFRunLoopObserverCreateWithHandler(None, kCFRunLoopAllActivities, False, 0, record) 264 self.assertIsInstance(ref, CFRunLoopObserverRef) 265 266 runloop_mode = kCFRunLoopDefaultMode 267 rl = CFRunLoopGetCurrent() 268 269 CFRunLoopAddObserver(rl, ref, runloop_mode) 270 res = CFRunLoopRunInMode(runloop_mode, 0.5, True) 271 CFRunLoopRemoveObserver(rl, ref, runloop_mode) 272 273 274 self.assertNotEqual(l, []) 275 for a, b in l: 276 self.assertEqual(a, ref) 277 self.assertIsInstance(b, (int, long)) 278 279 280 self.assertArgIsBlock(CFRunLoopTimerCreateWithHandler, 5, b'v^{__CFRunLoopTimer=}') 281 l = [] 282 ref = CFRunLoopTimerCreateWithHandler(None, 283 CFAbsoluteTimeGetCurrent() + 2.9, 0.0, 0, 0, lambda x: l.append(x)) 284 self.assertIsInstance(ref, CFRunLoopTimerRef) 285 286 CFRunLoopAddTimer(rl, ref, runloop_mode) 287 res = CFRunLoopRunInMode(runloop_mode, 6.0, True) 288 CFRunLoopRemoveTimer(rl, ref, runloop_mode) 289 290 # XXX: For some reason the timer fails with a full testrun. 291 # See also issue #11 in the pyobjc tracker 292 import __main__ 293 if 'setup' in __main__.__file__: 294 return 295 296 self.assertNotEqual(l, []) 297 for a in l: 298 self.assertEqual(a, ref) 299 300 301 302 303if __name__ == "__main__": 304 main() 305