1/*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http.internal.websocket;
27
28import jdk.incubator.http.WebSocket;
29import jdk.incubator.http.internal.common.Log;
30import jdk.incubator.http.internal.common.Pair;
31import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
32import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
33import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
34import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
35import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
36import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
37import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
38
39import java.io.IOException;
40import java.net.ProtocolException;
41import java.net.URI;
42import java.nio.ByteBuffer;
43import java.util.Queue;
44import java.util.concurrent.CompletableFuture;
45import java.util.concurrent.CompletionStage;
46import java.util.concurrent.ConcurrentLinkedQueue;
47import java.util.concurrent.atomic.AtomicBoolean;
48import java.util.function.Consumer;
49import java.util.function.Function;
50
51import static java.util.Objects.requireNonNull;
52import static java.util.concurrent.CompletableFuture.failedFuture;
53import static jdk.incubator.http.internal.common.Pair.pair;
54import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
55import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
56import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
57
58/*
59 * A WebSocket client.
60 */
61final class WebSocketImpl implements WebSocket {
62
63    private final URI uri;
64    private final String subprotocol;
65    private final RawChannel channel;
66    private final Listener listener;
67
68    /*
69     * Whether or not Listener.onClose or Listener.onError has been already
70     * invoked. We keep track of this since only one of these methods is invoked
71     * and it is invoked at most once.
72     */
73    private boolean lastMethodInvoked;
74    private final AtomicBoolean outstandingSend = new AtomicBoolean();
75    private final CooperativeHandler sendHandler =
76              new CooperativeHandler(this::sendFirst);
77    private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
78            queue = new ConcurrentLinkedQueue<>();
79    private final Context context = new OutgoingMessage.Context();
80    private final Transmitter transmitter;
81    private final Receiver receiver;
82
83    /*
84     * Whether or not the WebSocket has been closed. When a WebSocket has been
85     * closed it means that no further messages can be sent or received.
86     * A closure can be triggered by:
87     *
88     *   1. abort()
89     *   2. "Failing the WebSocket Connection" (i.e. a fatal error)
90     *   3. Completion of the Closing handshake
91     */
92    private final AtomicBoolean closed = new AtomicBoolean();
93
94    /*
95     * This lock is enforcing sequential ordering of invocations to listener's
96     * methods. It is supposed to be uncontended. The only contention that can
97     * happen is when onOpen, an asynchronous onError (not related to reading
98     * from the channel, e.g. an error from automatic Pong reply) or onClose
99     * (related to abort) happens. Since all of the above are one-shot actions,
100     * the said contention is insignificant.
101     */
102    private final Object lock = new Object();
103
104    private final CompletableFuture<?> closeReceived = new CompletableFuture<>();
105    private final CompletableFuture<?> closeSent = new CompletableFuture<>();
106
107    static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
108        Function<Result, WebSocket> newWebSocket = r -> {
109            WebSocketImpl ws = new WebSocketImpl(b.getUri(),
110                                                 r.subprotocol,
111                                                 r.channel,
112                                                 b.getListener());
113            // The order of calls might cause a subtle effects, like CF will be
114            // returned from the buildAsync _after_ onOpen has been signalled.
115            // This means if onOpen is lengthy, it might cause some problems.
116            ws.signalOpen();
117            return ws;
118        };
119        OpeningHandshake h;
120        try {
121            h = new OpeningHandshake(b);
122        } catch (IllegalArgumentException e) {
123            return failedFuture(e);
124        }
125        return h.send().thenApply(newWebSocket);
126    }
127
128    WebSocketImpl(URI uri,
129                  String subprotocol,
130                  RawChannel channel,
131                  Listener listener)
132    {
133        this.uri = requireNonNull(uri);
134        this.subprotocol = requireNonNull(subprotocol);
135        this.channel = requireNonNull(channel);
136        this.listener = requireNonNull(listener);
137        this.transmitter = new Transmitter(channel);
138        this.receiver = new Receiver(messageConsumerOf(listener), channel);
139
140        // Set up the Closing Handshake action
141        CompletableFuture.allOf(closeReceived, closeSent)
142                .whenComplete((result, error) -> {
143                    try {
144                        channel.close();
145                    } catch (IOException e) {
146                        Log.logError(e);
147                    } finally {
148                        closed.set(true);
149                    }
150                });
151    }
152
153    /*
154     * This initialisation is outside of the constructor for the sake of
155     * safe publication.
156     */
157    private void signalOpen() {
158        synchronized (lock) {
159            // TODO: might hold lock longer than needed causing prolonged
160            // contention? substitute lock for ConcurrentLinkedQueue<Runnable>?
161            try {
162                listener.onOpen(this);
163            } catch (Exception e) {
164                signalError(e);
165            }
166        }
167    }
168
169    private void signalError(Throwable error) {
170        synchronized (lock) {
171            if (lastMethodInvoked) {
172                Log.logError(error);
173            } else {
174                lastMethodInvoked = true;
175                receiver.close();
176                try {
177                    listener.onError(this, error);
178                } catch (Exception e) {
179                    Log.logError(e);
180                }
181            }
182        }
183    }
184
185    /*
186     * Processes a Close event that came from the channel. Invoked at most once.
187     */
188    private void processClose(int statusCode, String reason) {
189        receiver.close();
190        try {
191            channel.shutdownInput();
192        } catch (IOException e) {
193            Log.logError(e);
194        }
195        boolean alreadyCompleted = !closeReceived.complete(null);
196        if (alreadyCompleted) {
197            // This CF is supposed to be completed only once, the first time a
198            // Close message is received. No further messages are pulled from
199            // the socket.
200            throw new InternalError();
201        }
202        int code;
203        if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
204            code = NORMAL_CLOSURE;
205        } else {
206            code = statusCode;
207        }
208        CompletionStage<?> readyToClose = signalClose(statusCode, reason);
209        if (readyToClose == null) {
210            readyToClose = CompletableFuture.completedFuture(null);
211        }
212        readyToClose.whenComplete((r, error) -> {
213            enqueueClose(new Close(code, ""))
214                    .whenComplete((r1, error1) -> {
215                        if (error1 != null) {
216                            Log.logError(error1);
217                        }
218                    });
219        });
220    }
221
222    /*
223     * Signals a Close event (might not correspond to anything happened on the
224     * channel, e.g. `abort()`).
225     */
226    private CompletionStage<?> signalClose(int statusCode, String reason) {
227        synchronized (lock) {
228            if (lastMethodInvoked) {
229                Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
230            } else {
231                lastMethodInvoked = true;
232                receiver.close();
233                try {
234                    return listener.onClose(this, statusCode, reason);
235                } catch (Exception e) {
236                    Log.logError(e);
237                }
238            }
239        }
240        return null;
241    }
242
243    @Override
244    public CompletableFuture<WebSocket> sendText(CharSequence message,
245                                                 boolean isLast)
246    {
247        return enqueueExclusively(new Text(message, isLast));
248    }
249
250    @Override
251    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
252                                                   boolean isLast)
253    {
254        return enqueueExclusively(new Binary(message, isLast));
255    }
256
257    @Override
258    public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
259        return enqueueExclusively(new Ping(message));
260    }
261
262    @Override
263    public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
264        return enqueueExclusively(new Pong(message));
265    }
266
267    @Override
268    public CompletableFuture<WebSocket> sendClose(int statusCode,
269                                                  String reason) {
270        if (!isLegalToSendFromClient(statusCode)) {
271            return failedFuture(
272                    new IllegalArgumentException("statusCode: " + statusCode));
273        }
274        Close msg;
275        try {
276            msg = new Close(statusCode, reason);
277        } catch (IllegalArgumentException e) {
278            return failedFuture(e);
279        }
280        return enqueueClose(msg);
281    }
282
283    /*
284     * Sends a Close message with the given contents and then shuts down the
285     * channel for writing since no more messages are expected to be sent after
286     * this. Invoked at most once.
287     */
288    private CompletableFuture<WebSocket> enqueueClose(Close m) {
289        return enqueue(m).whenComplete((r, error) -> {
290            try {
291                channel.shutdownOutput();
292            } catch (IOException e) {
293                Log.logError(e);
294            }
295            boolean alreadyCompleted = !closeSent.complete(null);
296            if (alreadyCompleted) {
297                // Shouldn't happen as this callback must run at most once
298                throw new InternalError();
299            }
300        });
301    }
302
303    /*
304     * Accepts the given message into the outgoing queue in a mutually-exclusive
305     * fashion in respect to other messages accepted through this method. No
306     * further messages will be accepted until the returned CompletableFuture
307     * completes. This method is used to enforce "one outstanding send
308     * operation" policy.
309     */
310    private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m)
311    {
312        if (closed.get()) {
313            return failedFuture(new IllegalStateException("Closed"));
314        }
315        if (!outstandingSend.compareAndSet(false, true)) {
316            return failedFuture(new IllegalStateException("Outstanding send"));
317        }
318        return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
319    }
320
321    private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
322        CompletableFuture<WebSocket> cf = new CompletableFuture<>();
323        boolean added = queue.add(pair(m, cf));
324        if (!added) {
325            // The queue is supposed to be unbounded
326            throw new InternalError();
327        }
328        sendHandler.handle();
329        return cf;
330    }
331
332    /*
333     * This is the main sending method. It may be run in different threads,
334     * but never concurrently.
335     */
336    private void sendFirst(Runnable whenSent) {
337        Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
338        if (p == null) {
339            whenSent.run();
340            return;
341        }
342        OutgoingMessage message = p.first;
343        CompletableFuture<WebSocket> cf = p.second;
344        try {
345            message.contextualize(context);
346            Consumer<Exception> h = e -> {
347                if (e == null) {
348                    cf.complete(WebSocketImpl.this);
349                } else {
350                    cf.completeExceptionally(e);
351                }
352                sendHandler.handle();
353                whenSent.run();
354            };
355            transmitter.send(message, h);
356        } catch (Exception t) {
357            cf.completeExceptionally(t);
358        }
359    }
360
361    @Override
362    public void request(long n) {
363        receiver.request(n);
364    }
365
366    @Override
367    public String getSubprotocol() {
368        return subprotocol;
369    }
370
371    @Override
372    public boolean isClosed() {
373        return closed.get();
374    }
375
376    @Override
377    public void abort() throws IOException {
378        try {
379            channel.close();
380        } finally {
381            closed.set(true);
382            signalClose(CLOSED_ABNORMALLY, "");
383        }
384    }
385
386    @Override
387    public String toString() {
388        return super.toString()
389                + "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
390                + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
391    }
392
393    private MessageStreamConsumer messageConsumerOf(Listener listener) {
394        // Synchronization performed here in every method is not for the sake of
395        // ordering invocations to this consumer, after all they are naturally
396        // ordered in the channel. The reason is to avoid an interference with
397        // any unrelated to the channel calls to onOpen, onClose and onError.
398        return new MessageStreamConsumer() {
399
400            @Override
401            public void onText(MessagePart part, CharSequence data) {
402                receiver.acknowledge();
403                synchronized (WebSocketImpl.this.lock) {
404                    try {
405                        listener.onText(WebSocketImpl.this, data, part);
406                    } catch (Exception e) {
407                        signalError(e);
408                    }
409                }
410            }
411
412            @Override
413            public void onBinary(MessagePart part, ByteBuffer data) {
414                receiver.acknowledge();
415                synchronized (WebSocketImpl.this.lock) {
416                    try {
417                        listener.onBinary(WebSocketImpl.this, data.slice(), part);
418                    } catch (Exception e) {
419                        signalError(e);
420                    }
421                }
422            }
423
424            @Override
425            public void onPing(ByteBuffer data) {
426                receiver.acknowledge();
427                // Let's make a full copy of this tiny data. What we want here
428                // is to rule out a possibility the shared data we send might be
429                // corrupted the by processing in the listener.
430                ByteBuffer slice = data.slice();
431                ByteBuffer copy = ByteBuffer.allocate(data.remaining())
432                        .put(data)
433                        .flip();
434                // Non-exclusive send;
435                CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
436                pongSent.whenComplete(
437                        (r, error) -> {
438                            if (error != null) {
439                                WebSocketImpl.this.signalError(error);
440                            }
441                        }
442                );
443                synchronized (WebSocketImpl.this.lock) {
444                    try {
445                        listener.onPing(WebSocketImpl.this, slice);
446                    } catch (Exception e) {
447                        signalError(e);
448                    }
449                }
450            }
451
452            @Override
453            public void onPong(ByteBuffer data) {
454                receiver.acknowledge();
455                synchronized (WebSocketImpl.this.lock) {
456                    try {
457                        listener.onPong(WebSocketImpl.this, data.slice());
458                    } catch (Exception e) {
459                        signalError(e);
460                    }
461                }
462            }
463
464            @Override
465            public void onClose(int statusCode, CharSequence reason) {
466                receiver.acknowledge();
467                processClose(statusCode, reason.toString());
468            }
469
470            @Override
471            public void onError(Exception error) {
472                // An signalError doesn't necessarily mean we must signalClose
473                // the WebSocket. However, if it's something the WebSocket
474                // Specification recognizes as a reason for "Failing the
475                // WebSocket Connection", then we must do so, but BEFORE
476                // notifying the Listener.
477                if (!(error instanceof FailWebSocketException)) {
478                    signalError(error);
479                } else {
480                    Exception ex = (Exception) new ProtocolException().initCause(error);
481                    int code = ((FailWebSocketException) error).getStatusCode();
482                    enqueueClose(new Close(code, ""))
483                            .whenComplete((r, e) -> {
484                                if (e != null) {
485                                    ex.addSuppressed(e);
486                                }
487                                try {
488                                    channel.close();
489                                } catch (IOException e1) {
490                                    ex.addSuppressed(e1);
491                                } finally {
492                                    closed.set(true);
493                                }
494                                signalError(ex);
495                            });
496                }
497            }
498
499            @Override
500            public void onComplete() {
501                processClose(CLOSED_ABNORMALLY, "");
502            }
503        };
504    }
505}
506