1/* 2 Copyright (C) 2012 Samsung Electronics 3 4 This library is free software; you can redistribute it and/or 5 modify it under the terms of the GNU Library General Public 6 License as published by the Free Software Foundation; either 7 version 2 of the License, or (at your option) any later version. 8 9 This library is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 Library General Public License for more details. 13 14 You should have received a copy of the GNU Library General Public License 15 along with this library; see the file COPYING.LIB. If not, write to 16 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 17 Boston, MA 02110-1301, USA. 18 */ 19 20#include "config.h" 21#include "WorkQueue.h" 22 23#include <sys/timerfd.h> 24#include <wtf/Assertions.h> 25#include <wtf/CurrentTime.h> 26 27static const int invalidSocketDescriptor = -1; 28static const int threadMessageSize = 1; 29static const char finishThreadMessage[] = "F"; 30static const char wakupThreadMessage[] = "W"; 31 32PassOwnPtr<WorkQueue::TimerWorkItem> WorkQueue::TimerWorkItem::create(Function<void()> function, double expireTime) 33{ 34 if (expireTime < 0) 35 return nullptr; 36 37 return adoptPtr(new TimerWorkItem(function, expireTime)); 38} 39 40WorkQueue::TimerWorkItem::TimerWorkItem(Function<void()> function, double expireTime) 41 : m_function(function) 42 , m_expireTime(expireTime) 43{ 44} 45 46void WorkQueue::platformInitialize(const char* name) 47{ 48 int fds[2]; 49 if (pipe(fds)) 50 ASSERT_NOT_REACHED(); 51 52 m_readFromPipeDescriptor = fds[0]; 53 m_writeToPipeDescriptor = fds[1]; 54 FD_ZERO(&m_fileDescriptorSet); 55 FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet); 56 m_maxFileDescriptor = m_readFromPipeDescriptor; 57 58 m_socketDescriptor = invalidSocketDescriptor; 59 60 m_threadLoop = true; 61 createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::workQueueThread), this, name); 62} 63 64void WorkQueue::platformInvalidate() 65{ 66 sendMessageToThread(finishThreadMessage); 67} 68 69void WorkQueue::performWork() 70{ 71 while (true) { 72 Vector<Function<void()> > workItemQueue; 73 74 { 75 MutexLocker locker(m_workItemQueueLock); 76 if (m_workItemQueue.isEmpty()) 77 return; 78 79 m_workItemQueue.swap(workItemQueue); 80 } 81 82 for (size_t i = 0; i < workItemQueue.size(); ++i) { 83 workItemQueue[i](); 84 deref(); 85 } 86 } 87} 88 89void WorkQueue::performFileDescriptorWork() 90{ 91 fd_set readFileDescriptorSet = m_fileDescriptorSet; 92 93 if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) { 94 if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) { 95 char readBuf[threadMessageSize]; 96 if (read(m_readFromPipeDescriptor, readBuf, threadMessageSize) == -1) 97 LOG_ERROR("Failed to read from WorkQueueThread pipe"); 98 if (!strncmp(readBuf, finishThreadMessage, threadMessageSize)) 99 m_threadLoop = false; 100 } 101 102 if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet)) 103 m_socketEventHandler(); 104 } 105} 106 107struct timeval* WorkQueue::getNextTimeOut() 108{ 109 MutexLocker locker(m_timerWorkItemsLock); 110 if (m_timerWorkItems.isEmpty()) 111 return 0; 112 113 static struct timeval timeValue; 114 timeValue.tv_sec = 0; 115 timeValue.tv_usec = 0; 116 double timeOut = m_timerWorkItems[0]->expireTime() - currentTime(); 117 if (timeOut > 0) { 118 timeValue.tv_sec = static_cast<long>(timeOut); 119 timeValue.tv_usec = static_cast<long>((timeOut - timeValue.tv_sec) * 1000000); 120 } 121 122 return &timeValue; 123} 124 125void WorkQueue::insertTimerWorkItem(PassOwnPtr<TimerWorkItem> item) 126{ 127 if (!item) 128 return; 129 130 size_t position = 0; 131 132 MutexLocker locker(m_timerWorkItemsLock); 133 // m_timerWorkItems should be ordered by expire time. 134 for (; position < m_timerWorkItems.size(); ++position) 135 if (item->expireTime() < m_timerWorkItems[position]->expireTime()) 136 break; 137 138 m_timerWorkItems.insert(position, item); 139} 140 141void WorkQueue::performTimerWork() 142{ 143 Vector<OwnPtr<TimerWorkItem> > timerWorkItems; 144 145 { 146 // Protects m_timerWorkItems. 147 MutexLocker locker(m_timerWorkItemsLock); 148 if (m_timerWorkItems.isEmpty()) 149 return; 150 151 // Copies all the timer work items in m_timerWorkItems to local vector. 152 m_timerWorkItems.swap(timerWorkItems); 153 } 154 155 double current = currentTime(); 156 157 for (size_t i = 0; i < timerWorkItems.size(); ++i) { 158 if (!timerWorkItems[i]->expired(current)) { 159 // If a timer work item does not expired, keep it to the m_timerWorkItems. 160 // m_timerWorkItems should be ordered by expire time. 161 insertTimerWorkItem(timerWorkItems[i].release()); 162 continue; 163 } 164 165 // If a timer work item expired, dispatch the function of the work item. 166 timerWorkItems[i]->dispatch(); 167 deref(); 168 } 169} 170 171void WorkQueue::sendMessageToThread(const char* message) 172{ 173 MutexLocker locker(m_writeToPipeDescriptorLock); 174 if (write(m_writeToPipeDescriptor, message, threadMessageSize) == -1) 175 LOG_ERROR("Failed to wake up WorkQueue Thread"); 176} 177 178void* WorkQueue::workQueueThread(WorkQueue* workQueue) 179{ 180 while (workQueue->m_threadLoop) { 181 workQueue->performWork(); 182 workQueue->performTimerWork(); 183 workQueue->performFileDescriptorWork(); 184 } 185 186 close(workQueue->m_readFromPipeDescriptor); 187 close(workQueue->m_writeToPipeDescriptor); 188 189 return 0; 190} 191 192void WorkQueue::registerSocketEventHandler(int fileDescriptor, const Function<void()>& function) 193{ 194 if (m_socketDescriptor != invalidSocketDescriptor) 195 LOG_ERROR("%d is already registerd.", fileDescriptor); 196 197 m_socketDescriptor = fileDescriptor; 198 m_socketEventHandler = function; 199 200 if (fileDescriptor > m_maxFileDescriptor) 201 m_maxFileDescriptor = fileDescriptor; 202 FD_SET(fileDescriptor, &m_fileDescriptorSet); 203} 204 205void WorkQueue::unregisterSocketEventHandler(int fileDescriptor) 206{ 207 m_socketDescriptor = invalidSocketDescriptor; 208 209 if (fileDescriptor == m_maxFileDescriptor) 210 m_maxFileDescriptor = m_readFromPipeDescriptor; 211 FD_CLR(fileDescriptor, &m_fileDescriptorSet); 212} 213 214void WorkQueue::dispatch(const Function<void()>& function) 215{ 216 ref(); 217 218 { 219 MutexLocker locker(m_workItemQueueLock); 220 m_workItemQueue.append(function); 221 } 222 223 sendMessageToThread(wakupThreadMessage); 224} 225 226void WorkQueue::dispatchAfterDelay(const Function<void()>& function, double delay) 227{ 228 if (delay < 0) 229 return; 230 231 OwnPtr<TimerWorkItem> timerWorkItem = TimerWorkItem::create(function, currentTime() + delay); 232 if (!timerWorkItem) 233 return; 234 235 ref(); 236 insertTimerWorkItem(timerWorkItem.release()); 237 sendMessageToThread(wakupThreadMessage); 238} 239