From 3640aac587e0431963a8743e9255b98850365406 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 11 Apr 2026 19:32:09 -0700 Subject: [PATCH 1/9] feat: support for dual code Signed-off-by: vsoch --- README.md | 5 ++++- mcpserver/cli/args.py | 6 ++++++ mcpserver/core/hub.py | 40 +++++++++++++++++++++++++++++++++++----- mcpserver/version.py | 2 +- 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 8dba152..b32ba40 100644 --- a/README.md +++ b/README.md @@ -179,7 +179,7 @@ And with the configuration file instead: mcpserver start -t http --port 8089 --config ./examples/jobspec/mcpserver.yaml ``` -We will provide examples for jobspec translation functions in [fractale-mcp](https://github.com/compspec/fractale-mcp). +We will provide examples for jobspec translation functions in [fractale](https://github.com/compspec/fractale-mcp) and the agent in [fractale-agents](https://github.com/converged-computing/fractale-agents). ### Kubernetes (kind) @@ -243,6 +243,9 @@ The mcp-server can register worker hubs, which are other MCP servers that regist ```bash # Start a hub in one terminal mcpserver start --hub --hub-secret potato + +# Start in dual mode (not recommended for production, primarily for experiments) +mcpserver start --dual --hub-secret potato ``` In another terminal, start a worker using the token that is generated. Add some functions for fun. diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index f52956e..1ea53de 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -79,6 +79,12 @@ def populate_start_args(start): help="Run the hub in serial mode (ideal for experiments on single machines)", default=False, ) + # Dual mode means "I am a hub AND a worker" - added this for dispatch experiments + hub_group.add_argument( + "--dual", + action="store_true", + help="Start as both a Hub and a Worker (registers local resources to the fleet).", + ) # Worker Registration Group worker_group = start.add_argument_group("🐝 Worker Registration") diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index b52154b..a8514bc 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -2,6 +2,7 @@ import json import random import secrets +import socket import time from typing import Any, Dict, Optional @@ -19,24 +20,42 @@ class HubManager: reflects their tools, and manages federated job negotiation. """ - def __init__(self, mcp, host: str, port: int, secret: str = None, batch=None, serial=False): + def __init__( + self, + mcp, + host: str, + port: int, + secret: str = None, + batch=None, + serial=False, + dual=False, + hub_id=None, + ): self.mcp = mcp self.host = host self.port = port self.secret = secret or secrets.token_urlsafe(32) self.workers: Dict[str, Dict[str, Any]] = {} + self.hub_id = hub_id or socket.gethostname() # Make requests to hub in batches, in serial, or in parallel - self.set_running_mode(batch, serial) + self.set_running_mode(batch, serial, dual) # Track registered proxies to prevent ValueError on worker re-registration self._registered_proxies = set() - - self.registration_url = f"http://{host}:{port}/register" self._print_banner() self._register_hub_tools() - def set_running_mode(self, batch_size=None, serial=False): + @property + def url(self): + # This is running with uvicorn that serves the ssl + return f"http://{self.host}:{self.port}" + + @property + def registration_url(self): + return f"{self.url}/register" + + def set_running_mode(self, batch_size=None, serial=False, dual=False): """ Set the function to call the fleet. If we are worried about rate limits or running experiments, @@ -61,6 +80,16 @@ def set_running_mode(self, batch_size=None, serial=False): self.run_on_fleet = self.run_on_fleet_batched logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}") + # If we are also running as a worker, add ourselves to the fleet + if not dual: + return + + hub_id = self.hub_id or socket.gethostname() + self.workers[hub_id] = { + "url": self.registration_url, + "client": Client(self.registration_url), + } + @classmethod def from_args(cls, mcp, args) -> Optional["HubManager"]: """ @@ -75,6 +104,7 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]: secret=args.hub_secret, batch=args.batch, serial=args.serial, + dual=args.dual, ) def _print_banner(self): diff --git a/mcpserver/version.py b/mcpserver/version.py index 2ec0cf2..19149e1 100644 --- a/mcpserver/version.py +++ b/mcpserver/version.py @@ -1,4 +1,4 @@ -__version__ = "0.0.16" +__version__ = "0.0.17" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "mcp-serve" From 55dc2fa5c78be354236325ce1465710755a124a9 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 11 Apr 2026 19:57:55 -0700 Subject: [PATCH 2/9] design: common base for worker and hub We should have a common base with worker functions to make it cleaner for a hub to serve as a worker. Signed-off-by: vsoch --- mcpserver/cli/start.py | 10 ++-- mcpserver/core/base.py | 99 ++++++++++++++++++++++++++++++++++++++++ mcpserver/core/hub.py | 40 ++++++++++++---- mcpserver/core/worker.py | 94 ++------------------------------------ 4 files changed, 139 insertions(+), 104 deletions(-) create mode 100644 mcpserver/core/base.py diff --git a/mcpserver/cli/start.py b/mcpserver/cli/start.py index 2ad0d9a..7d63b74 100644 --- a/mcpserver/cli/start.py +++ b/mcpserver/cli/start.py @@ -15,7 +15,7 @@ from mcpserver.app import init_mcp from mcpserver.cli.manager import get_manager from mcpserver.core.config import MCPConfig -from mcpserver.core.hub import HubManager +from mcpserver.core.hub import DualHubManager, HubManager from mcpserver.core.worker import WorkerManager from mcpserver.logger import logger @@ -64,7 +64,9 @@ def main(args, extra, **kwargs): app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan) # Setup Hub (parent role) - if args.hub: + if args.dual: + mcp.hub_manager = DualHubManager.from_args(mcp, args) + elif args.hub: mcp.hub_manager = HubManager.from_args(mcp, args) # Setup Worker (child role) - triggered by --join. We require join secret. @@ -86,8 +88,8 @@ async def lifespan(app: FastAPI): app = FastAPI(title="MCP Server", lifespan=lifespan) - # Bind the /register endpoint if we are a Hub - if args.hub: + # Bind the /register endpoint if we are a Hub (or Hub and Worker) + if args.hub or args.dual: mcp.hub_manager.bind_to_app(app) # Mount the MCP server. Note from V: we can use mount with antother FastMCP diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py new file mode 100644 index 0000000..dc32ad9 --- /dev/null +++ b/mcpserver/core/base.py @@ -0,0 +1,99 @@ +import collections +import json +import time + +import mcpserver.utils as utils + + +class WorkerBase: + """ + A WorkerBase provides worker interaction functions, e.g., negotiate, status, + ask secretary. We provide it here so that a hub can use it to generate + its dual mode (acting as worker AND hub.) + """ + + def register_agent_tools(self): + """ + Registers the core negotiation tools with the FastMCP instance. + """ + + @self.mcp.tool(name="get_status") + async def get_status() -> dict: + """ + Returns the Level 1 Static Manifest of this cluster. + Use this to verify hardware, software providers, and site info. + """ + return { + "worker_id": self.worker_id, + "timestamp": time.time(), + "manifest": self.manifest, + } + + @self.mcp.tool(name="ask_secretary") + async def ask_secretary(request: str) -> dict: + """ + Wakes up the local Secretary Agent to perform a Level 2 investigation. + Use this to ask about specific software availability, queue depth, or node health. + """ + from resource_secretary.agents.secretary import SecretaryAgent + + # Flatten the catalog into a list of active provider instances + active_providers = [inst for category in self.catalog.values() for inst in category] + + # Verbose mode returns a second block with CALLS + agent = SecretaryAgent(active_providers, verbose=self.verbose) + proposal = await agent.negotiate(request) + return {"worker_id": self.worker_id, "proposal": proposal} + + @self.mcp.tool(name="submit") + async def receive_job(request: str) -> dict: + """ + Receive a job. Accepts a job request, invokes the local Secretary to + generate a spec, submit it, and verify the job ID. + """ + from resource_secretary.agents.secretary import SecretaryAgent + + active_providers = [inst for cat in self.catalog.values() for inst in cat] + + agent = SecretaryAgent(active_providers) + raw_result = await agent.submit(request) + try: + receipt = json.loads(utils.extract_code_block(raw_result)) + except: + receipt = {"status": "FAILED", "reasoning": raw_result} + + return {"worker_id": self.worker_id, "receipt": receipt} + + @self.mcp.tool(name="export_provider_metadata") + def export_provider_metadata() -> str: + """ + Iterates through all providers and returns their internal 'truth' state. + This tool is 'hidden' from the Secretary Agent but used by the Hub. + """ + truth_map = {} + tool_registry = collections.defaultdict(list) + + # Self.catalog is a dict: {"software": [MockSpackProvider, ...]} + for category, providers in self.catalog.items(): + truth_map[category] = {} + for p in providers: + # We check if the provider has the export_truth method + if hasattr(p, "export_truth"): + truth_map[category][p.name] = p.export_truth() + else: + # Fallback to standard metadata if not a mock + truth_map[category][p.name] = p.metadata + + # Capture all Secretary Tools for this provider + # We can use this for simulations to assess what the agent + # should have called (vs. what it did) + manifest = p.discover_tools(tool_types=["secretary"]) + for tool_name in manifest.keys(): + tool_registry[category].append(f"{p.name}.{tool_name}") + + metadata = {"truth": truth_map, "registry": dict(tool_registry)} + + # If we have an archetype (mocking something) save it + if hasattr(p, "archetype"): + metadata["metadata"] = {"archetype": p.archetype.name} + return json.dumps(metadata, indent=2) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index a8514bc..2f0745f 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -13,6 +13,8 @@ import mcpserver.utils as utils from mcpserver.logger import logger +from .base import WorkerBase + class HubManager: """ @@ -30,10 +32,12 @@ def __init__( serial=False, dual=False, hub_id=None, + path="/mcp", ): self.mcp = mcp self.host = host self.port = port + self.path = path self.secret = secret or secrets.token_urlsafe(32) self.workers: Dict[str, Dict[str, Any]] = {} self.hub_id = hub_id or socket.gethostname() @@ -81,21 +85,15 @@ def set_running_mode(self, batch_size=None, serial=False, dual=False): logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}") # If we are also running as a worker, add ourselves to the fleet - if not dual: - return - - hub_id = self.hub_id or socket.gethostname() - self.workers[hub_id] = { - "url": self.registration_url, - "client": Client(self.registration_url), - } + self.dual = dual @classmethod def from_args(cls, mcp, args) -> Optional["HubManager"]: """ Create a HubManager from CLI arguments. """ - if not getattr(args, "hub", False): + # Running in hub or dual mode? + if not getattr(args, "hub", False) and not getattr(args, "dual", False): return None return cls( mcp, @@ -105,6 +103,8 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]: batch=args.batch, serial=args.serial, dual=args.dual, + # server path + path=args.path, ) def _print_banner(self): @@ -386,3 +386,25 @@ def _create_proxy(self, worker_id: str, tool: Tool): except Exception as e: logger.error(f"❌ Failed to generate dynamic proxy for {tool.name}: {e}") + + +class DualHubManager(WorkerBase, HubManager): + """ + Combined hub and worker base. Aka, a hub that also serves as a worker + """ + + def __init__(self, *args, **kwargs): + # Calls super on the HubManager. WorkerBase has no init + super().__init__(*args, **kwargs) + self.setup_dual() + + def setup_dual(self): + """ + Setup dual mode, which means adding ourselves to the fleet. + """ + hub_id = self.hub_id or socket.gethostname() + default_url = f"http://{self.host}:{self.port}{self.path}" + self.workers[hub_id] = { + "url": self.registration_url, + "client": Client(default_url), + } diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index 1363357..b1a4052 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -1,8 +1,5 @@ import asyncio -import collections -import json import socket -import time from typing import Any, Dict, Optional import httpx @@ -10,11 +7,12 @@ from resource_secretary.providers.mock import discover_mock_providers from rich import print -import mcpserver.utils as utils from mcpserver.logger import logger +from .base import WorkerBase -class WorkerManager: + +class WorkerManager(WorkerBase): """ A generic worker mcpserver that discovers its own capabilities and context using the resource-secretary library. @@ -93,92 +91,6 @@ def parse_labels(self, label_list: Optional[list]) -> dict: labels[k.strip()] = v.strip() return labels - def register_agent_tools(self): - """ - Registers the core negotiation tools with the FastMCP instance. - """ - - @self.mcp.tool(name="get_status") - async def get_status() -> dict: - """ - Returns the Level 1 Static Manifest of this cluster. - Use this to verify hardware, software providers, and site info. - """ - return { - "worker_id": self.worker_id, - "timestamp": time.time(), - "manifest": self.manifest, - } - - @self.mcp.tool(name="ask_secretary") - async def ask_secretary(request: str) -> dict: - """ - Wakes up the local Secretary Agent to perform a Level 2 investigation. - Use this to ask about specific software availability, queue depth, or node health. - """ - from resource_secretary.agents.secretary import SecretaryAgent - - # Flatten the catalog into a list of active provider instances - active_providers = [inst for category in self.catalog.values() for inst in category] - - # Verbose mode returns a second block with CALLS - agent = SecretaryAgent(active_providers, verbose=self.verbose) - proposal = await agent.negotiate(request) - return {"worker_id": self.worker_id, "proposal": proposal} - - @self.mcp.tool(name="submit") - async def receive_job(request: str) -> dict: - """ - Receive a job. Accepts a job request, invokes the local Secretary to - generate a spec, submit it, and verify the job ID. - """ - from resource_secretary.agents.secretary import SecretaryAgent - - active_providers = [inst for cat in self.catalog.values() for inst in cat] - - agent = SecretaryAgent(active_providers) - raw_result = await agent.submit(request) - try: - receipt = json.loads(utils.extract_code_block(raw_result)) - except: - receipt = {"status": "FAILED", "reasoning": raw_result} - - return {"worker_id": self.worker_id, "receipt": receipt} - - @self.mcp.tool(name="export_provider_metadata") - def export_provider_metadata() -> str: - """ - Iterates through all providers and returns their internal 'truth' state. - This tool is 'hidden' from the Secretary Agent but used by the Hub. - """ - truth_map = {} - tool_registry = collections.defaultdict(list) - - # Self.catalog is a dict: {"software": [MockSpackProvider, ...]} - for category, providers in self.catalog.items(): - truth_map[category] = {} - for p in providers: - # We check if the provider has the export_truth method - if hasattr(p, "export_truth"): - truth_map[category][p.name] = p.export_truth() - else: - # Fallback to standard metadata if not a mock - truth_map[category][p.name] = p.metadata - - # Capture all Secretary Tools for this provider - # We can use this for simulations to assess what the agent - # should have called (vs. what it did) - manifest = p.discover_tools(tool_types=["secretary"]) - for tool_name in manifest.keys(): - tool_registry[category].append(f"{p.name}.{tool_name}") - - metadata = {"truth": truth_map, "registry": dict(tool_registry)} - - # If we have an archetype (mocking something) save it - if hasattr(p, "archetype"): - metadata["metadata"] = {"archetype": p.archetype.name} - return json.dumps(metadata, indent=2) - async def run_registration(self): """ Registers the worker with the Hub. From 8730bd2ebb93a23208907086e808d71326a8a043 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 12 Apr 2026 23:13:41 -0700 Subject: [PATCH 3/9] dispatch: updates to secretary Signed-off-by: vsoch --- README.md | 4 ++++ mcpserver/core/hub.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b32ba40..669fd1e 100644 --- a/README.md +++ b/README.md @@ -276,7 +276,11 @@ mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8 If you are doing experiments, you can bring up a hub the same way: ```bash +# Start as a standalone hub (recommended) mcpserver start --hub --hub-secret potato + +# Start in dual mode (not recommended for production or performance experiments +mcpserver start --dual --hub-secret potato ``` To mock (simulate) a worker, add `--mock`, optionally with a particular archetype (one of `hpc`, `cloud`, or `standalone`). A worker ID is suggested to make the seed reproducible. diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index 2f0745f..f37d5c8 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -82,7 +82,7 @@ def set_running_mode(self, batch_size=None, serial=False, dual=False): self.batch_size = batch_size self.semaphore = asyncio.Semaphore(batch_size) self.run_on_fleet = self.run_on_fleet_batched - logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}") + logger.info(f"🚦 Hub initialized with Batch Size: {batch_size} Worker mode: {dual}") # If we are also running as a worker, add ourselves to the fleet self.dual = dual From 3673ed7b941fafc7e64f0b1659f4808236cb10b2 Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 13 Apr 2026 00:18:14 -0700 Subject: [PATCH 4/9] bug: hub in dual mode needs to register agent tools testing, and I did not have the agent tools. This should fix it. Signed-off-by: vsoch --- mcpserver/core/hub.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index f37d5c8..9074404 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -408,3 +408,4 @@ def setup_dual(self): "url": self.registration_url, "client": Client(default_url), } + self.register_agent_tools() From 0aaf7c83a91a911f2df317d1701208e19ae680ec Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 13 Apr 2026 00:35:01 -0700 Subject: [PATCH 5/9] feat: dual mode should also support mock Signed-off-by: vsoch --- mcpserver/core/base.py | 15 +++++++++++++++ mcpserver/core/hub.py | 11 +++++++++-- mcpserver/core/worker.py | 11 ----------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py index dc32ad9..893d502 100644 --- a/mcpserver/core/base.py +++ b/mcpserver/core/base.py @@ -2,7 +2,11 @@ import json import time +from resource_secretary.providers import discover_providers +from resource_secretary.providers.mock import discover_mock_providers + import mcpserver.utils as utils +from mcpserver.logger import logger class WorkerBase: @@ -12,6 +16,17 @@ class WorkerBase: its dual mode (acting as worker AND hub.) """ + def init_providers(self, mock=False): + """ + Probe the local system on startup. E.g., "we found spack, flux, etc." + These can be faux (mock) or real discovered providers + """ + logger.info("📡 Probing local system for resource providers...") + if mock: + self.catalog = discover_mock_providers(self.worker_id, choice=mock) + else: + self.catalog = discover_providers() + def register_agent_tools(self): """ Registers the core negotiation tools with the FastMCP instance. diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index 9074404..e7c3410 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -33,14 +33,19 @@ def __init__( dual=False, hub_id=None, path="/mcp", + mock=False, ): + # Probably can simplify some of this between worker and hub self.mcp = mcp self.host = host self.port = port self.path = path self.secret = secret or secrets.token_urlsafe(32) self.workers: Dict[str, Dict[str, Any]] = {} - self.hub_id = hub_id or socket.gethostname() + + # For use if we are also a worker. + self.worker_id = hub_id or socket.gethostname() + self.mock = mock # Make requests to hub in batches, in serial, or in parallel self.set_running_mode(batch, serial, dual) @@ -103,6 +108,7 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]: batch=args.batch, serial=args.serial, dual=args.dual, + mock=args.mock, # server path path=args.path, ) @@ -397,12 +403,13 @@ def __init__(self, *args, **kwargs): # Calls super on the HubManager. WorkerBase has no init super().__init__(*args, **kwargs) self.setup_dual() + self.init_providers(kwargs.get("mock", False)) def setup_dual(self): """ Setup dual mode, which means adding ourselves to the fleet. """ - hub_id = self.hub_id or socket.gethostname() + hub_id = self.worker_id or socket.gethostname() default_url = f"http://{self.host}:{self.port}{self.path}" self.workers[hub_id] = { "url": self.registration_url, diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index b1a4052..afc6561 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -47,17 +47,6 @@ def __init__( # Register MCP Tools automatically self.register_agent_tools() - def init_providers(self, mock=False): - """ - Probe the local system on startup. E.g., "we found spack, flux, etc." - These can be faux (mock) or real discovered providers - """ - logger.info("📡 Probing local system for resource providers...") - if mock: - self.catalog = discover_mock_providers(self.worker_id, choice=mock) - else: - self.catalog = discover_providers() - def show(self): """ Show providers installed and verbosity. From 2464c45fd5d5cea5fe0f8faf4eca68bb1893d854 Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 13 Apr 2026 09:29:59 -0700 Subject: [PATCH 6/9] experiment: support for hub verbose (for calls) Signed-off-by: vsoch --- mcpserver/core/base.py | 2 +- mcpserver/core/hub.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py index 893d502..723e6f8 100644 --- a/mcpserver/core/base.py +++ b/mcpserver/core/base.py @@ -70,7 +70,7 @@ async def receive_job(request: str) -> dict: active_providers = [inst for cat in self.catalog.values() for inst in cat] - agent = SecretaryAgent(active_providers) + agent = SecretaryAgent(active_providers, verbose=self.verbose) raw_result = await agent.submit(request) try: receipt = json.loads(utils.extract_code_block(raw_result)) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index e7c3410..6866d00 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -34,6 +34,7 @@ def __init__( hub_id=None, path="/mcp", mock=False, + verbose: Optional[bool] = False, ): # Probably can simplify some of this between worker and hub self.mcp = mcp @@ -42,6 +43,7 @@ def __init__( self.path = path self.secret = secret or secrets.token_urlsafe(32) self.workers: Dict[str, Dict[str, Any]] = {} + self.verbose = verbose # For use if we are also a worker. self.worker_id = hub_id or socket.gethostname() @@ -109,6 +111,7 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]: serial=args.serial, dual=args.dual, mock=args.mock, + verbose=args.verbose, # server path path=args.path, ) From 50aa7c7de178f97929ada8e139e25018b14483c2 Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 13 Apr 2026 13:20:44 -0700 Subject: [PATCH 7/9] hub: parse calls directly from dispatch This will make it much easier for a user to see what was called. We can do this for negotiation too. Signed-off-by: vsoch --- mcpserver/core/base.py | 32 +++++++++++++++++++++++++++++--- mcpserver/core/hub.py | 6 +++++- mcpserver/utils/text.py | 21 +++++++++++++++++++-- 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py index 723e6f8..9f34768 100644 --- a/mcpserver/core/base.py +++ b/mcpserver/core/base.py @@ -15,6 +15,32 @@ class WorkerBase: ask secretary. We provide it here so that a hub can use it to generate its dual mode (acting as worker AND hub.) """ + def jsonify_response(self, result): + """ + Ensure we get the text, and separate and parse tool calls, + which the agent will return in a verbose mode. + """ + print('result') + print(result) + print(type(result)) + if isinstance(result, dict): + return result + if not isinstance(result, str) and hasattr(result, "content"): + result = result.content[0].text + + # Audit the tool calls (Did the agent just get lucky?) + calls = [] + if "CALLS" in result: + try: + result, calls_block = result.split("CALLS") + calls = utils.format_calls(calls_block) + except: + print(f"Issue parsing calls, agent had malformed response: {result}") + pass + + result = json.loads(utils.extract_code_block(result)) + result["calls"] = calls + return result def init_providers(self, mock=False): """ @@ -73,9 +99,9 @@ async def receive_job(request: str) -> dict: agent = SecretaryAgent(active_providers, verbose=self.verbose) raw_result = await agent.submit(request) try: - receipt = json.loads(utils.extract_code_block(raw_result)) - except: - receipt = {"status": "FAILED", "reasoning": raw_result} + receipt = self.jsonify_response(raw_result) + except Exception as e: + receipt = {"status": "FAILED", "reasoning": raw_result, "error": str(e)} return {"worker_id": self.worker_id, "receipt": receipt} diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index 6866d00..cf85e9e 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -209,7 +209,10 @@ async def dispatch_job(worker_id: str, prompt: str) -> dict: async with info["client"] as sess: result = await sess.call_tool("submit", {"request": prompt}) - return json.loads(utils.extract_code_block(result.content[0].text)) + print('response in hub') + print(type(result)) + print(result) + return self.jsonify_response(result) @self.mcp.tool(name="negotiate_job") async def negotiate_job(prompt: str) -> dict: @@ -253,6 +256,7 @@ async def negotiate_handler(wid, sess): mcp_result = await sess.call_tool("ask_secretary", {"request": prompt}) raw_text = mcp_result.content[0].text + # TODO: vsoch: add support to parse the calls here too (like dispatch) try: # Parse and handle potential quote issues in LLM JSON proposal_data = json.loads(utils.extract_code_block(raw_text)) diff --git a/mcpserver/utils/text.py b/mcpserver/utils/text.py index 89d824b..694b7c7 100644 --- a/mcpserver/utils/text.py +++ b/mcpserver/utils/text.py @@ -1,7 +1,11 @@ +import json import re def sanitize(name: str) -> str: + """ + Sanitize worker ids and arguments for hub properties. + """ # Replace hyphens/dots with underscores clean = name.replace("-", "_").replace(".", "_") # Python identifiers cannot start with a digit @@ -10,8 +14,21 @@ def sanitize(name: str) -> str: return clean -def format_rules(rules): - return "\n".join([f"- {r}" for r in rules]) +def format_calls(calls_block): + """ + The secretary agent can return calls. We need to ensure we try + to get and parse them correctly. + """ + calls = [] + try: + print(calls_block) + print(type(calls_block)) + calls = extract_code_block(calls_block) + print('success to extract calls') + return calls + except Exception as e: + print(f'Issue in format calls: {e}') + return calls def extract_code_block(text): From 77dbfb6ae1033fce38b979cc558504df58d4b1cc Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 16 Apr 2026 13:51:27 -0700 Subject: [PATCH 8/9] feat: add applications as providers Signed-off-by: vsoch --- mcpserver/core/base.py | 9 +++++++-- mcpserver/core/hub.py | 3 --- mcpserver/core/worker.py | 2 -- mcpserver/utils/text.py | 8 ++------ 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py index 9f34768..09c9197 100644 --- a/mcpserver/core/base.py +++ b/mcpserver/core/base.py @@ -2,6 +2,7 @@ import json import time +from resource_secretary.apps import discover_applications from resource_secretary.providers import discover_providers from resource_secretary.providers.mock import discover_mock_providers @@ -15,19 +16,20 @@ class WorkerBase: ask secretary. We provide it here so that a hub can use it to generate its dual mode (acting as worker AND hub.) """ + def jsonify_response(self, result): """ Ensure we get the text, and separate and parse tool calls, which the agent will return in a verbose mode. """ - print('result') + print("result") print(result) print(type(result)) if isinstance(result, dict): return result if not isinstance(result, str) and hasattr(result, "content"): result = result.content[0].text - + # Audit the tool calls (Did the agent just get lucky?) calls = [] if "CALLS" in result: @@ -47,11 +49,14 @@ def init_providers(self, mock=False): Probe the local system on startup. E.g., "we found spack, flux, etc." These can be faux (mock) or real discovered providers """ + # We can use apps in mock or regular + apps = discover_applications() logger.info("📡 Probing local system for resource providers...") if mock: self.catalog = discover_mock_providers(self.worker_id, choice=mock) else: self.catalog = discover_providers() + self.catalog.update(apps) def register_agent_tools(self): """ diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index cf85e9e..2e22a8d 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -209,9 +209,6 @@ async def dispatch_job(worker_id: str, prompt: str) -> dict: async with info["client"] as sess: result = await sess.call_tool("submit", {"request": prompt}) - print('response in hub') - print(type(result)) - print(result) return self.jsonify_response(result) @self.mcp.tool(name="negotiate_job") diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index afc6561..863152c 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -3,8 +3,6 @@ from typing import Any, Dict, Optional import httpx -from resource_secretary.providers import discover_providers -from resource_secretary.providers.mock import discover_mock_providers from rich import print from mcpserver.logger import logger diff --git a/mcpserver/utils/text.py b/mcpserver/utils/text.py index 694b7c7..c2b591b 100644 --- a/mcpserver/utils/text.py +++ b/mcpserver/utils/text.py @@ -1,4 +1,3 @@ -import json import re @@ -21,14 +20,11 @@ def format_calls(calls_block): """ calls = [] try: - print(calls_block) - print(type(calls_block)) calls = extract_code_block(calls_block) - print('success to extract calls') return calls except Exception as e: - print(f'Issue in format calls: {e}') - return calls + print(f"Issue in format calls: {e}") + return calls_block def extract_code_block(text): From 7ce13e60f56f6bcb64852fbbc2629181d97de550 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 18 Apr 2026 00:10:48 -0700 Subject: [PATCH 9/9] bug: tangled dependencies Signed-off-by: vsoch --- mcpserver/core/base.py | 12 +++++------- mcpserver/tools/system/system.py | 15 +++++++++------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/mcpserver/core/base.py b/mcpserver/core/base.py index 09c9197..98547ba 100644 --- a/mcpserver/core/base.py +++ b/mcpserver/core/base.py @@ -2,10 +2,6 @@ import json import time -from resource_secretary.apps import discover_applications -from resource_secretary.providers import discover_providers -from resource_secretary.providers.mock import discover_mock_providers - import mcpserver.utils as utils from mcpserver.logger import logger @@ -22,9 +18,6 @@ def jsonify_response(self, result): Ensure we get the text, and separate and parse tool calls, which the agent will return in a verbose mode. """ - print("result") - print(result) - print(type(result)) if isinstance(result, dict): return result if not isinstance(result, str) and hasattr(result, "content"): @@ -49,6 +42,11 @@ def init_providers(self, mock=False): Probe the local system on startup. E.g., "we found spack, flux, etc." These can be faux (mock) or real discovered providers """ + # Not required unless serving a worker or hub. + from resource_secretary.apps import discover_applications + from resource_secretary.providers import discover_providers + from resource_secretary.providers.mock import discover_mock_providers + # We can use apps in mock or regular apps = discover_applications() logger.info("📡 Probing local system for resource providers...") diff --git a/mcpserver/tools/system/system.py b/mcpserver/tools/system/system.py index 411e1b6..a679d70 100644 --- a/mcpserver/tools/system/system.py +++ b/mcpserver/tools/system/system.py @@ -1,13 +1,8 @@ import os import time -from typing import Any, Dict, List - -from resource_secretary.agents.backends import get_backend -from resource_secretary.agents.secretary import SecretaryAgent -from resource_secretary.providers import discover_providers +from typing import Any, Dict from mcpserver.tools.base import BaseTool -from mcpserver.tools.decorator import mcp class SystemTool(BaseTool): @@ -16,6 +11,8 @@ class SystemTool(BaseTool): """ def setup(self, manager=None): + from resource_secretary.providers import discover_providers + self.manager = manager self.catalog = discover_providers() @@ -42,6 +39,12 @@ async def ask_secretary(self, request: str, verbose=False) -> Dict[str, Any]: """ Wakes up the local Secretary Agent using the configured backend. """ + try: + from resource_secretary.agents.backends import get_backend + from resource_secretary.agents.secretary import SecretaryAgent + except ImportError: + return {"proposal": "This cluster cannot access resources.", "status": "SUCCESS"} + # Resolve the backend instance on-demand backend = get_backend( backend_type=self.backend_config["type"],