1/* 2 * Copyright (C) 2008 Apple Inc. All rights reserved. 3 * Copyright (C) 2009 Google Inc. All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Neither the name of Apple Inc. ("Apple") nor the names of 15 * its contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY 19 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 21 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY 22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 25 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30#ifndef MessageQueue_h 31#define MessageQueue_h 32 33#include <limits> 34#include <wtf/Assertions.h> 35#include <wtf/Deque.h> 36#include <wtf/Noncopyable.h> 37#include <wtf/Threading.h> 38 39namespace WTF { 40 41 enum MessageQueueWaitResult { 42 MessageQueueTerminated, // Queue was destroyed while waiting for message. 43 MessageQueueTimeout, // Timeout was specified and it expired. 44 MessageQueueMessageReceived // A message was successfully received and returned. 45 }; 46 47 // The queue takes ownership of messages and transfer it to the new owner 48 // when messages are fetched from the queue. 49 // Essentially, MessageQueue acts as a queue of std::unique_ptr<DataType>. 50 template<typename DataType> 51 class MessageQueue { 52 WTF_MAKE_NONCOPYABLE(MessageQueue); 53 public: 54 MessageQueue() : m_killed(false) { } 55 ~MessageQueue(); 56 57 void append(std::unique_ptr<DataType>); 58 void appendAndKill(std::unique_ptr<DataType>); 59 bool appendAndCheckEmpty(std::unique_ptr<DataType>); 60 void prepend(std::unique_ptr<DataType>); 61 62 std::unique_ptr<DataType> waitForMessage(); 63 std::unique_ptr<DataType> tryGetMessage(); 64 std::unique_ptr<DataType> tryGetMessageIgnoringKilled(); 65 template<typename Predicate> 66 std::unique_ptr<DataType> waitForMessageFilteredWithTimeout(MessageQueueWaitResult&, Predicate&&, double absoluteTime); 67 68 template<typename Predicate> 69 void removeIf(Predicate&&); 70 71 void kill(); 72 bool killed() const; 73 74 // The result of isEmpty() is only valid if no other thread is manipulating the queue at the same time. 75 bool isEmpty(); 76 77 static double infiniteTime() { return std::numeric_limits<double>::max(); } 78 79 private: 80 mutable Mutex m_mutex; 81 ThreadCondition m_condition; 82 Deque<std::unique_ptr<DataType>> m_queue; 83 bool m_killed; 84 }; 85 86 template<typename DataType> 87 MessageQueue<DataType>::~MessageQueue() 88 { 89 } 90 91 template<typename DataType> 92 inline void MessageQueue<DataType>::append(std::unique_ptr<DataType> message) 93 { 94 MutexLocker lock(m_mutex); 95 m_queue.append(WTF::move(message)); 96 m_condition.signal(); 97 } 98 99 template<typename DataType> 100 inline void MessageQueue<DataType>::appendAndKill(std::unique_ptr<DataType> message) 101 { 102 MutexLocker lock(m_mutex); 103 m_queue.append(WTF::move(message)); 104 m_killed = true; 105 m_condition.broadcast(); 106 } 107 108 // Returns true if the queue was empty before the item was added. 109 template<typename DataType> 110 inline bool MessageQueue<DataType>::appendAndCheckEmpty(std::unique_ptr<DataType> message) 111 { 112 MutexLocker lock(m_mutex); 113 bool wasEmpty = m_queue.isEmpty(); 114 m_queue.append(WTF::move(message)); 115 m_condition.signal(); 116 return wasEmpty; 117 } 118 119 template<typename DataType> 120 inline void MessageQueue<DataType>::prepend(std::unique_ptr<DataType> message) 121 { 122 MutexLocker lock(m_mutex); 123 m_queue.prepend(WTF::move(message)); 124 m_condition.signal(); 125 } 126 127 template<typename DataType> 128 inline auto MessageQueue<DataType>::waitForMessage() -> std::unique_ptr<DataType> 129 { 130 MessageQueueWaitResult exitReason; 131 std::unique_ptr<DataType> result = waitForMessageFilteredWithTimeout(exitReason, [](const DataType&) { return true; }, infiniteTime()); 132 ASSERT(exitReason == MessageQueueTerminated || exitReason == MessageQueueMessageReceived); 133 return result; 134 } 135 136 template<typename DataType> 137 template<typename Predicate> 138 inline auto MessageQueue<DataType>::waitForMessageFilteredWithTimeout(MessageQueueWaitResult& result, Predicate&& predicate, double absoluteTime) -> std::unique_ptr<DataType> 139 { 140 MutexLocker lock(m_mutex); 141 bool timedOut = false; 142 143 auto found = m_queue.end(); 144 while (!m_killed && !timedOut) { 145 found = m_queue.findIf([&predicate](const std::unique_ptr<DataType>& ptr) -> bool { 146 ASSERT(ptr); 147 return predicate(*ptr); 148 }); 149 if (found != m_queue.end()) 150 break; 151 152 timedOut = !m_condition.timedWait(m_mutex, absoluteTime); 153 } 154 155 ASSERT(!timedOut || absoluteTime != infiniteTime()); 156 157 if (m_killed) { 158 result = MessageQueueTerminated; 159 return nullptr; 160 } 161 162 if (timedOut) { 163 result = MessageQueueTimeout; 164 return nullptr; 165 } 166 167 ASSERT(found != m_queue.end()); 168 std::unique_ptr<DataType> message = WTF::move(*found); 169 m_queue.remove(found); 170 result = MessageQueueMessageReceived; 171 return message; 172 } 173 174 template<typename DataType> 175 inline auto MessageQueue<DataType>::tryGetMessage() -> std::unique_ptr<DataType> 176 { 177 MutexLocker lock(m_mutex); 178 if (m_killed) 179 return nullptr; 180 if (m_queue.isEmpty()) 181 return nullptr; 182 183 return m_queue.takeFirst(); 184 } 185 186 template<typename DataType> 187 inline auto MessageQueue<DataType>::tryGetMessageIgnoringKilled() -> std::unique_ptr<DataType> 188 { 189 MutexLocker lock(m_mutex); 190 if (m_queue.isEmpty()) 191 return nullptr; 192 193 return m_queue.takeFirst(); 194 } 195 196 template<typename DataType> 197 template<typename Predicate> 198 inline void MessageQueue<DataType>::removeIf(Predicate&& predicate) 199 { 200 MutexLocker lock(m_mutex); 201 while (true) { 202 auto found = m_queue.findIf([&predicate](const std::unique_ptr<DataType>& ptr) -> bool { 203 ASSERT(ptr); 204 return predicate(*ptr); 205 }); 206 207 if (found == m_queue.end()) 208 break; 209 210 m_queue.remove(found); 211 } 212 } 213 214 template<typename DataType> 215 inline bool MessageQueue<DataType>::isEmpty() 216 { 217 MutexLocker lock(m_mutex); 218 if (m_killed) 219 return true; 220 return m_queue.isEmpty(); 221 } 222 223 template<typename DataType> 224 inline void MessageQueue<DataType>::kill() 225 { 226 MutexLocker lock(m_mutex); 227 m_killed = true; 228 m_condition.broadcast(); 229 } 230 231 template<typename DataType> 232 inline bool MessageQueue<DataType>::killed() const 233 { 234 MutexLocker lock(m_mutex); 235 return m_killed; 236 } 237} // namespace WTF 238 239using WTF::MessageQueue; 240// MessageQueueWaitResult enum and all its values. 241using WTF::MessageQueueWaitResult; 242using WTF::MessageQueueTerminated; 243using WTF::MessageQueueTimeout; 244using WTF::MessageQueueMessageReceived; 245 246#endif // MessageQueue_h 247