1from PyObjCTools.TestSupport import *
2
3import Foundation
4import sys
5
6if sys.version_info[0] == 2:
7    from StringIO import StringIO
8    def _str(v): return v
9else:
10    from io import StringIO
11    def _str(v):
12        if isinstance(v, str):
13            return v
14        return v.decode('ascii')
15
16class TheadingHelperTestHelper (Foundation.NSObject):
17    def init(self):
18        self = super(TheadingHelperTestHelper, self).init()
19        if self is None:
20            return None
21
22        self.calls = []
23        return self
24
25    def performSelector_onThread_withObject_waitUntilDone_(self,
26            selector, thread, object, wait):
27        assert isinstance(selector, bytes)
28        assert isinstance(thread, Foundation.NSThread)
29        assert isinstance(wait, bool)
30
31        self.calls.append((selector, thread, object, wait))
32        getattr(self, _str(selector))(object)
33
34    def performSelector_onThread_withObject_waitUntilDone_modes_(self,
35            selector, thread, object, wait, modes):
36        assert isinstance(selector, bytes)
37        assert isinstance(thread, Foundation.NSThread)
38        assert isinstance(wait, bool)
39
40        self.calls.append((selector, thread, object, wait, modes))
41        getattr(self, _str(selector))(object)
42
43    def performSelector_withObject_afterDelay_(self,
44            selector, object, delay):
45        assert isinstance(selector, bytes)
46
47        self.calls.append((selector, object, delay))
48        getattr(self, _str(selector))(object)
49
50    def performSelector_withObject_afterDelay_inModes_(self,
51            selector, object, delay, modes):
52        assert isinstance(selector, bytes)
53
54        self.calls.append((selector, object, delay, modes))
55        getattr(self, _str(selector))(object)
56
57    def performSelectorInBackground_withObject_(self,
58            selector, object):
59        self.calls.append((selector, object))
60        getattr(self, _str(selector))(object)
61
62    def performSelectorOnMainThread_withObject_waitUntilDone_(self,
63            selector, object, wait):
64        self.calls.append((selector, object, wait))
65        getattr(self, _str(selector))(object)
66
67    def performSelectorOnMainThread_withObject_waitUntilDone_modes_(self,
68            selector, object, wait, modes):
69        self.calls.append((selector, object, wait, modes))
70        getattr(self, _str(selector))(object)
71
72    def sel1_(self, arg):
73        pass
74
75    def sel2_(self, arg):
76        return arg * 2
77
78    def sel3_(self, arg):
79        return 1/arg
80
81class TestThreadingHelpers (TestCase):
82    def testAsyncOnThreadNoResult(self):
83        # pyobjc_performSelector_onThread_withObject_waitUntilDone_
84        obj = TheadingHelperTestHelper.alloc().init()
85        thr = Foundation.NSThread.mainThread()
86
87        obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_(
88                b'sel1:', thr, 1, False),
89        obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_(
90                b'sel2:', thr, 2, True)
91        obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_(
92                b'isEqual:', thr, obj, True)
93
94        self.assertEqual(obj.calls, [
95            (b'_pyobjc_performOnThread:', thr, (b'sel1:', 1), False),
96            (b'_pyobjc_performOnThread:', thr, (b'sel2:', 2), True),
97            (b'_pyobjc_performOnThread:', thr, (b'isEqual:', obj), True),
98        ])
99
100        # Raise an exception
101        orig_stderr = sys.stderr
102        sys.stderr = StringIO()
103        try:
104            obj.calls[:] = []
105            obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_(
106                    b'sel3:', thr, 0, False)
107            self.assertEqual(obj.calls, [
108                (b'_pyobjc_performOnThread:', thr, (b'sel3:', 0), False),
109            ])
110            self.assertIn('Traceback', sys.stderr.getvalue())
111        finally:
112            sys.stderr = orig_stderr
113
114    def testAsyncOnThreadNoResultModes(self):
115        # pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_
116        obj = TheadingHelperTestHelper.alloc().init()
117        thr = Foundation.NSThread.mainThread()
118
119        obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_(
120                b'sel1:', thr, 1, False, 0),
121        obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_(
122                b'sel2:', thr, 2, True, 1)
123        obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_(
124                b'isEqual:', thr, obj, True, 2)
125
126        self.assertEqual(obj.calls, [
127            (b'_pyobjc_performOnThread:', thr, (b'sel1:', 1), False, 0),
128            (b'_pyobjc_performOnThread:', thr, (b'sel2:', 2), True, 1),
129            (b'_pyobjc_performOnThread:', thr, (b'isEqual:', obj), True, 2),
130        ])
131
132        # Raise an exception
133        orig_stderr = sys.stderr
134        sys.stderr = StringIO()
135        try:
136            obj.calls[:] = []
137            obj.pyobjc_performSelector_onThread_withObject_waitUntilDone_modes_(
138                    b'sel3:', thr, 0, False, 4)
139            self.assertEqual(obj.calls, [
140                (b'_pyobjc_performOnThread:', thr, (b'sel3:', 0), False, 4),
141            ])
142            self.assertIn('Traceback', sys.stderr.getvalue())
143        finally:
144            sys.stderr = orig_stderr
145
146    def testAsyncWithDelayNoResult(self):
147        # pyobjc_performSelector_withObject_afterDelay_
148        obj = TheadingHelperTestHelper.alloc().init()
149
150        obj.pyobjc_performSelector_withObject_afterDelay_(b'sel1:', 1, 1.0)
151        obj.pyobjc_performSelector_withObject_afterDelay_(b'sel2:', 2, 4.5)
152        obj.pyobjc_performSelector_withObject_afterDelay_(b'isEqual:', obj, 8.5)
153
154        self.assertEqual(obj.calls, [
155            (b'_pyobjc_performOnThread:', (b'sel1:', 1), 1.0),
156            (b'_pyobjc_performOnThread:', (b'sel2:', 2), 4.5),
157            (b'_pyobjc_performOnThread:', (b'isEqual:', obj), 8.5),
158        ])
159
160        # Raise an exception
161        orig_stderr = sys.stderr
162        sys.stderr = StringIO()
163        try:
164            obj.calls[:] = []
165            obj.pyobjc_performSelector_withObject_afterDelay_(
166                    b'sel3:', 0, 0.5)
167            self.assertEqual(obj.calls, [
168                (b'_pyobjc_performOnThread:', (b'sel3:', 0), 0.5),
169            ])
170            self.assertIn('Traceback', sys.stderr.getvalue())
171        finally:
172            sys.stderr = orig_stderr
173
174    def testAsyncWithDelayNoResultModes(self):
175        # pyobjc_performSelector_withObject_afterDelay_inModes_
176        obj = TheadingHelperTestHelper.alloc().init()
177
178        obj.pyobjc_performSelector_withObject_afterDelay_inModes_(b'sel1:', 1, 1.0, 0)
179        obj.pyobjc_performSelector_withObject_afterDelay_inModes_(b'sel2:', 2, 4.5, 1)
180        obj.pyobjc_performSelector_withObject_afterDelay_inModes_(b'isEqual:', obj, 8.5, 2)
181
182        self.assertEqual(obj.calls, [
183            (b'_pyobjc_performOnThread:', (b'sel1:', 1), 1.0, 0),
184            (b'_pyobjc_performOnThread:', (b'sel2:', 2), 4.5, 1),
185            (b'_pyobjc_performOnThread:', (b'isEqual:', obj), 8.5, 2),
186        ])
187
188        # Raise an exception
189        orig_stderr = sys.stderr
190        sys.stderr = StringIO()
191        try:
192            obj.calls[:] = []
193            obj.pyobjc_performSelector_withObject_afterDelay_inModes_(
194                    b'sel3:', 0, 0.5, 3)
195            self.assertEqual(obj.calls, [
196                (b'_pyobjc_performOnThread:', (b'sel3:', 0), 0.5, 3),
197            ])
198            self.assertIn('Traceback', sys.stderr.getvalue())
199        finally:
200            sys.stderr = orig_stderr
201
202    def testInBGNoResult(self):
203        # pyobjc_performSelectorInBackground_withObject_
204
205        obj = TheadingHelperTestHelper.alloc().init()
206
207        obj.pyobjc_performSelectorInBackground_withObject_(b'sel1:', 1)
208        obj.pyobjc_performSelectorInBackground_withObject_(b'sel2:', 2)
209        obj.pyobjc_performSelectorInBackground_withObject_(b'isEqual:', obj)
210
211        self.assertEqual(obj.calls, [
212            (b'_pyobjc_performOnThread:', (b'sel1:', 1)),
213            (b'_pyobjc_performOnThread:', (b'sel2:', 2)),
214            (b'_pyobjc_performOnThread:', (b'isEqual:', obj)),
215        ])
216
217        # Raise an exception
218        orig_stderr = sys.stderr
219        sys.stderr = StringIO()
220        try:
221            obj.calls[:] = []
222            obj.pyobjc_performSelectorInBackground_withObject_(
223                    b'sel3:', 0)
224            self.assertEqual(obj.calls, [
225                (b'_pyobjc_performOnThread:', (b'sel3:', 0)),
226            ])
227            self.assertIn('Traceback', sys.stderr.getvalue())
228        finally:
229            sys.stderr = orig_stderr
230
231    def testOnMtNoResultWait(self):
232        # pyobjc_performSelectorInBackground_withObject_waitUntilDone_
233
234        obj = TheadingHelperTestHelper.alloc().init()
235
236        obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(b'sel1:', 1, True)
237        obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(b'sel2:', 2, False)
238        obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(b'isEqual:', obj, True)
239
240        self.assertEqual(obj.calls, [
241            (b'_pyobjc_performOnThread:', (b'sel1:', 1), True),
242            (b'_pyobjc_performOnThread:', (b'sel2:', 2), False),
243            (b'_pyobjc_performOnThread:', (b'isEqual:', obj), True),
244        ])
245
246        # Raise an exception
247        orig_stderr = sys.stderr
248        sys.stderr = StringIO()
249        try:
250            obj.calls[:] = []
251            obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_(
252                    b'sel3:', 0, False)
253            self.assertEqual(obj.calls, [
254                (b'_pyobjc_performOnThread:', (b'sel3:', 0), False),
255            ])
256            self.assertIn('Traceback', sys.stderr.getvalue())
257        finally:
258            sys.stderr = orig_stderr
259
260    def testOnMtNoResultWaitModes(self):
261        # pyobjc_performSelectorInBackground_withObject_waitUntilDone_modes_
262
263        obj = TheadingHelperTestHelper.alloc().init()
264
265        obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(b'sel1:', 1, True, 4)
266        obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(b'sel2:', 2, False, 5)
267        obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(b'isEqual:', obj, True, 6)
268
269        self.assertEqual(obj.calls, [
270            (b'_pyobjc_performOnThread:', (b'sel1:', 1), True, 4),
271            (b'_pyobjc_performOnThread:', (b'sel2:', 2), False, 5),
272            (b'_pyobjc_performOnThread:', (b'isEqual:', obj), True, 6),
273        ])
274
275        # Raise an exception
276        orig_stderr = sys.stderr
277        sys.stderr = StringIO()
278        try:
279            obj.calls[:] = []
280            obj.pyobjc_performSelectorOnMainThread_withObject_waitUntilDone_modes_(
281                    b'sel3:', 0, False, 7)
282            self.assertEqual(obj.calls, [
283                (b'_pyobjc_performOnThread:', (b'sel3:', 0), False, 7),
284            ])
285            self.assertIn('Traceback', sys.stderr.getvalue())
286        finally:
287            sys.stderr = orig_stderr
288
289    def testOnMtWithResult(self):
290        # pyobjc_performSelectorOnMainThread_withObject_
291        obj = TheadingHelperTestHelper.alloc().init()
292
293        r = obj.pyobjc_performSelectorOnMainThread_withObject_('sel2:', 3)
294        self.assertEqual(r, 6)
295        r = obj.pyobjc_performSelectorOnMainThread_withObject_('sel3:', 2.0)
296        self.assertEqual(r, 0.5)
297
298        self.assertEqual(obj.calls, [
299            (b'_pyobjc_performOnThreadWithResult:', ('sel2:', 3, [(True, 6)]), True),
300            (b'_pyobjc_performOnThreadWithResult:', ('sel3:', 2.0, [(True, 0.5)]), True),
301        ])
302
303        # Raise an exception
304        orig_stderr = sys.stderr
305        sys.stderr = StringIO()
306        try:
307            obj.calls[:] = []
308
309            self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelectorOnMainThread_withObject_,
310                    b'sel3:', 0)
311
312            self.assertEqual(len(obj.calls), 1)
313            self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:')
314            self.assertEqual(obj.calls[0][1][-1][0][0],  False)
315            self.assertNotIn('Traceback', sys.stderr.getvalue())
316        finally:
317            sys.stderr = orig_stderr
318
319    def testOnMtWithResultModes(self):
320        obj = TheadingHelperTestHelper.alloc().init()
321
322        r = obj.pyobjc_performSelectorOnMainThread_withObject_modes_('sel2:', 3, 1)
323        self.assertEqual(r, 6)
324        r = obj.pyobjc_performSelectorOnMainThread_withObject_modes_('sel3:', 2.0, 2)
325        self.assertEqual(r, 0.5)
326
327        self.assertEqual(obj.calls, [
328            (b'_pyobjc_performOnThreadWithResult:', ('sel2:', 3, [(True, 6)]), True, 1),
329            (b'_pyobjc_performOnThreadWithResult:', ('sel3:', 2.0, [(True, 0.5)]), True, 2),
330        ])
331
332        # Raise an exception
333        orig_stderr = sys.stderr
334        sys.stderr = StringIO()
335        try:
336            obj.calls[:] = []
337
338            self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelectorOnMainThread_withObject_modes_,
339                    b'sel3:', 0, 3)
340
341            self.assertEqual(len(obj.calls), 1)
342            self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:')
343            self.assertEqual(obj.calls[0][1][-1][0][0],  False)
344            self.assertNotIn('Traceback', sys.stderr.getvalue())
345        finally:
346            sys.stderr = orig_stderr
347
348    def testOnThreadWithResult(self):
349        obj = TheadingHelperTestHelper.alloc().init()
350        thr = Foundation.NSThread.mainThread()
351
352        r = obj.pyobjc_performSelector_onThread_withObject_('sel2:', thr, 3)
353        self.assertEqual(r, 6)
354        r = obj.pyobjc_performSelector_onThread_withObject_('sel3:', thr, 2.0)
355        self.assertEqual(r, 0.5)
356
357        self.assertEqual(obj.calls, [
358            (b'_pyobjc_performOnThreadWithResult:', thr, ('sel2:', 3, [(True, 6)]), True),
359            (b'_pyobjc_performOnThreadWithResult:', thr, ('sel3:', 2.0, [(True, 0.5)]), True),
360        ])
361
362        # Raise an exception
363        orig_stderr = sys.stderr
364        sys.stderr = StringIO()
365        try:
366            obj.calls[:] = []
367
368            self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelector_onThread_withObject_,
369                    b'sel3:', thr, 0)
370
371            self.assertEqual(len(obj.calls), 1)
372            self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:')
373            self.assertEqual(obj.calls[0][2][-1][0][0],  False)
374            self.assertNotIn('Traceback', sys.stderr.getvalue())
375        finally:
376            sys.stderr = orig_stderr
377
378    def testOnThreadWithResultModes(self):
379        obj = TheadingHelperTestHelper.alloc().init()
380        thr = Foundation.NSThread.mainThread()
381
382        r = obj.pyobjc_performSelector_onThread_withObject_modes_('sel2:', thr, 3, 1)
383        self.assertEqual(r, 6)
384        r = obj.pyobjc_performSelector_onThread_withObject_modes_('sel3:', thr, 2.0, 2)
385        self.assertEqual(r, 0.5)
386
387        self.assertEqual(obj.calls, [
388            (b'_pyobjc_performOnThreadWithResult:', thr, ('sel2:', 3, [(True, 6)]), True, 1),
389            (b'_pyobjc_performOnThreadWithResult:', thr, ('sel3:', 2.0, [(True, 0.5)]), True, 2),
390        ])
391
392        # Raise an exception
393        orig_stderr = sys.stderr
394        sys.stderr = StringIO()
395        try:
396            obj.calls[:] = []
397
398            self.assertRaises(ZeroDivisionError, obj.pyobjc_performSelector_onThread_withObject_modes_,
399                    b'sel3:', thr, 0, 3)
400
401            self.assertEqual(len(obj.calls), 1)
402            self.assertEqual(obj.calls[0][0], b'_pyobjc_performOnThreadWithResult:')
403            self.assertEqual(obj.calls[0][2][-1][0][0],  False)
404            self.assertNotIn('Traceback', sys.stderr.getvalue())
405        finally:
406            sys.stderr = orig_stderr
407
408if __name__ == "__main__":
409    main()
410