Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions lib/action_cable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,40 @@ typedef _OnChannelMessageFunction = void Function(Map message);
class ActionCable {
DateTime _lastPing;
Timer _timer;
Duration timeoutAfter;
Duration healthCheckDuration;
IOWebSocketChannel _socketChannel;
StreamSubscription _listener;
_OnConnectedFunction onConnected;
_OnCannotConnectFunction onCannotConnect;
_OnConnectionLostFunction onConnectionLost;
Map<String, _OnChannelSubscribedFunction> _onChannelSubscribedCallbacks = {};
Map<String, _OnChannelDisconnectedFunction> _onChannelDisconnectedCallbacks =
{};
Map<String, _OnChannelDisconnectedFunction> _onChannelDisconnectedCallbacks = {};
Map<String, _OnChannelMessageFunction> _onChannelMessageCallbacks = {};

ActionCable.Connect(
ActionCable.connect(
String url, {
Map<String, String> headers: const {},
this.healthCheckDuration,
this.timeoutAfter,
this.onConnected,
this.onConnectionLost,
this.onCannotConnect,
}) {
// rails gets a ping every 3 seconds
_socketChannel = IOWebSocketChannel.connect(url,
headers: headers, pingInterval: Duration(seconds: 3));
_socketChannel =
IOWebSocketChannel.connect(url, headers: headers, pingInterval: Duration(seconds: 3));
_listener = _socketChannel.stream.listen(_onData, onError: (_) {
this.disconnect(); // close a socket and the timer
this.onCannotConnect();
});
_timer = Timer.periodic(const Duration(seconds: 3), healthCheck);
if (healthCheckDuration != null) {
_timer = Timer.periodic(healthCheckDuration ?? const Duration(seconds: 3), healthCheck);
}
}

void disconnect() {
_timer.cancel();
_timer?.cancel();
_socketChannel.sink.close();
_listener.cancel();
}
Expand All @@ -54,7 +59,7 @@ class ActionCable {
if (_lastPing == null) {
return;
}
if (DateTime.now().difference(_lastPing) > Duration(seconds: 6)) {
if (DateTime.now().difference(_lastPing) > (timeoutAfter ?? const Duration(seconds: 6))) {
this.disconnect();
if (this.onConnectionLost != null) this.onConnectionLost();
}
Expand Down Expand Up @@ -83,22 +88,16 @@ class ActionCable {
_onChannelDisconnectedCallbacks[channelId] = null;
_onChannelMessageCallbacks[channelId] = null;

_socketChannel.sink
.add(jsonEncode({'identifier': channelId, 'command': 'unsubscribe'}));
_socketChannel.sink.add(jsonEncode({'identifier': channelId, 'command': 'unsubscribe'}));
}

void performAction(String channelName,
{String action, Map channelParams, Map actionParams}) {
void performAction(String channelName, {String action, Map channelParams, Map actionParams}) {
final channelId = encodeChannelId(channelName, channelParams);

actionParams ??= {};
actionParams['action'] = action;

_send({
'identifier': channelId,
'command': 'message',
'data': jsonEncode(actionParams)
});
_send({'identifier': channelId, 'command': 'message', 'data': jsonEncode(actionParams)});
}

void _onData(dynamic payload) {
Expand All @@ -115,8 +114,7 @@ class ActionCable {
switch (payload['type']) {
case 'ping':
// rails sends epoch as seconds not miliseconds
_lastPing =
DateTime.fromMillisecondsSinceEpoch(payload['message'] * 1000);
_lastPing = DateTime.fromMillisecondsSinceEpoch(payload['message'] * 1000);
break;
case 'welcome':
if (onConnected != null) {
Expand Down