1/*
2    Copyright (C) 2012 Samsung Electronics
3
4    This library is free software; you can redistribute it and/or
5    modify it under the terms of the GNU Library General Public
6    License as published by the Free Software Foundation; either
7    version 2 of the License, or (at your option) any later version.
8
9    This library is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    Library General Public License for more details.
13
14    You should have received a copy of the GNU Library General Public License
15    along with this library; see the file COPYING.LIB.  If not, write to
16    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17    Boston, MA 02110-1301, USA.
18 */
19
20#include "config.h"
21#include "WorkQueue.h"
22
23#include <sys/timerfd.h>
24#include <wtf/Assertions.h>
25#include <wtf/CurrentTime.h>
26
27static const int invalidSocketDescriptor = -1;
28static const int threadMessageSize = 1;
29static const char finishThreadMessage[] = "F";
30static const char wakupThreadMessage[] = "W";
31
32PassOwnPtr<WorkQueue::TimerWorkItem> WorkQueue::TimerWorkItem::create(Function<void()> function, double expireTime)
33{
34    if (expireTime < 0)
35        return nullptr;
36
37    return adoptPtr(new TimerWorkItem(function, expireTime));
38}
39
40WorkQueue::TimerWorkItem::TimerWorkItem(Function<void()> function, double expireTime)
41    : m_function(function)
42    , m_expireTime(expireTime)
43{
44}
45
46void WorkQueue::platformInitialize(const char* name)
47{
48    int fds[2];
49    if (pipe(fds))
50        ASSERT_NOT_REACHED();
51
52    m_readFromPipeDescriptor = fds[0];
53    m_writeToPipeDescriptor = fds[1];
54    FD_ZERO(&m_fileDescriptorSet);
55    FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet);
56    m_maxFileDescriptor = m_readFromPipeDescriptor;
57
58    m_socketDescriptor = invalidSocketDescriptor;
59
60    m_threadLoop = true;
61    createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::workQueueThread), this, name);
62}
63
64void WorkQueue::platformInvalidate()
65{
66    sendMessageToThread(finishThreadMessage);
67}
68
69void WorkQueue::performWork()
70{
71    while (true) {
72        Vector<Function<void()> > workItemQueue;
73
74        {
75            MutexLocker locker(m_workItemQueueLock);
76            if (m_workItemQueue.isEmpty())
77                return;
78
79            m_workItemQueue.swap(workItemQueue);
80        }
81
82        for (size_t i = 0; i < workItemQueue.size(); ++i) {
83            workItemQueue[i]();
84            deref();
85        }
86    }
87}
88
89void WorkQueue::performFileDescriptorWork()
90{
91    fd_set readFileDescriptorSet = m_fileDescriptorSet;
92
93    if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) {
94        if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) {
95            char readBuf[threadMessageSize];
96            if (read(m_readFromPipeDescriptor, readBuf, threadMessageSize) == -1)
97                LOG_ERROR("Failed to read from WorkQueueThread pipe");
98            if (!strncmp(readBuf, finishThreadMessage, threadMessageSize))
99                m_threadLoop = false;
100        }
101
102        if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet))
103            m_socketEventHandler();
104    }
105}
106
107struct timeval* WorkQueue::getNextTimeOut()
108{
109    MutexLocker locker(m_timerWorkItemsLock);
110    if (m_timerWorkItems.isEmpty())
111        return 0;
112
113    static struct timeval timeValue;
114    timeValue.tv_sec = 0;
115    timeValue.tv_usec = 0;
116    double timeOut = m_timerWorkItems[0]->expireTime() - currentTime();
117    if (timeOut > 0) {
118        timeValue.tv_sec = static_cast<long>(timeOut);
119        timeValue.tv_usec = static_cast<long>((timeOut - timeValue.tv_sec) * 1000000);
120    }
121
122    return &timeValue;
123}
124
125void WorkQueue::insertTimerWorkItem(PassOwnPtr<TimerWorkItem> item)
126{
127    if (!item)
128        return;
129
130    size_t position = 0;
131
132    MutexLocker locker(m_timerWorkItemsLock);
133    // m_timerWorkItems should be ordered by expire time.
134    for (; position < m_timerWorkItems.size(); ++position)
135        if (item->expireTime() < m_timerWorkItems[position]->expireTime())
136            break;
137
138    m_timerWorkItems.insert(position, item);
139}
140
141void WorkQueue::performTimerWork()
142{
143    Vector<OwnPtr<TimerWorkItem> > timerWorkItems;
144
145    {
146        // Protects m_timerWorkItems.
147        MutexLocker locker(m_timerWorkItemsLock);
148        if (m_timerWorkItems.isEmpty())
149            return;
150
151        // Copies all the timer work items in m_timerWorkItems to local vector.
152        m_timerWorkItems.swap(timerWorkItems);
153    }
154
155    double current = currentTime();
156
157    for (size_t i = 0; i < timerWorkItems.size(); ++i) {
158        if (!timerWorkItems[i]->expired(current)) {
159            // If a timer work item does not expired, keep it to the m_timerWorkItems.
160            // m_timerWorkItems should be ordered by expire time.
161            insertTimerWorkItem(timerWorkItems[i].release());
162            continue;
163        }
164
165        // If a timer work item expired, dispatch the function of the work item.
166        timerWorkItems[i]->dispatch();
167        deref();
168    }
169}
170
171void WorkQueue::sendMessageToThread(const char* message)
172{
173    MutexLocker locker(m_writeToPipeDescriptorLock);
174    if (write(m_writeToPipeDescriptor, message, threadMessageSize) == -1)
175        LOG_ERROR("Failed to wake up WorkQueue Thread");
176}
177
178void* WorkQueue::workQueueThread(WorkQueue* workQueue)
179{
180    while (workQueue->m_threadLoop) {
181        workQueue->performWork();
182        workQueue->performTimerWork();
183        workQueue->performFileDescriptorWork();
184    }
185
186    close(workQueue->m_readFromPipeDescriptor);
187    close(workQueue->m_writeToPipeDescriptor);
188
189    return 0;
190}
191
192void WorkQueue::registerSocketEventHandler(int fileDescriptor, const Function<void()>& function)
193{
194    if (m_socketDescriptor != invalidSocketDescriptor)
195        LOG_ERROR("%d is already registerd.", fileDescriptor);
196
197    m_socketDescriptor = fileDescriptor;
198    m_socketEventHandler = function;
199
200    if (fileDescriptor > m_maxFileDescriptor)
201        m_maxFileDescriptor = fileDescriptor;
202    FD_SET(fileDescriptor, &m_fileDescriptorSet);
203}
204
205void WorkQueue::unregisterSocketEventHandler(int fileDescriptor)
206{
207    m_socketDescriptor = invalidSocketDescriptor;
208
209    if (fileDescriptor == m_maxFileDescriptor)
210        m_maxFileDescriptor = m_readFromPipeDescriptor;
211    FD_CLR(fileDescriptor, &m_fileDescriptorSet);
212}
213
214void WorkQueue::dispatch(const Function<void()>& function)
215{
216    ref();
217
218    {
219        MutexLocker locker(m_workItemQueueLock);
220        m_workItemQueue.append(function);
221    }
222
223    sendMessageToThread(wakupThreadMessage);
224}
225
226void WorkQueue::dispatchAfterDelay(const Function<void()>& function, double delay)
227{
228    if (delay < 0)
229        return;
230
231    OwnPtr<TimerWorkItem> timerWorkItem = TimerWorkItem::create(function, currentTime() + delay);
232    if (!timerWorkItem)
233        return;
234
235    ref();
236    insertTimerWorkItem(timerWorkItem.release());
237    sendMessageToThread(wakupThreadMessage);
238}
239