1/*
2 * Copyright (C) 2013 Samsung Electronics. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 *    notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 *    notice, this list of conditions and the following disclaimer in the
11 *    documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "DispatchQueueEfl.h"
28
29#include <DispatchQueueWorkItemEfl.h>
30#include <sys/timerfd.h>
31#include <wtf/Assertions.h>
32#include <wtf/CurrentTime.h>
33#include <wtf/Threading.h>
34
35static const int microSecondsPerSecond = 1000000;
36static const int nanoSecondsPerSecond = 1000000000;
37static const int invalidSocketDescriptor = -1;
38static const char wakeUpThreadMessage = 'W';
39
40class DispatchQueue::ThreadContext {
41public:
42    static void start(const char* name, PassRefPtr<DispatchQueue> dispatchQueue)
43    {
44        // The DispatchQueueThreadContext instance will be passed to the thread function and deleted in it.
45        createThread(reinterpret_cast<WTF::ThreadFunction>(&ThreadContext::function), new ThreadContext(dispatchQueue), name);
46    }
47
48private:
49    ThreadContext(PassRefPtr<DispatchQueue> dispatchQueue)
50        : m_dispatchQueue(dispatchQueue)
51    {
52    }
53
54    static void* function(ThreadContext* threadContext)
55    {
56        std::unique_ptr<ThreadContext>(threadContext)->m_dispatchQueue->dispatchQueueThread();
57        return 0;
58    }
59
60    RefPtr<DispatchQueue> m_dispatchQueue;
61};
62
63PassRefPtr<DispatchQueue> DispatchQueue::create(const char* name)
64{
65    RefPtr<DispatchQueue> dispatchQueue = adoptRef<DispatchQueue>(new DispatchQueue());
66
67    ThreadContext::start(name, dispatchQueue);
68
69    return dispatchQueue.release();
70}
71
72DispatchQueue::DispatchQueue()
73    : m_isThreadRunning(true)
74{
75    int fds[2];
76    if (pipe(fds))
77        ASSERT_NOT_REACHED();
78
79    m_readFromPipeDescriptor = fds[0];
80    m_writeToPipeDescriptor = fds[1];
81    FD_ZERO(&m_fileDescriptorSet);
82    FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet);
83    m_maxFileDescriptor = m_readFromPipeDescriptor;
84
85    m_socketDescriptor = invalidSocketDescriptor;
86}
87
88DispatchQueue::~DispatchQueue()
89{
90    close(m_readFromPipeDescriptor);
91    close(m_writeToPipeDescriptor);
92}
93
94void DispatchQueue::dispatch(std::unique_ptr<WorkItem> item)
95{
96    {
97        MutexLocker locker(m_workItemsLock);
98        m_workItems.append(WTF::move(item));
99    }
100
101    wakeUpThread();
102}
103
104void DispatchQueue::dispatch(std::unique_ptr<TimerWorkItem> item)
105{
106    insertTimerWorkItem(WTF::move(item));
107    wakeUpThread();
108}
109
110void DispatchQueue::stopThread()
111{
112    ASSERT(m_socketDescriptor == invalidSocketDescriptor);
113
114    m_isThreadRunning = false;
115    wakeUpThread();
116}
117
118void DispatchQueue::setSocketEventHandler(int fileDescriptor, std::function<void ()> function)
119{
120    ASSERT(m_socketDescriptor == invalidSocketDescriptor);
121
122    m_socketDescriptor = fileDescriptor;
123    m_socketEventHandler = WTF::move(function);
124
125    if (fileDescriptor > m_maxFileDescriptor)
126        m_maxFileDescriptor = fileDescriptor;
127    FD_SET(fileDescriptor, &m_fileDescriptorSet);
128}
129
130void DispatchQueue::clearSocketEventHandler()
131{
132    ASSERT(m_socketDescriptor != invalidSocketDescriptor);
133
134    if (m_socketDescriptor == m_maxFileDescriptor)
135        m_maxFileDescriptor = m_readFromPipeDescriptor;
136
137    FD_CLR(m_socketDescriptor, &m_fileDescriptorSet);
138
139    m_socketDescriptor = invalidSocketDescriptor;
140}
141
142void DispatchQueue::performWork()
143{
144    while (true) {
145        Vector<std::unique_ptr<WorkItem>> workItems;
146
147        {
148            MutexLocker locker(m_workItemsLock);
149            if (m_workItems.isEmpty())
150                return;
151
152            m_workItems.swap(workItems);
153        }
154
155        for (size_t i = 0; i < workItems.size(); ++i)
156            workItems[i]->dispatch();
157    }
158}
159
160void DispatchQueue::performTimerWork()
161{
162    Vector<std::unique_ptr<TimerWorkItem>> timerWorkItems;
163
164    {
165        // Protects m_timerWorkItems.
166        MutexLocker locker(m_timerWorkItemsLock);
167        if (m_timerWorkItems.isEmpty())
168            return;
169
170        // Copies all the timer work items in m_timerWorkItems to local vector.
171        m_timerWorkItems.swap(timerWorkItems);
172    }
173
174    double currentTimeNanoSeconds = monotonicallyIncreasingTime() * nanoSecondsPerSecond;
175
176    for (size_t i = 0; i < timerWorkItems.size(); ++i) {
177        if (!timerWorkItems[i]->hasExpired(currentTimeNanoSeconds)) {
178            insertTimerWorkItem(WTF::move(timerWorkItems[i]));
179            continue;
180        }
181
182        // If a timer work item has expired, dispatch the function of the work item.
183        timerWorkItems[i]->dispatch();
184    }
185}
186
187void DispatchQueue::performFileDescriptorWork()
188{
189    fd_set readFileDescriptorSet = m_fileDescriptorSet;
190
191    if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) {
192        if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) {
193            char message;
194            if (read(m_readFromPipeDescriptor, &message, 1) == -1)
195                LOG_ERROR("Failed to read from DispatchQueue Thread pipe");
196
197            ASSERT(message == wakeUpThreadMessage);
198        }
199
200        if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet))
201            m_socketEventHandler();
202    }
203}
204
205void DispatchQueue::insertTimerWorkItem(std::unique_ptr<TimerWorkItem> item)
206{
207    ASSERT(item);
208
209    size_t position = 0;
210
211    MutexLocker locker(m_timerWorkItemsLock);
212    // The items should be ordered by expire time.
213    for (; position < m_timerWorkItems.size(); ++position)
214        if (item->expirationTimeNanoSeconds() < m_timerWorkItems[position]->expirationTimeNanoSeconds())
215            break;
216
217    m_timerWorkItems.insert(position, WTF::move(item));
218}
219
220void DispatchQueue::dispatchQueueThread()
221{
222    while (m_isThreadRunning) {
223        performWork();
224        performTimerWork();
225        performFileDescriptorWork();
226    }
227}
228
229void DispatchQueue::wakeUpThread()
230{
231    MutexLocker locker(m_writeToPipeDescriptorLock);
232    if (write(m_writeToPipeDescriptor, &wakeUpThreadMessage, sizeof(char)) == -1)
233        LOG_ERROR("Failed to wake up DispatchQueue Thread");
234}
235
236timeval* DispatchQueue::getNextTimeOut() const
237{
238    MutexLocker locker(m_timerWorkItemsLock);
239    if (m_timerWorkItems.isEmpty())
240        return 0;
241
242    static timeval timeValue;
243    timeValue.tv_sec = 0;
244    timeValue.tv_usec = 0;
245    double timeOutSeconds = (m_timerWorkItems[0]->expirationTimeNanoSeconds() - monotonicallyIncreasingTime() * nanoSecondsPerSecond) / nanoSecondsPerSecond;
246    if (timeOutSeconds > 0) {
247        timeValue.tv_sec = static_cast<long>(timeOutSeconds);
248        timeValue.tv_usec = static_cast<long>((timeOutSeconds - timeValue.tv_sec) * microSecondsPerSecond);
249    }
250
251    return &timeValue;
252}
253