Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cloud_functions] feature request for steam support #17076

Open
victoire-hergan opened this issue Feb 11, 2025 · 2 comments
Open

[cloud_functions] feature request for steam support #17076

victoire-hergan opened this issue Feb 11, 2025 · 2 comments
Labels
Needs Attention This issue needs maintainer attention. plugin: functions type: enhancement New feature or request

Comments

@victoire-hergan
Copy link

Hi,

I’m trying to stream data from a Firebase function (a genkit flow)

I’m attempting to convert from JavaScript to Dart (https://github.com/firebase/genkit/blob/main/js/genkit/src/client/client.ts), but it only works when the entire text has arrived.

Will you be adding streaming support?

My code; https://gist.github.com/victoire-hergan/04110c88bfbf7c41dc5ec12a9a436969

Thx

@victoire-hergan victoire-hergan added Needs Attention This issue needs maintainer attention. type: enhancement New feature or request labels Feb 11, 2025
@victoire-hergan victoire-hergan changed the title [firebase_functions] Stream response from a deployed flow [cloud_functions] Stream response from a deployed flow Feb 11, 2025
@SelaseKay SelaseKay added plugin: functions blocked: customer-response Waiting for customer response, e.g. more information was requested. and removed plugin: functions Needs Attention This issue needs maintainer attention. labels Feb 12, 2025
@russellwheatley russellwheatley changed the title [cloud_functions] Stream response from a deployed flow [cloud_functions] feature request for steam support Feb 12, 2025
@SelaseKay SelaseKay added Needs Attention This issue needs maintainer attention. and removed blocked: customer-response Waiting for customer response, e.g. more information was requested. labels Feb 12, 2025
@victoire-hergan
Copy link
Author

Thanks for your response!

Here is my improved code:

import 'dart:core';

import 'package:cloud_functions/cloud_functions.dart';
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;

class StreamFlow<O, S> {
  final Future<O> output;
  final Stream<S> stream;

  StreamFlow({required this.output, required this.stream});
}

class Api {
  static StreamFlow<O, S> streamFlow<O, S>({
    required String url,
    dynamic input,
  }) {
    final StreamController<S> controller = StreamController<S>();
    final Completer<O> outputCompleter = Completer<O>();

    final requestHeaders = {
      'Accept': 'text/event-stream',
      'Content-Type': 'application/json',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    };

    _flowRunEnvelope<O, S>(
      url: url,
      input: input,
      sendChunk: (chunk) => controller.add(chunk),
      headers: requestHeaders,
      onComplete: (result) => outputCompleter.complete(result),
      onError: (error) {
        controller.addError(error);
        if (!outputCompleter.isCompleted) {
          outputCompleter.completeError(error);
        }
      },
    ).then((_) => controller.close());

    return StreamFlow(stream: controller.stream, output: outputCompleter.future);
  }

  static Future<void> _flowRunEnvelope<O, S>({
    required String url,
    dynamic input,
    required void Function(S chunk) sendChunk,
    required Map<String, String> headers,
    required void Function(O result) onComplete,
    required void Function(dynamic error) onError,
  }) async {
    late final http.Client client;

    try {
      client = http.Client();
      final request = http.Request('POST', Uri.parse(url));
      request.headers.addAll(headers);
      request.body = jsonEncode({'data': input});

      final streamedResponse = await client.send(request);

      if (streamedResponse.statusCode != 200) {
        throw Exception('Server returned: ${streamedResponse.statusCode}');
      }

      // Process raw bytes directly
      String pendingData = '';

      await for (final bytes in streamedResponse.stream) {
        final String chunk = utf8.decode(bytes, allowMalformed: true);
        //print(chunk);
        pendingData += chunk;

        // Process each potential SSE message
        while (pendingData.contains('\n')) {
          final lineEnd = pendingData.indexOf('\n');
          String line = pendingData.substring(0, lineEnd).trim();
          pendingData = pendingData.substring(lineEnd + 1);

          if (line.startsWith('data: ')) {
            line = line.substring(6);
            try {
              if (line.isNotEmpty) {
                final Map<String, dynamic> jsonChunk = jsonDecode(line);

                if (jsonChunk.containsKey('message')) {
                  sendChunk(jsonChunk['message'] as S);
                  print(jsonChunk['message']); // it only appear at the end not in streaming
                } else if (jsonChunk.containsKey('result')) {
                  onComplete(jsonChunk['result'] as O);
                  return;
                } else if (jsonChunk.containsKey('error')) {
                  throw Exception('${jsonChunk['error']['status']}: ${jsonChunk['error']['message']}\n${jsonChunk['error']['details']}');
                }
              }
            } catch (e) {
              print('Error processing chunk: $e');
              print('Problematic line: $line');
            }
          }
        }
      }

      // Process any remaining data
      if (pendingData.isNotEmpty && pendingData.startsWith('data: ')) {
        try {
          final line = pendingData.substring(6);
          final Map<String, dynamic> jsonChunk = jsonDecode(line);

          if (jsonChunk.containsKey('message')) {
            sendChunk(jsonChunk['message'] as S);
          } else if (jsonChunk.containsKey('result')) {
            onComplete(jsonChunk['result'] as O);
            return;
          }
        } catch (e) {
          print('Error processing final chunk: $e');
        }
      }
    } catch (e) {
      onError(e);
    } finally {
      client.close();
    }
  }

  static Future<HttpsCallableResult> call({required String functionName, dynamic parameters}) {
    return FirebaseFunctions.instance
        .httpsCallable(
          functionName,
          options: HttpsCallableOptions(
            timeout: const Duration(seconds: 60),
          ),
        )
        .call(parameters)
        .then((result) {
      print("Successfully invoke Function ($functionName)");
      return result;
    }).catchError((error, stacktrace) {
      print("Function ($functionName) got an error");

      if (error is FirebaseFunctionsException) {
        final String code = error.code;
        final String message = error.message ?? "unknown error";

        // final dynamic details = error.details['details'];
        throw ApiException(message, code);
      }
      throw error;
    });
  }
}

class ApiException implements Exception {
  final String message;
  final String code;

  ApiException(this.message, this.code);

  @override
  String toString() {
    return 'ApiException{_message: $message, _code: $code}';
  }
}

I can call a function with FirebaseFunctions.instance.httpsCallable in FlutterFire, but I can't stream it. I tried to implement streaming myself, but it doesn’t work. That’s why I’m asking if someone can help me build this new feature :)

Stream<String> stream(String userId, String messageText, List<dynamic> messageTexts) {
    var parameters = 'test';

    final controller = StreamController<String>();

    final flow = Api.streamFlow<String, String>(
      url: 'url to my cloud function',
      input: parameters,
    );

    // Listen to stream updates
    flow.stream.listen(
      (message) {
        print(message);
        controller.add(message);
      },
      onError: (error) {
        controller.addError(error);
      },
      onDone: () async {
        try {
          final output = await flow.output;
          // output is already a String, so no need for toString()
          controller.add(output);
          await controller.close();
        } catch (e) {
          controller.addError(e);
          await controller.close();
        }
      },
    );

    return controller.stream;
  }

Cloud function backend from https://github.com/firebase/genkit/blob/5033df07a99a05427da4f8c5c1009a2c0375e265/js/testapps/next/src/genkit/joke.ts#L28

@SelaseKay
Copy link
Contributor

Hi @victoire-hergan, currently, streaming is only supported on the web and not on native platforms (Android and iOS). I'll keep this issue open to track this feature request for FlutterFire.

Reference: Invertase/react-native-firebase#8210 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Needs Attention This issue needs maintainer attention. plugin: functions type: enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants