1/*
2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies)
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc.  All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25 * THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifndef Connection_h
29#define Connection_h
30
31#include "Arguments.h"
32#include "MessageDecoder.h"
33#include "MessageEncoder.h"
34#include "MessageReceiver.h"
35#include "WorkQueue.h"
36#include <atomic>
37#include <condition_variable>
38#include <wtf/Deque.h>
39#include <wtf/Forward.h>
40#include <wtf/PassRefPtr.h>
41#include <wtf/text/CString.h>
42
43#if OS(DARWIN)
44#include "XPCPtr.h"
45#include <mach/mach_port.h>
46#endif
47
48#if PLATFORM(GTK) || PLATFORM(EFL)
49#include "PlatformProcessIdentifier.h"
50#endif
51
52namespace WTF {
53class RunLoop;
54}
55
56namespace IPC {
57
58struct WaitForMessageState;
59
60enum MessageSendFlags {
61    // Whether this message should be dispatched when waiting for a sync reply.
62    // This is the default for synchronous messages.
63    DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0,
64};
65
66enum SyncMessageSendFlags {
67    // Use this to inform that this sync call will suspend this process until the user responds with input.
68    InformPlatformProcessWillSuspend = 1 << 0,
69    // Some platform accessibility clients can't suspend gracefully and need to spin the run loop so WebProcess doesn't hang.
70    // FIXME (126021): Remove when no platforms need to support this.
71    SpinRunLoopWhileWaitingForReply = 1 << 1,
72};
73
74enum WaitForMessageFlags {
75    // Use this to make waitForMessage be interrupted immediately by any incoming sync messages.
76    InterruptWaitingIfSyncMessageArrives = 1 << 0,
77};
78
79#define MESSAGE_CHECK_BASE(assertion, connection) do \
80    if (!(assertion)) { \
81        ASSERT(assertion); \
82        (connection)->markCurrentlyDispatchedMessageAsInvalid(); \
83        return; \
84    } \
85while (0)
86
87class Connection : public ThreadSafeRefCounted<Connection> {
88public:
89    class Client : public MessageReceiver {
90    public:
91        virtual void didClose(Connection*) = 0;
92        virtual void didReceiveInvalidMessage(Connection*, StringReference messageReceiverName, StringReference messageName) = 0;
93
94    protected:
95        virtual ~Client() { }
96    };
97
98    class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> {
99    };
100
101#if OS(DARWIN)
102    struct Identifier {
103        Identifier()
104            : port(MACH_PORT_NULL)
105        {
106        }
107
108        Identifier(mach_port_t port)
109            : port(port)
110        {
111        }
112
113        Identifier(mach_port_t port, XPCPtr<xpc_connection_t> xpcConnection)
114            : port(port)
115            , xpcConnection(WTF::move(xpcConnection))
116        {
117        }
118
119        mach_port_t port;
120        XPCPtr<xpc_connection_t> xpcConnection;
121    };
122    static bool identifierIsNull(Identifier identifier) { return identifier.port == MACH_PORT_NULL; }
123    xpc_connection_t xpcConnection() { return m_xpcConnection.get(); }
124    bool getAuditToken(audit_token_t&);
125#elif USE(UNIX_DOMAIN_SOCKETS)
126    typedef int Identifier;
127    static bool identifierIsNull(Identifier identifier) { return !identifier; }
128
129    struct SocketPair {
130        int client;
131        int server;
132    };
133
134    enum ConnectionOptions {
135        SetCloexecOnClient = 1 << 0,
136        SetCloexecOnServer = 1 << 1,
137    };
138
139    static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer);
140#endif
141
142    static PassRefPtr<Connection> createServerConnection(Identifier, Client*, WTF::RunLoop& clientRunLoop);
143    static PassRefPtr<Connection> createClientConnection(Identifier, Client*, WTF::RunLoop& clientRunLoop);
144    ~Connection();
145
146    Client* client() const { return m_client; }
147
148#if PLATFORM(MAC)
149    void setShouldCloseConnectionOnMachExceptions();
150#endif
151
152    void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool);
153    void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure);
154
155    // The set callback will be called on the connection work queue when the connection is closed,
156    // before didCall is called on the client thread. Must be called before the connection is opened.
157    // In the future we might want a more generic way to handle sync or async messages directly
158    // on the work queue, for example if we want to handle them on some other thread we could avoid
159    // handling the message on the client thread first.
160    typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*);
161    void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback);
162
163    void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*);
164    void removeWorkQueueMessageReceiver(StringReference messageReceiverName);
165
166    bool open();
167    void invalidate();
168    void markCurrentlyDispatchedMessageAsInvalid();
169
170    void postConnectionDidCloseOnConnectionWorkQueue();
171
172    template<typename T> bool send(T&& message, uint64_t destinationID, unsigned messageSendFlags = 0);
173    template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), unsigned syncSendFlags = 0);
174    template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags = 0);
175
176    std::unique_ptr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID);
177    bool sendMessage(std::unique_ptr<MessageEncoder>, unsigned messageSendFlags = 0);
178    std::unique_ptr<MessageDecoder> sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<MessageEncoder>, std::chrono::milliseconds timeout, unsigned syncSendFlags = 0);
179    std::unique_ptr<MessageDecoder> sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, std::unique_ptr<MessageEncoder>, std::chrono::milliseconds timeout);
180    bool sendSyncReply(std::unique_ptr<MessageEncoder>);
181
182    void wakeUpRunLoop();
183
184    void incrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { ++m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; }
185    void decrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { --m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; }
186
187    bool inSendSync() const { return m_inSendSyncCount; }
188
189    Identifier identifier() const;
190
191#if PLATFORM(COCOA)
192    bool kill();
193    void terminateSoon(double intervalInSeconds);
194#endif
195
196private:
197    Connection(Identifier, bool isServer, Client*, WTF::RunLoop& clientRunLoop);
198    void platformInitialize(Identifier);
199    void platformInvalidate();
200
201    bool isValid() const { return m_client; }
202
203    std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags);
204
205    std::unique_ptr<MessageDecoder> waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags);
206
207    // Called on the connection work queue.
208    void processIncomingMessage(std::unique_ptr<MessageDecoder>);
209    void processIncomingSyncReply(std::unique_ptr<MessageDecoder>);
210
211    void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver*, MessageDecoder*);
212
213    bool canSendOutgoingMessages() const;
214    bool platformCanSendOutgoingMessages() const;
215    void sendOutgoingMessages();
216    bool sendOutgoingMessage(std::unique_ptr<MessageEncoder>);
217    void connectionDidClose();
218
219    // Called on the listener thread.
220    void dispatchOneMessage();
221    void dispatchMessage(std::unique_ptr<MessageDecoder>);
222    void dispatchMessage(MessageDecoder&);
223    void dispatchSyncMessage(MessageDecoder&);
224    void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString);
225    void didFailToSendSyncMessage();
226
227    // Can be called on any thread.
228    void enqueueIncomingMessage(std::unique_ptr<MessageDecoder>);
229
230    void willSendSyncMessage(unsigned syncSendFlags);
231    void didReceiveSyncReply(unsigned syncSendFlags);
232
233    Client* m_client;
234    bool m_isServer;
235    std::atomic<uint64_t> m_syncRequestID;
236
237    bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage;
238    bool m_shouldExitOnSyncMessageSendFailure;
239    DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback;
240
241    bool m_isConnected;
242    RefPtr<WorkQueue> m_connectionQueue;
243    WTF::RunLoop& m_clientRunLoop;
244
245    HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>> m_workQueueMessageReceivers;
246
247    unsigned m_inSendSyncCount;
248    unsigned m_inDispatchMessageCount;
249    unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount;
250    bool m_didReceiveInvalidMessage;
251
252    // Incoming messages.
253    Mutex m_incomingMessagesLock;
254    Deque<std::unique_ptr<MessageDecoder>> m_incomingMessages;
255
256    // Outgoing messages.
257    Mutex m_outgoingMessagesLock;
258    Deque<std::unique_ptr<MessageEncoder>> m_outgoingMessages;
259
260    std::condition_variable m_waitForMessageCondition;
261    std::mutex m_waitForMessageMutex;
262
263    WaitForMessageState* m_waitingForMessage;
264
265    // Represents a sync request for which we're waiting on a reply.
266    struct PendingSyncReply {
267        // The request ID.
268        uint64_t syncRequestID;
269
270        // The reply decoder, will be null if there was an error processing the sync
271        // message on the other side.
272        std::unique_ptr<MessageDecoder> replyDecoder;
273
274        // Will be set to true once a reply has been received.
275        bool didReceiveReply;
276
277        PendingSyncReply()
278            : syncRequestID(0)
279            , didReceiveReply(false)
280        {
281        }
282
283        explicit PendingSyncReply(uint64_t syncRequestID)
284            : syncRequestID(syncRequestID)
285            , didReceiveReply(0)
286        {
287        }
288    };
289
290    class SyncMessageState;
291    friend class SyncMessageState;
292    RefPtr<SyncMessageState> m_syncMessageState;
293
294    Mutex m_syncReplyStateMutex;
295    bool m_shouldWaitForSyncReplies;
296    Vector<PendingSyncReply> m_pendingSyncReplies;
297
298    class SecondaryThreadPendingSyncReply;
299    typedef HashMap<uint64_t, SecondaryThreadPendingSyncReply*> SecondaryThreadPendingSyncReplyMap;
300    SecondaryThreadPendingSyncReplyMap m_secondaryThreadPendingSyncReplyMap;
301
302#if OS(DARWIN)
303    // Called on the connection queue.
304    void receiveSourceEventHandler();
305    void initializeDeadNameSource();
306
307    mach_port_t m_sendPort;
308    dispatch_source_t m_deadNameSource;
309
310    mach_port_t m_receivePort;
311    dispatch_source_t m_receivePortDataAvailableSource;
312
313#if !PLATFORM(IOS)
314    void exceptionSourceEventHandler();
315
316    // If setShouldCloseConnectionOnMachExceptions has been called, this has
317    // the exception port that exceptions from the other end will be sent on.
318    mach_port_t m_exceptionPort;
319    dispatch_source_t m_exceptionPortDataAvailableSource;
320#endif
321
322    XPCPtr<xpc_connection_t> m_xpcConnection;
323
324#elif USE(UNIX_DOMAIN_SOCKETS)
325    // Called on the connection queue.
326    void readyReadHandler();
327    bool processMessage();
328
329    Vector<uint8_t> m_readBuffer;
330    size_t m_readBufferSize;
331    Vector<int> m_fileDescriptors;
332    size_t m_fileDescriptorsSize;
333    int m_socketDescriptor;
334#endif
335};
336
337template<typename T> bool Connection::send(T&& message, uint64_t destinationID, unsigned messageSendFlags)
338{
339    COMPILE_ASSERT(!T::isSync, AsyncMessageExpected);
340
341    auto encoder = std::make_unique<MessageEncoder>(T::receiverName(), T::name(), destinationID);
342    encoder->encode(message.arguments());
343
344    return sendMessage(WTF::move(encoder), messageSendFlags);
345}
346
347template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned syncSendFlags)
348{
349    COMPILE_ASSERT(T::isSync, SyncMessageExpected);
350
351    uint64_t syncRequestID = 0;
352    std::unique_ptr<MessageEncoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID);
353
354    // Encode the rest of the input arguments.
355    encoder->encode(message.arguments());
356
357    // Now send the message and wait for a reply.
358    std::unique_ptr<MessageDecoder> replyDecoder = sendSyncMessage(syncRequestID, WTF::move(encoder), timeout, syncSendFlags);
359    if (!replyDecoder)
360        return false;
361
362    // Decode the reply.
363    return replyDecoder->decode(reply);
364}
365
366template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags)
367{
368    std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForMessageFlags);
369    if (!decoder)
370        return false;
371
372    ASSERT(decoder->destinationID() == destinationID);
373    m_client->didReceiveMessage(this, *decoder);
374    return true;
375}
376
377} // namespace IPC
378
379#endif // Connection_h
380