1/* 2 * Copyright (C) 2013 Samsung Electronics. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' 14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS 17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 23 * THE POSSIBILITY OF SUCH DAMAGE. 24 */ 25 26#include "config.h" 27#include "DispatchQueueEfl.h" 28 29#include <DispatchQueueWorkItemEfl.h> 30#include <sys/timerfd.h> 31#include <wtf/Assertions.h> 32#include <wtf/CurrentTime.h> 33#include <wtf/Threading.h> 34 35static const int microSecondsPerSecond = 1000000; 36static const int nanoSecondsPerSecond = 1000000000; 37static const int invalidSocketDescriptor = -1; 38static const char wakeUpThreadMessage = 'W'; 39 40class DispatchQueue::ThreadContext { 41public: 42 static void start(const char* name, PassRefPtr<DispatchQueue> dispatchQueue) 43 { 44 // The DispatchQueueThreadContext instance will be passed to the thread function and deleted in it. 45 createThread(reinterpret_cast<WTF::ThreadFunction>(&ThreadContext::function), new ThreadContext(dispatchQueue), name); 46 } 47 48private: 49 ThreadContext(PassRefPtr<DispatchQueue> dispatchQueue) 50 : m_dispatchQueue(dispatchQueue) 51 { 52 } 53 54 static void* function(ThreadContext* threadContext) 55 { 56 std::unique_ptr<ThreadContext>(threadContext)->m_dispatchQueue->dispatchQueueThread(); 57 return 0; 58 } 59 60 RefPtr<DispatchQueue> m_dispatchQueue; 61}; 62 63PassRefPtr<DispatchQueue> DispatchQueue::create(const char* name) 64{ 65 RefPtr<DispatchQueue> dispatchQueue = adoptRef<DispatchQueue>(new DispatchQueue()); 66 67 ThreadContext::start(name, dispatchQueue); 68 69 return dispatchQueue.release(); 70} 71 72DispatchQueue::DispatchQueue() 73 : m_isThreadRunning(true) 74{ 75 int fds[2]; 76 if (pipe(fds)) 77 ASSERT_NOT_REACHED(); 78 79 m_readFromPipeDescriptor = fds[0]; 80 m_writeToPipeDescriptor = fds[1]; 81 FD_ZERO(&m_fileDescriptorSet); 82 FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet); 83 m_maxFileDescriptor = m_readFromPipeDescriptor; 84 85 m_socketDescriptor = invalidSocketDescriptor; 86} 87 88DispatchQueue::~DispatchQueue() 89{ 90 close(m_readFromPipeDescriptor); 91 close(m_writeToPipeDescriptor); 92} 93 94void DispatchQueue::dispatch(std::unique_ptr<WorkItem> item) 95{ 96 { 97 MutexLocker locker(m_workItemsLock); 98 m_workItems.append(WTF::move(item)); 99 } 100 101 wakeUpThread(); 102} 103 104void DispatchQueue::dispatch(std::unique_ptr<TimerWorkItem> item) 105{ 106 insertTimerWorkItem(WTF::move(item)); 107 wakeUpThread(); 108} 109 110void DispatchQueue::stopThread() 111{ 112 ASSERT(m_socketDescriptor == invalidSocketDescriptor); 113 114 m_isThreadRunning = false; 115 wakeUpThread(); 116} 117 118void DispatchQueue::setSocketEventHandler(int fileDescriptor, std::function<void ()> function) 119{ 120 ASSERT(m_socketDescriptor == invalidSocketDescriptor); 121 122 m_socketDescriptor = fileDescriptor; 123 m_socketEventHandler = WTF::move(function); 124 125 if (fileDescriptor > m_maxFileDescriptor) 126 m_maxFileDescriptor = fileDescriptor; 127 FD_SET(fileDescriptor, &m_fileDescriptorSet); 128} 129 130void DispatchQueue::clearSocketEventHandler() 131{ 132 ASSERT(m_socketDescriptor != invalidSocketDescriptor); 133 134 if (m_socketDescriptor == m_maxFileDescriptor) 135 m_maxFileDescriptor = m_readFromPipeDescriptor; 136 137 FD_CLR(m_socketDescriptor, &m_fileDescriptorSet); 138 139 m_socketDescriptor = invalidSocketDescriptor; 140} 141 142void DispatchQueue::performWork() 143{ 144 while (true) { 145 Vector<std::unique_ptr<WorkItem>> workItems; 146 147 { 148 MutexLocker locker(m_workItemsLock); 149 if (m_workItems.isEmpty()) 150 return; 151 152 m_workItems.swap(workItems); 153 } 154 155 for (size_t i = 0; i < workItems.size(); ++i) 156 workItems[i]->dispatch(); 157 } 158} 159 160void DispatchQueue::performTimerWork() 161{ 162 Vector<std::unique_ptr<TimerWorkItem>> timerWorkItems; 163 164 { 165 // Protects m_timerWorkItems. 166 MutexLocker locker(m_timerWorkItemsLock); 167 if (m_timerWorkItems.isEmpty()) 168 return; 169 170 // Copies all the timer work items in m_timerWorkItems to local vector. 171 m_timerWorkItems.swap(timerWorkItems); 172 } 173 174 double currentTimeNanoSeconds = monotonicallyIncreasingTime() * nanoSecondsPerSecond; 175 176 for (size_t i = 0; i < timerWorkItems.size(); ++i) { 177 if (!timerWorkItems[i]->hasExpired(currentTimeNanoSeconds)) { 178 insertTimerWorkItem(WTF::move(timerWorkItems[i])); 179 continue; 180 } 181 182 // If a timer work item has expired, dispatch the function of the work item. 183 timerWorkItems[i]->dispatch(); 184 } 185} 186 187void DispatchQueue::performFileDescriptorWork() 188{ 189 fd_set readFileDescriptorSet = m_fileDescriptorSet; 190 191 if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) { 192 if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) { 193 char message; 194 if (read(m_readFromPipeDescriptor, &message, 1) == -1) 195 LOG_ERROR("Failed to read from DispatchQueue Thread pipe"); 196 197 ASSERT(message == wakeUpThreadMessage); 198 } 199 200 if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet)) 201 m_socketEventHandler(); 202 } 203} 204 205void DispatchQueue::insertTimerWorkItem(std::unique_ptr<TimerWorkItem> item) 206{ 207 ASSERT(item); 208 209 size_t position = 0; 210 211 MutexLocker locker(m_timerWorkItemsLock); 212 // The items should be ordered by expire time. 213 for (; position < m_timerWorkItems.size(); ++position) 214 if (item->expirationTimeNanoSeconds() < m_timerWorkItems[position]->expirationTimeNanoSeconds()) 215 break; 216 217 m_timerWorkItems.insert(position, WTF::move(item)); 218} 219 220void DispatchQueue::dispatchQueueThread() 221{ 222 while (m_isThreadRunning) { 223 performWork(); 224 performTimerWork(); 225 performFileDescriptorWork(); 226 } 227} 228 229void DispatchQueue::wakeUpThread() 230{ 231 MutexLocker locker(m_writeToPipeDescriptorLock); 232 if (write(m_writeToPipeDescriptor, &wakeUpThreadMessage, sizeof(char)) == -1) 233 LOG_ERROR("Failed to wake up DispatchQueue Thread"); 234} 235 236timeval* DispatchQueue::getNextTimeOut() const 237{ 238 MutexLocker locker(m_timerWorkItemsLock); 239 if (m_timerWorkItems.isEmpty()) 240 return 0; 241 242 static timeval timeValue; 243 timeValue.tv_sec = 0; 244 timeValue.tv_usec = 0; 245 double timeOutSeconds = (m_timerWorkItems[0]->expirationTimeNanoSeconds() - monotonicallyIncreasingTime() * nanoSecondsPerSecond) / nanoSecondsPerSecond; 246 if (timeOutSeconds > 0) { 247 timeValue.tv_sec = static_cast<long>(timeOutSeconds); 248 timeValue.tv_usec = static_cast<long>((timeOutSeconds - timeValue.tv_sec) * microSecondsPerSecond); 249 } 250 251 return &timeValue; 252} 253