1"""
2Test locking objects and interaction with @synchronized() statements
3
4These tests take an annoyingly long time to ensure that we'd hit a race condition when
5locking doesn't actually lock. It should be possible to find a faster mechanism for this.
6"""
7import sys
8from PyObjCTools.TestSupport import *
9import objc
10import threading
11import time
12
13from PyObjCTest.locking import OC_LockTest
14
15NSAutoreleasePool = objc.lookUpClass("NSAutoreleasePool")
16
17class OtherThread (threading.Thread):
18    def __init__(self, obj):
19        self.obj = obj
20
21        threading.Thread.__init__(self)
22
23    def run(self):
24        pool = NSAutoreleasePool.alloc().init()
25
26        lck = objc.object_lock(self.obj)
27
28        for i in range(6):
29            time.sleep(0.05)
30            lck.lock()
31            if self.obj.isLocked():
32                self.obj.appendToList_("LOCK FOUND")
33
34            self.obj.setLocked_(True)
35            self.obj.appendToList_("thread %d a"%(i,))
36            time.sleep(0.5)
37            self.obj.appendToList_("thread %d b"%(i,))
38            self.obj.setLocked_(False)
39            lck.unlock()
40
41        del pool
42
43class ObjCThread (threading.Thread):
44    def __init__(self, obj):
45        self.obj = obj
46
47        threading.Thread.__init__(self)
48
49    def run(self):
50        pool = NSAutoreleasePool.alloc().init()
51
52        native = OC_LockTest.alloc().init()
53        native.threadFunc_(self.obj)
54
55        del native
56        del pool
57
58
59class BaseClass (objc.lookUpClass('NSObject')):
60    def initWithList_(self, list):
61        self = super(BaseClass, self).init()
62        if self is None:
63            return None
64
65        self.list = list
66        self._locked = False
67        return self
68
69    def isLocked(self):
70        return self._locked
71
72    def setLocked_(self, value):
73        self._locked = value
74
75
76    def appendToList_(self, value):
77        self.list.append(value)
78
79class TestLockingBasic (TestCase):
80
81    def testBasicLocking(self):
82        lst = []
83
84        obj = BaseClass.alloc().initWithList_(lst)
85        lck = objc.object_lock(obj)
86
87        thr = OtherThread(obj)
88        thr.start()
89        for i in range(5):
90            time.sleep(0.1)
91            lck.lock()
92            self.assertFalse(obj.isLocked())
93            obj.setLocked_(True)
94            obj.appendToList_("mainthread")
95            obj.setLocked_(False)
96            lck.unlock()
97
98        thr.join()
99
100        self.assertNotIn("LOCK FOUND", lst)
101        for idx in range(len(lst)):
102            if lst[idx].endswith(' a'):
103                self.assertTrue(lst[idx+1].endswith(' b'))
104
105    def testObjectiveCLocking(self):
106        lst = []
107        lst = []
108
109        obj = BaseClass.alloc().initWithList_(lst)
110        lck = objc.object_lock(obj)
111
112        thr = ObjCThread(obj)
113        thr.start()
114        for i in range(5):
115            time.sleep(0.1)
116            lck.lock()
117            self.assertFalse(obj.isLocked())
118            obj.setLocked_(True)
119            obj.appendToList_("mainthread")
120            time.sleep(0.5)
121            obj.setLocked_(False)
122            lck.unlock()
123
124        thr.join()
125
126        self.assertNotIn("LOCK FOUND", lst)
127        for idx in range(len(lst)):
128            if lst[idx].endswith(' a'):
129                self.assertTrue(lst[idx+1].endswith(' b'))
130
131
132class TestLockingWithStatement (TestCase):
133
134    def testBasicLocking(self):
135        lst = []
136        lst = []
137
138        obj = BaseClass.alloc().initWithList_(lst)
139
140        thr = OtherThread(obj)
141        thr.start()
142        for i in range(5):
143            time.sleep(0.1)
144            with objc.object_lock(obj):
145                self.assertFalse(obj.isLocked())
146                obj.setLocked_(True)
147                obj.appendToList_("mainthread")
148                time.sleep(0.5)
149                obj.setLocked_(False)
150
151        thr.join()
152
153        self.assertNotIn("LOCK FOUND", lst)
154        for idx in range(len(lst)):
155            if lst[idx].endswith(' a'):
156                self.assertTrue(lst[idx+1].endswith(' b'))
157
158if __name__ == "__main__":
159    main()
160