Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ apply plugin: 'java'

repositories {
mavenCentral()
jcenter()
// jcenter()
google()
}

Expand All @@ -18,8 +18,8 @@ dependencies {
// Java 7+. These platforms lack support for TLS 1.2 and should not be used. But because
// upgrading is difficult we will backport critical fixes to the 3.12.x branch
// through December 31, 2020.
implementation 'com.squareup.okhttp3:okhttp:3.12.1'
implementation 'com.google.code.gson:gson:2.3.1'
implementation 'com.squareup.okhttp3:okhttp:4.12.0'
implementation 'com.google.code.gson:gson:2.8.9'

testImplementation 'junit:junit:4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
2 changes: 1 addition & 1 deletion jitpack.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
jdk:
- oraclejdk7
- openjdk17
9 changes: 8 additions & 1 deletion src/main/java/com/hosopy/actioncable/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ public class Channel {
private final JsonObject params = new JsonObject();

private String identifier;
private String name;

/**
* Constructor
*
* @param channel Channel name
*/
public Channel(String channel) {
this.name = channel;
params.addProperty(KEY_CHANNEL, channel);
}

Expand Down Expand Up @@ -74,7 +76,8 @@ public void addParam(String key, JsonElement value) {
addParamInternal(key, value);
}

/*package*/ String toIdentifier() {

public String toIdentifier() {
synchronized (params) {
if (identifier == null) {
identifier = GSON.toJson(params);
Expand All @@ -92,4 +95,8 @@ private void addParamInternal(String key, JsonElement value) {
identifier = null;
}
}

public String getName() {
return name;
}
}
10 changes: 7 additions & 3 deletions src/main/java/com/hosopy/actioncable/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.google.gson.*;
import com.google.gson.annotations.Expose;

class Command {
public class Command {

private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();

Expand All @@ -23,7 +23,7 @@ private Command(String command, String identifier) {
this(command, identifier, null);
}

private Command(String command, String identifier, String data) {
public Command(String command, String identifier, String data) {
this.command = command;
this.identifier = identifier;
this.data = data;
Expand All @@ -37,10 +37,14 @@ static Command unsubscribe(String identifier) {
return new Command("unsubscribe", identifier);
}

static Command message(String identifier, JsonObject params) {
public static Command message(String identifier, JsonObject params) {
return new Command("message", identifier, params.toString());
}

static Command pong() {
return new Command("pong", null);
}

/*package*/ String toJson() {
return GSON.toJson(this);
}
Expand Down
69 changes: 54 additions & 15 deletions src/main/java/com/hosopy/actioncable/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import com.hosopy.concurrent.EventLoop;
import com.hosopy.util.QueryStringUtils;

import java.io.IOException;
import java.net.CookieHandler;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -20,8 +17,6 @@
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.Buffer;
import okio.ByteString;


public class Connection {
Expand All @@ -30,7 +25,8 @@ private enum State {
CONNECTING,
OPEN,
CLOSING,
CLOSED
CLOSED,
IDEAL
}

/*package*/ interface Listener {
Expand Down Expand Up @@ -90,6 +86,13 @@ public static class Options {
*/
public OkHttpClientFactory okHttpClientFactory;

/**
* Whether to pingPong is automatically.
* <p/>
* <p>If pingPong is true, the client attempts to response to the server ping by send 'pong' command.</p>
*/
public boolean pingPong = false;

/**
* The ping interval on how often a ping is sent over the websocket connection
* <p/>
Expand All @@ -103,18 +106,20 @@ public interface OkHttpClientFactory {
}
}

private State state = State.CONNECTING;
private State state = State.IDEAL;

private URI uri;

private Options options;

private Listener listener;

private WebSocket webSocket;
private WebSocket mWebSocket;

private boolean isReopening = false;

private GeneralListener generalListener;

/*package*/ Connection(URI uri, Options options) {
this.uri = uri;
this.options = options;
Expand All @@ -131,6 +136,8 @@ public interface OkHttpClientFactory {
public void run() {
if (isOpen()) {
fireOnFailure(new IllegalStateException("Must close existing connection before opening"));
} else if (isConnecting()) {
fireOnFailure(new IllegalStateException("Already connection, must close existing connection before opening"));
} else {
doOpen();
}
Expand All @@ -142,11 +149,11 @@ public void run() {
EventLoop.execute(new Runnable() {
@Override
public void run() {
if (webSocket != null) {
if (mWebSocket != null) {
try {
// http://tools.ietf.org/html/rfc6455#section-7.4.1
if (!isState(State.CLOSING, State.CLOSED)) {
webSocket.close(1000, "connection closed manually");
mWebSocket.close(1000, "connection closed manually");
state = State.CLOSING;
}
} catch (IllegalStateException e) {
Expand All @@ -167,7 +174,16 @@ public void run() {
}

/*package*/ boolean isOpen() {
return webSocket != null && isState(State.OPEN);
return mWebSocket != null && isState(State.OPEN);
}
/*package*/ boolean isConnecting() {
return isState(State.CONNECTING);
}
/*package*/ boolean isClosed() {
return isState(State.CLOSED);
}
/*package*/ boolean isClosing() {
return isState(State.CLOSING);
}

/*package*/ boolean send(final String data) {
Expand Down Expand Up @@ -233,8 +249,10 @@ private void doOpen() {
}

private void doSend(String data) {
if (webSocket != null) {
webSocket.send(data);
if (mWebSocket != null) {
boolean isSent = mWebSocket.send(data);
if (isSent && generalListener != null)
generalListener.onSend(data);
}
}

Expand All @@ -253,11 +271,14 @@ private void fireOnFailure(Exception e) {
}
}

/*package*/ void setGeneralListener(GeneralListener listener) {
generalListener = listener;
}
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Connection.this.state = State.OPEN;
Connection.this.webSocket = webSocket;
Connection.this.mWebSocket = webSocket;
EventLoop.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -270,13 +291,17 @@ public void run() {

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
if (mWebSocket != null && webSocket != mWebSocket) return;
Connection.this.state = State.CLOSED;

EventLoop.execute(new Runnable() {
@Override
public void run() {
state = State.CLOSED;
Connection.this.mWebSocket = null;

if (listener != null) {
listener.onFailure((Exception) t);
listener.onFailure(new WebSocketException(t));
}
}
});
Expand All @@ -296,12 +321,25 @@ public void run() {

@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
if (mWebSocket != null && webSocket != mWebSocket) {
try {
webSocket.close(code, reason);
} catch (Exception e) {
}
return;
}
Connection.this.state = State.CLOSING;

EventLoop.execute(new Runnable() {
@Override
public void run() {
state = State.CLOSING;
try{
webSocket.close(code, reason);
Connection.this.mWebSocket = null;
} catch (IllegalStateException e) {
//do nothing
}

if (listener != null) {
listener.onClosing();
Expand All @@ -318,6 +356,7 @@ public void run() {

@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
if (mWebSocket != null && webSocket != mWebSocket) return;
Connection.this.state = State.CLOSED;

EventLoop.execute(new Runnable() {
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/com/hosopy/actioncable/ConnectionMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class ConnectionMonitor {

private static final int STALE_THRESHOLD = 6; // Server::Connections::BEAT_INTERVAL * 2 (missed two pings)

private int staleThresholdInSecond = STALE_THRESHOLD;

private final Connection connection;

private final ScheduledExecutorService pollExecutorService;
Expand All @@ -31,6 +33,8 @@ public class ConnectionMonitor {
private long stoppedAt = 0; // milliseconds

private int reconnectAttempts = 0;

private boolean pingPong = false;

/*package*/ ConnectionMonitor(Connection connection, Connection.Options options) {
this.connection = connection;
Expand All @@ -40,6 +44,7 @@ public class ConnectionMonitor {
this.reconnectionMaxAttempts = options.reconnectionMaxAttempts;
this.reconnectionDelay = options.reconnectionDelay;
this.reconnectionDelayMax = options.reconnectionDelayMax;
this.pingPong = options.pingPong;
}

/*package*/ void recordConnect() {
Expand All @@ -54,12 +59,16 @@ public class ConnectionMonitor {

/*package*/ void recordPing() {
pingedAt = now();
if(pingPong) {
connection.send(Command.pong().toJson());
}
}

/*package*/ void start() {
reset();
stoppedAt = 0;
pingedAt = 0;
startedAt = now();
stoppedAt = 0;
poll();
}

Expand Down Expand Up @@ -92,18 +101,22 @@ private void reconnectIfStale() {
}

private boolean connectionIsStale() {
return secondsSince(pingedAt > 0 ? pingedAt : startedAt) > STALE_THRESHOLD;
return secondsSince(pingedAt > 0 ? pingedAt : startedAt) > staleThresholdInSecond;
}

private boolean disconnectedRecently() {
return disconnectedAt != 0 && secondsSince(disconnectedAt) < STALE_THRESHOLD;
return disconnectedAt != 0 && secondsSince(disconnectedAt) < staleThresholdInSecond;
}

private long getInterval() {
final double interval = 5.0d * Math.log(reconnectAttempts + 1);
return (long) clamp(interval, reconnectionDelay, reconnectionDelayMax) * 1000;
}

public int getReconnectAttempts() {
return reconnectAttempts;
}

private static long secondsSince(long time) {
return (now() - time) / 1000;
}
Expand All @@ -115,4 +128,9 @@ private static long now() {
private static double clamp(double number, int min, int max) {
return Math.max(min, Math.min(max, number));
}


/*package*/ void setStaleThresholdInSecond(int staleThresholdInSecond) {
this.staleThresholdInSecond = staleThresholdInSecond;
}
}
Loading