1/**
2 * The event module provides a primitive for lightweight signaling of other threads
3 * (emulating Windows events on Posix)
4 *
5 * Copyright: Copyright (c) 2019 D Language Foundation
6 * License: Distributed under the
7 *    $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
8 *    (See accompanying file LICENSE)
9 * Authors: Rainer Schuetze
10 * Source:    $(DRUNTIMESRC core/sync/event.d)
11 */
12module core.sync.event;
13
14version (Windows)
15{
16    import core.sys.windows.basetsd /+: HANDLE +/;
17    import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
18    import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
19        WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
20}
21else version (Posix)
22{
23    import core.sys.posix.pthread;
24    import core.sys.posix.sys.types;
25    import core.sys.posix.time;
26}
27else
28{
29    static assert(false, "Platform not supported");
30}
31
32import core.time;
33import core.internal.abort : abort;
34
35/**
36 * represents an event. Clients of an event are suspended while waiting
37 * for the event to be "signaled".
38 *
39 * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
40 * `CreateEvent` and `SetEvent` on Windows.
41---
42import core.sync.event, core.thread, std.file;
43
44struct ProcessFile
45{
46    ThreadGroup group;
47    Event event;
48    void[] buffer;
49
50    void doProcess()
51    {
52        event.wait();
53        // process buffer
54    }
55
56    void process(string filename)
57    {
58        event.initialize(true, false);
59        group = new ThreadGroup;
60        for (int i = 0; i < 10; ++i)
61            group.create(&doProcess);
62
63        buffer = std.file.read(filename);
64        event.set();
65        group.joinAll();
66        event.terminate();
67    }
68}
69---
70 */
71struct Event
72{
73nothrow @nogc:
74    /**
75     * Creates an event object.
76     *
77     * Params:
78     *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
79     *  initialState = initial state of the signal
80     */
81    this(bool manualReset, bool initialState)
82    {
83        initialize(manualReset, initialState);
84    }
85
86    /**
87     * Initializes an event object. Does nothing if the event is already initialized.
88     *
89     * Params:
90     *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
91     *  initialState = initial state of the signal
92     */
93    void initialize(bool manualReset, bool initialState)
94    {
95        version (Windows)
96        {
97            if (m_event)
98                return;
99            m_event = CreateEvent(null, manualReset, initialState, null);
100            m_event || abort("Error: CreateEvent failed.");
101        }
102        else version (Posix)
103        {
104            if (m_initalized)
105                return;
106            pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
107                abort("Error: pthread_mutex_init failed.");
108            static if ( is( typeof( pthread_condattr_setclock ) ) )
109            {
110                pthread_condattr_t attr = void;
111                pthread_condattr_init(&attr) == 0 ||
112                    abort("Error: pthread_condattr_init failed.");
113                pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
114                    abort("Error: pthread_condattr_setclock failed.");
115                pthread_cond_init(&m_cond, &attr) == 0 ||
116                    abort("Error: pthread_cond_init failed.");
117                pthread_condattr_destroy(&attr) == 0 ||
118                    abort("Error: pthread_condattr_destroy failed.");
119            }
120            else
121            {
122                pthread_cond_init(&m_cond, null) == 0 ||
123                    abort("Error: pthread_cond_init failed.");
124            }
125            m_state = initialState;
126            m_manualReset = manualReset;
127            m_initalized = true;
128        }
129    }
130
131    // copying not allowed, can produce resource leaks
132    @disable this(this);
133    @disable void opAssign(Event);
134
135    ~this()
136    {
137        terminate();
138    }
139
140    /**
141     * deinitialize event. Does nothing if the event is not initialized. There must not be
142     * threads currently waiting for the event to be signaled.
143    */
144    void terminate()
145    {
146        version (Windows)
147        {
148            if (m_event)
149                CloseHandle(m_event);
150            m_event = null;
151        }
152        else version (Posix)
153        {
154            if (m_initalized)
155            {
156                pthread_mutex_destroy(&m_mutex) == 0 ||
157                    abort("Error: pthread_mutex_destroy failed.");
158                pthread_cond_destroy(&m_cond) == 0 ||
159                    abort("Error: pthread_cond_destroy failed.");
160                m_initalized = false;
161            }
162        }
163    }
164
165
166    /// Set the event to "signaled", so that waiting clients are resumed
167    void set()
168    {
169        version (Windows)
170        {
171            if (m_event)
172                SetEvent(m_event);
173        }
174        else version (Posix)
175        {
176            if (m_initalized)
177            {
178                pthread_mutex_lock(&m_mutex);
179                m_state = true;
180                pthread_cond_broadcast(&m_cond);
181                pthread_mutex_unlock(&m_mutex);
182            }
183        }
184    }
185
186    /// Reset the event manually
187    void reset()
188    {
189        version (Windows)
190        {
191            if (m_event)
192                ResetEvent(m_event);
193        }
194        else version (Posix)
195        {
196            if (m_initalized)
197            {
198                pthread_mutex_lock(&m_mutex);
199                m_state = false;
200                pthread_mutex_unlock(&m_mutex);
201            }
202        }
203    }
204
205    /**
206     * Wait for the event to be signaled without timeout.
207     *
208     * Returns:
209     *  `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
210     */
211    bool wait()
212    {
213        version (Windows)
214        {
215            return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
216        }
217        else version (Posix)
218        {
219            return wait(Duration.max);
220        }
221    }
222
223    /**
224     * Wait for the event to be signaled with timeout.
225     *
226     * Params:
227     *  tmout = the maximum time to wait
228     * Returns:
229     *  `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
230     *  the event is uninitialized or another error occured
231     */
232    bool wait(Duration tmout)
233    {
234        version (Windows)
235        {
236            if (!m_event)
237                return false;
238
239            auto maxWaitMillis = dur!("msecs")(uint.max - 1);
240
241            while (tmout > maxWaitMillis)
242            {
243                auto res = WaitForSingleObject(m_event, uint.max - 1);
244                if (res != WAIT_TIMEOUT)
245                    return res == WAIT_OBJECT_0;
246                tmout -= maxWaitMillis;
247            }
248            auto ms = cast(uint)(tmout.total!"msecs");
249            return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
250        }
251        else version (Posix)
252        {
253            if (!m_initalized)
254                return false;
255
256            pthread_mutex_lock(&m_mutex);
257
258            int result = 0;
259            if (!m_state)
260            {
261                if (tmout == Duration.max)
262                {
263                    result = pthread_cond_wait(&m_cond, &m_mutex);
264                }
265                else
266                {
267                    import core.sync.config;
268
269                    timespec t = void;
270                    mktspec(t, tmout);
271
272                    result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
273                }
274            }
275            if (result == 0 && !m_manualReset)
276                m_state = false;
277
278            pthread_mutex_unlock(&m_mutex);
279
280            return result == 0;
281        }
282    }
283
284private:
285    version (Windows)
286    {
287        HANDLE m_event;
288    }
289    else version (Posix)
290    {
291        pthread_mutex_t m_mutex;
292        pthread_cond_t m_cond;
293        bool m_initalized;
294        bool m_state;
295        bool m_manualReset;
296    }
297}
298
299// Test single-thread (non-shared) use.
300@nogc nothrow unittest
301{
302    // auto-reset, initial state false
303    Event ev1 = Event(false, false);
304    assert(!ev1.wait(1.dur!"msecs"));
305    ev1.set();
306    assert(ev1.wait());
307    assert(!ev1.wait(1.dur!"msecs"));
308
309    // manual-reset, initial state true
310    Event ev2 = Event(true, true);
311    assert(ev2.wait());
312    assert(ev2.wait());
313    ev2.reset();
314    assert(!ev2.wait(1.dur!"msecs"));
315}
316
317unittest
318{
319    import core.thread, core.atomic;
320
321    scope event      = new Event(true, false);
322    int  numThreads = 10;
323    shared int numRunning = 0;
324
325    void testFn()
326    {
327        event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
328        numRunning.atomicOp!"+="(1);
329    }
330
331    auto group = new ThreadGroup;
332
333    for (int i = 0; i < numThreads; ++i)
334        group.create(&testFn);
335
336    auto start = MonoTime.currTime;
337    assert(numRunning == 0);
338
339    event.set();
340    group.joinAll();
341
342    assert(numRunning == numThreads);
343
344    assert(MonoTime.currTime - start < 5.dur!"seconds");
345}
346