-
Notifications
You must be signed in to change notification settings - Fork 367
feat: ALTK JSON Processing native plugin #1326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| # ALTKJsonProcessor for Context Forge MCP Gateway | ||
|
|
||
| > Author: Jason Tsay | ||
| > Version: 0.1.0 | ||
|
|
||
| Uses JSON Processor from ALTK to extract data from long JSON responses. See the [ALTK](https://altk.ai/) and the [JSON Processor component in the ALTK repo](https://github.com/AgentToolkit/agent-lifecycle-toolkit/tree/main/altk/post_tool/code_generation) for more details on how the component works. | ||
|
|
||
| Note that this plugin will require calling an LLM and will therefore require configuring an LLM provider as described below. This plugin will also incure some cost in terms of time and money to do its LLM calls. This can be adjusted via the length threshold in the configuration, such that the plugin only activates and calls an LLM on JSON responses of a particular length (default: 100,000 characters). | ||
|
|
||
| ## Hooks | ||
| - `tool_post_invoke` - Detects long JSON responses and processes as necessary | ||
|
|
||
| ## Installation | ||
|
|
||
| 1. Enable the "ALTKJsonProcessor" plugin in `plugins/config.yaml`. | ||
| 2. Install the optional dependency `altk` (i.e. `pip install mcp-context-forge[altk]`) | ||
| 3. Configure a LLM provider as described below. | ||
|
|
||
| ## Configuration | ||
|
|
||
| ```yaml | ||
| - name: "ALTKJsonProcessor" | ||
| kind: "plugins.altk_json_processor.json_processor.ALTKJsonProcessor" | ||
| description: "Uses JSON Processor from ALTK to extract data from long JSON responses" | ||
| hooks: ["tool_post_invoke"] | ||
| tags: ["plugin"] | ||
| mode: "enforce" | ||
| priority: 150 | ||
| conditions: [] | ||
| config: | ||
| jsonprocessor_query: "" | ||
| llm_provider: "watsonx" # one of watsonx, ollama, openai, anthropic | ||
| watsonx: # each section of providers is optional | ||
| wx_api_key: "" # optional, can define WX_API_KEY instead | ||
| wx_project_id: "" # optional, can define WX_PROJECT_ID instead | ||
| wx_url: "https://us-south.ml.cloud.ibm.com" | ||
| ollama: | ||
| ollama_url: "http://localhost:11434" | ||
| openai: | ||
| api_key: "" # optional, can define OPENAI_API_KEY instead | ||
| anthropic: | ||
| api_key: "" # optional, can define ANTHROPIC_API_KEY instead | ||
| length_threshold: 100000 | ||
| model_id: "ibm/granite-3-3-8b-instruct" # note that this changes depending on provider | ||
| ``` | ||
|
|
||
| - `length_threshold` is the minimum number of characters before activating this component | ||
| - `jsonprocessor_query` is a natural language statement of what the long response should be processed for. For an example of a long response for a musical artist: "get full metadata for all albums from the artist's discography in json format" | ||
|
|
||
| ### LLM Provider Configuration | ||
|
|
||
| In the configuration, select an LLM Provider via `llm_provider`, the current options are WatsonX, Ollama, OpenAI, or Anthropic. | ||
| Then fill out the corresponding provider section in the plugin config. For many of the api key-related fields, an environment variable | ||
| can also be used instead. If the field is set in both the plugin config and in an environment variable, the plugin config takes priority. | ||
|
|
||
| ### JSON Processor Query | ||
|
|
||
| To guide the JSON Processor, an optional but recommended `jsonprocessor_query` can be provided that is a natural language statement of what the long response should be processed for. | ||
|
|
||
| Example queries: | ||
|
|
||
| - For an API endpoint such as [this Spotify artist overview](https://rapidapi.com/DataFanatic/api/spotify-scraper/playground/apiendpoint_fd33b4eb-d258-437e-af85-c244904acefc) that returns a large response, if you only want the discography of the artist, use a query such as: "get full metadata for all albums from the artist's discography in json format" | ||
| - For a shopping API endpoint that returns a [response like this](https://raw.githubusercontent.com/AgentToolkit/agent-lifecycle-toolkit/refs/heads/main/examples/codegen_long_response_example.json), if you only want the sizes of hte sneakers, use a query such as: "get the sizes for all products" | ||
|
|
||
| ## Testing | ||
|
|
||
| Unit tests: `tests/unit/mcpgateway/plugins/plugins/altk_json_processor/test_json_processor.py` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # -*- coding: utf-8 -*- | ||
| """MCP Gateway ALTKJsonProcessor Plugin - Uses JSON Processor from ALTK to extract data from long JSON responses. | ||
|
|
||
| Copyright 2025 | ||
| SPDX-License-Identifier: Apache-2.0 | ||
| Authors: Jason Tsay | ||
|
|
||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| # -*- coding: utf-8 -*- | ||
| """Uses JSON Processor from ALTK to extract data from long JSON responses. | ||
|
|
||
| Copyright 2025 | ||
| SPDX-License-Identifier: Apache-2.0 | ||
| Authors: Jason Tsay | ||
|
|
||
| This module loads configurations for plugins. | ||
| """ | ||
|
|
||
| # Standard | ||
| import json | ||
| import os | ||
| from typing import cast | ||
|
|
||
| # Third-Party | ||
| from altk.core.llm import get_llm | ||
|
|
||
| # Third-party | ||
| from altk.core.toolkit import AgentPhase | ||
| from altk.post_tool.code_generation.code_generation import CodeGenerationComponent, CodeGenerationComponentConfig | ||
| from altk.post_tool.core.toolkit import CodeGenerationRunInput, CodeGenerationRunOutput | ||
|
|
||
| # First-Party | ||
| from mcpgateway.plugins.framework import ( | ||
| Plugin, | ||
| PluginConfig, | ||
| PluginContext, | ||
| ToolPostInvokePayload, | ||
| ToolPostInvokeResult, | ||
| ) | ||
| from mcpgateway.services.logging_service import LoggingService | ||
|
|
||
| # Initialize logging service first | ||
| logging_service = LoggingService() | ||
| logger = logging_service.get_logger(__name__) | ||
|
|
||
|
|
||
| class ALTKJsonProcessor(Plugin): | ||
| """Uses JSON Processor from ALTK to extract data from long JSON responses.""" | ||
|
|
||
| def __init__(self, config: PluginConfig): | ||
| """Entry init block for plugin. | ||
|
|
||
| Args: | ||
| config: the plugin configuration | ||
| """ | ||
| super().__init__(config) | ||
| if config.config: | ||
| self._cfg = config.config | ||
| else: | ||
| self._cfg = {} | ||
|
|
||
| async def tool_post_invoke(self, payload: ToolPostInvokePayload, context: PluginContext) -> ToolPostInvokeResult: | ||
| """Plugin hook run after a tool is invoked. | ||
|
|
||
| Args: | ||
| payload: The tool result payload to be analyzed. | ||
| context: Contextual information about the hook call. | ||
|
|
||
| Raises: | ||
| ValueError: if a provider api key is not provided in either config or env var | ||
|
|
||
| Returns: | ||
| The result of the plugin's analysis, including whether the tool result should proceed. | ||
| """ | ||
| provider = self._cfg["llm_provider"] | ||
| llm_client = None | ||
| if provider == "watsonx": | ||
| watsonx_client = get_llm("watsonx") | ||
| if len(self._cfg["watsonx"]["wx_api_key"]) > 0: | ||
| api_key = self._cfg["watsonx"]["wx_api_key"] | ||
| else: | ||
| api_key = os.getenv("WX_API_KEY") | ||
| if not api_key: | ||
| raise ValueError("WatsonX api key not found, provide WX_API_KEY either in the plugin config or as an env var.") | ||
| if len(self._cfg["watsonx"]["wx_project_id"]) > 0: | ||
| project_id = self._cfg["watsonx"]["wx_project_id"] | ||
| else: | ||
| project_id = os.getenv("WX_PROJECT_ID") | ||
| if not project_id: | ||
| raise ValueError("WatsonX project id not found, project WX_PROJECT_ID either in the plugin config or as an env var.") | ||
| llm_client = watsonx_client(model_id=self._cfg["model_id"], api_key=api_key, project_id=project_id, url=self._cfg["watsonx"]["wx_url"]) | ||
| elif provider == "openai": | ||
| openai_client = get_llm("openai.sync") | ||
| if len(self._cfg["openai"]["api_key"]) > 0: | ||
| api_key = self._cfg["openai"]["api_key"] | ||
| else: | ||
| api_key = os.getenv("OPENAI_API_KEY") | ||
| if not api_key: | ||
| raise ValueError("OpenAI api key not found, provide OPENAI_API_KEY either in the plugin config or as an env var.") | ||
| llm_client = openai_client(api_key=api_key, model=self._cfg["model_id"]) | ||
| elif provider == "ollama": | ||
| ollama_client = get_llm("litellm.ollama") | ||
| llm_client = ollama_client(api_url=self._cfg["ollama"]["ollama_url"], model_name=self._cfg["model_id"]) | ||
| elif provider == "anthropic": | ||
| anthropic_client = get_llm("litellm") | ||
| model_path = f"anthropic/{self._cfg['model_id']}" | ||
| if len(self._cfg["anthropic"]["api_key"]) > 0: | ||
| api_key = self._cfg["anthropic"]["api_key"] | ||
| else: | ||
| api_key = os.getenv("ANTHROPIC_API_KEY") | ||
| if not api_key: | ||
| raise ValueError("Anthropic api key not found, provide ANTHROPIC_API_KEY either in the plugin config or as an env var.") | ||
| llm_client = anthropic_client(model_name=model_path, api_key=api_key) | ||
| elif provider == "pytestmock": | ||
| # only meant to be used for unit tests | ||
| llm_client = None | ||
| else: | ||
| raise ValueError("Unknown provider given for 'llm_provider' in plugin config!") | ||
|
|
||
| config = CodeGenerationComponentConfig(llm_client=llm_client, use_docker_sandbox=False) | ||
|
|
||
| response_json = None | ||
| response_str = None | ||
| if "content" in payload.result: | ||
| if len(payload.result["content"]) > 0: | ||
| content = payload.result["content"][0] | ||
| if "type" in content and content["type"] == "text": | ||
| response_str = content["text"] | ||
|
|
||
| if len(response_str) > self._cfg["length_threshold"]: | ||
| try: | ||
| response_json = json.loads(response_str) | ||
| except json.decoder.JSONDecodeError: | ||
| # ignore anything that's not json | ||
| pass | ||
|
|
||
| # Should only get here if response is long enough and is valid JSON | ||
| if response_json: | ||
| logger.info("Long JSON response detected, using ALTK JSON Processor...") | ||
| if provider == "pytestmock": | ||
| # only meant for unit testing | ||
| payload.result["content"][0]["text"] = "(filtered response)" | ||
| else: | ||
| codegen = CodeGenerationComponent(config=config) | ||
| nl_query = self._cfg.get("jsonprocessor_query", "") | ||
| input_data = CodeGenerationRunInput(messages=[], nl_query=nl_query, tool_response=response_json) | ||
| output = codegen.process(input_data, AgentPhase.RUNTIME) | ||
| output = cast(CodeGenerationRunOutput, output) | ||
| payload.result["content"][0]["text"] = output.result | ||
| logger.debug(f"ALTK processed response: {output.result}") | ||
| return ToolPostInvokeResult(continue_processing=True, modified_payload=payload) | ||
|
|
||
| return ToolPostInvokeResult(continue_processing=True) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| description: "Uses JSON Processor from ALTK to extract data from long JSON responses" | ||
| author: "Jason Tsay" | ||
| version: "0.1.0" | ||
| available_hooks: | ||
| - "tool_post_invoke" | ||
| default_configs: | ||
| length_threshold: 100000 |
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may not want to add this into the config.yaml by default as it will require the optional requirement of altk to be always need to be added. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if the plugin is disabled? What would be the right thing to do then? Separate |
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are the implications of removing pyre-check when altk isn't installed? Do all the tests pass? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's supposed to be used in linting but as far as I can tell the current linter list doesn't use it: Line 841 in fa92384
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| # -*- coding: utf-8 -*- | ||
| """Location: ./tests/unit/mcpgateway/plugins/plugins/altk_json_processor/test_json_processor.py | ||
| Copyright 2025 | ||
| SPDX-License-Identifier: Apache-2.0 | ||
| Authors: Jason Tsay | ||
|
|
||
| Tests for ALTKJsonProcessor. | ||
| """ | ||
|
|
||
| # Standard | ||
| import json | ||
|
|
||
| # Third-Party | ||
| import pytest | ||
|
|
||
| # First-Party | ||
| from mcpgateway.plugins.framework.models import ( | ||
| GlobalContext, | ||
| HookType, | ||
| PluginConfig, | ||
| PluginContext, | ||
| ToolPostInvokePayload, | ||
| ) | ||
|
|
||
| # ALTK is an optional dependency and may not be present, skip if not | ||
| have_altk = True | ||
| try: | ||
| # Third-Party | ||
| import altk # noqa: F401 # type: ignore | ||
|
|
||
| # First-Party | ||
| from plugins.altk_json_processor.json_processor import ALTKJsonProcessor | ||
| except ModuleNotFoundError: | ||
| have_altk = False | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.skipif(not have_altk, reason="altk not available") | ||
| async def test_threshold(): | ||
| plugin = ALTKJsonProcessor( # type: ignore | ||
| PluginConfig( | ||
| name="jsonprocessor", kind="plugins.altk_json_processor.json_processor.ALTKJsonProcessor", hooks=[HookType.TOOL_POST_INVOKE], config={"llm_provider": "pytestmock", "length_threshold": 50} | ||
| ) | ||
| ) | ||
| ctx = PluginContext(global_context=GlobalContext(request_id="r1")) | ||
| # below threshold, so the plugin should not activate | ||
| too_short = {"a": "1", "b": "2"} | ||
| too_short_payload = {"content": [{"type": "text", "text": json.dumps(too_short)}]} | ||
| res = await plugin.tool_post_invoke(ToolPostInvokePayload(name="x1", result=too_short_payload), ctx) | ||
| assert res.modified_payload is None | ||
| long_enough = { | ||
| "a": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", | ||
| "b": "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.", | ||
| "c": "Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.", | ||
| "d": "Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.", | ||
| } | ||
| # above threshold, so the plugin should activate | ||
| long_enough_payload = {"content": [{"type": "text", "text": json.dumps(long_enough)}]} | ||
| res = await plugin.tool_post_invoke(ToolPostInvokePayload(name="x2", result=long_enough_payload), ctx) | ||
| assert res.modified_payload is not None | ||
| assert res.modified_payload.result["content"][0]["text"] == "(filtered response)" |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is each section of the providers required? or optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional, I can mark it as such.