Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: serializing / deserializing over streams #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

SilverMira
Copy link

@SilverMira SilverMira commented Jan 14, 2023

Currently, this library provides no way to perform:

  • Serializing many Dart objects into msgpack binary output stream
  • Deserializing msgpack binary stream into Dart objects
  • Deserializing Dart object(s) from one or many Uint8List

This PR contains:

  • Depend on package:async/async.dart
  • Introducing StreamSerializer, which implements StreamTransformer<dynamic, Uint8List>
  • Introducing StreamDeserializer, which implements StreamTransformer<List<int>, dynamic>
  • Introducing tests covering serializing and deserializing using the new StreamTransformer

The discrepancy between the implemented StreamTransformer interface between the two is because:

  1. Serializing arbitrary Dart objects into msgpack format already yield Uint8List, which implements List<int>, it does not make sense to downgrade the type information here.
  2. Msgpack bytes streams to deserialize can come from various sources, some exposing their interface directly as Stream<Uint8List> while some Stream<List<int>>, loosening the type constraint here should increase compatibility and avoid scenarios of Stream<List<int>>.map((e) => Uint8List.fromList(e)).
  3. StreamTransformer.cast() can properly recast both StreamSerializer and StreamDeserializer between Uint8List and List<int> for compatibility in both Stream<List<int>> and Stream<Uint8List> if the underlying stream elements have correct types during runtime. Using StreamTransformer.bind directly rather than Stream.transform allows to properly bypass using Stream.cast or StreamTransformer.cast which has performance cost

Noteworthy stream behaviors:

  1. StreamDeserializer will wait for more bytes upstream if it requires more bytes in order to complete deserialization
  2. StreamDeserializer will emit UpstreamClosedError if upstream closes unexpected (in the midst of deserializing a dart object that spans over many bytes)
  3. Both StreamSerializer and StreamDeserializer have similar constructor arguments to their non-stream counterparts.

This PR should close #6, closely relates to #10 and should be generally a better solution than exposing Deserializer.offset.
Serializing Dart objects into msgpack binary and outputting into a stream is already something any user can implement on their own with the existing Serializer class, but semantics of deserializing from a byte stream or multiple Uint8List into multiple Dart objects are not possible with the current Deserializer interface.

Usage:

// Deserializing msgpack bytes from another process
Stream<List<int>> byteStream = process.stdout;
Stream<dynamic> objectStream = byteStream.transform(StreamDeserializer());
await for(final object in objectStream) {
  print(object);
}
// Deserializing chunks of Uint8List
List<Uint8List> chunks = ...;
Stream<Uint8List> byteStream = Stream.fromIterable(chunks);
// If the byte stream is of type `Stream<Uint8List>`, it is almost certain you have to use `StreamTransformer.bind` directly
Stream<dynamic> objectStream = StreamDeserializer().bind(byteStream);
List<dynamic> objectsList = await objectStream.toList();
// Serializing objects stream into msgpack bytes stream
Stream<dynamic> objectStream = ...;
Stream<Uint8List> bytesStream = objectStream.transform(StreamSerializer());
await for(final bytesChunk in bytesStream) {
  print(bytesChunk);
}
// Serializing many objects at once
List<dynamic> objects = ...;
Stream<dynamic> objectStream = Stream.fromIterable(objects);
Stream<Uint8List> bytesStream = objectStream.transform(StreamSerializer());

// To get 1-1 correspondence between input and output
List<Uint8List> bytesChunks = await bytesStream.toList(); // bytesChunk.length == objects.length

// To get M-1 correspondence between input and output
Uint8List bytes = await collectBytes(bytesStream); // collectBytes from package:async/async.dart

* Depends on `package:async/async.dart` for `ChunkedStreamReader`
* Adds `StreamSerializer` to handle serializing arbitrary dart types into msgpack binary
* Adds `StreamDeserializer` to handle deserializing incoming msgpack binary from a byte stream `Stream<List<int>>`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make offset available publicly in Decoder
1 participant