Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,14 @@ Major update.

- null safety
- updated dependencies

# 2021-04-01 1.2.1

### Breaking Changes

- change connect function from `ActionCable.Connect` to `ActionCable.connect`

### Updates

- updated websocket dependency
- improve readability of the core code by leveraging dart's null safety
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
![Pub](https://img.shields.io/pub/v/action_cable)

<!-- ALL-CONTRIBUTORS-BADGE:START - Do not remove or modify this section -->

[![All Contributors](https://img.shields.io/badge/all_contributors-3-orange.svg?style=flat-square)](#contributors-)

<!-- ALL-CONTRIBUTORS-BADGE:END -->

# ActionCable in Dart
Expand All @@ -14,17 +17,17 @@ This is a dart port of the client and protocol implementation which is available
### Connecting to a channel 🙌

```dart
cable = ActionCable.Connect(
cable = ActionCable.connect(
"ws://10.0.2.2:3000/cable",
headers: {
"Authorization": "Some Token",
},
onConnected: (){
print("connected");
},
},
onConnectionLost: () {
print("connection lost");
},
},
onCannotConnect: () {
print("cannot connect");
});
Expand Down
203 changes: 120 additions & 83 deletions lib/action_cable.dart
Original file line number Diff line number Diff line change
@@ -1,94 +1,129 @@
import 'dart:async';
import 'dart:convert';

import 'package:web_socket_channel/io.dart';

import 'channel_id.dart';

typedef _OnConnectedFunction = void Function();
typedef _OnConnectionLostFunction = void Function();
typedef _OnCannotConnectFunction = void Function();
typedef _OnChannelSubscribedFunction = void Function();
typedef _OnChannelDisconnectedFunction = void Function();
typedef _OnChannelMessageFunction = void Function(Map message);
typedef OnConnectedFunction = void Function();
typedef OnConnectionLostFunction = void Function();
typedef OnCannotConnectFunction = void Function();
typedef OnChannelSubscribedFunction = void Function();
typedef OnChannelDisconnectedFunction = void Function();
typedef OnChannelMessageFunction = void Function(Map message);

class ActionCable {
DateTime? _lastPing;
late Timer _timer;
late IOWebSocketChannel _socketChannel;
late StreamSubscription _listener;
_OnConnectedFunction? onConnected;
_OnCannotConnectFunction? onCannotConnect;
_OnConnectionLostFunction? onConnectionLost;
Map<String, _OnChannelSubscribedFunction?> _onChannelSubscribedCallbacks = {};
Map<String, _OnChannelDisconnectedFunction?> _onChannelDisconnectedCallbacks =
{};
Map<String, _OnChannelMessageFunction?> _onChannelMessageCallbacks = {};

ActionCable.Connect(
Timer? _timer;
IOWebSocketChannel? _socketChannel;
StreamSubscription? _listener;

final OnConnectedFunction? onConnected;
final OnConnectionLostFunction? onConnectionLost;
final OnCannotConnectFunction? onCannotConnect;

final Map<String, OnChannelSubscribedFunction?>
_onChannelSubscribedCallbacks = {};
final Map<String, OnChannelDisconnectedFunction?>
_onChannelDisconnectedCallbacks = {};
final Map<String, OnChannelMessageFunction?> _onChannelMessageCallbacks = {};

ActionCable.connect(
String url, {
Map<String, String> headers: const {},
Map<String, String> headers = const {},
this.onConnected,
this.onConnectionLost,
this.onCannotConnect,
}) {
// rails gets a ping every 3 seconds
_socketChannel = IOWebSocketChannel.connect(url,
headers: headers, pingInterval: Duration(seconds: 3));
_listener = _socketChannel.stream.listen(_onData, onError: (_) {
this.disconnect(); // close a socket and the timer
if (this.onCannotConnect != null) this.onCannotConnect!();
});
_timer = Timer.periodic(const Duration(seconds: 3), healthCheck);
_connect(url, headers);
}

void _connect(String url, Map<String, String> headers) {
_socketChannel = IOWebSocketChannel.connect(
url,
headers: headers,
pingInterval: const Duration(seconds: 3),
);

_listener = _socketChannel?.stream.listen(
_onData,
onError: (error) {
_handleError();
},
onDone: () {
_handleDone();
},
);

_timer = Timer.periodic(const Duration(seconds: 3), _healthCheck);
}

void disconnect() {
_timer.cancel();
_socketChannel.sink.close();
_listener.cancel();
_timer?.cancel();
_listener?.cancel();
_socketChannel?.sink.close();
_socketChannel = null;
_onChannelDisconnectedCallbacks.values
.where((onDisconnected) => onDisconnected != null)
.forEach((onDisconnected) {
onDisconnected!();
});
}

// check if there is no ping for 3 seconds and signal a [onConnectionLost] if
// there is no ping for more than 6 seconds
void healthCheck(_) {
if (_lastPing == null) {
return;
}
if (DateTime.now().difference(_lastPing!) > Duration(seconds: 6)) {
this.disconnect();
if (this.onConnectionLost != null) this.onConnectionLost!();
void _handleError() {
disconnect();
onCannotConnect?.call();
}

void _handleDone() {
disconnect();
onConnectionLost?.call();
}

void _healthCheck(Timer timer) {
if (_lastPing == null) return;
if (DateTime.now().difference(_lastPing!) > const Duration(seconds: 6)) {
disconnect();
onConnectionLost?.call();
}
}

// channelName being 'Chat' will be considered as 'ChatChannel',
// 'Chat', { id: 1 } => { channel: 'ChatChannel', id: 1 }
void subscribe(String channelName,
{Map? channelParams,
_OnChannelSubscribedFunction? onSubscribed,
_OnChannelDisconnectedFunction? onDisconnected,
_OnChannelMessageFunction? onMessage}) {
void subscribe(
String channelName, {
Map? channelParams,
OnChannelSubscribedFunction? onSubscribed,
OnChannelDisconnectedFunction? onDisconnected,
OnChannelMessageFunction? onMessage,
}) {
final channelId = encodeChannelId(channelName, channelParams);

_onChannelSubscribedCallbacks[channelId] = onSubscribed;
_onChannelDisconnectedCallbacks[channelId] = onDisconnected;
_onChannelMessageCallbacks[channelId] = onMessage;

_send({'identifier': channelId, 'command': 'subscribe'});
_send({
'identifier': channelId,
'command': 'subscribe',
});
}

void unsubscribe(String channelName, {Map? channelParams}) {
final channelId = encodeChannelId(channelName, channelParams);

_onChannelSubscribedCallbacks[channelId] = null;
_onChannelDisconnectedCallbacks[channelId] = null;
_onChannelMessageCallbacks[channelId] = null;
_onChannelSubscribedCallbacks.remove(channelId);
_onChannelDisconnectedCallbacks.remove(channelId);
_onChannelMessageCallbacks.remove(channelId);

_socketChannel.sink
.add(jsonEncode({'identifier': channelId, 'command': 'unsubscribe'}));
_send({
'identifier': channelId,
'command': 'unsubscribe',
});
}

void performAction(String channelName,
{String? action, Map? channelParams, Map? actionParams}) {
void performAction(
String channelName, {
String? action,
Map? channelParams,
Map? actionParams,
}) {
final channelId = encodeChannelId(channelName, channelParams);

actionParams ??= {};
Expand All @@ -97,45 +132,48 @@ class ActionCable {
_send({
'identifier': channelId,
'command': 'message',
'data': jsonEncode(actionParams)
'data': jsonEncode(actionParams),
});
}

void _onData(dynamic payload) {
payload = jsonDecode(payload);

if (payload['type'] != null) {
_handleProtocolMessage(payload);
} else {
_handleDataMessage(payload);
try {
final data = jsonDecode(payload);
if (data['type'] != null) {
_handleProtocolMessage(data);
} else {
_handleDataMessage(data);
}
} catch (error) {
throw 'InvalidPayload';
}
}

void _handleProtocolMessage(Map payload) {
void _handleProtocolMessage(Map<String, dynamic> payload) {
switch (payload['type']) {
case 'ping':
// rails sends epoch as seconds not miliseconds
_lastPing =
DateTime.fromMillisecondsSinceEpoch(payload['message'] * 1000);
break;
case 'welcome':
if (onConnected != null) {
onConnected!();
}
onConnected?.call();
break;
case 'disconnect':
final channelId = parseChannelId(payload['identifier']);
final onDisconnected = _onChannelDisconnectedCallbacks[channelId];
if (onDisconnected != null) {
onDisconnected();
final identifier = payload['identifier'];
if (identifier != null) {
final channelId = parseChannelId(payload['identifier']);
final onDisconnected = _onChannelDisconnectedCallbacks[channelId];
onDisconnected?.call();
} else {
final reason = payload['reason'];
if (reason != null && reason == 'unauthorized') {
this.onCannotConnect?.call();
}
}
break;
case 'confirm_subscription':
final channelId = parseChannelId(payload['identifier']);
final onSubscribed = _onChannelSubscribedCallbacks[channelId];
if (onSubscribed != null) {
onSubscribed();
}
_onChannelSubscribedCallbacks[channelId]?.call();
break;
case 'reject_subscription':
// throw 'Unimplemented';
Expand All @@ -145,15 +183,14 @@ class ActionCable {
}
}

void _handleDataMessage(Map payload) {
void _handleDataMessage(Map<String, dynamic> payload) {
final channelId = parseChannelId(payload['identifier']);
final onMessage = _onChannelMessageCallbacks[channelId];
if (onMessage != null) {
onMessage(payload['message']);
}
_onChannelMessageCallbacks[channelId]?.call(payload['message']);
}

void _send(Map payload) {
_socketChannel.sink.add(jsonEncode(payload));
void _send(Map<String, dynamic> payload) {
if (_socketChannel != null) {
_socketChannel!.sink.add(jsonEncode(payload));
}
}
}
Loading