Skip to content

feat: serializing / deserializing over streams #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/msgpack_dart.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
library msgpack_dart;

import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:async/async.dart';

part 'src/common.dart';
part 'src/data_writer.dart';
part 'src/deserializer.dart';
part 'src/serializer.dart';
part 'src/stream_serializer.dart';
part 'src/stream_deserializer.dart';

Uint8List serialize(
dynamic value, {
Expand Down
11 changes: 11 additions & 0 deletions lib/src/common.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@ class FormatError implements Exception {
return "FormatError: $message";
}
}

/// The upstream stream closed unexpectedly while in the midst of decoding a message.
/// Thrown from [StreamDeserializer]
class UpstreamClosedError implements Exception {
const UpstreamClosedError();
final String message = 'Upstream closed unexpectedly';

String toString() {
return "UpstreamClosedError: $message";
}
}
250 changes: 250 additions & 0 deletions lib/src/stream_deserializer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
part of msgpack_dart;

/// A [StreamTransformer] that deserializes [Uint8List]s into Dart objects,
/// using the MessagePack format.
class StreamDeserializer extends StreamTransformerBase<List<int>, dynamic> {
final ExtDecoder? _extDecoder;

/// If false, decoded binary data buffers will reference underlying input
/// buffer and thus may change when the content of input buffer changes.
///
/// If true, decoded buffers are copies and the underlying input buffer is
/// free to change after decoding.
final bool copyBinaryData;
final Encoding codec;

StreamDeserializer({
ExtDecoder? extDecoder,
this.copyBinaryData = false,
this.codec = const Utf8Codec(),
}) : _extDecoder = extDecoder;

@override
Stream<dynamic> bind(Stream<List<int>> stream) async* {
final bytesChunkReader = ChunkedStreamReader(stream);

var uByte = await bytesChunkReader.readBytes(1);
while (uByte.isNotEmpty) {
final u = uByte[0];
yield await _decode(u, bytesChunkReader);
uByte = await bytesChunkReader.readBytes(1);
}
}

Future<dynamic> _decode(
int u,
ChunkedStreamReader<int> bytesChunkReader,
) async {
if (u <= 127) {
return u;
} else if ((u & 0xE0) == 0xE0) {
// negative small integer
return u - 256;
} else if ((u & 0xE0) == 0xA0) {
return await _readString(bytesChunkReader, u & 0x1F);
} else if ((u & 0xF0) == 0x90) {
return await _readArray(bytesChunkReader, u & 0xF);
} else if ((u & 0xF0) == 0x80) {
return await _readMap(bytesChunkReader, u & 0xF);
}
switch (u) {
case 0xc0:
return null;
case 0xc2:
return false;
case 0xc3:
return true;
case 0xcc:
return await _readUInt8(bytesChunkReader);
case 0xcd:
return await _readUInt16(bytesChunkReader);
case 0xce:
return await _readUInt32(bytesChunkReader);
case 0xcf:
return await _readUInt64(bytesChunkReader);
case 0xd0:
return await _readInt8(bytesChunkReader);
case 0xd1:
return await _readInt16(bytesChunkReader);
case 0xd2:
return await _readInt32(bytesChunkReader);
case 0xd3:
return await _readInt64(bytesChunkReader);
case 0xca:
return await _readFloat(bytesChunkReader);
case 0xcb:
return await _readDouble(bytesChunkReader);
case 0xd9:
return await _readString(
bytesChunkReader, await _readUInt8(bytesChunkReader));
case 0xda:
return await _readString(
bytesChunkReader, await _readUInt16(bytesChunkReader));
case 0xdb:
return await _readString(
bytesChunkReader, await _readUInt32(bytesChunkReader));
case 0xc4:
return await _readBuffer(
bytesChunkReader, await _readUInt8(bytesChunkReader));
case 0xc5:
return await _readBuffer(
bytesChunkReader, await _readUInt16(bytesChunkReader));
case 0xc6:
return await _readBuffer(
bytesChunkReader, await _readUInt32(bytesChunkReader));
case 0xdc:
return await _readArray(
bytesChunkReader, await _readUInt16(bytesChunkReader));
case 0xdd:
return await _readArray(
bytesChunkReader, await _readUInt32(bytesChunkReader));
case 0xde:
return await _readMap(
bytesChunkReader, await _readUInt16(bytesChunkReader));
case 0xdf:
return await _readMap(
bytesChunkReader, await _readUInt32(bytesChunkReader));
case 0xd4:
return await _readExt(bytesChunkReader, 1);
case 0xd5:
return await _readExt(bytesChunkReader, 2);
case 0xd6:
return await _readExt(bytesChunkReader, 4);
case 0xd7:
return await _readExt(bytesChunkReader, 8);
case 0xd8:
return await _readExt(bytesChunkReader, 16);
case 0xc7:
return await _readExt(
bytesChunkReader, await _readUInt8(bytesChunkReader));
case 0xc8:
return await _readExt(
bytesChunkReader, await _readUInt16(bytesChunkReader));
case 0xc9:
return await _readExt(
bytesChunkReader, await _readUInt32(bytesChunkReader));
default:
throw FormatError("Invalid MessagePack format");
}
}

Future<int> _readInt8(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 1);
return ByteData.sublistView(bytes).getInt8(0);
}

Future<int> _readUInt8(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 1);
return ByteData.sublistView(bytes).getUint8(0);
}

