@@ -7,12 +7,14 @@ import 'dart:isolate';
77import 'dart:typed_data' ;
88
99import 'package:ffi/ffi.dart' ;
10+ import 'package:meta/meta.dart' ;
1011
1112import '../../common.dart' ;
1213import '../../modelinfo/entity_definition.dart' ;
1314import '../../modelinfo/modelproperty.dart' ;
1415import '../../modelinfo/modelrelation.dart' ;
1516import '../../store.dart' ;
17+ import '../../transaction.dart' ;
1618import '../bindings/bindings.dart' ;
1719import '../bindings/data_visitor.dart' ;
1820import '../bindings/helpers.dart' ;
@@ -721,6 +723,24 @@ class Query<T> {
721723 }
722724 }
723725
726+ /// Clones this native query and returns a pointer to the clone.
727+ ///
728+ /// This is useful to send a reference to a query to an isolate. A [Query] can
729+ /// not be sent to an isolate directly because it contains pointers.
730+ ///
731+ /// ```dart
732+ /// // Clone the query and obtain its address, can be sent to an isolate.
733+ /// final queryPtrAddress = query._clone().address;
734+ ///
735+ /// // Within an isolate re-create the query pointer to be used with the C API.
736+ /// final queryPtr = Pointer<OBX_query>.fromAddress(isolateInit.queryPtrAddress);
737+ /// ```
738+ Pointer <OBX_query > _clone () {
739+ final ptr = checkObxPtr (C .query_clone (_ptr));
740+ reachabilityFence (this );
741+ return ptr;
742+ }
743+
724744 /// Close the query and free resources.
725745 void close () {
726746 if (! _closed) {
@@ -811,58 +831,59 @@ class Query<T> {
811831
812832 /// Finds Objects matching the query, streaming them while the query executes.
813833 ///
814- /// Note: make sure you evaluate performance in your use case - streams come
815- /// with an overhead so a plain [find()] is usually faster .
816- Stream <T > stream () => _stream1 ();
834+ /// Results are streamed from a worker isolate in batches (the stream still
835+ /// returns objects one by one) .
836+ Stream <T > stream () => _streamIsolate ();
817837
818838 /// Stream items by sending full flatbuffers binary as a message.
819- Stream <T > _stream1 () {
820- initializeDartAPI ();
821- final port = ReceivePort ();
822- final cStream = checkObxPtr (
823- C .dartc_query_find (_cQuery, port.sendPort.nativePort), 'query stream' );
824-
825- var closed = false ;
826- final close = () {
827- if (closed) return ;
828- closed = true ;
829- C .dartc_stream_close (cStream);
830- port.close ();
831- reachabilityFence (this );
832- };
833-
834- try {
835- final controller = StreamController <T >(onCancel: close);
836- port.listen ((dynamic message) {
837- // We expect Uint8List for data and NULL when the query has finished.
838- if (message is Uint8List ) {
839- try {
840- controller.add (
841- _entity.objectFromFB (_store, ByteData .view (message.buffer)));
842- return ;
843- } catch (e) {
844- controller.addError (e);
845- }
846- } else if (message is String ) {
847- controller.addError (
848- ObjectBoxException ('Query stream native exception: $message ' ));
849- } else if (message != null ) {
850- controller.addError (ObjectBoxException (
851- 'Query stream received an invalid message type '
852- '(${message .runtimeType }): $message ' ));
853- }
854- // Close the stream, this will call the onCancel function.
855- // Do not call the onCancel function manually,
856- // if cancel() is called on the Stream subscription right afterwards it
857- // will use the shortcut in the onCancel function and not wait.
858- controller.close (); // done
859- });
860- return controller.stream;
861- } catch (e) {
862- close ();
863- rethrow ;
864- }
865- }
839+ /// Replaced by _streamIsolate which in benchmarks has been faster.
840+ // Stream<T> _stream1() {
841+ // initializeDartAPI();
842+ // final port = ReceivePort();
843+ // final cStream = checkObxPtr(
844+ // C.dartc_query_find(_cQuery, port.sendPort.nativePort), 'query stream');
845+ //
846+ // var closed = false;
847+ // final close = () {
848+ // if (closed) return;
849+ // closed = true;
850+ // C.dartc_stream_close(cStream);
851+ // port.close();
852+ // reachabilityFence(this);
853+ // };
854+ //
855+ // try {
856+ // final controller = StreamController<T>(onCancel: close);
857+ // port.listen((dynamic message) {
858+ // // We expect Uint8List for data and NULL when the query has finished.
859+ // if (message is Uint8List) {
860+ // try {
861+ // controller.add(
862+ // _entity.objectFromFB(_store, ByteData.view(message.buffer)));
863+ // return;
864+ // } catch (e) {
865+ // controller.addError(e);
866+ // }
867+ // } else if (message is String) {
868+ // controller.addError(
869+ // ObjectBoxException('Query stream native exception: $message'));
870+ // } else if (message != null) {
871+ // controller.addError(ObjectBoxException(
872+ // 'Query stream received an invalid message type '
873+ // '(${message.runtimeType}): $message'));
874+ // }
875+ // // Close the stream, this will call the onCancel function.
876+ // // Do not call the onCancel function manually,
877+ // // if cancel() is called on the Stream subscription right afterwards it
878+ // // will use the shortcut in the onCancel function and not wait.
879+ // controller.close(); // done
880+ // });
881+ // return controller.stream;
882+ // } catch (e) {
883+ // close();
884+ // rethrow;
885+ // }
886+ // }
866887
867888 /// Stream items by sending pointers from native code.
868889 /// Interestingly this is slower even though it transfers only pointers...
@@ -915,6 +936,166 @@ class Query<T> {
915936 // }
916937 // }
917938
939+ Stream <T > _streamIsolate () {
940+ final resultPort = ReceivePort ();
941+ final exitPort = ReceivePort ();
942+
943+ void spawnWorkerIsolate () async {
944+ // Pass clones of Store and Query to avoid these getting closed while the
945+ // worker isolate is still running. The isolate closes the clones once done.
946+ final storeClonePtr = InternalStoreAccess .clone (_store);
947+ final queryClonePtr = _clone ();
948+
949+ // Current batch size determined through testing, performs well for smaller
950+ // objects. Might want to expose in the future for performance tuning by
951+ // users.
952+ final isolateInit = _StreamIsolateInit (resultPort.sendPort,
953+ storeClonePtr.address, queryClonePtr.address, 20 );
954+ // If spawn errors StreamController will propagate the error, no point in
955+ // using addError as no listener before this function completes.
956+ await Isolate .spawn (_queryAndVisit, isolateInit,
957+ onExit: exitPort.sendPort);
958+ }
959+
960+ SendPort ? sendPort;
961+
962+ // Callback to exit the isolate once consumers or this close the stream
963+ // (potentially before all results have been streamed).
964+ // Must return Future<void>, otherwise StreamController will not wait on it.
965+ var isolateExitSent = false ;
966+ Future <void > exitIsolate () async {
967+ if (isolateExitSent) return ;
968+ isolateExitSent = true ;
969+ // Send signal to isolate it should exit.
970+ sendPort? .send (null );
971+ // Wait for isolate to clean up native resources,
972+ // otherwise e.g. Store is still open and
973+ // e.g. tests can not delete database files.
974+ await exitPort.first;
975+ resultPort.close ();
976+ exitPort.close ();
977+ }
978+
979+ final streamController = StreamController <T >(
980+ onListen: spawnWorkerIsolate, onCancel: exitIsolate);
981+ resultPort.listen ((dynamic message) async {
982+ // The first message from the spawned isolate is a SendPort. This port
983+ // is used to communicate with the spawned isolate.
984+ if (message is SendPort ) {
985+ sendPort = message;
986+ return ; // wait for next message.
987+ }
988+ // Further messages are
989+ // - ObxObjectMessage for data,
990+ // - Exception and Error for errors and
991+ // - null if the worker isolate is done sending data.
992+ else if (message is _StreamIsolateMessage ) {
993+ try {
994+ for (var i = 0 ; i < message.dataPtrAddresses.length; i++ ) {
995+ final dataPtrAddress = message.dataPtrAddresses[i];
996+ final size = message.sizes[i];
997+ if (size == 0 ) break ; // Reached last object.
998+ streamController.add (_entity.objectFromFB (
999+ _store,
1000+ InternalStoreAccess .reader (_store)
1001+ .access (Pointer .fromAddress (dataPtrAddress), size)));
1002+ }
1003+ return ; // wait for next message.
1004+ } catch (e) {
1005+ streamController.addError (e);
1006+ }
1007+ } else if (message is Error ) {
1008+ streamController.addError (message);
1009+ } else if (message is Exception ) {
1010+ streamController.addError (message);
1011+ } else if (message != null ) {
1012+ streamController.addError (
1013+ ObjectBoxException ('Query stream received an invalid message type '
1014+ '(${message .runtimeType }): $message ' ));
1015+ }
1016+ // Close the stream, this will call the onCancel function.
1017+ // Do not call the onCancel function manually,
1018+ // if cancel() is called on the Stream subscription right afterwards it
1019+ // will use the shortcut in the onCancel function and not wait.
1020+ streamController.close ();
1021+ });
1022+ return streamController.stream;
1023+ }
1024+
1025+ // Isolate entry point must be top-level or static.
1026+ static Future <void > _queryAndVisit (_StreamIsolateInit isolateInit) async {
1027+ // Init native resources asap so that they do not leak, e.g. on exceptions
1028+ final store =
1029+ InternalStoreAccess .createMinimal (isolateInit.storePtrAddress);
1030+
1031+ var resultPort = isolateInit.resultPort;
1032+
1033+ // Send a SendPort to the main isolate so that it can send to this isolate.
1034+ final commandPort = ReceivePort ();
1035+ resultPort.send (commandPort.sendPort);
1036+
1037+ try {
1038+ // Visit inside transaction and do not complete transaction to ensure
1039+ // data pointers remain valid until main isolate has deserialized all data.
1040+ await InternalStoreAccess .runInTransaction (store, TxMode .read,
1041+ (Transaction tx) async {
1042+ // Use fixed-length lists to avoid performance hit due to growing.
1043+ final maxBatchSize = isolateInit.batchSize;
1044+ var dataPtrBatch = List <int >.filled (maxBatchSize, 0 );
1045+ var sizeBatch = List <int >.filled (maxBatchSize, 0 );
1046+ var batchSize = 0 ;
1047+ final visitor = dataVisitor ((Pointer <Uint8 > data, int size) {
1048+ // Currently returning all results, even if the stream has been closed
1049+ // before (e.g. only first element taken). Would need a way to check
1050+ // for exit command on commandPort synchronously.
1051+ dataPtrBatch[batchSize] = data.address;
1052+ sizeBatch[batchSize] = size;
1053+ batchSize++ ;
1054+ // Send data in batches as sending a message is rather expensive.
1055+ if (batchSize == maxBatchSize) {
1056+ resultPort.send (_StreamIsolateMessage (dataPtrBatch, sizeBatch));
1057+ // Re-use list instance to avoid performance hit due to new instance.
1058+ dataPtrBatch.fillRange (0 , dataPtrBatch.length, 0 );
1059+ sizeBatch.fillRange (0 , dataPtrBatch.length, 0 );
1060+ batchSize = 0 ;
1061+ }
1062+ return true ;
1063+ });
1064+ final queryPtr =
1065+ Pointer <OBX_query >.fromAddress (isolateInit.queryPtrAddress);
1066+ try {
1067+ checkObx (C .query_visit (queryPtr, visitor, nullptr));
1068+ } catch (e) {
1069+ resultPort.send (e);
1070+ return ;
1071+ } finally {
1072+ try {
1073+ checkObx (C .query_close (queryPtr));
1074+ } catch (e) {
1075+ resultPort.send (e);
1076+ return ;
1077+ }
1078+ }
1079+ // Send any remaining data.
1080+ if (batchSize > 0 ) {
1081+ resultPort.send (_StreamIsolateMessage (dataPtrBatch, sizeBatch));
1082+ }
1083+
1084+ // Signal to the main isolate there are no more results.
1085+ resultPort.send (null );
1086+ // Wait for main isolate to confirm it is done accessing sent data pointers.
1087+ await commandPort.first;
1088+ // Note: when the transaction is closed after await this might lead to an
1089+ // error log as the isolate could have been transferred to another thread
1090+ // when resuming execution.
1091+ // https://github.com/dart-lang/sdk/issues/46943
1092+ });
1093+ } finally {
1094+ store.close ();
1095+ commandPort.close ();
1096+ }
1097+ }
1098+
9181099 /// For internal testing purposes.
9191100 String describe () {
9201101 final result = dartStringFromC (C .query_describe (_ptr));
@@ -947,3 +1128,24 @@ class Query<T> {
9471128 return result;
9481129 }
9491130}
1131+
1132+ /// Message passed to entry point [Query._queryAndVisit] of isolate.
1133+ @immutable
1134+ class _StreamIsolateInit {
1135+ final SendPort resultPort;
1136+ final int storePtrAddress;
1137+ final int queryPtrAddress;
1138+ final int batchSize;
1139+
1140+ const _StreamIsolateInit (this .resultPort, this .storePtrAddress,
1141+ this .queryPtrAddress, this .batchSize);
1142+ }
1143+
1144+ /// Message sent to main isolate containing info about a batch of objects.
1145+ @immutable
1146+ class _StreamIsolateMessage {
1147+ final List <int > dataPtrAddresses;
1148+ final List <int > sizes;
1149+
1150+ const _StreamIsolateMessage (this .dataPtrAddresses, this .sizes);
1151+ }
0 commit comments