1/*
2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 *    notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 *    notice, this list of conditions and the following disclaimer in the
11 *    documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "WorkQueue.h"
28
29#include <wtf/Threading.h>
30
31inline WorkQueue::WorkItemWin::WorkItemWin(const Function<void()>& function, WorkQueue* queue)
32    : m_function(function)
33    , m_queue(queue)
34{
35}
36
37PassRefPtr<WorkQueue::WorkItemWin> WorkQueue::WorkItemWin::create(const Function<void()>& function, WorkQueue* queue)
38{
39    return adoptRef(new WorkItemWin(function, queue));
40}
41
42WorkQueue::WorkItemWin::~WorkItemWin()
43{
44}
45
46inline WorkQueue::HandleWorkItem::HandleWorkItem(HANDLE handle, const Function<void()>& function, WorkQueue* queue)
47    : WorkItemWin(function, queue)
48    , m_handle(handle)
49    , m_waitHandle(0)
50{
51    ASSERT_ARG(handle, handle);
52}
53
54PassRefPtr<WorkQueue::HandleWorkItem> WorkQueue::HandleWorkItem::createByAdoptingHandle(HANDLE handle, const Function<void()>& function, WorkQueue* queue)
55{
56    return adoptRef(new HandleWorkItem(handle, function, queue));
57}
58
59WorkQueue::HandleWorkItem::~HandleWorkItem()
60{
61    ::CloseHandle(m_handle);
62}
63
64void WorkQueue::handleCallback(void* context, BOOLEAN timerOrWaitFired)
65{
66    ASSERT_ARG(context, context);
67    ASSERT_ARG(timerOrWaitFired, !timerOrWaitFired);
68
69    WorkItemWin* item = static_cast<WorkItemWin*>(context);
70    RefPtr<WorkQueue> queue = item->queue();
71
72    {
73        MutexLocker lock(queue->m_workItemQueueLock);
74        queue->m_workItemQueue.append(item);
75
76        // If no other thread is performing work, we can do it on this thread.
77        if (!queue->tryRegisterAsWorkThread()) {
78            // Some other thread is performing work. Since we hold the queue lock, we can be sure
79            // that the work thread is not exiting due to an empty queue and will process the work
80            // item we just added to it. If we weren't holding the lock we'd have to signal
81            // m_performWorkEvent to make sure the work item got picked up.
82            return;
83        }
84    }
85
86    queue->performWorkOnRegisteredWorkThread();
87}
88
89void WorkQueue::registerHandle(HANDLE handle, const Function<void()>& function)
90{
91    RefPtr<HandleWorkItem> handleItem = HandleWorkItem::createByAdoptingHandle(handle, function, this);
92
93    {
94        MutexLocker lock(m_handlesLock);
95        ASSERT_ARG(handle, !m_handles.contains(handle));
96        m_handles.set(handle, handleItem);
97    }
98
99    HANDLE waitHandle;
100    if (!::RegisterWaitForSingleObject(&waitHandle, handle, handleCallback, handleItem.get(), INFINITE, WT_EXECUTEDEFAULT)) {
101        DWORD error = ::GetLastError();
102        ASSERT_NOT_REACHED();
103    }
104    handleItem->setWaitHandle(waitHandle);
105}
106
107void WorkQueue::unregisterAndCloseHandle(HANDLE handle)
108{
109    RefPtr<HandleWorkItem> item;
110    {
111        MutexLocker locker(m_handlesLock);
112        ASSERT_ARG(handle, m_handles.contains(handle));
113        item = m_handles.take(handle);
114    }
115
116    unregisterWaitAndDestroyItemSoon(item.release());
117}
118
119DWORD WorkQueue::workThreadCallback(void* context)
120{
121    ASSERT_ARG(context, context);
122
123    WorkQueue* queue = static_cast<WorkQueue*>(context);
124
125    if (!queue->tryRegisterAsWorkThread())
126        return 0;
127
128    queue->performWorkOnRegisteredWorkThread();
129    return 0;
130}
131
132void WorkQueue::performWorkOnRegisteredWorkThread()
133{
134    ASSERT(m_isWorkThreadRegistered);
135
136    m_workItemQueueLock.lock();
137
138    while (!m_workItemQueue.isEmpty()) {
139        Vector<RefPtr<WorkItemWin> > workItemQueue;
140        m_workItemQueue.swap(workItemQueue);
141
142        // Allow more work to be scheduled while we're not using the queue directly.
143        m_workItemQueueLock.unlock();
144        for (size_t i = 0; i < workItemQueue.size(); ++i)
145            workItemQueue[i]->function()();
146        m_workItemQueueLock.lock();
147    }
148
149    // One invariant we maintain is that any work scheduled while a work thread is registered will
150    // be handled by that work thread. Unregister as the work thread while the queue lock is still
151    // held so that no work can be scheduled while we're still registered.
152    unregisterAsWorkThread();
153
154    m_workItemQueueLock.unlock();
155}
156
157void WorkQueue::platformInitialize(const char* name)
158{
159    m_isWorkThreadRegistered = 0;
160    m_timerQueue = ::CreateTimerQueue();
161    ASSERT_WITH_MESSAGE(m_timerQueue, "::CreateTimerQueue failed with error %lu", ::GetLastError());
162}
163
164bool WorkQueue::tryRegisterAsWorkThread()
165{
166    LONG result = ::InterlockedCompareExchange(&m_isWorkThreadRegistered, 1, 0);
167    ASSERT(!result || result == 1);
168    return !result;
169}
170
171void WorkQueue::unregisterAsWorkThread()
172{
173    LONG result = ::InterlockedCompareExchange(&m_isWorkThreadRegistered, 0, 1);
174    ASSERT_UNUSED(result, result == 1);
175}
176
177void WorkQueue::platformInvalidate()
178{
179#if !ASSERT_DISABLED
180    MutexLocker lock(m_handlesLock);
181    ASSERT(m_handles.isEmpty());
182#endif
183
184    // FIXME: We need to ensure that any timer-queue timers that fire after this point don't try to
185    // access this WorkQueue <http://webkit.org/b/44690>.
186    ::DeleteTimerQueueEx(m_timerQueue, 0);
187}
188
189void WorkQueue::dispatch(const Function<void()>& function)
190{
191    MutexLocker locker(m_workItemQueueLock);
192
193    m_workItemQueue.append(WorkItemWin::create(function, this));
194
195    // Spawn a work thread to perform the work we just added. As an optimization, we avoid
196    // spawning the thread if a work thread is already registered. This prevents multiple work
197    // threads from being spawned in most cases. (Note that when a work thread has been spawned but
198    // hasn't registered itself yet, m_isWorkThreadRegistered will be false and we'll end up
199    // spawning a second work thread here. But work thread registration process will ensure that
200    // only one thread actually ends up performing work.)
201    if (!m_isWorkThreadRegistered)
202        ::QueueUserWorkItem(workThreadCallback, this, WT_EXECUTEDEFAULT);
203}
204
205struct TimerContext : public ThreadSafeRefCounted<TimerContext> {
206    static PassRefPtr<TimerContext> create() { return adoptRef(new TimerContext); }
207
208    WorkQueue* queue;
209    Function<void()> function;
210    Mutex timerMutex;
211    HANDLE timer;
212
213private:
214    TimerContext() : queue(0), timer(0) { }
215};
216
217void WorkQueue::timerCallback(void* context, BOOLEAN timerOrWaitFired)
218{
219    ASSERT_ARG(context, context);
220    ASSERT_UNUSED(timerOrWaitFired, timerOrWaitFired);
221
222    // Balanced by leakRef in scheduleWorkAfterDelay.
223    RefPtr<TimerContext> timerContext = adoptRef(static_cast<TimerContext*>(context));
224
225    timerContext->queue->dispatch(timerContext->function);
226
227    MutexLocker lock(timerContext->timerMutex);
228    ASSERT(timerContext->timer);
229    ASSERT(timerContext->queue->m_timerQueue);
230    if (!::DeleteTimerQueueTimer(timerContext->queue->m_timerQueue, timerContext->timer, 0)) {
231        // Getting ERROR_IO_PENDING here means that the timer will be destroyed once the callback is done executing.
232        ASSERT_WITH_MESSAGE(::GetLastError() == ERROR_IO_PENDING, "::DeleteTimerQueueTimer failed with error %lu", ::GetLastError());
233    }
234}
235
236void WorkQueue::dispatchAfterDelay(const Function<void()>& function, double delay)
237{
238    ASSERT(m_timerQueue);
239
240    RefPtr<TimerContext> context = TimerContext::create();
241    context->queue = this;
242    context->function = function;
243
244    {
245        // The timer callback could fire before ::CreateTimerQueueTimer even returns, so we protect
246        // context->timer with a mutex to ensure the timer callback doesn't access it before the
247        // timer handle has been stored in it.
248        MutexLocker lock(context->timerMutex);
249
250        // Since our timer callback is quick, we can execute in the timer thread itself and avoid
251        // an extra thread switch over to a worker thread.
252        if (!::CreateTimerQueueTimer(&context->timer, m_timerQueue, timerCallback, context.get(), delay * 1000, 0, WT_EXECUTEINTIMERTHREAD)) {
253            ASSERT_WITH_MESSAGE(false, "::CreateTimerQueueTimer failed with error %lu", ::GetLastError());
254            return;
255        }
256    }
257
258    // The timer callback will handle destroying context.
259    context.release().leakRef();
260}
261
262void WorkQueue::unregisterWaitAndDestroyItemSoon(PassRefPtr<HandleWorkItem> item)
263{
264    // We're going to make a blocking call to ::UnregisterWaitEx before closing the handle. (The
265    // blocking version of ::UnregisterWaitEx is much simpler than the non-blocking version.) If we
266    // do this on the current thread, we'll deadlock if we're currently in a callback function for
267    // the wait we're unregistering. So instead we do it asynchronously on some other worker thread.
268
269    ::QueueUserWorkItem(unregisterWaitAndDestroyItemCallback, item.leakRef(), WT_EXECUTEDEFAULT);
270}
271
272DWORD WINAPI WorkQueue::unregisterWaitAndDestroyItemCallback(void* context)
273{
274    ASSERT_ARG(context, context);
275    RefPtr<HandleWorkItem> item = adoptRef(static_cast<HandleWorkItem*>(context));
276
277    // Now that we know we're not in a callback function for the wait we're unregistering, we can
278    // make a blocking call to ::UnregisterWaitEx.
279    if (!::UnregisterWaitEx(item->waitHandle(), INVALID_HANDLE_VALUE)) {
280        DWORD error = ::GetLastError();
281        ASSERT_NOT_REACHED();
282    }
283
284    return 0;
285}
286