Future<int> _readUInt16(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 2);
return ByteData.sublistView(bytes).getUint16(0);
}

Future<int> _readInt16(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 2);
return ByteData.sublistView(bytes).getInt16(0);
}

Future<int> _readUInt32(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 4);
return ByteData.sublistView(bytes).getUint32(0);
}

Future<int> _readInt32(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 4);
return ByteData.sublistView(bytes).getInt32(0);
}

Future<int> _readUInt64(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 8);
return ByteData.sublistView(bytes).getUint64(0);
}

Future<int> _readInt64(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 8);
return ByteData.sublistView(bytes).getInt64(0);
}

Future<double> _readFloat(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 4);
return ByteData.sublistView(bytes).getFloat32(0);
}

Future<double> _readDouble(ChunkedStreamReader<int> bytesChunkReader) async {
final bytes = await _expectBytes(bytesChunkReader, 8);
return ByteData.sublistView(bytes).getFloat64(0);
}

Future<Uint8List> _readBuffer(
ChunkedStreamReader<int> bytesChunkReader,
int length,
) async {
final bytes = await _expectBytes(bytesChunkReader, length);
return copyBinaryData ? Uint8List.fromList(bytes) : bytes;
}

Future<String> _readString(
ChunkedStreamReader<int> bytesChunkReader,
int length,
) async {
final list = await _readBuffer(bytesChunkReader, length);
final len = list.length;
for (int i = 0; i < len; ++i) {
if (list[i] > 127) {
return codec.decode(list);
}
}
return String.fromCharCodes(list);
}

Future<List> _readArray(
ChunkedStreamReader<int> bytesChunkReader,
int length,
) async {
final res = List<dynamic>.filled(length, null, growable: false);
for (int i = 0; i < length; ++i) {
final uByte = await _expectBytes(bytesChunkReader, 1);
res[i] = await _decode(uByte[0], bytesChunkReader);
}
return res;
}

Future<Map> _readMap(
ChunkedStreamReader<int> bytesChunkReader,
int length,
) async {
final res = Map();
while (length > 0) {
final uByteKey = await _expectBytes(bytesChunkReader, 1);
final key = await _decode(uByteKey[0], bytesChunkReader);
final uByteValue = await _expectBytes(bytesChunkReader, 1);
final value = await _decode(uByteValue[0], bytesChunkReader);
res[key] = value;
--length;
}
return res;
}

Future<dynamic> _readExt(
ChunkedStreamReader<int> bytesChunkReader,
int length,
) async {
final extType = await _readUInt8(bytesChunkReader);
final data = await _readBuffer(bytesChunkReader, length);
return _extDecoder?.decodeObject(extType, data);
}

Future<Uint8List> _expectBytes(
ChunkedStreamReader<int> bytesChunkReader,
int length,
) async {
final bytes = await bytesChunkReader.readBytes(length);
if (bytes.length != length) {
throw const UpstreamClosedError();
}
return bytes;
}
}
32 changes: 32 additions & 0 deletions lib/src/stream_serializer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
part of msgpack_dart;

/// A [StreamTransformer] that serializes objects in the stream into [Uint8List]s
/// using MessagePack specification.
///
/// Internally, it uses the [Serializer] class to handles the serialization.
/// This class is useful as an abstraction layer for the [Serializer] class for streams.
/// Alternatively, you can use the [Serializer] class directly.
class StreamSerializer<T> extends StreamTransformerBase<T, Uint8List> {
final Serializer serializer;

StreamSerializer.withSerializer(this.serializer);

factory StreamSerializer({
DataWriter? dataWriter,
ExtEncoder? extEncoder,
}) =>
StreamSerializer.withSerializer(
Serializer(
dataWriter: dataWriter,
extEncoder: extEncoder,
),
);

@override
Stream<Uint8List> bind(Stream<T> stream) async* {
await for (final value in stream) {
serializer.encode(value);
yield serializer.takeBytes();
}
}
}
2 changes: 2 additions & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ environment:

dev_dependencies:
test: ^1.3.0
dependencies:
async: ^2.10.0
Loading