1from PyObjCTools.TestSupport import * 2from CoreFoundation import * 3 4 5class TestRunLoop (TestCase): 6 7 def testTypes(self): 8 self.failUnlessIsCFType(CFRunLoopRef) 9 self.failUnlessIsCFType(CFRunLoopSourceRef) 10 self.failUnlessIsCFType(CFRunLoopObserverRef) 11 self.failUnlessIsCFType(CFRunLoopTimerRef) 12 13 def testConstants(self): 14 self.failUnless(kCFRunLoopRunFinished == 1) 15 self.failUnless(kCFRunLoopRunStopped == 2) 16 self.failUnless(kCFRunLoopRunTimedOut == 3) 17 self.failUnless(kCFRunLoopRunHandledSource == 4) 18 19 self.failUnless(kCFRunLoopEntry == (1 << 0)) 20 self.failUnless(kCFRunLoopBeforeTimers == (1 << 1)) 21 self.failUnless(kCFRunLoopBeforeSources == (1 << 2)) 22 self.failUnless(kCFRunLoopBeforeWaiting == (1 << 5)) 23 self.failUnless(kCFRunLoopAfterWaiting == (1 << 6)) 24 self.failUnless(kCFRunLoopExit == (1 << 7)) 25 self.failUnless(kCFRunLoopAllActivities == 0x0FFFFFFF) 26 27 self.failUnless(isinstance(kCFRunLoopDefaultMode, unicode)) 28 self.failUnless(isinstance(kCFRunLoopCommonModes, unicode)) 29 30 def testGetTypeID(self): 31 self.failUnless(isinstance(CFRunLoopGetTypeID(), (int, long))) 32 self.failUnless(isinstance(CFRunLoopSourceGetTypeID(), (int, long))) 33 self.failUnless(isinstance(CFRunLoopObserverGetTypeID(), (int, long))) 34 self.failUnless(isinstance(CFRunLoopTimerGetTypeID(), (int, long))) 35 36 def testRunloop(self): 37 loop = CFRunLoopGetCurrent() 38 self.failUnless(isinstance(loop, CFRunLoopRef)) 39 40 loop = CFRunLoopGetMain() 41 self.failUnless(isinstance(loop, CFRunLoopRef)) 42 43 mode = CFRunLoopCopyCurrentMode(loop) 44 self.failUnless(mode is None or isinstance(mode, unicode)) 45 46 self.failUnlessResultIsCFRetained(CFRunLoopCopyAllModes) 47 allmodes = CFRunLoopCopyAllModes(loop) 48 self.failUnless(isinstance(allmodes, CFArrayRef)) 49 self.failIf(len(allmodes) == 0) 50 for mode in allmodes: 51 self.failUnless(isinstance(mode, unicode)) 52 53 CFRunLoopAddCommonMode(loop, "pyobjctest") 54 allmodes = CFRunLoopCopyAllModes(loop) 55 56 tm = CFRunLoopGetNextTimerFireDate(loop, kCFRunLoopDefaultMode) 57 self.failUnless(isinstance(tm, float)) 58 59 b = CFRunLoopIsWaiting(loop) 60 self.failUnless(isinstance(b, bool)) 61 CFRunLoopWakeUp(loop) 62 CFRunLoopStop(loop) 63 64 res = CFRunLoopRunInMode("mode", 2.0, True) 65 self.failUnless(isinstance(res, (int, long))) 66 self.assertEquals(res, kCFRunLoopRunFinished) 67 68 # CFRunLoopRun is hard to test reliably 69 self.failUnless(hasattr(CoreFoundation, 'CFRunLoopRun')) 70 71 def testObserver(self): 72 73 rl = CFRunLoopGetCurrent() 74 75 data = {} 76 state = [] 77 def callback(observer, activity, info): 78 state.append((observer, activity, info)) 79 80 observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit, 81 True, 4, callback, data) 82 self.failUnless(isinstance(observer, CFRunLoopObserverRef)) 83 ctx = CFRunLoopObserverGetContext(observer) 84 self.failUnless(ctx is data) 85 86 self.assertEquals(CFRunLoopObserverGetActivities(observer), kCFRunLoopEntry|kCFRunLoopExit) 87 self.failUnless(CFRunLoopObserverDoesRepeat(observer) is True) 88 self.assertEquals(CFRunLoopObserverGetOrder(observer), 4) 89 self.failUnless(CFRunLoopObserverIsValid(observer) is True) 90 CFRunLoopObserverInvalidate(observer) 91 self.failUnless(CFRunLoopObserverIsValid(observer) is False) 92 ctx = CFRunLoopObserverGetContext(observer) 93 self.failUnless(ctx is objc.NULL) 94 95 96 observer = CFRunLoopObserverCreate(None, kCFRunLoopEntry|kCFRunLoopExit, 97 True, 4, callback, data) 98 self.failUnless(isinstance(observer, CFRunLoopObserverRef)) 99 100 self.failUnless (CFRunLoopContainsObserver(rl, observer, kCFRunLoopDefaultMode) is False) 101 CFRunLoopAddObserver(rl, observer, kCFRunLoopDefaultMode) 102 self.failUnless (CFRunLoopContainsObserver(rl, observer, kCFRunLoopDefaultMode) is True) 103 104 # Use dummy stream to ensure that the runloop actually performs work 105 strval = 'hello world' 106 stream = CFReadStreamCreateWithBytesNoCopy(None, 107 strval, len(strval), kCFAllocatorNull) 108 self.failUnless(isinstance(stream, CFReadStreamRef)) 109 110 CFReadStreamScheduleWithRunLoop(stream, rl, kCFRunLoopDefaultMode) 111 res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, True) 112 CFReadStreamUnscheduleFromRunLoop(stream, rl, kCFRunLoopDefaultMode) 113 114 self.failIf( len(state) == 0 ) 115 for item in state: 116 self.failUnless(item[0] is observer) 117 self.failUnless(item[1] in (kCFRunLoopEntry, kCFRunLoopExit)) 118 self.failUnless(item[2] is data) 119 120 121 CFRunLoopRemoveObserver(rl, observer, kCFRunLoopDefaultMode) 122 self.failUnless (CFRunLoopContainsObserver(rl, observer, kCFRunLoopDefaultMode) is False) 123 124 125 def testTimer(self): 126 rl = CFRunLoopGetCurrent() 127 128 state = [] 129 data = {} 130 def callback(timer, info): 131 state.append((timer, info)) 132 133 timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data) 134 135 r = CFRunLoopTimerGetNextFireDate(timer) 136 self.failUnless(isinstance(r, float)) 137 138 CFRunLoopTimerSetNextFireDate(timer, r + 2) 139 r2 = CFRunLoopTimerGetNextFireDate(timer) 140 self.assertEquals(int(r2), int(r + 2)) 141 142 r = CFRunLoopTimerGetInterval(timer) 143 self.assertEquals(r, 0.5) 144 145 self.failUnless(CFRunLoopTimerGetContext(timer) is data) 146 self.failUnless(CFRunLoopTimerDoesRepeat(timer) is True) 147 self.assertEquals(CFRunLoopTimerGetOrder(timer), 0) 148 self.failUnless(CFRunLoopTimerIsValid(timer) is True) 149 CFRunLoopTimerInvalidate(timer) 150 self.failUnless(CFRunLoopTimerIsValid(timer) is False) 151 self.failUnless(CFRunLoopTimerGetContext(timer) is objc.NULL) 152 153 154 timer = CFRunLoopTimerCreate(None, 0, 0.5, 0, 0, callback, data) 155 self.failUnless(CFRunLoopContainsTimer(rl, timer, kCFRunLoopDefaultMode) is False) 156 CFRunLoopAddTimer(rl, timer, kCFRunLoopDefaultMode) 157 self.failUnless(CFRunLoopContainsTimer(rl, timer, kCFRunLoopDefaultMode) is True) 158 159 res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 2.0, True) 160 161 CFRunLoopRemoveTimer(rl, timer, kCFRunLoopDefaultMode) 162 self.failUnless(CFRunLoopContainsTimer(rl, timer, kCFRunLoopDefaultMode) is False) 163 164 self.failIf(len(state) < 3) 165 for item in state: 166 self.failUnless(item[0] is timer) 167 self.failUnless(item[1] is data) 168 169 def testSource(self): 170 rl = CFRunLoopGetCurrent() 171 172 state = [] 173 data = {} 174 def schedule(info, rl, mode): 175 state.append(['schedule', info, rl, mode]) 176 def cancel(info, rl, mode): 177 state.append(['cancel', info, rl, mode]) 178 def perform(info): 179 state.append(['perform', info]) 180 181 source = CFRunLoopSourceCreate(None, 55, 182 (0, schedule, cancel, perform, data)) 183 self.failUnless(isinstance(source, CFRunLoopSourceRef)) 184 185 ctx = CFRunLoopSourceGetContext(source, None) 186 self.failUnless(isinstance(ctx, tuple)) 187 self.assertEquals(ctx[0], 0) 188 self.assertEquals(ctx[1], schedule) 189 self.assertEquals(ctx[2], cancel) 190 self.assertEquals(ctx[3], perform) 191 self.assertEquals(ctx[4], data) 192 193 self.assertEquals(CFRunLoopSourceGetOrder(source), 55) 194 self.failUnless(CFRunLoopSourceIsValid(source) is True) 195 CFRunLoopSourceInvalidate(source) 196 self.failUnless(CFRunLoopSourceIsValid(source) is False) 197 198 source = CFRunLoopSourceCreate(None, 55, 199 (0, schedule, cancel, perform, data)) 200 self.failUnless(isinstance(source, CFRunLoopSourceRef)) 201 202 self.failUnless(CFRunLoopContainsSource(rl, source, kCFRunLoopDefaultMode) is False) 203 CFRunLoopAddSource(rl, source, kCFRunLoopDefaultMode) 204 self.failUnless(CFRunLoopContainsSource(rl, source, kCFRunLoopDefaultMode) is True) 205 self.failUnless(len(state) == 1) 206 self.failUnless(state[0][0] == 'schedule') 207 self.failUnless(state[0][1] is data) 208 self.failUnless(state[0][2] is rl) 209 self.failUnless(state[0][3] == kCFRunLoopDefaultMode) 210 211 212 del state[:] 213 214 res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.5, True) 215 self.failUnless(isinstance(res, (int, long))) 216 #self.assertEquals(res, kCFRunLoopRunTimedOut) 217 218 self.assertEquals(len(state), 0) 219 220 CFRunLoopSourceSignal(source) 221 222 res = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.5, True) 223 self.failUnless(isinstance(res, (int, long))) 224 self.assertEquals(res, kCFRunLoopRunHandledSource) 225 226 self.assertEquals(len(state), 1) 227 self.failUnless(state[0][0] == 'perform') 228 self.failUnless(state[0][1] is data) 229 230 del state[:] 231 232 CFRunLoopRemoveSource(rl, source, kCFRunLoopDefaultMode) 233 self.failUnless(CFRunLoopContainsSource(rl, source, kCFRunLoopDefaultMode) is False) 234 235 self.assertEquals(len(state), 1) 236 self.failUnless(state[0][0] == 'cancel') 237 self.failUnless(state[0][1] is data) 238 self.failUnless(state[0][2] is rl) 239 self.failUnless(state[0][3] == kCFRunLoopDefaultMode) 240 241if __name__ == "__main__": 242 main() 243