diff --git a/libs/oci/README.md b/libs/oci/README.md index 2266d10..78c829b 100644 --- a/libs/oci/README.md +++ b/libs/oci/README.md @@ -59,14 +59,16 @@ embeddings.embed_query("What is the meaning of life?") ### 1. Use a Chat Model -You may instantiate the OCI Data Science model with the generic `ChatOCIModelDeployment` or framework specific class like `ChatOCIModelDeploymentVLLM`. +The `ChatOCIModelDeployment` class is designed for deployment with OpenAI compatible APIs. ```python -from langchain_oci.chat_models import ChatOCIModelDeployment, ChatOCIModelDeploymentVLLM +from langchain_oci import ChatOCIModelDeployment # Create an instance of OCI Model Deployment Endpoint + # Replace the endpoint uri with your own -endpoint = "https://modeldeployment..oci.customer-oci.com//predict" +# For streaming, use the /predictWithResponseStream endpoint. +endpoint = "https://modeldeployment..oci.customer-oci.com//predictWithResponseStream" messages = [ ( @@ -78,31 +80,23 @@ messages = [ chat = ChatOCIModelDeployment( endpoint=endpoint, - streaming=True, - max_retries=1, - model_kwargs={ - "temperature": 0.2, - "max_tokens": 512, - }, # other model params... - default_headers={ - "route": "/v1/chat/completions", - # other request headers ... - }, + model="odsc-llm", + max_tokens=512, ) -chat.invoke(messages) +chat.stream(messages) -chat_vllm = ChatOCIModelDeploymentVLLM(endpoint=endpoint) -chat_vllm.invoke(messages) ``` ### 2. Use a Completion Model -You may instantiate the OCI Data Science model with `OCIModelDeploymentLLM` or `OCIModelDeploymentVLLM`. +The `OCIModelDeploymentLLM` class is designed for completion endpoints. ```python -from langchain_oci.llms import OCIModelDeploymentLLM, OCIModelDeploymentVLLM +from langchain_oci import OCIModelDeploymentLLM # Create an instance of OCI Model Deployment Endpoint + # Replace the endpoint uri and model name with your own +# For streaming, use the /predictWithResponseStream endpoint. endpoint = "https://modeldeployment..oci.customer-oci.com//predict" llm = OCIModelDeploymentLLM( @@ -111,10 +105,6 @@ llm = OCIModelDeploymentLLM( ) llm.invoke("Who is the first president of United States?") -vllm = OCIModelDeploymentVLLM( - endpoint=endpoint, -) -vllm.invoke("Who is the first president of United States?") ``` ### 3. Use an Embedding Model diff --git a/libs/oci/langchain_oci/__init__.py b/libs/oci/langchain_oci/__init__.py index eb6df32..e3a40f8 100644 --- a/libs/oci/langchain_oci/__init__.py +++ b/libs/oci/langchain_oci/__init__.py @@ -1,35 +1,21 @@ # Copyright (c) 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ - -from langchain_oci.chat_models.oci_data_science import ( - ChatOCIModelDeployment, - ChatOCIModelDeploymentTGI, - ChatOCIModelDeploymentVLLM, -) +from langchain_oci.chat_models import ChatOCIModelDeployment from langchain_oci.chat_models.oci_generative_ai import ChatOCIGenAI from langchain_oci.embeddings.oci_data_science_model_deployment_endpoint import ( OCIModelDeploymentEndpointEmbeddings, ) from langchain_oci.embeddings.oci_generative_ai import OCIGenAIEmbeddings -from langchain_oci.llms.oci_data_science_model_deployment_endpoint import ( - BaseOCIModelDeployment, - OCIModelDeploymentLLM, - OCIModelDeploymentTGI, - OCIModelDeploymentVLLM, -) +from langchain_oci.llms import BaseOCIModelDeployment, OCIModelDeploymentLLM from langchain_oci.llms.oci_generative_ai import OCIGenAI, OCIGenAIBase __all__ = [ "ChatOCIGenAI", "ChatOCIModelDeployment", - "ChatOCIModelDeploymentTGI", - "ChatOCIModelDeploymentVLLM", "OCIGenAIEmbeddings", "OCIModelDeploymentEndpointEmbeddings", "OCIGenAIBase", "OCIGenAI", "BaseOCIModelDeployment", "OCIModelDeploymentLLM", - "OCIModelDeploymentTGI", - "OCIModelDeploymentVLLM", ] diff --git a/libs/oci/langchain_oci/chat_models/__init__.py b/libs/oci/langchain_oci/chat_models/__init__.py index c714b3c..a460ede 100644 --- a/libs/oci/langchain_oci/chat_models/__init__.py +++ b/libs/oci/langchain_oci/chat_models/__init__.py @@ -1,16 +1,33 @@ # Copyright (c) 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -from langchain_oci.chat_models.oci_data_science import ( - ChatOCIModelDeployment, - ChatOCIModelDeploymentTGI, - ChatOCIModelDeploymentVLLM, -) from langchain_oci.chat_models.oci_generative_ai import ChatOCIGenAI +try: + from langchain_oci.chat_models.oci_data_science import ChatOCIModelDeployment + +except ModuleNotFoundError as ex: + # Default message + message = ex.msg + # For langchain_openai, show the message with pip install command. + if ex.name == "langchain_openai": + message = ( + "No module named langchain_openai. " + "Please install it with `pip install langchain_openai`" + ) + + # Create a placeholder class here so that + # users can import the class without error. + # Users will see the error message when they try to initialize an instance. + class ChatOCIModelDeployment: + """Placeholder class for ChatOCIModelDeployment + when langchain-openai is not installed.""" + + def __init__(self, *args, **kwargs): + raise ModuleNotFoundError(message) + + __all__ = [ "ChatOCIGenAI", "ChatOCIModelDeployment", - "ChatOCIModelDeploymentTGI", - "ChatOCIModelDeploymentVLLM", ] diff --git a/libs/oci/langchain_oci/chat_models/oci_data_science.py b/libs/oci/langchain_oci/chat_models/oci_data_science.py index 719c355..c09141f 100644 --- a/libs/oci/langchain_oci/chat_models/oci_data_science.py +++ b/libs/oci/langchain_oci/chat_models/oci_data_science.py @@ -2,62 +2,17 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ """Chat model for OCI data science model deployment endpoint.""" +from langchain_openai import ChatOpenAI -import importlib -import json -import logging -from operator import itemgetter -from typing import ( - Any, - AsyncIterator, - Callable, - Dict, - Iterator, - List, - Literal, - Optional, - Sequence, - Type, - Union, -) +from langchain_oci.llms import BaseOCIModelDeployment -from langchain_core.callbacks import ( - AsyncCallbackManagerForLLMRun, - CallbackManagerForLLMRun, -) -from langchain_core.language_models import LanguageModelInput -from langchain_core.language_models.chat_models import ( - BaseChatModel, - agenerate_from_stream, - generate_from_stream, -) -from langchain_core.messages import AIMessageChunk, BaseMessage, BaseMessageChunk -from langchain_core.output_parsers import ( - JsonOutputParser, - PydanticOutputParser, -) -from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult -from langchain_core.runnables import Runnable, RunnableMap, RunnablePassthrough -from langchain_core.tools import BaseTool -from langchain_core.utils.function_calling import convert_to_openai_tool -from pydantic import BaseModel, Field, model_validator -from langchain_oci.llms.oci_data_science_model_deployment_endpoint import ( - DEFAULT_MODEL_NAME, - BaseOCIModelDeployment, -) - -logger = logging.getLogger(__name__) -DEFAULT_INFERENCE_ENDPOINT_CHAT = "/v1/chat/completions" - - -def _is_pydantic_class(obj: Any) -> bool: - return isinstance(obj, type) and issubclass(obj, BaseModel) - - -class ChatOCIModelDeployment(BaseChatModel, BaseOCIModelDeployment): +class ChatOCIModelDeployment(BaseOCIModelDeployment, ChatOpenAI): """OCI Data Science Model Deployment chat model integration. + This class inherits from ChatOpenAI LangChain client. + You can use all the parameters supported by ChatOpenAI LangChain client. + Prerequisite The OCI Model Deployment plugins are installable only on python version 3.9 and above. If you're working inside the notebook, @@ -88,945 +43,26 @@ class ChatOCIModelDeployment(BaseChatModel, BaseOCIModelDeployment): https://docs.oracle.com/en-us/iaas/data-science/using/model-dep-policies-auth.htm - Key init args - completion params: - endpoint: str - The OCI model deployment endpoint. - temperature: float - Sampling temperature. - max_tokens: Optional[int] - Max number of tokens to generate. - - Key init args — client params: - auth: dict - ADS auth dictionary for OCI authentication. - default_headers: Optional[Dict] - The headers to be added to the Model Deployment request. - - Instantiate: - .. code-block:: python - - from langchain_oci.chat_models import ChatOCIModelDeployment - - chat = ChatOCIModelDeployment( - endpoint="https://modeldeployment..oci.customer-oci.com//predict", - model="odsc-llm", # this is the default model name if deployed with AQUA - streaming=True, - max_retries=3, - model_kwargs={ - "max_token": 512, - "temperature": 0.2, - # other model parameters ... - }, - default_headers={ - "route": "/v1/chat/completions", - # other request headers ... - }, - ) - - Invocation: - .. code-block:: python - - messages = [ - ("system", "Translate the user sentence to French."), - ("human", "Hello World!"), - ] - chat.invoke(messages) - - .. code-block:: python - - AIMessage( - content='Bonjour le monde!', - response_metadata={ - 'token_usage': { - 'prompt_tokens': 40, - 'total_tokens': 50, - 'completion_tokens': 10 - }, - 'model_name': 'odsc-llm', - 'system_fingerprint': '', - 'finish_reason': 'stop' - }, - id='run-cbed62da-e1b3-4abd-9df3-ec89d69ca012-0' - ) - - Streaming: - .. code-block:: python - - for chunk in chat.stream(messages): - print(chunk) - - .. code-block:: python - - content='' id='run-02c6-c43f-42de' - content='\n' id='run-02c6-c43f-42de' - content='B' id='run-02c6-c43f-42de' - content='on' id='run-02c6-c43f-42de' - content='j' id='run-02c6-c43f-42de' - content='our' id='run-02c6-c43f-42de' - content=' le' id='run-02c6-c43f-42de' - content=' monde' id='run-02c6-c43f-42de' - content='!' id='run-02c6-c43f-42de' - content='' response_metadata={'finish_reason': 'stop'} id='run-02c6-c43f-42de' - - Async: - .. code-block:: python - - await chat.ainvoke(messages) - - # stream: - # async for chunk in (await chat.astream(messages)) - - .. code-block:: python - - AIMessage( - content='Bonjour le monde!', - response_metadata={'finish_reason': 'stop'}, - id='run-8657a105-96b7-4bb6-b98e-b69ca420e5d1-0' - ) - - Structured output: - .. code-block:: python - - from typing import Optional - from pydantic import BaseModel, Field - - class Joke(BaseModel): - setup: str = Field(description="The setup of the joke") - punchline: str = Field(description="The punchline to the joke") - - structured_llm = chat.with_structured_output(Joke, method="json_mode") - structured_llm.invoke( - "Tell me a joke about cats, " - "respond in JSON with `setup` and `punchline` keys" - ) - - .. code-block:: python - - Joke( - setup='Why did the cat get stuck in the tree?', - punchline='Because it was chasing its tail!' - ) - - See ``ChatOCIModelDeployment.with_structured_output()`` for more. - - Customized Usage: - You can inherit from base class and overwrite the `_process_response`, - `_process_stream_response`, `_construct_json_body` for customized usage. - - .. code-block:: python - - class MyChatModel(ChatOCIModelDeployment): - def _process_stream_response(self, response_json: dict) -> ChatGenerationChunk: - print("My customized streaming result handler.") - return GenerationChunk(...) - - def _process_response(self, response_json:dict) -> ChatResult: - print("My customized output handler.") - return ChatResult(...) - - def _construct_json_body(self, messages: list, params: dict) -> dict: - print("My customized payload handler.") - return { - "messages": messages, - **params, - } - - chat = MyChatModel( - endpoint=f"https://modeldeployment..oci.customer-oci.com/{ocid}/predict", - model="odsc-llm", - } - - chat.invoke("tell me a joke") - - Response metadata - .. code-block:: python - - ai_msg = chat.invoke(messages) - ai_msg.response_metadata - - .. code-block:: python - - { - 'token_usage': { - 'prompt_tokens': 40, - 'total_tokens': 50, - 'completion_tokens': 10 - }, - 'model_name': 'odsc-llm', - 'system_fingerprint': '', - 'finish_reason': 'stop' - } - - """ # noqa: E501 - - model_kwargs: Dict[str, Any] = Field(default_factory=dict) - """Keyword arguments to pass to the model.""" - - model: str = DEFAULT_MODEL_NAME - """The name of the model.""" - - stop: Optional[List[str]] = None - """Stop words to use when generating. Model output is cut off - at the first occurrence of any of these substrings.""" - - @model_validator(mode="before") - @classmethod - def validate_openai(cls, values: Any) -> Any: - """Checks if langchain_openai is installed.""" - if not importlib.util.find_spec("langchain_openai"): - raise ImportError( - "Could not import langchain_openai package. " - "Please install it with `pip install langchain_openai`." - ) - return values - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "oci_model_depolyment_chat_endpoint" - - @property - def _identifying_params(self) -> Dict[str, Any]: - """Get the identifying parameters.""" - _model_kwargs = self.model_kwargs or {} - return { - **{"endpoint": self.endpoint, "model_kwargs": _model_kwargs}, - **self._default_params, - } - - @property - def _default_params(self) -> Dict[str, Any]: - """Get the default parameters.""" - return { - "model": self.model, - "stop": self.stop, - "stream": self.streaming, - } - - def _headers( - self, is_async: Optional[bool] = False, body: Optional[dict] = None - ) -> Dict: - """Construct and return the headers for a request. - - Args: - is_async (bool, optional): Indicates if the request is asynchronous. - Defaults to `False`. - body (optional): The request body to be included in the headers if - the request is asynchronous. - - Returns: - Dict: A dictionary containing the appropriate headers for the request. - """ - return { - "route": DEFAULT_INFERENCE_ENDPOINT_CHAT, - **super()._headers(is_async=is_async, body=body), - } - - def _generate( - self, - messages: List[BaseMessage], - stop: Optional[List[str]] = None, - run_manager: Optional[CallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> ChatResult: - """Call out to an OCI Model Deployment Online endpoint. - - Args: - messages: The messages in the conversation with the chat model. - stop: Optional list of stop words to use when generating. - - Returns: - LangChain ChatResult - - Raises: - RuntimeError: - Raise when invoking endpoint fails. - - Example: - - .. code-block:: python - - messages = [ - ( - "system", - "You are a helpful assistant that translates English to French. Translate the user sentence.", - ), - ("human", "Hello World!"), - ] - - response = chat.invoke(messages) - """ # noqa: E501 - if self.streaming: - stream_iter = self._stream( - messages, stop=stop, run_manager=run_manager, **kwargs - ) - return generate_from_stream(stream_iter) - - requests_kwargs = kwargs.pop("requests_kwargs", {}) - params = self._invocation_params(stop, **kwargs) - body = self._construct_json_body(messages, params) - res = self.completion_with_retry( - data=body, run_manager=run_manager, **requests_kwargs - ) - return self._process_response(res.json()) - - def _stream( - self, - messages: List[BaseMessage], - stop: Optional[List[str]] = None, - run_manager: Optional[CallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> Iterator[ChatGenerationChunk]: - """Stream OCI Data Science Model Deployment endpoint on given messages. - - Args: - messages (List[BaseMessage]): - The messagaes to pass into the model. - stop (List[str], Optional): - List of stop words to use when generating. - kwargs: - requests_kwargs: - Additional ``**kwargs`` to pass to requests.post - - Returns: - An iterator of ChatGenerationChunk. - - Raises: - RuntimeError: - Raise when invoking endpoint fails. - - Example: - - .. code-block:: python - - messages = [ - ( - "system", - "You are a helpful assistant that translates English to French. Translate the user sentence.", - ), - ("human", "Hello World!"), - ] - - chunk_iter = chat.stream(messages) - - """ # noqa: E501 - requests_kwargs = kwargs.pop("requests_kwargs", {}) - self.streaming = True - params = self._invocation_params(stop, **kwargs) - body = self._construct_json_body(messages, params) # request json body - - response = self.completion_with_retry( - data=body, run_manager=run_manager, stream=True, **requests_kwargs - ) - default_chunk_class = AIMessageChunk - for line in self._parse_stream(response.iter_lines()): - chunk = self._handle_sse_line(line, default_chunk_class) - if run_manager: - run_manager.on_llm_new_token(chunk.text, chunk=chunk) - yield chunk - - async def _agenerate( - self, - messages: List[BaseMessage], - stop: Optional[List[str]] = None, - run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> ChatResult: - """Asynchronously call out to OCI Data Science Model Deployment - endpoint on given messages. - - Args: - messages (List[BaseMessage]): - The messagaes to pass into the model. - stop (List[str], Optional): - List of stop words to use when generating. - kwargs: - requests_kwargs: - Additional ``**kwargs`` to pass to requests.post - - Returns: - LangChain ChatResult. - - Raises: - ValueError: - Raise when invoking endpoint fails. - - Example: - - .. code-block:: python - - messages = [ - ( - "system", - "You are a helpful assistant that translates English to French. Translate the user sentence.", - ), - ("human", "I love programming."), - ] - - resp = await chat.ainvoke(messages) - - """ # noqa: E501 - if self.streaming: - stream_iter = self._astream( - messages, stop=stop, run_manager=run_manager, **kwargs - ) - return await agenerate_from_stream(stream_iter) - - requests_kwargs = kwargs.pop("requests_kwargs", {}) - params = self._invocation_params(stop, **kwargs) - body = self._construct_json_body(messages, params) - response = await self.acompletion_with_retry( - data=body, - run_manager=run_manager, - **requests_kwargs, - ) - return self._process_response(response) - - async def _astream( - self, - messages: List[BaseMessage], - stop: Optional[List[str]] = None, - run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> AsyncIterator[ChatGenerationChunk]: - """Asynchronously streaming OCI Data Science Model Deployment - endpoint on given messages. - - Args: - messages (List[BaseMessage]): - The messagaes to pass into the model. - stop (List[str], Optional): - List of stop words to use when generating. - kwargs: - requests_kwargs: - Additional ``**kwargs`` to pass to requests.post - - Returns: - An Asynciterator of ChatGenerationChunk. - - Raises: - ValueError: - Raise when invoking endpoint fails. - - Example: - - .. code-block:: python - - messages = [ - ( - "system", - "You are a helpful assistant that translates English to French. Translate the user sentence.", - ), - ("human", "I love programming."), - ] - - chunk_iter = await chat.astream(messages) - - """ # noqa: E501 - requests_kwargs = kwargs.pop("requests_kwargs", {}) - self.streaming = True - params = self._invocation_params(stop, **kwargs) - body = self._construct_json_body(messages, params) # request json body - - default_chunk_class = AIMessageChunk - async for line in await self.acompletion_with_retry( - data=body, run_manager=run_manager, stream=True, **requests_kwargs - ): - chunk = self._handle_sse_line(line, default_chunk_class) - if run_manager: - await run_manager.on_llm_new_token(chunk.text, chunk=chunk) - yield chunk - - def with_structured_output( - self, - schema: Optional[Union[Dict, Type[BaseModel]]] = None, - *, - method: Literal["json_mode"] = "json_mode", - include_raw: bool = False, - **kwargs: Any, - ) -> Runnable[LanguageModelInput, Union[Dict, BaseModel]]: - """Model wrapper that returns outputs formatted to match the given schema. - - Args: - schema: The output schema as a dict or a Pydantic class. If a Pydantic class - then the model output will be an object of that class. If a dict then - the model output will be a dict. With a Pydantic class the returned - attributes will be validated, whereas with a dict they will not be. If - `method` is "function_calling" and `schema` is a dict, then the dict - must match the OpenAI function-calling spec. - method: The method for steering model generation, currently only support - for "json_mode". If "json_mode" then JSON mode will be used. Note that - if using "json_mode" then you must include instructions for formatting - the output into the desired schema into the model call. - include_raw: If False then only the parsed structured output is returned. If - an error occurs during model output parsing it will be raised. If True - then both the raw model response (a BaseMessage) and the parsed model - response will be returned. If an error occurs during output parsing it - will be caught and returned as well. The final output is always a dict - with keys "raw", "parsed", and "parsing_error". - - Returns: - A Runnable that takes any ChatModel input and returns as output: - - If include_raw is True then a dict with keys: - raw: BaseMessage - parsed: Optional[_DictOrPydantic] - parsing_error: Optional[BaseException] - - If include_raw is False then just _DictOrPydantic is returned, - where _DictOrPydantic depends on the schema: - - If schema is a Pydantic class then _DictOrPydantic is the Pydantic - class. - - If schema is a dict then _DictOrPydantic is a dict. - - """ # noqa: E501 - if kwargs: - raise ValueError(f"Received unsupported arguments {kwargs}") - is_pydantic_schema = _is_pydantic_class(schema) - if method == "json_mode": - llm = self.bind(response_format={"type": "json_object"}) - output_parser = ( - PydanticOutputParser(pydantic_object=schema) # type: ignore[type-var, arg-type] - if is_pydantic_schema - else JsonOutputParser() - ) - else: - raise ValueError( - f"Unrecognized method argument. Expected `json_mode`." - f"Received: `{method}`." - ) - - if include_raw: - parser_assign = RunnablePassthrough.assign( - parsed=itemgetter("raw") | output_parser, parsing_error=lambda _: None - ) - parser_none = RunnablePassthrough.assign(parsed=lambda _: None) - parser_with_fallback = parser_assign.with_fallbacks( - [parser_none], exception_key="parsing_error" - ) - return RunnableMap(raw=llm) | parser_with_fallback - else: - return llm | output_parser - - def _invocation_params(self, stop: Optional[List[str]], **kwargs: Any) -> dict: - """Combines the invocation parameters with default parameters.""" - params = self._default_params - _model_kwargs = self.model_kwargs or {} - params["stop"] = stop or params.get("stop", []) - return {**params, **_model_kwargs, **kwargs} - - def _handle_sse_line( - self, line: str, default_chunk_cls: Type[BaseMessageChunk] = AIMessageChunk - ) -> ChatGenerationChunk: - """Handle a single Server-Sent Events (SSE) line and process it into - a chat generation chunk. - - Args: - line (str): A single line from the SSE stream in string format. - default_chunk_cls (AIMessageChunk): The default class for message - chunks to be used during the processing of the stream response. - - Returns: - ChatGenerationChunk: The processed chat generation chunk. If an error - occurs, an empty `ChatGenerationChunk` is returned. - """ - try: - obj = json.loads(line) - return self._process_stream_response(obj, default_chunk_cls) - except Exception as e: - logger.debug(f"Error occurs when processing line={line}: {str(e)}") - return ChatGenerationChunk(message=AIMessageChunk(content="")) - - def _construct_json_body(self, messages: list, params: dict) -> dict: - """Constructs the request body as a dictionary (JSON). - - Args: - messages (list): A list of message objects to be included in the - request body. - params (dict): A dictionary of additional parameters to be included - in the request body. - - Returns: - dict: A dictionary representing the JSON request body, including - converted messages and additional parameters. - - """ - from langchain_openai.chat_models.base import _convert_message_to_dict - - return { - "messages": [_convert_message_to_dict(m) for m in messages], - **params, - } - - def _process_stream_response( - self, - response_json: dict, - default_chunk_cls: Type[BaseMessageChunk] = AIMessageChunk, - ) -> ChatGenerationChunk: - """Formats streaming response in OpenAI spec. - - Args: - response_json (dict): The JSON response from the streaming endpoint. - default_chunk_cls (type, optional): The default class to use for - creating message chunks. Defaults to `AIMessageChunk`. - - Returns: - ChatGenerationChunk: An object containing the processed message - chunk and any relevant generation information such as finish - reason and usage. - - Raises: - ValueError: If the response JSON is not well-formed or does not - contain the expected structure. - """ - from langchain_openai.chat_models.base import _convert_delta_to_message_chunk - - try: - choice = response_json["choices"][0] - if not isinstance(choice, dict): - raise TypeError("Endpoint response is not well formed.") - except (KeyError, IndexError, TypeError) as e: - raise ValueError( - "Error while formatting response payload for chat model of type" - ) from e - - chunk = _convert_delta_to_message_chunk(choice["delta"], default_chunk_cls) - default_chunk_cls = chunk.__class__ - finish_reason = choice.get("finish_reason") - usage = choice.get("usage") - gen_info = {} - if finish_reason is not None: - gen_info.update({"finish_reason": finish_reason}) - if usage is not None: - gen_info.update({"usage": usage}) - - return ChatGenerationChunk( - message=chunk, generation_info=gen_info if gen_info else None - ) - - def _process_response(self, response_json: dict) -> ChatResult: - """Formats response in OpenAI spec. - - Args: - response_json (dict): The JSON response from the chat model endpoint. - - Returns: - ChatResult: An object containing the list of `ChatGeneration` objects - and additional LLM output information. - - Raises: - ValueError: If the response JSON is not well-formed or does not - contain the expected structure. - - """ - from langchain_openai.chat_models.base import _convert_dict_to_message - - generations = [] - try: - choices = response_json["choices"] - if not isinstance(choices, list): - raise TypeError("Endpoint response is not well formed.") - except (KeyError, TypeError) as e: - raise ValueError( - "Error while formatting response payload for chat model of type" - ) from e - - for choice in choices: - message = _convert_dict_to_message(choice["message"]) - generation_info = {"finish_reason": choice.get("finish_reason")} - if "logprobs" in choice: - generation_info["logprobs"] = choice["logprobs"] - - gen = ChatGeneration( - message=message, - generation_info=generation_info, - ) - generations.append(gen) - - token_usage = response_json.get("usage", {}) - llm_output = { - "token_usage": token_usage, - "model_name": self.model, - "system_fingerprint": response_json.get("system_fingerprint", ""), - } - return ChatResult(generations=generations, llm_output=llm_output) - - def bind_tools( - self, - tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]], - **kwargs: Any, - ) -> Runnable[LanguageModelInput, BaseMessage]: - formatted_tools = [convert_to_openai_tool(tool) for tool in tools] - return super().bind(tools=formatted_tools, **kwargs) - - -class ChatOCIModelDeploymentVLLM(ChatOCIModelDeployment): - """OCI large language chat models deployed with vLLM. - - To use, you must provide the model HTTP endpoint from your deployed - model, e.g. https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predict. - - To authenticate, `oracle-ads` has been used to automatically load - credentials: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html - - Make sure to have the required policies to access the OCI Data - Science Model Deployment endpoint. See: - https://docs.oracle.com/en-us/iaas/data-science/using/model-dep-policies-auth.htm#model_dep_policies_auth__predict-endpoint - Example: .. code-block:: python - from langchain_oci.chat_models import ChatOCIModelDeploymentVLLM + from langchain_oci import ChatOCIModelDeployment - chat = ChatOCIModelDeploymentVLLM( - endpoint="https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predict", - frequency_penalty=0.1, - max_tokens=512, - temperature=0.2, - top_p=1.0, - # other model parameters... + llm = ChatOCIModelDeployment( + endpoint="https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predictWithResponseStream", + model="odsc-llm", ) + llm.stream("tell me a joke.") - """ # noqa: E501 - - frequency_penalty: float = 0.0 - """Penalizes repeated tokens according to frequency. Between 0 and 1.""" - logit_bias: Optional[Dict[str, float]] = None - """Adjust the probability of specific tokens being generated.""" - - max_tokens: Optional[int] = 256 - """The maximum number of tokens to generate in the completion.""" - - n: int = 1 - """Number of output sequences to return for the given prompt.""" - - presence_penalty: float = 0.0 - """Penalizes repeated tokens. Between 0 and 1.""" - - temperature: float = 0.2 - """What sampling temperature to use.""" - - top_p: float = 1.0 - """Total probability mass of tokens to consider at each step.""" - - best_of: Optional[int] = None - """Generates best_of completions server-side and returns the "best" - (the one with the highest log probability per token). - """ - - use_beam_search: Optional[bool] = False - """Whether to use beam search instead of sampling.""" - - top_k: Optional[int] = -1 - """Number of most likely tokens to consider at each step.""" - - min_p: Optional[float] = 0.0 - """Float that represents the minimum probability for a token to be considered. - Must be in [0,1]. 0 to disable this.""" - - repetition_penalty: Optional[float] = 1.0 - """Float that penalizes new tokens based on their frequency in the - generated text. Values > 1 encourage the model to use new tokens.""" - - length_penalty: Optional[float] = 1.0 - """Float that penalizes sequences based on their length. Used only - when `use_beam_search` is True.""" - - early_stopping: Optional[bool] = False - """Controls the stopping condition for beam search. It accepts the - following values: `True`, where the generation stops as soon as there - are `best_of` complete candidates; `False`, where a heuristic is applied - to the generation stops when it is very unlikely to find better candidates; - `never`, where the beam search procedure only stops where there cannot be - better candidates (canonical beam search algorithm).""" - - ignore_eos: Optional[bool] = False - """Whether to ignore the EOS token and continue generating tokens after - the EOS token is generated.""" - - min_tokens: Optional[int] = 0 - """Minimum number of tokens to generate per output sequence before - EOS or stop_token_ids can be generated""" - - stop_token_ids: Optional[List[int]] = None - """List of tokens that stop the generation when they are generated. - The returned output will contain the stop tokens unless the stop tokens - are special tokens.""" - - skip_special_tokens: Optional[bool] = True - """Whether to skip special tokens in the output. Defaults to True.""" - - spaces_between_special_tokens: Optional[bool] = True - """Whether to add spaces between special tokens in the output. - Defaults to True.""" - - tool_choice: Optional[str] = None - """Whether to use tool calling. - Defaults to None, tool calling is disabled. - Tool calling requires model support and the vLLM to be configured - with `--tool-call-parser`. - Set this to `auto` for the model to make tool calls automatically. - Set this to `required` to force the model to always call one or more tools. - """ - - chat_template: Optional[str] = None - """Use customized chat template. - Defaults to None. The chat template from the tokenizer will be used. + Key init args: + endpoint: str + The OCI model deployment endpoint. For example: + Non-streaming: https://modeldeployment..oci.customer-oci.com//predict + Streaming: https://modeldeployment..oci.customer-oci.com//predictWithResponseStream + auth: dict + ADS auth dictionary for OCI authentication. """ - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "oci_model_depolyment_chat_endpoint_vllm" - - @property - def _default_params(self) -> Dict[str, Any]: - """Get the default parameters.""" - params = { - "model": self.model, - "stop": self.stop, - "stream": self.streaming, - } - for attr_name in self._get_model_params(): - try: - value = getattr(self, attr_name) - if value is not None: - params.update({attr_name: value}) - except Exception: - pass - - return params - - def _get_model_params(self) -> List[str]: - """Gets the name of model parameters.""" - return [ - "best_of", - "early_stopping", - "frequency_penalty", - "ignore_eos", - "length_penalty", - "logit_bias", - "logprobs", - "max_tokens", - "min_p", - "min_tokens", - "n", - "presence_penalty", - "repetition_penalty", - "skip_special_tokens", - "spaces_between_special_tokens", - "stop_token_ids", - "temperature", - "top_k", - "top_p", - "use_beam_search", - "tool_choice", - "chat_template", - ] - - -class ChatOCIModelDeploymentTGI(ChatOCIModelDeployment): - """OCI large language chat models deployed with Text Generation Inference. - - To use, you must provide the model HTTP endpoint from your deployed - model, e.g. https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predict. - - To authenticate, `oracle-ads` has been used to automatically load - credentials: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html - - Make sure to have the required policies to access the OCI Data - Science Model Deployment endpoint. See: - https://docs.oracle.com/en-us/iaas/data-science/using/model-dep-policies-auth.htm#model_dep_policies_auth__predict-endpoint - - Example: - - .. code-block:: python - - from langchain_oci.chat_models import ChatOCIModelDeploymentTGI - - chat = ChatOCIModelDeploymentTGI( - endpoint="https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predict", - max_token=512, - temperature=0.2, - frequency_penalty=0.1, - seed=42, - # other model parameters... - ) - - """ # noqa: E501 - - frequency_penalty: Optional[float] = None - """Penalizes repeated tokens according to frequency. Between 0 and 1.""" - - logit_bias: Optional[Dict[str, float]] = None - """Adjust the probability of specific tokens being generated.""" - - logprobs: Optional[bool] = None - """Whether to return log probabilities of the output tokens or not.""" - - max_tokens: int = 256 - """The maximum number of tokens to generate in the completion.""" - - n: int = 1 - """Number of output sequences to return for the given prompt.""" - - presence_penalty: Optional[float] = None - """Penalizes repeated tokens. Between 0 and 1.""" - - seed: Optional[int] = None - """To sample deterministically,""" - - temperature: float = 0.2 - """What sampling temperature to use.""" - - top_p: Optional[float] = None - """Total probability mass of tokens to consider at each step.""" - - top_logprobs: Optional[int] = None - """An integer between 0 and 5 specifying the number of most - likely tokens to return at each token position, each with an - associated log probability. logprobs must be set to true if - this parameter is used.""" - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "oci_model_depolyment_chat_endpoint_tgi" - - @property - def _default_params(self) -> Dict[str, Any]: - """Get the default parameters.""" - params = { - "model": self.model, - "stop": self.stop, - "stream": self.streaming, - } - for attr_name in self._get_model_params(): - try: - value = getattr(self, attr_name) - if value is not None: - params.update({attr_name: value}) - except Exception: - pass - - return params - - def _get_model_params(self) -> List[str]: - """Gets the name of model parameters.""" - return [ - "frequency_penalty", - "logit_bias", - "logprobs", - "max_tokens", - "n", - "presence_penalty", - "seed", - "temperature", - "top_k", - "top_p", - "top_logprobs", - ] + pass diff --git a/libs/oci/langchain_oci/llms/__init__.py b/libs/oci/langchain_oci/llms/__init__.py index 41cc42a..5c2436d 100644 --- a/libs/oci/langchain_oci/llms/__init__.py +++ b/libs/oci/langchain_oci/llms/__init__.py @@ -1,19 +1,39 @@ # Copyright (c) 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -from langchain_oci.llms.oci_data_science_model_deployment_endpoint import ( - BaseOCIModelDeployment, - OCIModelDeploymentLLM, - OCIModelDeploymentTGI, - OCIModelDeploymentVLLM, -) + from langchain_oci.llms.oci_generative_ai import OCIGenAI, OCIGenAIBase +try: + from langchain_oci.llms.oci_data_science_model_deployment_endpoint import ( + BaseOCIModelDeployment, + OCIModelDeploymentLLM, + ) +except ModuleNotFoundError as ex: + # Default message + message = ex.msg + # For langchain_openai, show the message with pip install command. + if ex.name == "langchain_openai": + message = ( + "No module named langchain_openai. " + "Please install it with `pip install langchain_openai`" + ) + + # Create a placeholder class here so that + # users can import the class without error. + # Users will see the error message when they try to initialize an instance. + + class BaseOCIModelDeployment: + def __init__(self, *args, **kwargs): + raise ModuleNotFoundError(message) + + class OCIModelDeploymentLLM(BaseOCIModelDeployment): + pass + + __all__ = [ "OCIGenAIBase", "OCIGenAI", "BaseOCIModelDeployment", "OCIModelDeploymentLLM", - "OCIModelDeploymentTGI", - "OCIModelDeploymentVLLM", ] diff --git a/libs/oci/langchain_oci/llms/oci_data_science_model_deployment_endpoint.py b/libs/oci/langchain_oci/llms/oci_data_science_model_deployment_endpoint.py index 9089be1..4603492 100644 --- a/libs/oci/langchain_oci/llms/oci_data_science_model_deployment_endpoint.py +++ b/libs/oci/langchain_oci/llms/oci_data_science_model_deployment_endpoint.py @@ -2,69 +2,17 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ """LLM for OCI data science model deployment endpoint.""" +import os +from typing import Dict, Optional -import json -import logging -import traceback -from contextlib import asynccontextmanager -from typing import ( - Any, - AsyncGenerator, - AsyncIterator, - Callable, - Dict, - Iterator, - List, - Literal, - Optional, - Union, -) - -import aiohttp -import requests -from langchain_core.callbacks import ( - AsyncCallbackManagerForLLMRun, - CallbackManagerForLLMRun, -) -from langchain_core.language_models.llms import BaseLLM, create_base_retry_decorator -from langchain_core.load.serializable import Serializable -from langchain_core.outputs import Generation, GenerationChunk, LLMResult +from langchain_core.language_models.llms import BaseLanguageModel from langchain_core.utils import get_from_dict_or_env -from pydantic import Field, model_validator - -logger = logging.getLogger(__name__) -DEFAULT_INFERENCE_ENDPOINT = "/v1/completions" - - -DEFAULT_TIME_OUT = 300 -DEFAULT_CONTENT_TYPE_JSON = "application/json" -DEFAULT_MODEL_NAME = "odsc-llm" - - -class TokenExpiredError(Exception): - """Raises when token expired.""" - +from langchain_core.utils.utils import secret_from_env +from langchain_openai import OpenAI +from pydantic import Field, SecretStr, model_validator -class ServerError(Exception): - """Raises when encounter server error when making inference.""" - -def _create_retry_decorator( - llm: "BaseOCIModelDeployment", - *, - run_manager: Optional[ - Union[AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun] - ] = None, -) -> Callable[[Any], Any]: - """Create a retry decorator.""" - errors = [requests.exceptions.ConnectTimeout, TokenExpiredError] - decorator = create_base_retry_decorator( - error_types=errors, max_retries=llm.max_retries, run_manager=run_manager - ) - return decorator - - -class BaseOCIModelDeployment(Serializable): +class BaseOCIModelDeployment(BaseLanguageModel): """Base class for LLM deployed on OCI Data Science Model Deployment.""" auth: dict = Field(default_factory=dict, exclude=True) @@ -74,329 +22,63 @@ class BaseOCIModelDeployment(Serializable): or `ads.common.auth.resource_principal()`. If this is not provided then the `ads.common.default_signer()` will be used.""" - endpoint: str = "" - """The uri of the endpoint from the deployed Model Deployment model.""" - - streaming: bool = False - """Whether to stream the results or not.""" + openai_api_key: Optional[SecretStr] = Field( + alias="api_key", default_factory=secret_from_env("OPENAI_API_KEY", default=" ") + ) + """openai_api_key needs to be a non-empty in order to initialize openai client.""" - max_retries: int = 3 - """Maximum number of retries to make when generating.""" + model_name: str = Field(default="odsc-llm", alias="model") + """Model name to use.""" - default_headers: Optional[Dict[str, Any]] = None - """The headers to be added to the Model Deployment request.""" + endpoint: Optional[str] = None + """For backward compatibility, user may specify the endpoint instead of base_url.""" @model_validator(mode="before") @classmethod - def validate_environment(cls, values: Dict) -> Dict: + def oci_auth_and_endpoint(cls, values: Dict) -> Dict: """Checks if oracle-ads is installed and get credentials/endpoint from environment. """ - try: - import ads + # Authentication + # oracle-ads is only required if the http_client is not set + if not values.get("http_client"): + try: + import ads - except ImportError as ex: - raise ImportError( - "Could not import ads python package. " - "Please install it with `pip install oracle_ads`." - ) from ex + except ImportError as ex: + raise ImportError( + "Could not import ads python package. " + "Please install it with `pip install oracle_ads`." + ) from ex - if not values.get("auth", None): - values["auth"] = ads.common.auth.default_signer() + if not values.get("auth"): + values["auth"] = ads.common.auth.default_signer() + from ads.aqua import HttpxOCIAuth, get_httpx_client + + values["http_client"] = get_httpx_client( + auth=HttpxOCIAuth(values["auth"].get("signer")) + ) + + # Endpoint values["endpoint"] = get_from_dict_or_env( values, "endpoint", "OCI_LLM_ENDPOINT", + default="", ) - return values - - def _headers( - self, is_async: Optional[bool] = False, body: Optional[dict] = None - ) -> Dict: - """Construct and return the headers for a request. - - Args: - is_async (bool, optional): Indicates if the request is asynchronous. - Defaults to `False`. - body (optional): The request body to be included in the headers if - the request is asynchronous. - - Returns: - Dict: A dictionary containing the appropriate headers for the request. - """ - headers = self.default_headers or {} - if is_async: - signer = self.auth["signer"] - _req = requests.Request("POST", self.endpoint, json=body) - req = _req.prepare() - req = signer(req) - for key, value in req.headers.items(): - headers[key] = value - - if self.streaming: - headers.update( - { - "enable-streaming": "true", - "Accept": "text/event-stream", - } - ) - return headers - - headers.update( - { - "Content-Type": DEFAULT_CONTENT_TYPE_JSON, - "enable-streaming": "true", - "Accept": "text/event-stream", - } - if self.streaming - else { - "Content-Type": DEFAULT_CONTENT_TYPE_JSON, - } - ) - - return headers - - def completion_with_retry( - self, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any - ) -> Any: - """Use tenacity to retry the completion call.""" - retry_decorator = _create_retry_decorator(self, run_manager=run_manager) - @retry_decorator - def _completion_with_retry(**kwargs: Any) -> Any: - try: - request_timeout = kwargs.pop("request_timeout", DEFAULT_TIME_OUT) - data = kwargs.pop("data") - stream = kwargs.pop("stream", self.streaming) - headers = self._headers() - auth = self.auth.get("signer") - url = self.endpoint - - response = requests.post( - url=url, - json=data, - timeout=request_timeout, - stream=stream, - headers=headers, - auth=auth, - verify=True, - **kwargs, - ) - self._check_response(response) - return response - except TokenExpiredError as e: - raise e - except Exception as err: - traceback.print_exc() - logger.debug( - f"Requests payload: {data}. Requests arguments: " - f"url={self.endpoint},timeout={request_timeout},stream={stream}. " - f"Additional request kwargs={kwargs}." - ) - raise RuntimeError( - f"Error occurs by inference endpoint: {str(err)}" - ) from err - - return _completion_with_retry(**kwargs) - - async def acompletion_with_retry( - self, - run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> Any: - """Use tenacity to retry the async completion call.""" - retry_decorator = _create_retry_decorator(self, run_manager=run_manager) - - @retry_decorator - async def _completion_with_retry(**kwargs: Any) -> Any: - try: - request_timeout = kwargs.pop("request_timeout", DEFAULT_TIME_OUT) - data = kwargs.pop("data") - stream = kwargs.pop("stream", self.streaming) - headers = self._headers(is_async=True, body=data) - auth = self.auth.get("signer") - url = self.endpoint - method = "POST" - - if stream: - response = self._arequest( - method=method, - url=url, - json=data, - timeout=request_timeout, - headers=headers, - auth=auth, - ) - return self._aiter_sse(response) - else: - async with self._arequest( - method=method, - url=url, - json=data, - timeout=request_timeout, - headers=headers, - auth=auth, - ) as resp: - self._check_response(resp) - data = await resp.json() - return data - except TokenExpiredError as e: - raise e - except Exception as err: - traceback.print_exc() - logger.debug( - f"Requests payload: `{data}`. " - f"Stream mode={stream}. " - f"Requests kwargs: url={self.endpoint}, timeout={request_timeout}." - ) - raise RuntimeError( - f"Error occurs by inference endpoint: {str(err)}" - ) from err - - return await _completion_with_retry(**kwargs) - - @asynccontextmanager - async def _arequest( - self, method: str, url: str, headers: dict, auth: Any, **kwargs: Any - ) -> AsyncGenerator[aiohttp.ClientResponse, None]: - """Make an async request.""" - async with aiohttp.ClientSession() as session: - async with session.request( - method, - url, - headers=headers, - auth=auth, - **kwargs, - ) as response: - yield response - - def _check_response(self, response: Any) -> None: - """Handle server error by checking the response status. - - Args: - response: - The response object from either `requests` or `aiohttp` library. - - Raises: - TokenExpiredError: - If the response status code is 401 and the token refresh is successful. - ServerError: - If any other HTTP error occurs. - """ - try: - response.raise_for_status() - except requests.exceptions.HTTPError as http_err: - status_code = ( - response.status_code - if hasattr(response, "status_code") - else response.status - ) - if status_code == 401 and self._refresh_signer(): - raise TokenExpiredError() from http_err + if values.get("endpoint"): + values["openai_api_base"] = values["endpoint"] - raise ServerError( - f"Server error: {str(http_err)}. \nMessage: {response.text}" - ) from http_err - - def _parse_stream(self, lines: Iterator[bytes]) -> Iterator[str]: - """Parse a stream of byte lines and yield parsed string lines. - - Args: - lines (Iterator[bytes]): - An iterator that yields lines in byte format. - - Yields: - Iterator[str]: - An iterator that yields parsed lines as strings. - """ - for line in lines: - _line = self._parse_stream_line(line) - if _line is not None: - yield _line - - async def _parse_stream_async( - self, - lines: aiohttp.StreamReader, - ) -> AsyncIterator[str]: - """ - Asynchronously parse a stream of byte lines and yield parsed string lines. - - Args: - lines (aiohttp.StreamReader): - An `aiohttp.StreamReader` object that yields lines in byte format. - - Yields: - AsyncIterator[str]: - An asynchronous iterator that yields parsed lines as strings. - """ - async for line in lines: - _line = self._parse_stream_line(line) - if _line is not None: - yield _line - - def _parse_stream_line(self, line: bytes) -> Optional[str]: - """Parse a single byte line and return a processed string line if valid. - - Args: - line (bytes): A single line in byte format. - - Returns: - Optional[str]: - The processed line as a string if valid, otherwise `None`. - """ - line = line.strip() - if not line: - return None - _line = line.decode("utf-8") - - if _line.lower().startswith("data:"): - _line = _line[5:].lstrip() - - if _line.startswith("[DONE]"): - return None - return _line - return None - - async def _aiter_sse( - self, - async_cntx_mgr: Any, - ) -> AsyncIterator[str]: - """Asynchronously iterate over server-sent events (SSE). - - Args: - async_cntx_mgr: An asynchronous context manager that yields a client - response object. - - Yields: - AsyncIterator[str]: An asynchronous iterator that yields parsed server-sent - event lines as json string. - """ - async with async_cntx_mgr as client_resp: - self._check_response(client_resp) - async for line in self._parse_stream_async(client_resp.content): - yield line - - def _refresh_signer(self) -> bool: - """Attempt to refresh the security token using the signer. - - Returns: - bool: `True` if the token was successfully refreshed, `False` otherwise. - """ - if self.auth.get("signer", None) and hasattr( - self.auth["signer"], "refresh_security_token" + if values.get("openai_api_base") and not values["openai_api_base"].endswith( + "v1" ): - self.auth["signer"].refresh_security_token() - return True - return False - - @classmethod - def is_lc_serializable(cls) -> bool: - """Return whether this model can be serialized by LangChain.""" - return True + values["openai_api_base"] = os.path.join(values["openai_api_base"], "v1") + return values -class OCIModelDeploymentLLM(BaseLLM, BaseOCIModelDeployment): +class OCIModelDeploymentLLM(BaseOCIModelDeployment, OpenAI): """LLM deployed on OCI Data Science Model Deployment. To use, you must provide the model HTTP endpoint from your deployed @@ -409,599 +91,19 @@ class OCIModelDeploymentLLM(BaseLLM, BaseOCIModelDeployment): Science Model Deployment endpoint. See: https://docs.oracle.com/en-us/iaas/data-science/using/model-dep-policies-auth.htm#model_dep_policies_auth__predict-endpoint + This class inherits from OpenAI LangChain client. + You can use all the parameters supported by OpenAI LangChain client. + Example: .. code-block:: python - from langchain_oci.llms import OCIModelDeploymentLLM + from langchain_oci import OCIModelDeploymentLLM llm = OCIModelDeploymentLLM( - endpoint="https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predict", + endpoint="https://modeldeployment.us-ashburn-1.oci.customer-oci.com//predictWithResponseStream", model="odsc-llm", - streaming=True, - model_kwargs={"frequency_penalty": 1.0}, - headers={ - "route": "/v1/completions", - # other request headers ... - } ) - llm.invoke("tell me a joke.") - - Customized Usage: - - User can inherit from our base class and overrwrite the `_process_response`, `_process_stream_response`, - `_construct_json_body` for satisfying customized needed. - - .. code-block:: python - - from langchain_oci.llms import OCIModelDeploymentLLM - - class MyCutomizedModel(OCIModelDeploymentLLM): - def _process_stream_response(self, response_json:dict) -> GenerationChunk: - print("My customized output stream handler.") - return GenerationChunk() - - def _process_response(self, response_json:dict) -> List[Generation]: - print("My customized output handler.") - return [Generation()] - - def _construct_json_body(self, prompt: str, param:dict) -> dict: - print("My customized input handler.") - return {} - - llm = MyCutomizedModel( - endpoint=f"https://modeldeployment.us-ashburn-1.oci.customer-oci.com/{ocid}/predict", - model="", - } - - llm.invoke("tell me a joke.") + llm.stream("tell me a joke.") """ # noqa: E501 - - model: str = DEFAULT_MODEL_NAME - """The name of the model.""" - - stop: Optional[List[str]] = None - """Stop words to use when generating. Model output is cut off - at the first occurrence of any of these substrings.""" - - model_kwargs: Dict[str, Any] = Field(default_factory=dict) - """Keyword arguments to pass to the model.""" - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "oci_model_deployment_endpoint" - - @property - def _default_params(self) -> Dict[str, Any]: - """Get the default parameters.""" - return { - "model": self.model, - "stop": self.stop, - "stream": self.streaming, - } - - @property - def _identifying_params(self) -> Dict[str, Any]: - """Get the identifying parameters.""" - _model_kwargs = self.model_kwargs or {} - return { - **{"endpoint": self.endpoint, "model_kwargs": _model_kwargs}, - **self._default_params, - } - - def _headers( - self, is_async: Optional[bool] = False, body: Optional[dict] = None - ) -> Dict: - """Construct and return the headers for a request. - - Args: - is_async (bool, optional): Indicates if the request is asynchronous. - Defaults to `False`. - body (optional): The request body to be included in the headers if - the request is asynchronous. - - Returns: - Dict: A dictionary containing the appropriate headers for the request. - """ - return { - "route": DEFAULT_INFERENCE_ENDPOINT, - **super()._headers(is_async=is_async, body=body), - } - - def _generate( - self, - prompts: List[str], - stop: Optional[List[str]] = None, - run_manager: Optional[CallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> LLMResult: - """Call out to OCI Data Science Model Deployment endpoint with k unique prompts. - - Args: - prompts: The prompts to pass into the service. - stop: Optional list of stop words to use when generating. - - Returns: - The full LLM output. - - Example: - .. code-block:: python - - response = llm.invoke("Tell me a joke.") - response = llm.generate(["Tell me a joke."]) - """ - generations: List[List[Generation]] = [] - params = self._invocation_params(stop, **kwargs) - for prompt in prompts: - body = self._construct_json_body(prompt, params) - if self.streaming: - generation = GenerationChunk(text="") - for chunk in self._stream( - prompt, stop=stop, run_manager=run_manager, **kwargs - ): - generation += chunk - generations.append([generation]) - else: - res = self.completion_with_retry( - data=body, - run_manager=run_manager, - **kwargs, - ) - generations.append(self._process_response(res.json())) - return LLMResult(generations=generations) - - async def _agenerate( - self, - prompts: List[str], - stop: Optional[List[str]] = None, - run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> LLMResult: - """Call out to OCI Data Science Model Deployment endpoint async with k unique prompts. - - Args: - prompts: The prompts to pass into the service. - stop: Optional list of stop words to use when generating. - - Returns: - The full LLM output. - - Example: - .. code-block:: python - - response = await llm.ainvoke("Tell me a joke.") - response = await llm.agenerate(["Tell me a joke."]) - """ # noqa: E501 - generations: List[List[Generation]] = [] - params = self._invocation_params(stop, **kwargs) - for prompt in prompts: - body = self._construct_json_body(prompt, params) - if self.streaming: - generation = GenerationChunk(text="") - async for chunk in self._astream( - prompt, stop=stop, run_manager=run_manager, **kwargs - ): - generation += chunk - generations.append([generation]) - else: - res = await self.acompletion_with_retry( - data=body, - run_manager=run_manager, - **kwargs, - ) - generations.append(self._process_response(res)) - return LLMResult(generations=generations) - - def _stream( - self, - prompt: str, - stop: Optional[List[str]] = None, - run_manager: Optional[CallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> Iterator[GenerationChunk]: - """Stream OCI Data Science Model Deployment endpoint on given prompt. - - - Args: - prompt (str): - The prompt to pass into the model. - stop (List[str], Optional): - List of stop words to use when generating. - kwargs: - requests_kwargs: - Additional ``**kwargs`` to pass to requests.post - - Returns: - An iterator of GenerationChunks. - - - Example: - - .. code-block:: python - - response = llm.stream("Tell me a joke.") - - """ - requests_kwargs = kwargs.pop("requests_kwargs", {}) - self.streaming = True - params = self._invocation_params(stop, **kwargs) - body = self._construct_json_body(prompt, params) - - response = self.completion_with_retry( - data=body, run_manager=run_manager, stream=True, **requests_kwargs - ) - for line in self._parse_stream(response.iter_lines()): - chunk = self._handle_sse_line(line) - if run_manager: - run_manager.on_llm_new_token(chunk.text, chunk=chunk) - - yield chunk - - async def _astream( - self, - prompt: str, - stop: Optional[List[str]] = None, - run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> AsyncIterator[GenerationChunk]: - """Stream OCI Data Science Model Deployment endpoint async on given prompt. - - - Args: - prompt (str): - The prompt to pass into the model. - stop (List[str], Optional): - List of stop words to use when generating. - kwargs: - requests_kwargs: - Additional ``**kwargs`` to pass to requests.post - - Returns: - An iterator of GenerationChunks. - - - Example: - - .. code-block:: python - - async for chunk in llm.astream(("Tell me a joke."): - print(chunk, end="", flush=True) - - """ - requests_kwargs = kwargs.pop("requests_kwargs", {}) - self.streaming = True - params = self._invocation_params(stop, **kwargs) - body = self._construct_json_body(prompt, params) - - async for line in await self.acompletion_with_retry( - data=body, run_manager=run_manager, stream=True, **requests_kwargs - ): - chunk = self._handle_sse_line(line) - if run_manager: - await run_manager.on_llm_new_token(chunk.text, chunk=chunk) - yield chunk - - def _construct_json_body(self, prompt: str, params: dict) -> dict: - """Constructs the request body as a dictionary (JSON).""" - return { - "prompt": prompt, - **params, - } - - def _invocation_params( - self, stop: Optional[List[str]] = None, **kwargs: Any - ) -> dict: - """Combines the invocation parameters with default parameters.""" - params = self._default_params - _model_kwargs = self.model_kwargs or {} - params["stop"] = stop or params.get("stop", []) - return {**params, **_model_kwargs, **kwargs} - - def _process_stream_response(self, response_json: dict) -> GenerationChunk: - """Formats streaming response for OpenAI spec into GenerationChunk.""" - try: - choice = response_json["choices"][0] - if not isinstance(choice, dict): - raise TypeError("Endpoint response is not well formed.") - except (KeyError, IndexError, TypeError) as e: - raise ValueError("Error while formatting response payload.") from e - - return GenerationChunk(text=choice.get("text", "")) - - def _process_response(self, response_json: dict) -> List[Generation]: - """Formats response in OpenAI spec. - - Args: - response_json (dict): The JSON response from the chat model endpoint. - - Returns: - ChatResult: An object containing the list of `ChatGeneration` objects - and additional LLM output information. - - Raises: - ValueError: If the response JSON is not well-formed or does not - contain the expected structure. - - """ - generations = [] - try: - choices = response_json["choices"] - if not isinstance(choices, list): - raise TypeError("Endpoint response is not well formed.") - except (KeyError, TypeError) as e: - raise ValueError("Error while formatting response payload.") from e - - for choice in choices: - gen = Generation( - text=choice.get("text"), - generation_info=self._generate_info(choice), - ) - generations.append(gen) - - return generations - - def _generate_info(self, choice: dict) -> Any: - """Extracts generation info from the response.""" - gen_info = {} - finish_reason = choice.get("finish_reason", None) - logprobs = choice.get("logprobs", None) - index = choice.get("index", None) - if finish_reason: - gen_info.update({"finish_reason": finish_reason}) - if logprobs is not None: - gen_info.update({"logprobs": logprobs}) - if index is not None: - gen_info.update({"index": index}) - - return gen_info or None - - def _handle_sse_line(self, line: str) -> GenerationChunk: - try: - obj = json.loads(line) - return self._process_stream_response(obj) - except Exception: - return GenerationChunk(text="") - - -class OCIModelDeploymentTGI(OCIModelDeploymentLLM): - """OCI Data Science Model Deployment TGI Endpoint. - - To use, you must provide the model HTTP endpoint from your deployed - model, e.g. https://modeldeployment..oci.customer-oci.com//predict. - - To authenticate, `oracle-ads` has been used to automatically load - credentials: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html - - Make sure to have the required policies to access the OCI Data - Science Model Deployment endpoint. See: - https://docs.oracle.com/en-us/iaas/data-science/using/model-dep-policies-auth.htm#model_dep_policies_auth__predict-endpoint - - Example: - .. code-block:: python - - from langchain_oci.llms import OCIModelDeploymentTGI - - llm = OCIModelDeploymentTGI( - endpoint="https://modeldeployment..oci.customer-oci.com//predict", - api="/v1/completions", - streaming=True, - temperature=0.2, - seed=42, - # other model parameters ... - ) - - """ - - max_tokens: int = 256 - """Denotes the number of tokens to predict per generation.""" - - temperature: float = 0.2 - """A non-negative float that tunes the degree of randomness in generation.""" - - k: int = -1 - """Number of most likely tokens to consider at each step.""" - - p: float = 0.75 - """Total probability mass of tokens to consider at each step.""" - - best_of: int = 1 - """Generates best_of completions server-side and returns the "best" - (the one with the highest log probability per token). - """ - - api: Literal["/generate", "/v1/completions"] = "/v1/completions" - """Api spec.""" - - frequency_penalty: float = 0.0 - """Penalizes repeated tokens according to frequency. Between 0 and 1.""" - - seed: Optional[int] = None - """Random sampling seed""" - - repetition_penalty: Optional[float] = None - """The parameter for repetition penalty. 1.0 means no penalty.""" - - suffix: Optional[str] = None - """The text to append to the prompt. """ - - do_sample: bool = True - """If set to True, this parameter enables decoding strategies such as - multi-nominal sampling, beam-search multi-nominal sampling, Top-K - sampling and Top-p sampling. - """ - - watermark: bool = True - """Watermarking with `A Watermark for Large Language Models `_. - Defaults to True.""" - - return_full_text: bool = False - """Whether to prepend the prompt to the generated text. Defaults to False.""" - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "oci_model_deployment_tgi_endpoint" - - @property - def _default_params(self) -> Dict[str, Any]: - """Get the default parameters for invoking OCI model deployment TGI endpoint.""" - return ( - { - "model": self.model, # can be any - "frequency_penalty": self.frequency_penalty, - "max_tokens": self.max_tokens, - "repetition_penalty": self.repetition_penalty, - "temperature": self.temperature, - "top_p": self.p, - "seed": self.seed, - "stream": self.streaming, - "suffix": self.suffix, - "stop": self.stop, - } - if self.api == "/v1/completions" - else { - "best_of": self.best_of, - "max_new_tokens": self.max_tokens, - "temperature": self.temperature, - "top_k": ( - self.k if self.k > 0 else None - ), # `top_k` must be strictly positive' - "top_p": self.p, - "do_sample": self.do_sample, - "return_full_text": self.return_full_text, - "watermark": self.watermark, - "stop": self.stop, - } - ) - - @property - def _identifying_params(self) -> Dict[str, Any]: - """Get the identifying parameters.""" - _model_kwargs = self.model_kwargs or {} - return { - **{ - "endpoint": self.endpoint, - "api": self.api, - "model_kwargs": _model_kwargs, - }, - **self._default_params, - } - - def _construct_json_body(self, prompt: str, params: dict) -> dict: - """Construct request payload.""" - if self.api == "/v1/completions": - return super()._construct_json_body(prompt, params) - - return { - "inputs": prompt, - "parameters": params, - } - - def _process_response(self, response_json: dict) -> List[Generation]: - """Formats response.""" - if self.api == "/v1/completions": - return super()._process_response(response_json) - - try: - text = response_json["generated_text"] - except KeyError as e: - raise ValueError( - f"Error while formatting response payload.response_json={response_json}" - ) from e - - return [Generation(text=text)] - - -class OCIModelDeploymentVLLM(OCIModelDeploymentLLM): - """VLLM deployed on OCI Data Science Model Deployment - - To use, you must provide the model HTTP endpoint from your deployed - model, e.g. https://modeldeployment..oci.customer-oci.com//predict. - - To authenticate, `oracle-ads` has been used to automatically load - credentials: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html - - Make sure to have the required policies to access the OCI Data - Science Model Deployment endpoint. See: - https://docs.oracle.com/en-us/iaas/data-science/using/model-dep-policies-auth.htm#model_dep_policies_auth__predict-endpoint - - Example: - .. code-block:: python - - from langchain_oci.llms import OCIModelDeploymentVLLM - - llm = OCIModelDeploymentVLLM( - endpoint="https://modeldeployment..oci.customer-oci.com//predict", - model="odsc-llm", - streaming=False, - temperature=0.2, - max_tokens=512, - n=3, - best_of=3, - # other model parameters - ) - - """ - - max_tokens: int = 256 - """Denotes the number of tokens to predict per generation.""" - - temperature: float = 0.2 - """A non-negative float that tunes the degree of randomness in generation.""" - - p: float = 0.75 - """Total probability mass of tokens to consider at each step.""" - - best_of: int = 1 - """Generates best_of completions server-side and returns the "best" - (the one with the highest log probability per token). - """ - - n: int = 1 - """Number of output sequences to return for the given prompt.""" - - k: int = -1 - """Number of most likely tokens to consider at each step.""" - - frequency_penalty: float = 0.0 - """Penalizes repeated tokens according to frequency. Between 0 and 1.""" - - presence_penalty: float = 0.0 - """Penalizes repeated tokens. Between 0 and 1.""" - - use_beam_search: bool = False - """Whether to use beam search instead of sampling.""" - - ignore_eos: bool = False - """Whether to ignore the EOS token and continue generating tokens after - the EOS token is generated.""" - - logprobs: Optional[int] = None - """Number of log probabilities to return per output token.""" - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "oci_model_deployment_vllm_endpoint" - - @property - def _default_params(self) -> Dict[str, Any]: - """Get the default parameters for calling vllm.""" - return { - "best_of": self.best_of, - "frequency_penalty": self.frequency_penalty, - "ignore_eos": self.ignore_eos, - "logprobs": self.logprobs, - "max_tokens": self.max_tokens, - "model": self.model, - "n": self.n, - "presence_penalty": self.presence_penalty, - "stop": self.stop, - "stream": self.streaming, - "temperature": self.temperature, - "top_k": self.k, - "top_p": self.p, - "use_beam_search": self.use_beam_search, - } diff --git a/libs/oci/pyproject.toml b/libs/oci/pyproject.toml index c5bda85..7f214c4 100644 --- a/libs/oci/pyproject.toml +++ b/libs/oci/pyproject.toml @@ -16,7 +16,6 @@ langchain-core = ">=0.3.20,<1.0.0" langchain = ">=0.3.20,<1.0.0" oci = ">=2.144.0" pydantic = ">=2,<3" -aiohttp = ">=3.12.14" [tool.poetry.group.test] optional = true diff --git a/libs/oci/tests/unit_tests/chat_models/test_oci_data_science.py b/libs/oci/tests/unit_tests/chat_models/test_oci_data_science.py index 0bb01c8..f330cee 100644 --- a/libs/oci/tests/unit_tests/chat_models/test_oci_data_science.py +++ b/libs/oci/tests/unit_tests/chat_models/test_oci_data_science.py @@ -1,196 +1,30 @@ """Test Chat model for OCI Data Science Model Deployment Endpoint.""" -import sys -from typing import Any, AsyncGenerator, Dict, Generator from unittest import mock +import httpx import pytest -from langchain_core.messages import AIMessage, AIMessageChunk -from requests.exceptions import HTTPError -from langchain_oci.chat_models import ( - ChatOCIModelDeploymentTGI, - ChatOCIModelDeploymentVLLM, -) - -CONST_MODEL_NAME = "odsc-vllm" -CONST_ENDPOINT = "https://oci.endpoint/ocid/predict" -CONST_PROMPT = "This is a prompt." -CONST_COMPLETION = "This is a completion." -CONST_COMPLETION_ROUTE = "/v1/chat/completions" -CONST_COMPLETION_RESPONSE = { - "id": "chat-123456789", - "object": "chat.completion", - "created": 123456789, - "model": "mistral", - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": CONST_COMPLETION, - "tool_calls": [], - }, - "logprobs": None, - "finish_reason": "length", - "stop_reason": None, - } - ], - "usage": {"prompt_tokens": 115, "total_tokens": 371, "completion_tokens": 256}, - "prompt_logprobs": None, -} -CONST_COMPLETION_RESPONSE_TGI = {"generated_text": CONST_COMPLETION} -CONST_STREAM_TEMPLATE = ( - 'data: {"id":"chat-123456","object":"chat.completion.chunk","created":123456789,' - '"model":"odsc-llm","choices":[{"index":0,"delta":,"finish_reason":null}]}' -) -CONST_STREAM_DELTAS = ['{"role":"assistant","content":""}'] + [ - '{"content":" ' + word + '"}' for word in CONST_COMPLETION.split(" ") -] -CONST_STREAM_RESPONSE = ( - content - for content in [ - CONST_STREAM_TEMPLATE.replace("", delta).encode() - for delta in CONST_STREAM_DELTAS - ] - + [b"data: [DONE]"] -) - -CONST_ASYNC_STREAM_TEMPLATE = ( - '{"id":"chat-123456","object":"chat.completion.chunk","created":123456789,' - '"model":"odsc-llm","choices":[{"index":0,"delta":,"finish_reason":null}]}' -) -CONST_ASYNC_STREAM_RESPONSE = ( - CONST_ASYNC_STREAM_TEMPLATE.replace("", delta).encode() - for delta in CONST_STREAM_DELTAS -) - -pytestmark = pytest.mark.skipif( - sys.version_info < (3, 9), reason="Requires Python 3.9 or higher" -) - - -class MockResponse: - """Represents a mocked response.""" - - def __init__(self, json_data: Dict, status_code: int = 200): - self.json_data = json_data - self.status_code = status_code - - def raise_for_status(self) -> None: - """Mocked raise for status.""" - if 400 <= self.status_code < 600: - raise HTTPError() # type: ignore[call-arg] - - def json(self) -> Dict: - """Returns mocked json data.""" - return self.json_data - - def iter_lines(self, chunk_size: int = 4096) -> Generator[bytes, None, None]: - """Returns a generator of mocked streaming response.""" - return CONST_STREAM_RESPONSE - - @property - def text(self) -> str: - """Returns the mocked text representation.""" - return "" - - -def mocked_requests_post(url: str, **kwargs: Any) -> MockResponse: - """Method to mock post requests""" - - payload: dict = kwargs.get("json", {}) - messages: list = payload.get("messages", []) - prompt = messages[0].get("content") - - if prompt == CONST_PROMPT: - return MockResponse(json_data=CONST_COMPLETION_RESPONSE) - - return MockResponse( - json_data={}, - status_code=404, - ) +from langchain_oci import ChatOCIModelDeployment @pytest.mark.requires("ads") @pytest.mark.requires("langchain_openai") -@mock.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) -@mock.patch("requests.post", side_effect=mocked_requests_post) -def test_invoke_vllm(*args: Any) -> None: - """Tests invoking vLLM endpoint.""" - llm = ChatOCIModelDeploymentVLLM(endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - output = llm.invoke(CONST_PROMPT) - assert isinstance(output, AIMessage) - assert output.content == CONST_COMPLETION +@mock.patch("ads.aqua.client.client.HttpxOCIAuth") +def test_authentication_with_ads(*args): + with mock.patch( + "ads.common.auth.default_signer", return_value=dict(signer=mock.MagicMock()) + ) as ads_auth: + llm = ChatOCIModelDeployment(endpoint="", model="my-model") + ads_auth.assert_called_once() + assert llm.openai_api_base == "/v1" + assert llm.model_name == "my-model" -@pytest.mark.requires("ads") -@pytest.mark.requires("langchain_openai") -@mock.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) -@mock.patch("requests.post", side_effect=mocked_requests_post) -def test_invoke_tgi(*args: Any) -> None: - """Tests invoking TGI endpoint using OpenAI Spec.""" - llm = ChatOCIModelDeploymentTGI(endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - output = llm.invoke(CONST_PROMPT) - assert isinstance(output, AIMessage) - assert output.content == CONST_COMPLETION - - -@pytest.mark.requires("ads") -@pytest.mark.requires("langchain_openai") -@mock.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) -@mock.patch("requests.post", side_effect=mocked_requests_post) -def test_stream_vllm(*args: Any) -> None: - """Tests streaming with vLLM endpoint using OpenAI spec.""" - llm = ChatOCIModelDeploymentVLLM( - endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME, streaming=True - ) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - output = None - count = 0 - for chunk in llm.stream(CONST_PROMPT): - assert isinstance(chunk, AIMessageChunk) - if output is None: - output = chunk - else: - output += chunk - count += 1 - assert count == 5 - assert output is not None - if output is not None: - assert str(output.content).strip() == CONST_COMPLETION - - -async def mocked_async_streaming_response( - *args: Any, **kwargs: Any -) -> AsyncGenerator[bytes, None]: - """Returns mocked response for async streaming.""" - for item in CONST_ASYNC_STREAM_RESPONSE: - yield item - - -@pytest.mark.asyncio -@pytest.mark.requires("ads") @pytest.mark.requires("langchain_openai") -@mock.patch( - "ads.common.auth.default_signer", return_value=dict(signer=mock.MagicMock()) -) -@mock.patch( - "langchain_oci.llms.oci_data_science_model_deployment_endpoint.BaseOCIModelDeployment._arequest", - mock.MagicMock(), -) -async def test_stream_async(*args: Any) -> None: - """Tests async streaming.""" - llm = ChatOCIModelDeploymentVLLM( - endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME, streaming=True - ) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - with mock.patch.object( - llm, - "_aiter_sse", - mock.MagicMock(return_value=mocked_async_streaming_response()), - ): - chunks = [str(chunk.content) async for chunk in llm.astream(CONST_PROMPT)] - assert "".join(chunks).strip() == CONST_COMPLETION +def test_authentication_with_custom_client(): + http_client = httpx.Client() + llm = ChatOCIModelDeployment(base_url="/v1", http_client=http_client) + assert llm.http_client == http_client + assert llm.openai_api_base == "/v1" + assert llm.model_name == "odsc-llm" diff --git a/libs/oci/tests/unit_tests/chat_models/test_oci_model_deployment_endpoint.py b/libs/oci/tests/unit_tests/chat_models/test_oci_model_deployment_endpoint.py deleted file mode 100644 index b670be6..0000000 --- a/libs/oci/tests/unit_tests/chat_models/test_oci_model_deployment_endpoint.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Test OCI Data Science Model Deployment Endpoint.""" - -import pytest -import responses -from langchain_core.messages import AIMessage, HumanMessage -from pytest_mock import MockerFixture - -from langchain_oci.chat_models import ChatOCIModelDeployment - - -@pytest.mark.requires("ads") -def test_initialization(mocker: MockerFixture) -> None: - """Test chat model initialization.""" - mocker.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) - chat = ChatOCIModelDeployment( - model="odsc", - endpoint="test_endpoint", - model_kwargs={"temperature": 0.2}, - ) - assert chat.model == "odsc" - assert chat.endpoint == "test_endpoint" - assert chat.model_kwargs == {"temperature": 0.2} - assert chat._identifying_params == { - "endpoint": chat.endpoint, - "model_kwargs": {"temperature": 0.2}, - "model": chat.model, - "stop": chat.stop, - "stream": chat.streaming, - } - - -@pytest.mark.requires("ads") -@responses.activate -def test_call(mocker: MockerFixture) -> None: - """Test valid call to oci model deployment endpoint.""" - endpoint = "https://MD_OCID/predict" - responses.add( - responses.POST, - endpoint, - json={ - "id": "cmpl-88159e77c92f46088faad75fce2e26a1", - "object": "chat.completion", - "created": 274246, - "model": "odsc-llm", - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": "Hello World", - }, - "finish_reason": "stop", - } - ], - "usage": { - "prompt_tokens": 10, - "total_tokens": 20, - "completion_tokens": 10, - }, - }, - status=200, - ) - mocker.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) - - chat = ChatOCIModelDeployment(endpoint=endpoint) - output = chat.invoke("this is a test.") - assert isinstance(output, AIMessage) - assert output.response_metadata == { - "token_usage": { - "prompt_tokens": 10, - "total_tokens": 20, - "completion_tokens": 10, - }, - "model_name": "odsc-llm", - "system_fingerprint": "", - "finish_reason": "stop", - } - - -@pytest.mark.requires("ads") -@responses.activate -def test_construct_json_body(mocker: MockerFixture) -> None: - """Tests constructing json body that will be sent to endpoint.""" - mocker.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) - messages = [ - HumanMessage(content="User message"), - ] - chat = ChatOCIModelDeployment( - endpoint="test_endpoint", model_kwargs={"temperature": 0.2} - ) - payload = chat._construct_json_body(messages, chat._invocation_params(stop=None)) - assert payload == { - "messages": [{"content": "User message", "role": "user"}], - "model": chat.model, - "stop": None, - "stream": chat.streaming, - "temperature": 0.2, - } diff --git a/libs/oci/tests/unit_tests/llms/test_oci_model_deployment_endpoint.py b/libs/oci/tests/unit_tests/llms/test_oci_model_deployment_endpoint.py deleted file mode 100644 index 86e2ca7..0000000 --- a/libs/oci/tests/unit_tests/llms/test_oci_model_deployment_endpoint.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Test LLM for OCI Data Science Model Deployment Endpoint.""" - -import sys -from typing import Any, AsyncGenerator, Dict, Generator -from unittest import mock - -import pytest -from requests.exceptions import HTTPError - -from langchain_oci.llms import ( - OCIModelDeploymentTGI, - OCIModelDeploymentVLLM, -) - -CONST_MODEL_NAME = "odsc-vllm" -CONST_ENDPOINT = "https://oci.endpoint/ocid/predict" -CONST_PROMPT = "This is a prompt." -CONST_COMPLETION = "This is a completion." -CONST_COMPLETION_ROUTE = "/v1/completions" -CONST_COMPLETION_RESPONSE = { - "choices": [ - { - "index": 0, - "text": CONST_COMPLETION, - "logprobs": 0.1, - "finish_reason": "length", - } - ], -} -CONST_COMPLETION_RESPONSE_TGI = {"generated_text": CONST_COMPLETION} -CONST_STREAM_TEMPLATE = ( - 'data: {"id":"","object":"text_completion","created":123456,' - + '"choices":[{"index":0,"text":"","finish_reason":""}]}' -) -CONST_STREAM_RESPONSE = ( - CONST_STREAM_TEMPLATE.replace("", " " + word).encode() - for word in CONST_COMPLETION.split(" ") -) - -CONST_ASYNC_STREAM_TEMPLATE = ( - '{"id":"","object":"text_completion","created":123456,' - + '"choices":[{"index":0,"text":"","finish_reason":""}]}' -) -CONST_ASYNC_STREAM_RESPONSE = ( - CONST_ASYNC_STREAM_TEMPLATE.replace("", " " + word).encode() - for word in CONST_COMPLETION.split(" ") -) - -pytestmark = pytest.mark.skipif( - sys.version_info < (3, 9), reason="Requires Python 3.9 or higher" -) - - -class MockResponse: - """Represents a mocked response.""" - - def __init__(self, json_data: Dict, status_code: int = 200) -> None: - self.json_data = json_data - self.status_code = status_code - - def raise_for_status(self) -> None: - """Mocked raise for status.""" - if 400 <= self.status_code < 600: - raise HTTPError() - - def json(self) -> Dict: - """Returns mocked json data.""" - return self.json_data - - def iter_lines(self, chunk_size: int = 4096) -> Generator[bytes, None, None]: - """Returns a generator of mocked streaming response.""" - return CONST_STREAM_RESPONSE - - @property - def text(self) -> str: - """Returns the mocked text representation.""" - return "" - - -def mocked_requests_post(url: str, **kwargs: Any) -> MockResponse: - """Method to mock post requests""" - - payload: dict = kwargs.get("json", {}) - if "inputs" in payload: - prompt = payload.get("inputs") - is_tgi = True - else: - prompt = payload.get("prompt") - is_tgi = False - - if prompt == CONST_PROMPT: - if is_tgi: - return MockResponse(json_data=CONST_COMPLETION_RESPONSE_TGI) - return MockResponse(json_data=CONST_COMPLETION_RESPONSE) - - return MockResponse( - json_data={}, - status_code=404, - ) - - -async def mocked_async_streaming_response( - *args: Any, **kwargs: Any -) -> AsyncGenerator[bytes, None]: - """Returns mocked response for async streaming.""" - for item in CONST_ASYNC_STREAM_RESPONSE: - yield item - - -@pytest.mark.requires("ads") -@mock.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) -@mock.patch("requests.post", side_effect=mocked_requests_post) -def test_invoke_vllm(*args: Any) -> None: - """Tests invoking vLLM endpoint.""" - llm = OCIModelDeploymentVLLM(endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - output = llm.invoke(CONST_PROMPT) - assert output == CONST_COMPLETION - - -@pytest.mark.requires("ads") -@mock.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) -@mock.patch("requests.post", side_effect=mocked_requests_post) -def test_stream_tgi(*args: Any) -> None: - """Tests streaming with TGI endpoint using OpenAI spec.""" - llm = OCIModelDeploymentTGI( - endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME, streaming=True - ) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - output = "" - count = 0 - for chunk in llm.stream(CONST_PROMPT): - output += chunk - count += 1 - assert count == 4 - assert output.strip() == CONST_COMPLETION - - -@pytest.mark.requires("ads") -@mock.patch("ads.common.auth.default_signer", return_value=dict(signer=None)) -@mock.patch("requests.post", side_effect=mocked_requests_post) -def test_generate_tgi(*args: Any) -> None: - """Tests invoking TGI endpoint using TGI generate spec.""" - llm = OCIModelDeploymentTGI( - endpoint=CONST_ENDPOINT, api="/generate", model=CONST_MODEL_NAME - ) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - output = llm.invoke(CONST_PROMPT) - assert output == CONST_COMPLETION - - -@pytest.mark.asyncio -@pytest.mark.requires("ads") -@mock.patch( - "ads.common.auth.default_signer", return_value=dict(signer=mock.MagicMock()) -) -async def test_stream_async(*args: Any) -> None: - """Tests async streaming.""" - llm = OCIModelDeploymentTGI( - endpoint=CONST_ENDPOINT, model=CONST_MODEL_NAME, streaming=True - ) - assert llm._headers().get("route") == CONST_COMPLETION_ROUTE - with mock.patch.object( - llm, - "_aiter_sse", - mock.MagicMock(return_value=mocked_async_streaming_response()), - ): - chunks = [chunk async for chunk in llm.astream(CONST_PROMPT)] - assert "".join(chunks).strip() == CONST_COMPLETION