diff --git a/build.gradle b/build.gradle index e224c79..bba333f 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ apply plugin: 'java' repositories { mavenCentral() - jcenter() +// jcenter() google() } @@ -18,8 +18,8 @@ dependencies { // Java 7+. These platforms lack support for TLS 1.2 and should not be used. But because // upgrading is difficult we will backport critical fixes to the 3.12.x branch // through December 31, 2020. - implementation 'com.squareup.okhttp3:okhttp:3.12.1' - implementation 'com.google.code.gson:gson:2.3.1' + implementation 'com.squareup.okhttp3:okhttp:4.12.0' + implementation 'com.google.code.gson:gson:2.8.9' testImplementation 'junit:junit:4.11' testImplementation 'org.hamcrest:hamcrest-all:1.3' diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e7ba06c..42c70ae 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip diff --git a/jitpack.yml b/jitpack.yml index 309debd..efde7bf 100644 --- a/jitpack.yml +++ b/jitpack.yml @@ -1,2 +1,2 @@ jdk: - - oraclejdk7 \ No newline at end of file + - openjdk17 diff --git a/src/main/java/com/hosopy/actioncable/Channel.java b/src/main/java/com/hosopy/actioncable/Channel.java index d4bcaac..a28c895 100644 --- a/src/main/java/com/hosopy/actioncable/Channel.java +++ b/src/main/java/com/hosopy/actioncable/Channel.java @@ -24,6 +24,7 @@ public class Channel { private final JsonObject params = new JsonObject(); private String identifier; + private String name; /** * Constructor @@ -31,6 +32,7 @@ public class Channel { * @param channel Channel name */ public Channel(String channel) { + this.name = channel; params.addProperty(KEY_CHANNEL, channel); } @@ -74,7 +76,8 @@ public void addParam(String key, JsonElement value) { addParamInternal(key, value); } - /*package*/ String toIdentifier() { + + public String toIdentifier() { synchronized (params) { if (identifier == null) { identifier = GSON.toJson(params); @@ -92,4 +95,8 @@ private void addParamInternal(String key, JsonElement value) { identifier = null; } } + + public String getName() { + return name; + } } diff --git a/src/main/java/com/hosopy/actioncable/Command.java b/src/main/java/com/hosopy/actioncable/Command.java index 659ba8f..80bf948 100644 --- a/src/main/java/com/hosopy/actioncable/Command.java +++ b/src/main/java/com/hosopy/actioncable/Command.java @@ -3,7 +3,7 @@ import com.google.gson.*; import com.google.gson.annotations.Expose; -class Command { +public class Command { private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create(); @@ -23,7 +23,7 @@ private Command(String command, String identifier) { this(command, identifier, null); } - private Command(String command, String identifier, String data) { + public Command(String command, String identifier, String data) { this.command = command; this.identifier = identifier; this.data = data; @@ -37,10 +37,14 @@ static Command unsubscribe(String identifier) { return new Command("unsubscribe", identifier); } - static Command message(String identifier, JsonObject params) { + public static Command message(String identifier, JsonObject params) { return new Command("message", identifier, params.toString()); } + static Command pong() { + return new Command("pong", null); + } + /*package*/ String toJson() { return GSON.toJson(this); } diff --git a/src/main/java/com/hosopy/actioncable/Connection.java b/src/main/java/com/hosopy/actioncable/Connection.java index de8932a..ec9cf5b 100644 --- a/src/main/java/com/hosopy/actioncable/Connection.java +++ b/src/main/java/com/hosopy/actioncable/Connection.java @@ -3,10 +3,7 @@ import com.hosopy.concurrent.EventLoop; import com.hosopy.util.QueryStringUtils; -import java.io.IOException; -import java.net.CookieHandler; import java.net.URI; -import java.time.Duration; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -20,8 +17,6 @@ import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; -import okio.Buffer; -import okio.ByteString; public class Connection { @@ -30,7 +25,8 @@ private enum State { CONNECTING, OPEN, CLOSING, - CLOSED + CLOSED, + IDEAL } /*package*/ interface Listener { @@ -90,6 +86,13 @@ public static class Options { */ public OkHttpClientFactory okHttpClientFactory; + /** + * Whether to pingPong is automatically. + *

+ *

If pingPong is true, the client attempts to response to the server ping by send 'pong' command.

+ */ + public boolean pingPong = false; + /** * The ping interval on how often a ping is sent over the websocket connection *

@@ -103,7 +106,7 @@ public interface OkHttpClientFactory { } } - private State state = State.CONNECTING; + private State state = State.IDEAL; private URI uri; @@ -111,10 +114,12 @@ public interface OkHttpClientFactory { private Listener listener; - private WebSocket webSocket; + private WebSocket mWebSocket; private boolean isReopening = false; + private GeneralListener generalListener; + /*package*/ Connection(URI uri, Options options) { this.uri = uri; this.options = options; @@ -131,6 +136,8 @@ public interface OkHttpClientFactory { public void run() { if (isOpen()) { fireOnFailure(new IllegalStateException("Must close existing connection before opening")); + } else if (isConnecting()) { + fireOnFailure(new IllegalStateException("Already connection, must close existing connection before opening")); } else { doOpen(); } @@ -142,11 +149,11 @@ public void run() { EventLoop.execute(new Runnable() { @Override public void run() { - if (webSocket != null) { + if (mWebSocket != null) { try { // http://tools.ietf.org/html/rfc6455#section-7.4.1 if (!isState(State.CLOSING, State.CLOSED)) { - webSocket.close(1000, "connection closed manually"); + mWebSocket.close(1000, "connection closed manually"); state = State.CLOSING; } } catch (IllegalStateException e) { @@ -167,7 +174,16 @@ public void run() { } /*package*/ boolean isOpen() { - return webSocket != null && isState(State.OPEN); + return mWebSocket != null && isState(State.OPEN); + } + /*package*/ boolean isConnecting() { + return isState(State.CONNECTING); + } + /*package*/ boolean isClosed() { + return isState(State.CLOSED); + } + /*package*/ boolean isClosing() { + return isState(State.CLOSING); } /*package*/ boolean send(final String data) { @@ -233,8 +249,10 @@ private void doOpen() { } private void doSend(String data) { - if (webSocket != null) { - webSocket.send(data); + if (mWebSocket != null) { + boolean isSent = mWebSocket.send(data); + if (isSent && generalListener != null) + generalListener.onSend(data); } } @@ -253,11 +271,14 @@ private void fireOnFailure(Exception e) { } } + /*package*/ void setGeneralListener(GeneralListener listener) { + generalListener = listener; + } private WebSocketListener webSocketListener = new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { Connection.this.state = State.OPEN; - Connection.this.webSocket = webSocket; + Connection.this.mWebSocket = webSocket; EventLoop.execute(new Runnable() { @Override public void run() { @@ -270,13 +291,17 @@ public void run() { @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { + if (mWebSocket != null && webSocket != mWebSocket) return; + Connection.this.state = State.CLOSED; + EventLoop.execute(new Runnable() { @Override public void run() { state = State.CLOSED; + Connection.this.mWebSocket = null; if (listener != null) { - listener.onFailure((Exception) t); + listener.onFailure(new WebSocketException(t)); } } }); @@ -296,12 +321,25 @@ public void run() { @Override public void onClosing(WebSocket webSocket, int code, String reason) { + if (mWebSocket != null && webSocket != mWebSocket) { + try { + webSocket.close(code, reason); + } catch (Exception e) { + } + return; + } Connection.this.state = State.CLOSING; EventLoop.execute(new Runnable() { @Override public void run() { state = State.CLOSING; + try{ + webSocket.close(code, reason); + Connection.this.mWebSocket = null; + } catch (IllegalStateException e) { + //do nothing + } if (listener != null) { listener.onClosing(); @@ -318,6 +356,7 @@ public void run() { @Override public void onClosed(WebSocket webSocket, int code, String reason) { + if (mWebSocket != null && webSocket != mWebSocket) return; Connection.this.state = State.CLOSED; EventLoop.execute(new Runnable() { diff --git a/src/main/java/com/hosopy/actioncable/ConnectionMonitor.java b/src/main/java/com/hosopy/actioncable/ConnectionMonitor.java index 54bf533..0f13d53 100644 --- a/src/main/java/com/hosopy/actioncable/ConnectionMonitor.java +++ b/src/main/java/com/hosopy/actioncable/ConnectionMonitor.java @@ -10,6 +10,8 @@ public class ConnectionMonitor { private static final int STALE_THRESHOLD = 6; // Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + private int staleThresholdInSecond = STALE_THRESHOLD; + private final Connection connection; private final ScheduledExecutorService pollExecutorService; @@ -31,6 +33,8 @@ public class ConnectionMonitor { private long stoppedAt = 0; // milliseconds private int reconnectAttempts = 0; + + private boolean pingPong = false; /*package*/ ConnectionMonitor(Connection connection, Connection.Options options) { this.connection = connection; @@ -40,6 +44,7 @@ public class ConnectionMonitor { this.reconnectionMaxAttempts = options.reconnectionMaxAttempts; this.reconnectionDelay = options.reconnectionDelay; this.reconnectionDelayMax = options.reconnectionDelayMax; + this.pingPong = options.pingPong; } /*package*/ void recordConnect() { @@ -54,12 +59,16 @@ public class ConnectionMonitor { /*package*/ void recordPing() { pingedAt = now(); + if(pingPong) { + connection.send(Command.pong().toJson()); + } } /*package*/ void start() { reset(); - stoppedAt = 0; + pingedAt = 0; startedAt = now(); + stoppedAt = 0; poll(); } @@ -92,11 +101,11 @@ private void reconnectIfStale() { } private boolean connectionIsStale() { - return secondsSince(pingedAt > 0 ? pingedAt : startedAt) > STALE_THRESHOLD; + return secondsSince(pingedAt > 0 ? pingedAt : startedAt) > staleThresholdInSecond; } private boolean disconnectedRecently() { - return disconnectedAt != 0 && secondsSince(disconnectedAt) < STALE_THRESHOLD; + return disconnectedAt != 0 && secondsSince(disconnectedAt) < staleThresholdInSecond; } private long getInterval() { @@ -104,6 +113,10 @@ private long getInterval() { return (long) clamp(interval, reconnectionDelay, reconnectionDelayMax) * 1000; } + public int getReconnectAttempts() { + return reconnectAttempts; + } + private static long secondsSince(long time) { return (now() - time) / 1000; } @@ -115,4 +128,9 @@ private static long now() { private static double clamp(double number, int min, int max) { return Math.max(min, Math.min(max, number)); } + + + /*package*/ void setStaleThresholdInSecond(int staleThresholdInSecond) { + this.staleThresholdInSecond = staleThresholdInSecond; + } } diff --git a/src/main/java/com/hosopy/actioncable/Consumer.java b/src/main/java/com/hosopy/actioncable/Consumer.java index d720b47..06457f2 100644 --- a/src/main/java/com/hosopy/actioncable/Consumer.java +++ b/src/main/java/com/hosopy/actioncable/Consumer.java @@ -28,6 +28,8 @@ public static class Options extends Connection.Options { private Subscriptions subscriptions; + private GeneralListener generalListener; + /*package*/ Consumer(URI uri, Options options) { this.subscriptions = new Subscriptions(this); this.connection = new Connection(uri, options); @@ -37,16 +39,19 @@ public static class Options extends Connection.Options { public void onOpen() { connectionMonitor.recordConnect(); subscriptions.reload(); + if (generalListener != null) generalListener.onOpen(); } @Override public void onFailure(Exception e) { subscriptions.notifyFailed(new ActionCableException(e)); + if (generalListener != null) generalListener.onFailure(new ActionCableException(e)); } @Override public void onMessage(String string) { final Message message = Message.fromJson(string); + if (generalListener != null) generalListener.onMessage(string); if (message.isWelcome()) { onOpen(); } else if (message.isPing()) { @@ -64,10 +69,12 @@ public void onMessage(String string) { public void onClosing() { subscriptions.notifyDisconnected(); connectionMonitor.recordDisconnect(); + if (generalListener != null) generalListener.onClosing(); } @Override public void onClosed() { + if (generalListener != null) generalListener.onClosed(); } }); } @@ -107,15 +114,55 @@ public void unsubscribeAndDisconnect() { connectionMonitor.stop(); } - /*package*/ boolean send(Command command) { + public void setGeneralListener(GeneralListener listener) { + generalListener = listener; + connection.setGeneralListener(listener); + } + + public boolean send(Command command) { return connection.send(command.toJson()); } + public void setStaleThresholdInSecond(int staleThresholdInSecond){ + if(connectionMonitor !=null) connectionMonitor.setStaleThresholdInSecond(staleThresholdInSecond); + } + + @Deprecated + public boolean hasSubscription(String channel) { + return subscriptions != null && subscriptions.hasSubscription(channel); + } + + public boolean hasChannel(String channel) { + return subscriptions != null && subscriptions.hasChannel(channel); + } + + public boolean hasSubscriptionFor(String identifier) { + return subscriptions != null && subscriptions.hasSubscriptionOf(identifier); + } + public Connection getConnection() { return connection; } + public boolean isConnecting() { + return this.connection != null && this.connection.isConnecting(); + } + public boolean isConnected() { return this.connection != null && this.connection.isOpen(); } + + public boolean isClosing() { + return this.connection != null && this.connection.isClosing(); + } + + public boolean isClosed() { + return this.connection == null || this.connection.isClosed(); + } + + public Integer getReconnectAttempts() { + if (connectionMonitor != null) + return connectionMonitor.getReconnectAttempts(); + else return null; + } } diff --git a/src/main/java/com/hosopy/actioncable/GeneralListener.java b/src/main/java/com/hosopy/actioncable/GeneralListener.java new file mode 100644 index 0000000..816eafe --- /dev/null +++ b/src/main/java/com/hosopy/actioncable/GeneralListener.java @@ -0,0 +1,21 @@ +package com.hosopy.actioncable; + + +/** + * GeneralListener provides a number of callbacks for calling remote procedure calls + * on the corresponding Channel instance on the server side. + * + */ +public interface GeneralListener { + public void onMessage(String string); + /** + * onOpen can be called multi-times + * e.g.[on connection establish, on welcome message received] + * + */ + public void onOpen(); + public void onSend(String data); + public void onClosing(); + public void onClosed(); + public void onFailure(Exception e); +} diff --git a/src/main/java/com/hosopy/actioncable/SubscriptionProxy.java b/src/main/java/com/hosopy/actioncable/SubscriptionProxy.java index 3ed2dc6..42bacf7 100644 --- a/src/main/java/com/hosopy/actioncable/SubscriptionProxy.java +++ b/src/main/java/com/hosopy/actioncable/SubscriptionProxy.java @@ -45,6 +45,10 @@ public class SubscriptionProxy { return channel.toIdentifier(); } + /*package*/ String getChannelName() { + return channel.getName(); + } + /*package*/ void onConnected(Subscription.ConnectedCallback callback) { onConnected = callback; } diff --git a/src/main/java/com/hosopy/actioncable/Subscriptions.java b/src/main/java/com/hosopy/actioncable/Subscriptions.java index bf566ad..0a16863 100644 --- a/src/main/java/com/hosopy/actioncable/Subscriptions.java +++ b/src/main/java/com/hosopy/actioncable/Subscriptions.java @@ -64,6 +64,32 @@ public void remove(Subscription subscription) { } } + /** + * Remove subscription from collection. + * + * @param channelName channel name of an exist subscription to remove + */ + @Deprecated + public void remove(String channelName) { + for (SubscriptionProxy subscription : subscriptionProxies.values()) { + if (subscription.getChannelName().equals(channelName)) { + remove(subscription.getProxy()); + } + } + } + /** + * Remove subscription from collection. + * + * @param identifier channel name of an exist subscription to remove + */ + public void removeByIdentifier(String identifier) { + for (SubscriptionProxy subscription : subscriptionProxies.values()) { + if (subscription.getIdentifier().equals(identifier)) { + remove(subscription.getProxy()); + } + } + } + /** * Remove all subscriptions from collection. */ @@ -136,4 +162,33 @@ private void forget(Subscription subscription) { private boolean sendSubscribeCommand(SubscriptionProxy subscriptionProxy) { return consumer.send(Command.subscribe(subscriptionProxy.getIdentifier())); } + + @Deprecated(since = "use hasChannel()") + /*package*/ boolean hasSubscription(String channelName) { + for (SubscriptionProxy subscription : subscriptionProxies.values()) { + if (subscription.getChannelName().equals(channelName)) { + return true; + } + } + return false; + } + + /*package*/ boolean hasChannel(String channelName) { + for (SubscriptionProxy subscription : subscriptionProxies.values()) { + if (subscription.getChannelName().equals(channelName)) { + return true; + } + } + return false; + } + + /*package*/ boolean hasSubscriptionOf(String identifier) { + for (SubscriptionProxy subscriptionProxy : subscriptionProxies.values()) { + if (subscriptionProxy.getIdentifier().equals(identifier)) { + return true; + } + } + return false; + } + } diff --git a/src/main/java/com/hosopy/actioncable/WebSocketException.java b/src/main/java/com/hosopy/actioncable/WebSocketException.java new file mode 100644 index 0000000..1842165 --- /dev/null +++ b/src/main/java/com/hosopy/actioncable/WebSocketException.java @@ -0,0 +1,8 @@ +package com.hosopy.actioncable; + +public class WebSocketException extends Exception { + + /*package*/ WebSocketException(Throwable t) { + super(t); + } +}