1/* 2 * Copyright (C) 2010 Apple Inc. All rights reserved. 3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) 4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' 16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS 19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 25 * THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#ifndef Connection_h 29#define Connection_h 30 31#include "Arguments.h" 32#include "MessageDecoder.h" 33#include "MessageEncoder.h" 34#include "MessageReceiver.h" 35#include "WorkQueue.h" 36#include <atomic> 37#include <condition_variable> 38#include <wtf/Deque.h> 39#include <wtf/Forward.h> 40#include <wtf/PassRefPtr.h> 41#include <wtf/text/CString.h> 42 43#if OS(DARWIN) 44#include "XPCPtr.h" 45#include <mach/mach_port.h> 46#endif 47 48#if PLATFORM(GTK) || PLATFORM(EFL) 49#include "PlatformProcessIdentifier.h" 50#endif 51 52namespace WTF { 53class RunLoop; 54} 55 56namespace IPC { 57 58struct WaitForMessageState; 59 60enum MessageSendFlags { 61 // Whether this message should be dispatched when waiting for a sync reply. 62 // This is the default for synchronous messages. 63 DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0, 64}; 65 66enum SyncMessageSendFlags { 67 // Use this to inform that this sync call will suspend this process until the user responds with input. 68 InformPlatformProcessWillSuspend = 1 << 0, 69 // Some platform accessibility clients can't suspend gracefully and need to spin the run loop so WebProcess doesn't hang. 70 // FIXME (126021): Remove when no platforms need to support this. 71 SpinRunLoopWhileWaitingForReply = 1 << 1, 72}; 73 74enum WaitForMessageFlags { 75 // Use this to make waitForMessage be interrupted immediately by any incoming sync messages. 76 InterruptWaitingIfSyncMessageArrives = 1 << 0, 77}; 78 79#define MESSAGE_CHECK_BASE(assertion, connection) do \ 80 if (!(assertion)) { \ 81 ASSERT(assertion); \ 82 (connection)->markCurrentlyDispatchedMessageAsInvalid(); \ 83 return; \ 84 } \ 85while (0) 86 87class Connection : public ThreadSafeRefCounted<Connection> { 88public: 89 class Client : public MessageReceiver { 90 public: 91 virtual void didClose(Connection*) = 0; 92 virtual void didReceiveInvalidMessage(Connection*, StringReference messageReceiverName, StringReference messageName) = 0; 93 94 protected: 95 virtual ~Client() { } 96 }; 97 98 class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> { 99 }; 100 101#if OS(DARWIN) 102 struct Identifier { 103 Identifier() 104 : port(MACH_PORT_NULL) 105 { 106 } 107 108 Identifier(mach_port_t port) 109 : port(port) 110 { 111 } 112 113 Identifier(mach_port_t port, XPCPtr<xpc_connection_t> xpcConnection) 114 : port(port) 115 , xpcConnection(WTF::move(xpcConnection)) 116 { 117 } 118 119 mach_port_t port; 120 XPCPtr<xpc_connection_t> xpcConnection; 121 }; 122 static bool identifierIsNull(Identifier identifier) { return identifier.port == MACH_PORT_NULL; } 123 xpc_connection_t xpcConnection() { return m_xpcConnection.get(); } 124 bool getAuditToken(audit_token_t&); 125#elif USE(UNIX_DOMAIN_SOCKETS) 126 typedef int Identifier; 127 static bool identifierIsNull(Identifier identifier) { return !identifier; } 128 129 struct SocketPair { 130 int client; 131 int server; 132 }; 133 134 enum ConnectionOptions { 135 SetCloexecOnClient = 1 << 0, 136 SetCloexecOnServer = 1 << 1, 137 }; 138 139 static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer); 140#endif 141 142 static PassRefPtr<Connection> createServerConnection(Identifier, Client*, WTF::RunLoop& clientRunLoop); 143 static PassRefPtr<Connection> createClientConnection(Identifier, Client*, WTF::RunLoop& clientRunLoop); 144 ~Connection(); 145 146 Client* client() const { return m_client; } 147 148#if PLATFORM(MAC) 149 void setShouldCloseConnectionOnMachExceptions(); 150#endif 151 152 void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool); 153 void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure); 154 155 // The set callback will be called on the connection work queue when the connection is closed, 156 // before didCall is called on the client thread. Must be called before the connection is opened. 157 // In the future we might want a more generic way to handle sync or async messages directly 158 // on the work queue, for example if we want to handle them on some other thread we could avoid 159 // handling the message on the client thread first. 160 typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*); 161 void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback); 162 163 void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*); 164 void removeWorkQueueMessageReceiver(StringReference messageReceiverName); 165 166 bool open(); 167 void invalidate(); 168 void markCurrentlyDispatchedMessageAsInvalid(); 169 170 void postConnectionDidCloseOnConnectionWorkQueue(); 171 172 template<typename T> bool send(T&& message, uint64_t destinationID, unsigned messageSendFlags = 0); 173 template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), unsigned syncSendFlags = 0); 174 template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags = 0); 175 176 std::unique_ptr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID); 177 bool sendMessage(std::unique_ptr<MessageEncoder>, unsigned messageSendFlags = 0); 178 std::unique_ptr<MessageDecoder> sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<MessageEncoder>, std::chrono::milliseconds timeout, unsigned syncSendFlags = 0); 179 std::unique_ptr<MessageDecoder> sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, std::unique_ptr<MessageEncoder>, std::chrono::milliseconds timeout); 180 bool sendSyncReply(std::unique_ptr<MessageEncoder>); 181 182 void wakeUpRunLoop(); 183 184 void incrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { ++m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } 185 void decrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { --m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } 186 187 bool inSendSync() const { return m_inSendSyncCount; } 188 189 Identifier identifier() const; 190 191#if PLATFORM(COCOA) 192 bool kill(); 193 void terminateSoon(double intervalInSeconds); 194#endif 195 196private: 197 Connection(Identifier, bool isServer, Client*, WTF::RunLoop& clientRunLoop); 198 void platformInitialize(Identifier); 199 void platformInvalidate(); 200 201 bool isValid() const { return m_client; } 202 203 std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags); 204 205 std::unique_ptr<MessageDecoder> waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags); 206 207 // Called on the connection work queue. 208 void processIncomingMessage(std::unique_ptr<MessageDecoder>); 209 void processIncomingSyncReply(std::unique_ptr<MessageDecoder>); 210 211 void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver*, MessageDecoder*); 212 213 bool canSendOutgoingMessages() const; 214 bool platformCanSendOutgoingMessages() const; 215 void sendOutgoingMessages(); 216 bool sendOutgoingMessage(std::unique_ptr<MessageEncoder>); 217 void connectionDidClose(); 218 219 // Called on the listener thread. 220 void dispatchOneMessage(); 221 void dispatchMessage(std::unique_ptr<MessageDecoder>); 222 void dispatchMessage(MessageDecoder&); 223 void dispatchSyncMessage(MessageDecoder&); 224 void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString); 225 void didFailToSendSyncMessage(); 226 227 // Can be called on any thread. 228 void enqueueIncomingMessage(std::unique_ptr<MessageDecoder>); 229 230 void willSendSyncMessage(unsigned syncSendFlags); 231 void didReceiveSyncReply(unsigned syncSendFlags); 232 233 Client* m_client; 234 bool m_isServer; 235 std::atomic<uint64_t> m_syncRequestID; 236 237 bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage; 238 bool m_shouldExitOnSyncMessageSendFailure; 239 DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback; 240 241 bool m_isConnected; 242 RefPtr<WorkQueue> m_connectionQueue; 243 WTF::RunLoop& m_clientRunLoop; 244 245 HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>> m_workQueueMessageReceivers; 246 247 unsigned m_inSendSyncCount; 248 unsigned m_inDispatchMessageCount; 249 unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; 250 bool m_didReceiveInvalidMessage; 251 252 // Incoming messages. 253 Mutex m_incomingMessagesLock; 254 Deque<std::unique_ptr<MessageDecoder>> m_incomingMessages; 255 256 // Outgoing messages. 257 Mutex m_outgoingMessagesLock; 258 Deque<std::unique_ptr<MessageEncoder>> m_outgoingMessages; 259 260 std::condition_variable m_waitForMessageCondition; 261 std::mutex m_waitForMessageMutex; 262 263 WaitForMessageState* m_waitingForMessage; 264 265 // Represents a sync request for which we're waiting on a reply. 266 struct PendingSyncReply { 267 // The request ID. 268 uint64_t syncRequestID; 269 270 // The reply decoder, will be null if there was an error processing the sync 271 // message on the other side. 272 std::unique_ptr<MessageDecoder> replyDecoder; 273 274 // Will be set to true once a reply has been received. 275 bool didReceiveReply; 276 277 PendingSyncReply() 278 : syncRequestID(0) 279 , didReceiveReply(false) 280 { 281 } 282 283 explicit PendingSyncReply(uint64_t syncRequestID) 284 : syncRequestID(syncRequestID) 285 , didReceiveReply(0) 286 { 287 } 288 }; 289 290 class SyncMessageState; 291 friend class SyncMessageState; 292 RefPtr<SyncMessageState> m_syncMessageState; 293 294 Mutex m_syncReplyStateMutex; 295 bool m_shouldWaitForSyncReplies; 296 Vector<PendingSyncReply> m_pendingSyncReplies; 297 298 class SecondaryThreadPendingSyncReply; 299 typedef HashMap<uint64_t, SecondaryThreadPendingSyncReply*> SecondaryThreadPendingSyncReplyMap; 300 SecondaryThreadPendingSyncReplyMap m_secondaryThreadPendingSyncReplyMap; 301 302#if OS(DARWIN) 303 // Called on the connection queue. 304 void receiveSourceEventHandler(); 305 void initializeDeadNameSource(); 306 307 mach_port_t m_sendPort; 308 dispatch_source_t m_deadNameSource; 309 310 mach_port_t m_receivePort; 311 dispatch_source_t m_receivePortDataAvailableSource; 312 313#if !PLATFORM(IOS) 314 void exceptionSourceEventHandler(); 315 316 // If setShouldCloseConnectionOnMachExceptions has been called, this has 317 // the exception port that exceptions from the other end will be sent on. 318 mach_port_t m_exceptionPort; 319 dispatch_source_t m_exceptionPortDataAvailableSource; 320#endif 321 322 XPCPtr<xpc_connection_t> m_xpcConnection; 323 324#elif USE(UNIX_DOMAIN_SOCKETS) 325 // Called on the connection queue. 326 void readyReadHandler(); 327 bool processMessage(); 328 329 Vector<uint8_t> m_readBuffer; 330 size_t m_readBufferSize; 331 Vector<int> m_fileDescriptors; 332 size_t m_fileDescriptorsSize; 333 int m_socketDescriptor; 334#endif 335}; 336 337template<typename T> bool Connection::send(T&& message, uint64_t destinationID, unsigned messageSendFlags) 338{ 339 COMPILE_ASSERT(!T::isSync, AsyncMessageExpected); 340 341 auto encoder = std::make_unique<MessageEncoder>(T::receiverName(), T::name(), destinationID); 342 encoder->encode(message.arguments()); 343 344 return sendMessage(WTF::move(encoder), messageSendFlags); 345} 346 347template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned syncSendFlags) 348{ 349 COMPILE_ASSERT(T::isSync, SyncMessageExpected); 350 351 uint64_t syncRequestID = 0; 352 std::unique_ptr<MessageEncoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID); 353 354 // Encode the rest of the input arguments. 355 encoder->encode(message.arguments()); 356 357 // Now send the message and wait for a reply. 358 std::unique_ptr<MessageDecoder> replyDecoder = sendSyncMessage(syncRequestID, WTF::move(encoder), timeout, syncSendFlags); 359 if (!replyDecoder) 360 return false; 361 362 // Decode the reply. 363 return replyDecoder->decode(reply); 364} 365 366template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags) 367{ 368 std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForMessageFlags); 369 if (!decoder) 370 return false; 371 372 ASSERT(decoder->destinationID() == destinationID); 373 m_client->didReceiveMessage(this, *decoder); 374 return true; 375} 376 377} // namespace IPC 378 379#endif // Connection_h 380