1/* 2 * Copyright (C) 2010 Apple Inc. All rights reserved. 3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) 4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' 16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS 19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 25 * THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#ifndef Connection_h 29#define Connection_h 30 31#include "Arguments.h" 32#include "MessageDecoder.h" 33#include "MessageEncoder.h" 34#include "MessageReceiver.h" 35#include "WorkQueue.h" 36#include <wtf/Deque.h> 37#include <wtf/Forward.h> 38#include <wtf/PassRefPtr.h> 39#include <wtf/OwnPtr.h> 40#include <wtf/Threading.h> 41#include <wtf/text/CString.h> 42 43#if OS(DARWIN) 44#include <mach/mach_port.h> 45#if HAVE(XPC) 46#include <xpc/xpc.h> 47#endif 48#elif PLATFORM(QT) 49QT_BEGIN_NAMESPACE 50class QSocketNotifier; 51QT_END_NAMESPACE 52#endif 53 54#if PLATFORM(QT) || PLATFORM(GTK) || PLATFORM(EFL) 55#include "PlatformProcessIdentifier.h" 56#endif 57 58namespace WebCore { 59class RunLoop; 60} 61 62namespace CoreIPC { 63 64enum MessageSendFlags { 65 // Whether this message should be dispatched when waiting for a sync reply. 66 // This is the default for synchronous messages. 67 DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0, 68}; 69 70enum SyncMessageSendFlags { 71 // Will allow events to continue being handled while waiting for the sync reply. 72 SpinRunLoopWhileWaitingForReply = 1 << 0, 73}; 74 75#define MESSAGE_CHECK_BASE(assertion, connection) do \ 76 if (!(assertion)) { \ 77 ASSERT(assertion); \ 78 (connection)->markCurrentlyDispatchedMessageAsInvalid(); \ 79 return; \ 80 } \ 81while (0) 82 83class Connection : public ThreadSafeRefCounted<Connection> { 84public: 85 class Client : public MessageReceiver { 86 public: 87 virtual void didClose(Connection*) = 0; 88 virtual void didReceiveInvalidMessage(Connection*, StringReference messageReceiverName, StringReference messageName) = 0; 89 90 protected: 91 virtual ~Client() { } 92 }; 93 94 class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> { 95 }; 96 97#if OS(DARWIN) 98 struct Identifier { 99 Identifier() 100 : port(MACH_PORT_NULL) 101#if HAVE(XPC) 102 , xpcConnection(0) 103#endif 104 { 105 } 106 107 Identifier(mach_port_t port) 108 : port(port) 109#if HAVE(XPC) 110 , xpcConnection(0) 111#endif 112 { 113 } 114 115#if HAVE(XPC) 116 Identifier(mach_port_t port, xpc_connection_t xpcConnection) 117 : port(port) 118 , xpcConnection(xpcConnection) 119 { 120 } 121#endif 122 123 mach_port_t port; 124#if HAVE(XPC) 125 xpc_connection_t xpcConnection; 126#endif 127 }; 128 static bool identifierIsNull(Identifier identifier) { return identifier.port == MACH_PORT_NULL; } 129#elif OS(WINDOWS) 130 typedef HANDLE Identifier; 131 static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier); 132 static bool identifierIsNull(Identifier identifier) { return !identifier; } 133#elif USE(UNIX_DOMAIN_SOCKETS) 134 typedef int Identifier; 135 static bool identifierIsNull(Identifier identifier) { return !identifier; } 136#endif 137 138 static PassRefPtr<Connection> createServerConnection(Identifier, Client*, WebCore::RunLoop* clientRunLoop); 139 static PassRefPtr<Connection> createClientConnection(Identifier, Client*, WebCore::RunLoop* clientRunLoop); 140 ~Connection(); 141 142 Client* client() const { return m_client; } 143 144#if OS(DARWIN) 145 void setShouldCloseConnectionOnMachExceptions(); 146#elif PLATFORM(QT) 147 void setShouldCloseConnectionOnProcessTermination(WebKit::PlatformProcessIdentifier); 148#endif 149 150 void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool); 151 void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure); 152 153 // The set callback will be called on the connection work queue when the connection is closed, 154 // before didCall is called on the client thread. Must be called before the connection is opened. 155 // In the future we might want a more generic way to handle sync or async messages directly 156 // on the work queue, for example if we want to handle them on some other thread we could avoid 157 // handling the message on the client thread first. 158 typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*); 159 void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback); 160 161 void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*); 162 void removeWorkQueueMessageReceiver(StringReference messageReceiverName); 163 164 bool open(); 165 void invalidate(); 166 void markCurrentlyDispatchedMessageAsInvalid(); 167 168 void postConnectionDidCloseOnConnectionWorkQueue(); 169 170 static const int NoTimeout = -1; 171 172 template<typename T> bool send(const T& message, uint64_t destinationID, unsigned messageSendFlags = 0); 173 template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = NoTimeout, unsigned syncSendFlags = 0); 174 template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, double timeout); 175 176 PassOwnPtr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID); 177 bool sendMessage(PassOwnPtr<MessageEncoder>, unsigned messageSendFlags = 0); 178 PassOwnPtr<MessageDecoder> sendSyncMessage(uint64_t syncRequestID, PassOwnPtr<MessageEncoder>, double timeout, unsigned syncSendFlags = 0); 179 PassOwnPtr<MessageDecoder> sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, PassOwnPtr<MessageEncoder>, double timeout); 180 bool sendSyncReply(PassOwnPtr<MessageEncoder>); 181 182 void wakeUpRunLoop(); 183 184 void incrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { ++m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } 185 void decrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { --m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } 186 187 bool inSendSync() const { return m_inSendSyncCount; } 188 189private: 190 Connection(Identifier, bool isServer, Client*, WebCore::RunLoop* clientRunLoop); 191 void platformInitialize(Identifier); 192 void platformInvalidate(); 193 194 bool isValid() const { return m_client; } 195 196 PassOwnPtr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, double timeout); 197 198 PassOwnPtr<MessageDecoder> waitForSyncReply(uint64_t syncRequestID, double timeout, unsigned syncSendFlags); 199 200 // Called on the connection work queue. 201 void processIncomingMessage(PassOwnPtr<MessageDecoder>); 202 void processIncomingSyncReply(PassOwnPtr<MessageDecoder>); 203 204 void addWorkQueueMessageReceiverOnConnectionWorkQueue(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*); 205 void removeWorkQueueMessageReceiverOnConnectionWorkQueue(StringReference messageReceiverName); 206 void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver*, MessageDecoder*); 207 208 bool canSendOutgoingMessages() const; 209 bool platformCanSendOutgoingMessages() const; 210 void sendOutgoingMessages(); 211 bool sendOutgoingMessage(PassOwnPtr<MessageEncoder>); 212 void connectionDidClose(); 213 214 // Called on the listener thread. 215 void dispatchConnectionDidClose(); 216 void dispatchOneMessage(); 217 void dispatchMessage(PassOwnPtr<MessageDecoder>); 218 void dispatchMessage(MessageDecoder&); 219 void dispatchSyncMessage(MessageDecoder&); 220 void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString); 221 void didFailToSendSyncMessage(); 222 223 // Can be called on any thread. 224 void enqueueIncomingMessage(PassOwnPtr<MessageDecoder>); 225 226 Client* m_client; 227 bool m_isServer; 228 uint64_t m_syncRequestID; 229 230 bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage; 231 bool m_shouldExitOnSyncMessageSendFailure; 232 DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback; 233 234 bool m_isConnected; 235 RefPtr<WorkQueue> m_connectionQueue; 236 WebCore::RunLoop* m_clientRunLoop; 237 238 HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>> m_workQueueMessageReceivers; 239 240 unsigned m_inSendSyncCount; 241 unsigned m_inDispatchMessageCount; 242 unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; 243 bool m_didReceiveInvalidMessage; 244 245 // Incoming messages. 246 Mutex m_incomingMessagesLock; 247 Deque<OwnPtr<MessageDecoder>> m_incomingMessages; 248 249 // Outgoing messages. 250 Mutex m_outgoingMessagesLock; 251 Deque<OwnPtr<MessageEncoder>> m_outgoingMessages; 252 253 ThreadCondition m_waitForMessageCondition; 254 Mutex m_waitForMessageMutex; 255 HashMap<std::pair<std::pair<StringReference, StringReference>, uint64_t>, OwnPtr<MessageDecoder>> m_waitForMessageMap; 256 257 // Represents a sync request for which we're waiting on a reply. 258 struct PendingSyncReply { 259 // The request ID. 260 uint64_t syncRequestID; 261 262 // The reply decoder, will be null if there was an error processing the sync 263 // message on the other side. 264 MessageDecoder* replyDecoder; 265 266 // Will be set to true once a reply has been received. 267 bool didReceiveReply; 268 269 PendingSyncReply() 270 : syncRequestID(0) 271 , replyDecoder(0) 272 , didReceiveReply(false) 273 { 274 } 275 276 explicit PendingSyncReply(uint64_t syncRequestID) 277 : syncRequestID(syncRequestID) 278 , replyDecoder(0) 279 , didReceiveReply(0) 280 { 281 } 282 283 PassOwnPtr<MessageDecoder> releaseReplyDecoder() 284 { 285 OwnPtr<MessageDecoder> reply = adoptPtr(replyDecoder); 286 replyDecoder = 0; 287 288 return reply.release(); 289 } 290 }; 291 292 class SyncMessageState; 293 friend class SyncMessageState; 294 RefPtr<SyncMessageState> m_syncMessageState; 295 296 Mutex m_syncReplyStateMutex; 297 bool m_shouldWaitForSyncReplies; 298 Vector<PendingSyncReply> m_pendingSyncReplies; 299 300 class SecondaryThreadPendingSyncReply; 301 typedef HashMap<uint64_t, SecondaryThreadPendingSyncReply*> SecondaryThreadPendingSyncReplyMap; 302 SecondaryThreadPendingSyncReplyMap m_secondaryThreadPendingSyncReplyMap; 303 304#if OS(DARWIN) 305 // Called on the connection queue. 306 void receiveSourceEventHandler(); 307 void initializeDeadNameSource(); 308 void exceptionSourceEventHandler(); 309 310 mach_port_t m_sendPort; 311 dispatch_source_t m_deadNameSource; 312 313 mach_port_t m_receivePort; 314 dispatch_source_t m_receivePortDataAvailableSource; 315 316 // If setShouldCloseConnectionOnMachExceptions has been called, this has 317 // the exception port that exceptions from the other end will be sent on. 318 mach_port_t m_exceptionPort; 319 dispatch_source_t m_exceptionPortDataAvailableSource; 320 321#if HAVE(XPC) 322 xpc_connection_t m_xpcConnection; 323#endif 324 325#elif OS(WINDOWS) 326 // Called on the connection queue. 327 void readEventHandler(); 328 void writeEventHandler(); 329 330 // Called by Connection::SyncMessageState::waitWhileDispatchingSentWin32Messages. 331 // The absoluteTime is in seconds, starting on January 1, 1970. The time is assumed to use the 332 // same time zone as WTF::currentTime(). Dispatches sent (not posted) messages to the passed-in 333 // set of HWNDs until the semaphore is signaled or absoluteTime is reached. Returns true if the 334 // semaphore is signaled, false otherwise. 335 static bool dispatchSentMessagesUntil(const Vector<HWND>& windows, WTF::BinarySemaphore& semaphore, double absoluteTime); 336 337 Vector<uint8_t> m_readBuffer; 338 OVERLAPPED m_readState; 339 OwnPtr<MessageEncoder> m_pendingWriteEncoder; 340 OVERLAPPED m_writeState; 341 HANDLE m_connectionPipe; 342#elif USE(UNIX_DOMAIN_SOCKETS) 343 // Called on the connection queue. 344 void readyReadHandler(); 345 bool processMessage(); 346 347 Vector<uint8_t> m_readBuffer; 348 size_t m_readBufferSize; 349 Vector<int> m_fileDescriptors; 350 size_t m_fileDescriptorsSize; 351 int m_socketDescriptor; 352#if PLATFORM(QT) 353 QSocketNotifier* m_socketNotifier; 354#endif 355#endif 356}; 357 358template<typename T> bool Connection::send(const T& message, uint64_t destinationID, unsigned messageSendFlags) 359{ 360 COMPILE_ASSERT(!T::isSync, AsyncMessageExpected); 361 362 OwnPtr<MessageEncoder> encoder = MessageEncoder::create(T::receiverName(), T::name(), destinationID); 363 encoder->encode(message); 364 365 return sendMessage(encoder.release(), messageSendFlags); 366} 367 368template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout, unsigned syncSendFlags) 369{ 370 COMPILE_ASSERT(T::isSync, SyncMessageExpected); 371 372 uint64_t syncRequestID = 0; 373 OwnPtr<MessageEncoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID); 374 375 // Encode the rest of the input arguments. 376 encoder->encode(message); 377 378 // Now send the message and wait for a reply. 379 OwnPtr<MessageDecoder> replyDecoder = sendSyncMessage(syncRequestID, encoder.release(), timeout, syncSendFlags); 380 if (!replyDecoder) 381 return false; 382 383 // Decode the reply. 384 return replyDecoder->decode(const_cast<typename T::Reply&>(reply)); 385} 386 387template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, double timeout) 388{ 389 OwnPtr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout); 390 if (!decoder) 391 return false; 392 393 ASSERT(decoder->destinationID() == destinationID); 394 m_client->didReceiveMessage(this, *decoder); 395 return true; 396} 397 398} // namespace CoreIPC 399 400#endif // Connection_h 401