Skip to content

Commit a5a24f7

Browse files
authored
feat: ALTK JSON Processing native plugin (#1326)
* Add ALTK JSON Processing native plugin Signed-off-by: Jason Tsay <[email protected]> * add changes based on feedback Signed-off-by: Jason Tsay <[email protected]> --------- Signed-off-by: Jason Tsay <[email protected]>
1 parent 69b7aec commit a5a24f7

File tree

8 files changed

+2231
-105
lines changed

8 files changed

+2231
-105
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# ALTKJsonProcessor for Context Forge MCP Gateway
2+
3+
> Author: Jason Tsay
4+
> Version: 0.1.0
5+
6+
Uses JSON Processor from ALTK to extract data from long JSON responses. See the [ALTK](https://altk.ai/) and the [JSON Processor component in the ALTK repo](https://github.com/AgentToolkit/agent-lifecycle-toolkit/tree/main/altk/post_tool/code_generation) for more details on how the component works.
7+
8+
Note that this plugin will require calling an LLM and will therefore require configuring an LLM provider as described below. This plugin will also incure some cost in terms of time and money to do its LLM calls. This can be adjusted via the length threshold in the configuration, such that the plugin only activates and calls an LLM on JSON responses of a particular length (default: 100,000 characters).
9+
10+
## Hooks
11+
- `tool_post_invoke` - Detects long JSON responses and processes as necessary
12+
13+
## Installation
14+
15+
1. Enable the "ALTKJsonProcessor" plugin in `plugins/config.yaml`.
16+
2. Install the optional dependency `altk` (i.e. `pip install mcp-context-forge[altk]`)
17+
3. Configure a LLM provider as described below.
18+
19+
## Configuration
20+
21+
```yaml
22+
- name: "ALTKJsonProcessor"
23+
kind: "plugins.altk_json_processor.json_processor.ALTKJsonProcessor"
24+
description: "Uses JSON Processor from ALTK to extract data from long JSON responses"
25+
hooks: ["tool_post_invoke"]
26+
tags: ["plugin"]
27+
mode: "enforce"
28+
priority: 150
29+
conditions: []
30+
config:
31+
jsonprocessor_query: ""
32+
llm_provider: "watsonx" # one of watsonx, ollama, openai, anthropic
33+
watsonx: # each section of providers is optional
34+
wx_api_key: "" # optional, can define WX_API_KEY instead
35+
wx_project_id: "" # optional, can define WX_PROJECT_ID instead
36+
wx_url: "https://us-south.ml.cloud.ibm.com"
37+
ollama:
38+
ollama_url: "http://localhost:11434"
39+
openai:
40+
api_key: "" # optional, can define OPENAI_API_KEY instead
41+
anthropic:
42+
api_key: "" # optional, can define ANTHROPIC_API_KEY instead
43+
length_threshold: 100000
44+
model_id: "ibm/granite-3-3-8b-instruct" # note that this changes depending on provider
45+
```
46+
47+
- `length_threshold` is the minimum number of characters before activating this component
48+
- `jsonprocessor_query` is a natural language statement of what the long response should be processed for. For an example of a long response for a musical artist: "get full metadata for all albums from the artist's discography in json format"
49+
50+
### LLM Provider Configuration
51+
52+
In the configuration, select an LLM Provider via `llm_provider`, the current options are WatsonX, Ollama, OpenAI, or Anthropic.
53+
Then fill out the corresponding provider section in the plugin config. For many of the api key-related fields, an environment variable
54+
can also be used instead. If the field is set in both the plugin config and in an environment variable, the plugin config takes priority.
55+
56+
### JSON Processor Query
57+
58+
To guide the JSON Processor, an optional but recommended `jsonprocessor_query` can be provided that is a natural language statement of what the long response should be processed for.
59+
60+
Example queries:
61+
62+
- For an API endpoint such as [this Spotify artist overview](https://rapidapi.com/DataFanatic/api/spotify-scraper/playground/apiendpoint_fd33b4eb-d258-437e-af85-c244904acefc) that returns a large response, if you only want the discography of the artist, use a query such as: "get full metadata for all albums from the artist's discography in json format"
63+
- For a shopping API endpoint that returns a [response like this](https://raw.githubusercontent.com/AgentToolkit/agent-lifecycle-toolkit/refs/heads/main/examples/codegen_long_response_example.json), if you only want the sizes of hte sneakers, use a query such as: "get the sizes for all products"
64+
65+
## Testing
66+
67+
Unit tests: `tests/unit/mcpgateway/plugins/plugins/altk_json_processor/test_json_processor.py`
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# -*- coding: utf-8 -*-
2+
"""MCP Gateway ALTKJsonProcessor Plugin - Uses JSON Processor from ALTK to extract data from long JSON responses.
3+
4+
Copyright 2025
5+
SPDX-License-Identifier: Apache-2.0
6+
Authors: Jason Tsay
7+
8+
"""
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# -*- coding: utf-8 -*-
2+
"""Uses JSON Processor from ALTK to extract data from long JSON responses.
3+
4+
Copyright 2025
5+
SPDX-License-Identifier: Apache-2.0
6+
Authors: Jason Tsay
7+
8+
This module loads configurations for plugins.
9+
"""
10+
11+
# Standard
12+
import json
13+
import os
14+
from typing import cast
15+
16+
# Third-Party
17+
from altk.core.llm import get_llm
18+
19+
# Third-party
20+
from altk.core.toolkit import AgentPhase
21+
from altk.post_tool.code_generation.code_generation import CodeGenerationComponent, CodeGenerationComponentConfig
22+
from altk.post_tool.core.toolkit import CodeGenerationRunInput, CodeGenerationRunOutput
23+
24+
# First-Party
25+
from mcpgateway.plugins.framework import (
26+
Plugin,
27+
PluginConfig,
28+
PluginContext,
29+
ToolPostInvokePayload,
30+
ToolPostInvokeResult,
31+
)
32+
from mcpgateway.services.logging_service import LoggingService
33+
34+
# Initialize logging service first
35+
logging_service = LoggingService()
36+
logger = logging_service.get_logger(__name__)
37+
38+
39+
class ALTKJsonProcessor(Plugin):
40+
"""Uses JSON Processor from ALTK to extract data from long JSON responses."""
41+
42+
def __init__(self, config: PluginConfig):
43+
"""Entry init block for plugin.
44+
45+
Args:
46+
config: the plugin configuration
47+
"""
48+
super().__init__(config)
49+
if config.config:
50+
self._cfg = config.config
51+
else:
52+
self._cfg = {}
53+
54+
async def tool_post_invoke(self, payload: ToolPostInvokePayload, context: PluginContext) -> ToolPostInvokeResult:
55+
"""Plugin hook run after a tool is invoked.
56+
57+
Args:
58+
payload: The tool result payload to be analyzed.
59+
context: Contextual information about the hook call.
60+
61+
Raises:
62+
ValueError: if a provider api key is not provided in either config or env var
63+
64+
Returns:
65+
The result of the plugin's analysis, including whether the tool result should proceed.
66+
"""
67+
provider = self._cfg["llm_provider"]
68+
llm_client = None
69+
if provider == "watsonx":
70+
watsonx_client = get_llm("watsonx")
71+
if len(self._cfg["watsonx"]["wx_api_key"]) > 0:
72+
api_key = self._cfg["watsonx"]["wx_api_key"]
73+
else:
74+
api_key = os.getenv("WX_API_KEY")
75+
if not api_key:
76+
raise ValueError("WatsonX api key not found, provide WX_API_KEY either in the plugin config or as an env var.")
77+
if len(self._cfg["watsonx"]["wx_project_id"]) > 0:
78+
project_id = self._cfg["watsonx"]["wx_project_id"]
79+
else:
80+
project_id = os.getenv("WX_PROJECT_ID")
81+
if not project_id:
82+
raise ValueError("WatsonX project id not found, project WX_PROJECT_ID either in the plugin config or as an env var.")
83+
llm_client = watsonx_client(model_id=self._cfg["model_id"], api_key=api_key, project_id=project_id, url=self._cfg["watsonx"]["wx_url"])
84+
elif provider == "openai":
85+
openai_client = get_llm("openai.sync")
86+
if len(self._cfg["openai"]["api_key"]) > 0:
87+
api_key = self._cfg["openai"]["api_key"]
88+
else:
89+
api_key = os.getenv("OPENAI_API_KEY")
90+
if not api_key:
91+
raise ValueError("OpenAI api key not found, provide OPENAI_API_KEY either in the plugin config or as an env var.")
92+
llm_client = openai_client(api_key=api_key, model=self._cfg["model_id"])
93+
elif provider == "ollama":
94+
ollama_client = get_llm("litellm.ollama")
95+
llm_client = ollama_client(api_url=self._cfg["ollama"]["ollama_url"], model_name=self._cfg["model_id"])
96+
elif provider == "anthropic":
97+
anthropic_client = get_llm("litellm")
98+
model_path = f"anthropic/{self._cfg['model_id']}"
99+
if len(self._cfg["anthropic"]["api_key"]) > 0:
100+
api_key = self._cfg["anthropic"]["api_key"]
101+
else:
102+
api_key = os.getenv("ANTHROPIC_API_KEY")
103+
if not api_key:
104+
raise ValueError("Anthropic api key not found, provide ANTHROPIC_API_KEY either in the plugin config or as an env var.")
105+
llm_client = anthropic_client(model_name=model_path, api_key=api_key)
106+
elif provider == "pytestmock":
107+
# only meant to be used for unit tests
108+
llm_client = None
109+
else:
110+
raise ValueError("Unknown provider given for 'llm_provider' in plugin config!")
111+
112+
config = CodeGenerationComponentConfig(llm_client=llm_client, use_docker_sandbox=False)
113+
114+
response_json = None
115+
response_str = None
116+
if "content" in payload.result:
117+
if len(payload.result["content"]) > 0:
118+
content = payload.result["content"][0]
119+
if "type" in content and content["type"] == "text":
120+
response_str = content["text"]
121+
122+
if len(response_str) > self._cfg["length_threshold"]:
123+
try:
124+
response_json = json.loads(response_str)
125+
except json.decoder.JSONDecodeError:
126+
# ignore anything that's not json
127+
pass
128+
129+
# Should only get here if response is long enough and is valid JSON
130+
if response_json:
131+
logger.info("Long JSON response detected, using ALTK JSON Processor...")
132+
if provider == "pytestmock":
133+
# only meant for unit testing
134+
payload.result["content"][0]["text"] = "(filtered response)"
135+
else:
136+
codegen = CodeGenerationComponent(config=config)
137+
nl_query = self._cfg.get("jsonprocessor_query", "")
138+
input_data = CodeGenerationRunInput(messages=[], nl_query=nl_query, tool_response=response_json)
139+
output = codegen.process(input_data, AgentPhase.RUNTIME)
140+
output = cast(CodeGenerationRunOutput, output)
141+
payload.result["content"][0]["text"] = output.result
142+
logger.debug(f"ALTK processed response: {output.result}")
143+
return ToolPostInvokeResult(continue_processing=True, modified_payload=payload)
144+
145+
return ToolPostInvokeResult(continue_processing=True)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
description: "Uses JSON Processor from ALTK to extract data from long JSON responses"
2+
author: "Jason Tsay"
3+
version: "0.1.0"
4+
available_hooks:
5+
- "tool_post_invoke"
6+
default_configs:
7+
length_threshold: 100000

plugins/config.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,3 +869,33 @@ plugins:
869869
enable_caching: true
870870
cache_ttl: 3600
871871
max_text_length: 10000
872+
873+
# ALTK: JSON Processor
874+
- name: "ALTKJsonProcessor"
875+
kind: "plugins.altk_json_processor.json_processor.ALTKJsonProcessor"
876+
description: "Uses JSON Processor from ALTK to extract data from long JSON responses"
877+
version: "0.1.0"
878+
author: "Jason Tsay"
879+
hooks: ["tool_post_invoke"]
880+
tags: ["plugin"]
881+
mode: "disabled" # enforce | permissive | disabled
882+
priority: 150
883+
conditions:
884+
# Apply to specific tools/servers
885+
- server_ids: [] # Apply to all servers
886+
tenant_ids: [] # Apply to all tenants
887+
config:
888+
jsonprocessor_query: ""
889+
llm_provider: "watsonx" # one of watsonx, ollama, openai, anthropic
890+
watsonx: # each section of providers is optional
891+
wx_api_key: "" # optional, can define WX_API_KEY instead
892+
wx_project_id: "" # optional, can define WX_PROJECT_ID instead
893+
wx_url: "https://us-south.ml.cloud.ibm.com"
894+
ollama:
895+
ollama_url: "http://localhost:11434"
896+
openai:
897+
api_key: "" # optional, can define OPENAI_API_KEY instead
898+
anthropic:
899+
api_key: "" # optional, can define ANTHROPIC_API_KEY instead
900+
model_id: "ibm/granite-3-3-8b-instruct" # note that this changes depending on provider
901+
length_threshold: 100000

pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ dev = [
116116
"pydocstyle>=6.3.0",
117117
"pylint>=3.3.9",
118118
"pylint-pydantic>=0.3.5",
119-
"pyre-check>=0.9.25",
119+
#"pyre-check>=0.9.25", # unused, conflicts with altk, superceded by pyrefly
120120
"pyrefly>=0.35.0",
121121
"pyright>=1.1.406",
122122
"pyroma>=5.0",
@@ -213,6 +213,11 @@ asyncpg = [
213213
"asyncpg>=0.30.0",
214214
]
215215

216+
# Agent Lifecycle Toolkit(optional)
217+
altk = [
218+
"agent-lifecycle-toolkit>=0.4.0",
219+
]
220+
216221
# gRPC Support (EXPERIMENTAL - optional, disabled by default)
217222
# Install with: pip install mcp-contextforge-gateway[grpc]
218223
grpc = [
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# -*- coding: utf-8 -*-
2+
"""Location: ./tests/unit/mcpgateway/plugins/plugins/altk_json_processor/test_json_processor.py
3+
Copyright 2025
4+
SPDX-License-Identifier: Apache-2.0
5+
Authors: Jason Tsay
6+
7+
Tests for ALTKJsonProcessor.
8+
"""
9+
10+
# Standard
11+
import json
12+
13+
# Third-Party
14+
import pytest
15+
16+
# First-Party
17+
from mcpgateway.plugins.framework.models import (
18+
GlobalContext,
19+
HookType,
20+
PluginConfig,
21+
PluginContext,
22+
ToolPostInvokePayload,
23+
)
24+
25+
# ALTK is an optional dependency and may not be present, skip if not
26+
have_altk = True
27+
try:
28+
# Third-Party
29+
import altk # noqa: F401 # type: ignore
30+
31+
# First-Party
32+
from plugins.altk_json_processor.json_processor import ALTKJsonProcessor
33+
except ModuleNotFoundError:
34+
have_altk = False
35+
36+
37+
@pytest.mark.asyncio
38+
@pytest.mark.skipif(not have_altk, reason="altk not available")
39+
async def test_threshold():
40+
plugin = ALTKJsonProcessor( # type: ignore
41+
PluginConfig(
42+
name="jsonprocessor", kind="plugins.altk_json_processor.json_processor.ALTKJsonProcessor", hooks=[HookType.TOOL_POST_INVOKE], config={"llm_provider": "pytestmock", "length_threshold": 50}
43+
)
44+
)
45+
ctx = PluginContext(global_context=GlobalContext(request_id="r1"))
46+
# below threshold, so the plugin should not activate
47+
too_short = {"a": "1", "b": "2"}
48+
too_short_payload = {"content": [{"type": "text", "text": json.dumps(too_short)}]}
49+
res = await plugin.tool_post_invoke(ToolPostInvokePayload(name="x1", result=too_short_payload), ctx)
50+
assert res.modified_payload is None
51+
long_enough = {
52+
"a": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
53+
"b": "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.",
54+
"c": "Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.",
55+
"d": "Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.",
56+
}
57+
# above threshold, so the plugin should activate
58+
long_enough_payload = {"content": [{"type": "text", "text": json.dumps(long_enough)}]}
59+
res = await plugin.tool_post_invoke(ToolPostInvokePayload(name="x2", result=long_enough_payload), ctx)
60+
assert res.modified_payload is not None
61+
assert res.modified_payload.result["content"][0]["text"] == "(filtered response)"

0 commit comments

Comments
 (0)