1from PyObjCTools.TestSupport import * 2 3import Foundation 4import sys 5 6if sys.version_info[0] == 2: 7 from StringIO import StringIO 8 def _str(v): return v 9else: 10 from io import StringIO 11 def _str(v): 12 if isinstance(v, str): 13 return v 14 return v.decode('ascii') 15 16class TheadingHelperTestHelper (Foundation.NSObject): 17 def init(self): 18 self = super(TheadingHelperTestHelper, self).init() 19 if self is None: 20 return None 21 22 self.calls = [] 23 return self 24 25 def performSelector_onThread_withObject_waitUntilDone_(self, 26 selector, thread, object, wait): 27 assert isinstance(selector, bytes) 28 assert isinstance(thread, Foundation.NSThread) 29 assert isinstance(wait, bool) 30 31 self.calls.append((selector, thread, object, wait)) 32 getattr(self, _str(selector))(object) 33 34 def performSelector_onThread_withObject_waitUntilDone_modes_(self, 35 selector, thread, object, wait, modes): 36 assert isinstance(selector, bytes) 37 assert isinstance(thread, Foundation.NSThread) 38 assert isinstance(wait, bool) 39 40 self.calls.append((selector, thread, object, wait, modes)) 41 getattr(self, _str(selector))(object) 42 43 def performSelector_withObject_afterDelay_(self, 44 selector, object, delay): 45 assert isinstance(selector, bytes) 46 47 self.calls.append((selector, object, delay)) 48 getattr(self, _str(selector))(object) 49 50 def performSelector_withObject_afterDelay_inModes_(self, 51 selector, object, delay, modes): 52 assert isinstance(selector, bytes) 53 54 self.calls.append((selector, object, delay, modes)) 55 getattr(self, _str(selector))(object) 56 57 def performSelectorInBackground_withObject_(self, 58 selector, object): 59 self.calls.append((selector, object)) 60 getattr(self, _str(selector))(object) 61 62 def performSelectorOnMainThread_withObject_waitUntilDone_(self, 63 selector, object, wait): 64 self.calls.append((selector, object, wait)) 65 getattr(self, _str(selector))(object) 66 67 def performSelectorOnMainThread_withObject_waitUntilDone_modes_(self, 68 selector, object, wait, modes): 69 self.calls.append((selector, object, wait, modes)) 70 getattr(self, _str(selector))(object) 71 72 def sel1_(self, arg): 73 pass 74 75 def sel2_(self, arg): 76 return arg * 2 77 78 def sel3_(self, arg): 79 return 1/arg 80 81class TestThreadingHelpers (TestCase): 82 def testAsyncOnThreadNoResult(self): 83 # pyobjc_performSelector_onThread_withObject_waitUntilDone_ 84 obj = TheadingHelperTestHelper.alloc().init() 85 thr = Foundation.NSThread.mainThread() 86 87 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_( 88 b'sel1:', thr, 1, False), 89 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_( 90 b'sel2:', thr, 2, True) 91 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_( 92 b'isEqual:', thr, obj, True) 93 94 self.assertEqual(obj.calls, [ 95 (b'_pyobjc_performOnThread:', thr, (b'sel1:', 1), False), 96 (b'_pyobjc_performOnThread:', thr, (b'sel2:', 2), True), 97 (b'_pyobjc_performOnThread:', thr, (b'isEqual:', obj), True), 98 ]) 99 100 # Raise an exception 101 orig_stderr = sys.stderr 102 sys.stderr = StringIO() 103 try: 104 obj.calls[:] = [] 105 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_( 106 b'sel3:', thr, 0, False) 107 self.assertEqual(obj.calls, [ 108 (b'_pyobjc_performOnThread:', thr, (b'sel3:', 0), False), 109 ]) 110 self.assertIn('Traceback', sys.stderr.getvalue()) 111 finally: 112 sys.stderr = orig_stderr 113 114 def testAsyncOnThreadNoResultModes(self): 115 # pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_ 116 obj = TheadingHelperTestHelper.alloc().init() 117 thr = Foundation.NSThread.mainThread() 118 119 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_( 120 b'sel1:', thr, 1, False, 0), 121 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_( 122 b'sel2:', thr, 2, True, 1) 123 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_( 124 b'isEqual:', thr, obj, True, 2) 125 126 self.assertEqual(obj.calls, [ 127 (b'_pyobjc_performOnThread:', thr, (b'sel1:', 1), False, 0), 128 (b'_pyobjc_performOnThread:', thr, (b'sel2:', 2), True, 1), 129 (b'_pyobjc_performOnThread:', thr, (b'isEqual:', obj), True, 2), 130 ]) 131 132 # Raise an exception 133 orig_stderr = sys.stderr 134 sys.stderr = StringIO() 135 try: 136 obj.calls[:] = [] 137 obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_( 138 b'sel3:', thr, 0, False, 4) 139 self.assertEqual(obj.calls, [ 140 (b'_pyobjc_performOnThread:', thr, (b'sel3:', 0), False, 4), 141 ]) 142 self.assertIn('Traceback', sys.stderr.getvalue()) 143 finally: 144 sys.stderr = orig_stderr 145 146 def testAsyncWithDelayNoResult(self): 147 # pyobjc_performSelector_withObject_afterDelay_ 148 obj = TheadingHelperTestHelper.alloc().init() 149 150 obj.pyobjc_performSelector_withObject_afterDelay_(b'sel1:', 1, 1.0) 151 obj.pyobjc_performSelector_withObject_afterDelay_(b'sel2:', 2, 4.5) 152 obj.pyobjc_performSelector_withObject_afterDelay_(b'isEqual:', obj, 8.5) 153 154 self.assertEqual(obj.calls, [ 155 (b'_pyobjc_performOnThread:', (b'sel1:', 1), 1.0), 156 (b'_pyobjc_performOnThread:', (b'sel2:', 2), 4.5), 157 (b'_pyobjc_performOnThread:', (b'isEqual:', obj), 8.5), 158 ]) 159 160 # Raise an exception 161 orig_stderr = sys.stderr 162 sys.stderr = StringIO() 163 try: 164 obj.calls[:] = [] 165 obj.pyobjc_performSelector_withObject_afterDelay_( 166 b'sel3:', 0, 0.5) 167 self.assertEqual(obj.calls, [ 168 (b'_pyobjc_performOnThread:', (b'sel3:', 0), 0.5), 169 ]) 170 self.assertIn('Traceback', sys.stderr.getvalue()) 171 finally: 172 sys.stderr = orig_stderr 173 174 def testAsyncWithDelayNoResultModes(self): 175 # pyobjc_performSelector_withObject_afterDelay_inModes_ 176 obj = TheadingHelperTestHelper.alloc().init() 177 178 obj.pyobjc_performSelector_withObject_afterDelay_inModes_(b'sel1:', 1, 1.0, 0) 179 obj.pyobjc_performSelector_withObject_afterDelay_inModes_(b'sel2:', 2, 4.5, 1) 180 obj.pyobjc_performSelector_withObject_afterDelay_inModes_(b'isEqual:', obj, 8.5, 2) 181 182 self.assertEqual(obj.calls, [ 183 (b'_pyobjc_performOnThread:', (b'sel1:', 1), 1.0, 0), 184 (b'_pyobjc_performOnThread:', (b'sel2:', 2), 4.5, 1), 185 (b'_pyobjc_performOnThread:', (b'isEqual:', obj), 8.5, 2), 186 ]) 187 188 # Raise an exception 189 orig_stderr = sys.stderr 190 sys.stderr = StringIO() 191 try: 192 obj.calls[:] = [] 193 obj.pyobjc_performSelector_withObject_afterDelay_inModes_( 194 b'sel3:', 0, 0.5, 3) 195 self.assertEqual(obj.calls, [ 196 (b'_pyobjc_performOnThread:', (b'sel3:', 0), 0.5, 3), 197 ]) 198 self.assertIn('Traceback', sys.stderr.getvalue()) 199 finally: 200 sys.stderr = orig_stderr 201 202 def testInBGNoResult(self): 203 # pyobjc_performSelectorInBackground_withObject_ 204 205 obj = TheadingHelperTestHelper.alloc().init() 206 207 obj.pyobjc_performSelectorInBackground_withObject_(b'sel1:', 1) 208 obj.pyobjc_performSelectorInBackground_withObject_(b'sel2:', 2) 209 obj.pyobjc_performSelectorInBackground_withObject_(b'isEqual:', obj) 210 211 self.assertEqual(obj.calls, [ 212 (b'_pyobjc_performOnThread:', (b'sel1:', 1)), 213 (b'_pyobjc_performOnThread:', (b'sel2:', 2)), 214 (b'_pyobjc_performOnThread:', (b'isEqual:', obj)), 215 ]) 216 217 # Raise an exception 218 orig_stderr = sys.stderr 219 sys.stderr = StringIO() 220 try: 221 obj.calls[:] = [] 222 obj.pyobjc_performSelectorInBackground_withObject_( 223 b'sel3:', 0) 224 self.assertEqual(obj.calls, [ 225 (b'_pyobjc_performOnThread:', (b'sel3:', 0)), 226 ]) 227 self.assertIn('Traceback', sys.stderr.getvalue()) 228 finally: 229 sys.stderr = orig_stderr 230 231 def testOnMtNoResultWait(self): 232 # pyobjc_performSelectorInBackground_withObject_waitUntilDone_ 233 234 obj = TheadingHelperTestHelper.alloc().init() 235 236 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(b'sel1:', 1, True) 237 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(b'sel2:', 2, False) 238 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(b'isEqual:', obj, True) 239 240 self.assertEqual(obj.calls, [ 241 (b'_pyobjc_performOnThread:', (b'sel1:', 1), True), 242 (b'_pyobjc_performOnThread:', (b'sel2:', 2), False), 243 (b'_pyobjc_performOnThread:', (b'isEqual:', obj), True), 244 ]) 245 246 # Raise an exception 247 orig_stderr = sys.stderr 248 sys.stderr = StringIO() 249 try: 250 obj.calls[:] = [] 251 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_( 252 b'sel3:', 0, False) 253 self.assertEqual(obj.calls, [ 254 (b'_pyobjc_performOnThread:', (b'sel3:', 0), False), 255 ]) 256 self.assertIn('Traceback', sys.stderr.getvalue()) 257 finally: 258 sys.stderr = orig_stderr 259 260 def testOnMtNoResultWaitModes(self): 261 # pyobjc_performSelectorInBackground_withObject_waitUntilDone_modes_ 262 263 obj = TheadingHelperTestHelper.alloc().init() 264 265 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(b'sel1:', 1, True, 4) 266 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(b'sel2:', 2, False, 5) 267 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(b'isEqual:', obj, True, 6) 268 269 self.assertEqual(obj.calls, [ 270 (b'_pyobjc_performOnThread:', (b'sel1:', 1), True, 4), 271 (b'_pyobjc_performOnThread:', (b'sel2:', 2), False, 5), 272 (b'_pyobjc_performOnThread:', (b'isEqual:', obj), True, 6), 273 ]) 274 275 # Raise an exception 276 orig_stderr = sys.stderr 277 sys.stderr = StringIO() 278 try: 279 obj.calls[:] = [] 280 obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_( 281 b'sel3:', 0, False, 7) 282 self.assertEqual(obj.calls, [ 283 (b'_pyobjc_performOnThread:', (b'sel3:', 0), False, 7), 284 ]) 285 self.assertIn('Traceback', sys.stderr.getvalue()) 286 finally: 287 sys.stderr = orig_stderr 288 289 def testOnMtWithResult(self): 290 # pyobjc_performSelectorOnMainThread_withObject_ 291 obj = TheadingHelperTestHelper.alloc().init() 292 293 r = obj.pyobjc_performSelectorOnMainThread_withObject_('sel2:', 3) 294 self.assertEqual(r, 6) 295 r = obj.pyobjc_performSelectorOnMainThread_withObject_('sel3:', 2.0) 296 self.assertEqual(r, 0.5) 297 298 self.assertEqual(obj.calls, [ 299 (b'_pyobjc_performOnThreadWithResult:', ('sel2:', 3, [(True, 6)]), True), 300 (b'_pyobjc_performOnThreadWithResult:', ('sel3:', 2.0, [(True, 0.5)]), True), 301 ]) 302 303 # Raise an exception 304 orig_stderr = sys.stderr 305 sys.stderr = StringIO() 306 try: 307 obj.calls[:] = [] 308 309 self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelectorOnMainThread_withObject_, 310 b'sel3:', 0) 311 312 self.assertEqual(len(obj.calls), 1) 313 self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:') 314 self.assertEqual(obj.calls[0][1][-1][0][0], False) 315 self.assertNotIn('Traceback', sys.stderr.getvalue()) 316 finally: 317 sys.stderr = orig_stderr 318 319 def testOnMtWithResultModes(self): 320 obj = TheadingHelperTestHelper.alloc().init() 321 322 r = obj.pyobjc_performSelectorOnMainThread_withObject_modes_('sel2:', 3, 1) 323 self.assertEqual(r, 6) 324 r = obj.pyobjc_performSelectorOnMainThread_withObject_modes_('sel3:', 2.0, 2) 325 self.assertEqual(r, 0.5) 326 327 self.assertEqual(obj.calls, [ 328 (b'_pyobjc_performOnThreadWithResult:', ('sel2:', 3, [(True, 6)]), True, 1), 329 (b'_pyobjc_performOnThreadWithResult:', ('sel3:', 2.0, [(True, 0.5)]), True, 2), 330 ]) 331 332 # Raise an exception 333 orig_stderr = sys.stderr 334 sys.stderr = StringIO() 335 try: 336 obj.calls[:] = [] 337 338 self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelectorOnMainThread_withObject_modes_, 339 b'sel3:', 0, 3) 340 341 self.assertEqual(len(obj.calls), 1) 342 self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:') 343 self.assertEqual(obj.calls[0][1][-1][0][0], False) 344 self.assertNotIn('Traceback', sys.stderr.getvalue()) 345 finally: 346 sys.stderr = orig_stderr 347 348 def testOnThreadWithResult(self): 349 obj = TheadingHelperTestHelper.alloc().init() 350 thr = Foundation.NSThread.mainThread() 351 352 r = obj.pyobjc_performSelector_onThread_withObject_('sel2:', thr, 3) 353 self.assertEqual(r, 6) 354 r = obj.pyobjc_performSelector_onThread_withObject_('sel3:', thr, 2.0) 355 self.assertEqual(r, 0.5) 356 357 self.assertEqual(obj.calls, [ 358 (b'_pyobjc_performOnThreadWithResult:', thr, ('sel2:', 3, [(True, 6)]), True), 359 (b'_pyobjc_performOnThreadWithResult:', thr, ('sel3:', 2.0, [(True, 0.5)]), True), 360 ]) 361 362 # Raise an exception 363 orig_stderr = sys.stderr 364 sys.stderr = StringIO() 365 try: 366 obj.calls[:] = [] 367 368 self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelector_onThread_withObject_, 369 b'sel3:', thr, 0) 370 371 self.assertEqual(len(obj.calls), 1) 372 self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:') 373 self.assertEqual(obj.calls[0][2][-1][0][0], False) 374 self.assertNotIn('Traceback', sys.stderr.getvalue()) 375 finally: 376 sys.stderr = orig_stderr 377 378 def testOnThreadWithResultModes(self): 379 obj = TheadingHelperTestHelper.alloc().init() 380 thr = Foundation.NSThread.mainThread() 381 382 r = obj.pyobjc_performSelector_onThread_withObject_modes_('sel2:', thr, 3, 1) 383 self.assertEqual(r, 6) 384 r = obj.pyobjc_performSelector_onThread_withObject_modes_('sel3:', thr, 2.0, 2) 385 self.assertEqual(r, 0.5) 386 387 self.assertEqual(obj.calls, [ 388 (b'_pyobjc_performOnThreadWithResult:', thr, ('sel2:', 3, [(True, 6)]), True, 1), 389 (b'_pyobjc_performOnThreadWithResult:', thr, ('sel3:', 2.0, [(True, 0.5)]), True, 2), 390 ]) 391 392 # Raise an exception 393 orig_stderr = sys.stderr 394 sys.stderr = StringIO() 395 try: 396 obj.calls[:] = [] 397 398 self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelector_onThread_withObject_modes_, 399 b'sel3:', thr, 0, 3) 400 401 self.assertEqual(len(obj.calls), 1) 402 self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:') 403 self.assertEqual(obj.calls[0][2][-1][0][0], False) 404 self.assertNotIn('Traceback', sys.stderr.getvalue()) 405 finally: 406 sys.stderr = orig_stderr 407 408if __name__ == "__main__": 409 main() 410