Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions examples/configs/gs_content_safety/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@ rails:
enabled: True
chunk_size: 200
context_size: 50

streaming: True
3 changes: 0 additions & 3 deletions examples/configs/llm/hf_pipeline_dolly/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ models:
- type: main
engine: hf_pipeline_dolly

# Remove attribute / set to False if streaming is not required
streaming: True

instructions:
- type: general
content: |
Expand Down
2 changes: 0 additions & 2 deletions examples/configs/streaming/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ rails:
dialog:
single_call:
enabled: True

streaming: True
24 changes: 3 additions & 21 deletions examples/notebooks/generate_events_and_streaming.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@
"metadata": {
"collapsed": false
},
"source": [
"## Step 1: create a config \n",
"\n",
"Let's create a simple config:"
]
"source": "## Step 1: create a config \n\nLet's create a simple config. No special streaming configuration is needed—streaming is automatically enabled when a `StreamingHandler` is used:"
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"id": "d9bac50b3383915e",
"metadata": {
"ExecuteTime": {
Expand All @@ -59,21 +55,7 @@
"collapsed": false
},
"outputs": [],
"source": [
"from nemoguardrails import LLMRails, RailsConfig\n",
"\n",
"YAML_CONFIG = \"\"\"\n",
"models:\n",
" - type: main\n",
" engine: openai\n",
" model: gpt-4\n",
"\n",
"streaming: True\n",
"\"\"\"\n",
"\n",
"config = RailsConfig.from_content(yaml_content=YAML_CONFIG)\n",
"app = LLMRails(config)"
]
"source": "from nemoguardrails import LLMRails, RailsConfig\n\nYAML_CONFIG = \"\"\"\nmodels:\n - type: main\n engine: openai\n model: gpt-4\n\"\"\"\n\nconfig = RailsConfig.from_content(yaml_content=YAML_CONFIG)\napp = LLMRails(config)"
},
{
"cell_type": "markdown",
Expand Down
4 changes: 0 additions & 4 deletions examples/scripts/demo_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
- type: main
engine: openai
model: gpt-4

streaming: True
"""


Expand Down Expand Up @@ -99,8 +97,6 @@ async def demo_streaming_from_custom_action():
dialog:
user_messages:
embeddings_only: True

streaming: True
""",
colang_content="""
# We need to have at least on canonical form to enable dialog rails.
Expand Down
48 changes: 29 additions & 19 deletions nemoguardrails/cli/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from nemoguardrails.colang.v2_x.runtime.eval import eval_expression
from nemoguardrails.colang.v2_x.runtime.flows import State
from nemoguardrails.colang.v2_x.runtime.runtime import RuntimeV2_x
from nemoguardrails.exceptions import StreamingNotSupportedError
from nemoguardrails.logging import verbose
from nemoguardrails.logging.verbose import console
from nemoguardrails.rails.llm.options import (
Expand Down Expand Up @@ -65,11 +66,6 @@ async def _run_chat_v1_0(
raise RuntimeError("config_path cannot be None when server_url is None")
rails_config = RailsConfig.from_path(config_path)
rails_app = LLMRails(rails_config, verbose=verbose)
if streaming and not rails_config.streaming_supported:
console.print(
f"WARNING: The config `{config_path}` does not support streaming. Falling back to normal mode."
)
streaming = False
else:
rails_app = None

Expand All @@ -83,19 +79,33 @@ async def _run_chat_v1_0(

if not server_url:
# If we have streaming from a locally loaded config, we initialize the handler.
if streaming and not server_url and rails_app and rails_app.main_llm_supports_streaming:
bot_message_list = []
async for chunk in rails_app.stream_async(messages=history):
if '{"event": "ABORT"' in chunk:
dict_chunk = json.loads(chunk)
console.print("\n\n[red]" + f"ABORT streaming. {dict_chunk['data']}" + "[/]")
break

console.print("[green]" + f"{chunk}" + "[/]", end="")
bot_message_list.append(chunk)

bot_message_text = "".join(bot_message_list)
bot_message = {"role": "assistant", "content": bot_message_text}
if streaming and not server_url and rails_app:
try:
bot_message_list = []
async for chunk in rails_app.stream_async(messages=history):
if '{"event": "ABORT"' in chunk:
dict_chunk = json.loads(chunk)
console.print("\n\n[red]" + f"ABORT streaming. {dict_chunk['data']}" + "[/]")
break

console.print("[green]" + f"{chunk}" + "[/]", end="")
bot_message_list.append(chunk)

bot_message_text = "".join(bot_message_list)
bot_message = {"role": "assistant", "content": bot_message_text}
except StreamingNotSupportedError as e:
raise StreamingNotSupportedError(
f"Cannot use --streaming with config `{config_path}` because output rails "
"are configured but streaming is not enabled for them.\n\n"
"To fix this, either:\n"
" 1. Enable streaming for output rails by adding to your config.yml:\n"
" rails:\n"
" output:\n"
" streaming:\n"
" enabled: True\n\n"
" 2. Or run without the --streaming flag:\n"
f" nemoguardrails chat {config_path}"
) from e

else:
if rails_app is None:
Expand Down Expand Up @@ -124,7 +134,7 @@ async def _run_chat_v1_0(
# String or other fallback case
bot_message = {"role": "assistant", "content": str(response)}

if not streaming or not rails_app.main_llm_supports_streaming:
if not streaming:
# We print bot messages in green.
content = bot_message.get("content", str(bot_message))
console.print("[green]" + f"{content}" + "[/]")
Expand Down
7 changes: 7 additions & 0 deletions nemoguardrails/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"InvalidModelConfigurationError",
"InvalidRailsConfigurationError",
"LLMCallException",
"StreamingNotSupportedError",
]


Expand Down Expand Up @@ -49,6 +50,12 @@ class InvalidRailsConfigurationError(ConfigurationError):
pass


class StreamingNotSupportedError(InvalidRailsConfigurationError):
"""Raised when streaming is requested but not supported by the configuration."""

pass


class LLMCallException(Exception):
"""A wrapper around the LLM call invocation exception.

Expand Down
17 changes: 2 additions & 15 deletions nemoguardrails/rails/llm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,8 @@ class RailsConfig(BaseModel):

streaming: bool = Field(
default=False,
description="Whether this configuration should use streaming mode or not.",
deprecated="The 'streaming' field is no longer required. Use stream_async() method directly instead. This field will be removed in a future version.",
description="DEPRECATED: Use stream_async() method instead. This field is ignored.",
)

enable_rails_exceptions: bool = Field(
Expand Down Expand Up @@ -1665,20 +1666,6 @@ def parse_object(cls, obj):

return cls.parse_obj(obj)

@property
def streaming_supported(self):
"""Whether the current config supports streaming or not."""

if len(self.rails.output.flows) > 0:
# if we have output rails streaming enabled
# we keep it in case it was needed when we have
# support per rails
if self.rails.output.streaming and self.rails.output.streaming.enabled:
return True
return False

return True

def __add__(self, other):
"""Adds two RailsConfig objects."""
return _join_rails_configs(self, other)
Expand Down
36 changes: 10 additions & 26 deletions nemoguardrails/rails/llm/llmrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from nemoguardrails.exceptions import (
InvalidModelConfigurationError,
InvalidRailsConfigurationError,
StreamingNotSupportedError,
)
from nemoguardrails.kb.kb import KnowledgeBase
from nemoguardrails.llm.cache import CacheInterface, LFUCache
Expand Down Expand Up @@ -155,9 +156,6 @@ def __init__(
# should be removed
self.events_history_cache = {}

# Weather the main LLM supports streaming
self.main_llm_supports_streaming = False

# We also load the default flows from the `default_flows.yml` file in the current folder.
# But only for version 1.0.
# TODO: decide on the default flows for 2.x.
Expand Down Expand Up @@ -377,10 +375,9 @@ def _prepare_model_kwargs(self, model_config):
if api_key:
kwargs["api_key"] = api_key

# enable streaming token usage when streaming is enabled
# enable streaming token usage
# providers that don't support this parameter will simply ignore it
if self.config.streaming:
kwargs["stream_usage"] = True
kwargs["stream_usage"] = True
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pouyanpi : we can move this later to llm_call when we drop the streaming callback handler feature.


return kwargs

Expand All @@ -398,22 +395,9 @@ def _configure_main_llm_streaming(
provider_name (Optional[str], optional): Optional provider name for logging.

"""
if not self.config.streaming:
return

if hasattr(llm, "streaming"):
setattr(llm, "streaming", True)
self.main_llm_supports_streaming = True
else:
self.main_llm_supports_streaming = False
if model_name and provider_name:
log.warning(
"Model %s from provider %s does not support streaming.",
model_name,
provider_name,
)
else:
log.warning("Provided main LLM does not support streaming.")

def _init_llms(self):
"""
Expand Down Expand Up @@ -442,7 +426,6 @@ def _init_llms(self):
)
self.runtime.register_action_param("llm", self.llm)

self._configure_main_llm_streaming(self.llm)
else:
# Otherwise, initialize the main LLM from the config
main_model = next((model for model in self.config.models if model.type == "main"), None)
Expand All @@ -457,11 +440,6 @@ def _init_llms(self):
)
self.runtime.register_action_param("llm", self.llm)

self._configure_main_llm_streaming(
self.llm,
model_name=main_model.model,
provider_name=main_model.engine,
)
else:
log.warning("No main LLM specified in the config and no LLM provided via constructor.")

Expand Down Expand Up @@ -848,6 +826,7 @@ async def generate_async(

if streaming_handler:
streaming_handler_var.set(streaming_handler)
self._configure_main_llm_streaming(self.llm) # type: ignore

# Initialize the object with additional explanation information.
# We allow this to also be set externally. This is useful when multiple parallel
Expand Down Expand Up @@ -1189,7 +1168,7 @@ def _validate_streaming_with_output_rails(self) -> None:
if len(self.config.rails.output.flows) > 0 and (
not self.config.rails.output.streaming or not self.config.rails.output.streaming.enabled
):
raise InvalidRailsConfigurationError(
raise StreamingNotSupportedError(
"stream_async() cannot be used when output rails are configured but "
"rails.output.streaming.enabled is False. Either set "
"rails.output.streaming.enabled to True in your configuration, or use "
Expand Down Expand Up @@ -1246,6 +1225,8 @@ def stream_async(

streaming_handler = StreamingHandler(include_generation_metadata=include_generation_metadata)

self._configure_main_llm_streaming(self.llm) # type: ignore

# Create a properly managed task with exception handling
async def _generation_task():
try:
Expand Down Expand Up @@ -1357,6 +1338,9 @@ async def generate_events_async(
llm_stats = LLMStats()
llm_stats_var.set(llm_stats)

if streaming_handler_var.get():
self._configure_main_llm_streaming(self.llm) # type: ignore

# Compute the new events.
processing_log = []
new_events = await self.runtime.generate_events(events, processing_log=processing_log)
Expand Down
18 changes: 5 additions & 13 deletions nemoguardrails/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
GenerationResponse,
)
from nemoguardrails.server.datastore.datastore import DataStore
from nemoguardrails.streaming import StreamingHandler

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -426,18 +425,11 @@ async def chat_completion(body: RequestBody, request: Request):
# And prepend them.
messages = thread_messages + messages

if body.stream and llm_rails.config.streaming_supported and llm_rails.main_llm_supports_streaming:
# Create the streaming handler instance
streaming_handler = StreamingHandler()

# Start the generation
asyncio.create_task(
llm_rails.generate_async(
messages=messages,
streaming_handler=streaming_handler,
options=body.options,
state=body.state,
)
if body.stream:
streaming_handler = llm_rails.stream_async(
messages=messages,
options=body.options,
state=body.state,
)

# TODO: Add support for thread_ids in streaming mode
Expand Down
Loading