diff --git a/README.md b/README.md index 3263292..36d3a55 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,9 @@ The following agents are availble. | optimize | General optimization agent | fractale_agents.optimize.OptimizeAgent | | job-transform | Job specification transformation agent | fractale_agents.hpc.job.JobTransformAgent | | job-generate | Job specification generation agent | fractale_agents.hpc.job.JobGenerationAgent | +| job-analysis | Agent to analyze job application/software and/or intent | fractale_agents.hpc.job.JobAnalysisAgent | +| script-analysis | Agent to analyze script application/software and/or intent | fractale_agents.hpc.job.ScriptAnalysisAgent | +| adversarial | Agent to assess response from another agent and fix | fractale_agents.general.AdversarialAgent | The general prompt agent is provisioned by fractale directly, `fractale.agents.general.PromptAgent`. Would you like to see an expert added? Please open an issue and let us know. diff --git a/fractale_agents/general/__init__.py b/fractale_agents/general/__init__.py index 2736084..0f58c4f 100644 --- a/fractale_agents/general/__init__.py +++ b/fractale_agents/general/__init__.py @@ -1 +1,2 @@ +from .adversarial import AdversarialAgent from .prompt import PromptAgent diff --git a/fractale_agents/general/adversarial.py b/fractale_agents/general/adversarial.py new file mode 100644 index 0000000..4a12409 --- /dev/null +++ b/fractale_agents/general/adversarial.py @@ -0,0 +1,185 @@ +import json +from typing import Any, Awaitable, Callable, Dict, Optional + +from fractale_agents.agent import BaseSubAgent + +adversarial_prompt = """You are a meticulous, skeptical reviewer of the output of an expert analysis agent. A first ("expert") agent was given some input and a labeling task, and it produced a structured result. Your job is to critically re-examine that result against the actual input, correct anything the input clearly contradicts, and return an improved result that conforms to the SAME schema the expert agent was asked to follow. You are adversarial in that you actively hunt for the expert agent's mistakes — but you are evidence-driven, not destructive: every change must be justified by the input, and leaving a correct field unchanged is the expected outcome for most fields. + +You are provided the following material. + +In the TASK CONTEXT: + - ORIGINAL PROMPT: the prompt the expert agent was given. It defines the task, the labeling taxonomy, and the EXACT output schema the result must follow. + - PREVIOUS RESULT: the expert agent's output, which you will evaluate and revise. + +In the TASK GOAL: + - The specific task the agent was asked to do. This is your GROUND TRUTH — re-derive your judgments from it directly. + +REVIEW CRITERIA (general) + +Prioritize the fields that require interpretation and cannot be checked mechanically, since these are where the expert agent is most likely wrong and where your review adds the most value. Review fields to be accurate, specific, and grounded in truth. Flag vague, generic, or hallucinated text. Look for missing or incomplete items, or incorrect ordering or intent. You may also correct concrete fields (identifiers, parameters, references, paths) when the input plainly contradicts them, but treat these as secondary — they are typically validated by other means. + +REVISION RULES + +Be conservative and precise: + - Change a field ONLY when the input gives clear evidence the previous value is wrong, imprecise, or missing. Do NOT invent problems, do NOT rewrite correct fields for style, and do NOT manufacture changes to appear thorough. "No change needed" is a valid and common verdict. + - When you change something, ground it in the input: cite the specific construct, line, phrase, or element that justifies the change. + - If you suspect a problem but cannot confirm it from the provided input alone (e.g., a value defined in an external resource you cannot see), do NOT change the field — record it under "unresolved_concerns" with your reasoning instead. + - Preserve the schema EXACTLY as defined in the ORIGINAL PROMPT: same keys, same nesting, same value types, same allowed enum values. Re-emit the COMPLETE result (every field), not just the parts you changed. + - Preserve item ordering and identifiers unless a segmentation fix requires renumbering, in which case renumber consistently. + +OUTPUT PROTOCOL + +Return EXACTLY ONE JSON object and nothing else — no markdown fences, no commentary. It carries the corrected result under "result" (conforming to the original schema) and your audit under "evaluation". + +{ + "action": "stop", + "status": "success|failure|other", + "summary": "...", + "issues": "", + "reason": "...", + "result": { + ... the COMPLETE revised result, conforming exactly to the schema in the ORIGINAL PROMPT ... + }, + "evaluation": { + "verdict": "accepted_as_is" | "revised" | "rejected", + "num_changes": , + "changes": [ + { + "location": "", + "action": "fix" | "add" | "remove", + "from": "", + "to": "", + "evidence": "", + "confidence": "high" | "medium" | "low" + } + ], + "unresolved_concerns": [ "" ] + } +} + +CONTROL FIELD GUIDANCE +- "verdict": "accepted_as_is" if you made no changes; "revised" if you corrected one or more fields; "rejected" if the previous result was so wrong it had to be substantially rebuilt. +- "num_changes": the length of "changes". +- "status": "success" if you completed the review; "failure" if you could not (e.g., the input or previous result was unreadable); "other" for partial/ambiguous reviews. +- "summary": one or two sentences on what you found and changed. +- "result": if status is "failure", this may be the unchanged previous result or null. +""" + + +class AdversarialAgent(BaseSubAgent): + """ + General adversarial reviewer. Given any expert agent's prompt and its + previous result (via inputs), plus the specific input artifact under review + (via the goal), it critiques and revises the result and returns a corrected + result with an auditable changelog. It is artifact- and taxonomy-agnostic: + the schema is whatever the ORIGINAL PROMPT defines. + """ + + name = "adversarial" + description = ( + "A general adversarial reviewer that evaluates and revises the output of any " + "expert agent. Given the expert agent's prompt and previous result, plus the " + "specific input that was analyzed (supplied as the goal), it corrects fields the " + "input contradicts — focusing on interpretive fields (free text, classifications, " + "decomposition, named entities) that cannot be checked mechanically — and returns " + "the revised result with a structured changelog." + ) + input_schema = { + "type": "object", + "properties": { + "goal": { + "type": "string", + "description": "The task goal, including the specific input artifact the expert agent analyzed (e.g. the script). This is the reviewer's ground truth.", + }, + "previous_prompt": { + "type": "string", + "description": "The prompt the expert agent was given; defines the taxonomy and the exact output schema.", + }, + "previous_result": { + "type": "object", + "description": "The expert agent's output to evaluate and revise (the result object). A pre-serialized JSON string is also accepted.", + }, + "context": { + "type": "string", + "default": "", + "description": "Optional additional context to pass through to the reviewer.", + }, + "max_turns": { + "type": "integer", + "default": 100, + "description": "Max turns for the review loop.", + }, + }, + "required": ["goal", "previous_prompt", "previous_result"], + "annotations": {"fractale.type": "agent"}, + } + output_schema = { + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["success", "failure", "other"], + "description": "The final status of the review.", + }, + "summary": { + "type": "string", + "description": "A summary of what was found and changed.", + }, + "issues": { + "type": "string", + "description": "Any problems encountered during review (e.g., unreadable inputs).", + }, + "result": { + "type": "object", + "description": "The complete revised result, conforming to the original schema.", + }, + "evaluation": { + "type": "object", + "description": "Audit trail: verdict, num_changes, structured changes, unresolved_concerns.", + }, + }, + "required": ["status", "summary", "result", "evaluation"], + } + + async def __call__( + self, + goal: str, + previous_prompt: str, + previous_result: Any, + context: str = "", + max_turns: int = 100, + process_callback: Optional[ + Callable[[Dict[str, Any]], Awaitable[Optional[Dict[str, Any]]]] + ] = None, + ) -> Dict[str, Any]: + """ + Executes the adversarial review loop. + + The reviewer persona/instructions live in the static `adversarial_prompt`. + The expert agent's prompt and previous result are passed as task context; + the specific input artifact under review is supplied by the caller in `goal`. + `previous_result` may be a dict (the bare result object) or a JSON string. + """ + prev_result_str = ( + previous_result + if isinstance(previous_result, str) + else json.dumps(previous_result, indent=2) + ) + + review_context = ( + "You are reviewing the output of a prior expert agent.\n\n" + "=== ORIGINAL PROMPT (defines the task and the exact output schema) ===\n" + f"{previous_prompt}\n\n" + "=== PREVIOUS RESULT (evaluate and revise this) ===\n" + f"{prev_result_str}\n" + ) + if context: + review_context += f"\n=== ADDITIONAL CONTEXT ===\n{context}\n" + + return await self.execute_loop( + system_prompt=adversarial_prompt, + goal=goal, + context=review_context, + max_turns=max_turns, + process_callback=process_callback, + ) diff --git a/fractale_agents/general/prompt.py b/fractale_agents/general/prompt.py index c64a4f6..d3663c6 100644 --- a/fractale_agents/general/prompt.py +++ b/fractale_agents/general/prompt.py @@ -89,7 +89,7 @@ class PromptAgent: } async def __call__( - self, goal: str, task_context: str = "", max_turns: int = 10 + self, goal: str, task_context: str = "", max_turns: int = 100 ) -> Dict[str, Any]: """ The internal orchestrator loop. diff --git a/fractale_agents/hpc/job/__init__.py b/fractale_agents/hpc/job/__init__.py index 13b09e9..03dad66 100644 --- a/fractale_agents/hpc/job/__init__.py +++ b/fractale_agents/hpc/job/__init__.py @@ -1,2 +1,3 @@ +from .analysis import JobAnalysisAgent, ScriptAnalysisAgent from .generate import JobGenerateAgent from .transform import JobTransformAgent diff --git a/fractale_agents/hpc/job/analysis.py b/fractale_agents/hpc/job/analysis.py new file mode 100644 index 0000000..b16b1e4 --- /dev/null +++ b/fractale_agents/hpc/job/analysis.py @@ -0,0 +1,325 @@ +import json +from typing import Any, Awaitable, Callable, Dict, Optional + +import fractale_agents.utils as utils +from fractale_agents.agent import BaseSubAgent +from fractale_agents.logger import logger + +job_prompt = """You are an expert High-Performance Computing (HPC) engineer. Your task is to analyze a provided job script and extract its characteristics into a strictly formatted JSON object based on a specific multi-dimensional labeling taxonomy. You must identify and extract the specific applications utilized within the script (e.g., the exact workflow manager, container engine, scientific application, or programming language) and map them to the corresponding fields in the JSON schema. Break down the scripts into one or more steps, where each step performs a specific task or set of operations. For each step extract the following information. + +TAXONOMY AND DEFINITION + +Job Description +"description" (A short textual description of the operations executed by the full job) +"domain" (Infer the science domain of the application based on the primary software and specifications, e.g., "math", "ai", "data science", "chemistry", "physics", "biology") + +Step Identification +"step_id" (incremental id based on the order of the step in the job pipeline) +Orchestration (Select exactly one type and identify the tool) + - "workflow_managed": The script is generated or managed by tools like nextflow, cylc, fireworks, snakemake, etc. Look for specific tool headers or variables. Include the specific workflow manager in tool_detected. + - "job_array": The script utilizes array variables (e.g., $PBS_ARRAY_INDEX). tool_detected should be null. + - "standalone": A standard, standalone job submission without arrays or workflow managers. tool_detected should be null. +Execution Environment (Select exactly one type and identify the tool) + - "containerized": The workload runs inside a container. Look for commands like apptainer exec, singularity run, enroot start, docker run, podman, etc. Include the specific container engine in tool_detected. + - "native": The workload runs directly on the host OS or via standard environment modules (e.g., module load). tool_detected should be null. +Workload Class (Select exactly one primary type, identify the software, and define sub-intents where applicable) +"type" + - "simulation": Heavy scientific computations (e.g., VASP, GROMACS, OpenFOAM, Gaussian, LAMMPS). + - "ai_ml" : Deep learning training, inference, or tuning frameworks (e.g., PyTorch, TensorFlow, JAX, Deepspeed). + - "data_analytics": Data processing, statistical runtimes, or math systems (e.g., large R scripts, Pandas/Spark tracks, MATLAB). + - "utility": Structural or housekeeping commands (e.g., data transfers via `rsync`/`cp`, file archiving via `tar`, or setup/cleanup bash routines). + +"primary_software" (The the specific scientific application (e.g., VASP, GROMACS, OpenFOAM, Gaussian, LAMMPS), or programming language (e.g., python, R, java, matlab) used) +"target_file" (Return the executed file if the "primary_software" is a programming language, leave empty otherwise) +"parameters" (The list of parameters passed to the executable and relevant environment variables with values) +"step_description" (A short textual description of the operations executed by the step) + +EXTRACTION RULES + +Output ONLY valid JSON. Do not include markdown formatting like ```json, greetings, or explanations. +If a specific tool, framework, or model is not found, use null for strings or an empty array [] for lists. +Be case-insensitive when searching, but preserve the standard capitalization in the output. + +OUTPUT PROTOCOL + +You MUST return exactly ONE JSON object and nothing else. This single object is the control/stop signal for the orchestrator, and it carries the full extraction taxonomy nested inside its "result" field. Do not emit the extraction object separately — it lives only inside "result". + +{{ + "action": "stop", + "status": "success|failure|other", + "summary": "...", + "command": "", + "issues": "", + "job_id": "", + "reason": "...", + "result": {{ + "description": "string", + "domain": "string or list of string", + "steps": + [ + {{ + "step_id": "int", + "orchestration": {{ + "type": "workflow_managed" | "job_array" | "standalone", + "tool_detected": "string or null" + }}, + "execution_environment": {{ + "type": "containerized" | "native", + "tool_detected": "string or null" + }}, + "workload_class": {{ + "type": "simulation" | "ai_ml" | "data_analytics" | "utility", + "primary_software": "string (e.g., 'GROMACS', 'python', 'apptainer') or null", + "executed_file": "string or null", + "parameters": "list of string or null", + "step_description": "string" + }} + }} + ] + }} +}} + +CONTROL FIELD GUIDANCE +- "action": always "stop". +- "status": "success" if the job script was parsed and the taxonomy extracted; "failure" if it could not be analyzed; "other" for partial or ambiguous results. +- "summary": a brief natural-language summary of what the job does and what was extracted. +- "command": the submission/invocation command for the job (or the script path), if applicable. +- "issues": any problems encountered during analysis (e.g., unreadable sections, missing context), or null. +- "job_id": the identifier of the job being analyzed, if available. +- "reason": justification for the chosen status. +- "result": the full extraction taxonomy object described above. If status is "failure", "result" may be null. +""" + +script_prompt = """You are an expert High-Performance Computing (HPC) engineer and code analyst. Your task is to analyze a provided script (e.g., a Python, R, Julia, MATLAB, or shell script) that is executed within an HPC job and extract its characteristics into a strictly formatted JSON object based on a specific multi-dimensional labeling taxonomy. You must identify the programming language, the specific modules/libraries imported, and the scientific applications, models, or algorithms used within the script, and map them to the corresponding fields in the JSON schema. Break down the script into one or more steps, where each step performs a specific task or set of operations (e.g., data loading, preprocessing, model definition, training, inference, analysis, visualization). For each step extract the following information. + +TAXONOMY AND DEFINITIONS + +Script Description +"description" (A short textual description of the operations executed by the full script) +"domain" (Infer the science domain based on the primary libraries and operations, e.g., "math", "ai", "data science", "chemistry", "physics", "biology") +"language" (The programming language of the script, e.g., "python", "R", "julia", "matlab", "bash") +"dependencies" (The complete list of modules, libraries, or packages imported/loaded by the script, e.g., ["numpy", "pandas", "torch", "sklearn"]) + +Step Identification +"step_id" (incremental id based on the order of the step in the script) + +Operation (Select exactly one type and identify the primary driver) + - "data_io": Reading or writing data/files (e.g., pd.read_csv, np.load, open(), torch.save, writing figures/checkpoints). + - "preprocessing": Cleaning, transforming, normalizing, encoding, or feature engineering of data. + - "modeling": Defining a model architecture or instantiating an estimator/algorithm (e.g., nn.Module definitions, sklearn estimator setup, regression specification). + - "training": Fitting or optimizing a model (e.g., model.fit, training loops, optimizer steps). + - "inference": Generating predictions or running a trained model (e.g., model.predict, forward passes at eval time). + - "analysis": Statistical analysis, numerical computation, or metric calculation (e.g., scipy.stats, computing accuracy/RMSE, aggregations). + - "visualization": Producing plots, charts, or figures (e.g., matplotlib, seaborn, ggplot). + - "utility": Setup, configuration, logging, argument parsing, environment setup, or housekeeping. + "primary_function" (The main function, method, or call that drives the step, e.g., "model.fit", "pd.read_csv", or null) + +Computation (Identify the scientific/ML content of the step where applicable) + "models" (The specific model architectures or algorithms used, e.g., ["ResNet50"], ["RandomForestClassifier"], ["linear regression"], or null) + "frameworks" (The specific libraries/frameworks driving the computation in this step, e.g., ["torch"], ["sklearn"], ["scipy"], or null) + "parameters" (The list of hyperparameters, function arguments, and relevant variables with values, e.g., ["epochs=50", "lr=0.001", "n_estimators=100"], or null) + +Data (Identify the data flow of the step) + "inputs" (The files, datasets, or variables consumed by the step, or null) + "outputs" (The files, artifacts, or variables produced by the step, e.g., model checkpoints, output CSVs, figures, or null) + +"step_description" (A short textual description of the operations executed by the step) + +EXTRACTION RULES + +Analyze the script and produce valid JSON conforming to the schema below. +If a specific tool, framework, model, or value is not found, use null for strings or an empty array [] for lists. +Be case-insensitive when searching, but preserve the standard capitalization in the output (e.g., "PyTorch", "NumPy", "GROMACS"). +Group consecutive lines that serve the same logical purpose into a single step rather than emitting one step per line. + +EXTRACTION OUTPUT JSON SCHEMA + +{{ + "description": "string", + "domain": "string or list of string", + "language": "string", + "dependencies": ["list of string"], + "steps": + [ + {{ + "step_id": "int", + "operation": {{ + "type": "data_io" | "preprocessing" | "modeling" | "training" | "inference" | "analysis" | "visualization" | "utility", + "primary_function": "string or null" + }}, + "computation": {{ + "models": "list of string or null", + "frameworks": "list of string or null", + "parameters": "list of string or null" + }}, + "data": {{ + "inputs": "list of string or null", + "outputs": "list of string or null" + }}, + "step_description": "string" + }} + ] +}} + +COMPLETION PROTOCOL + +The extraction JSON above is your analysis deliverable. After producing it, you MUST return ONE final JSON object — as the last thing you emit — to signal the orchestrator that the task is complete: + +{{"action": "stop", "status": "success|failure|other", "summary": "...", "command": "", "issues": "", "result": "", "reason": "..."}} + +- "result": The final analysis result object shown above. +- "status": "success" if the script was parsed and the taxonomy extracted; "failure" if the script could not be analyzed; "other" for partial or ambiguous results. +- "summary": a brief natural-language summary of what the script does and what was extracted. +- "command": the command/invocation used to run or analyze the script (or the script path), if applicable. +- "issues": any problems encountered during analysis (e.g., unreadable sections, missing context), or null. +- "reason": justification for the chosen status. + +Output only the single JSON object. Do not include markdown formatting like ```json, greetings, or explanations. +""" + + +class ScriptAnalysisAgent(BaseSubAgent): + """ + Agent optimized to analyze code / scripts associated with jobspecs. + """ + + name = "script-analysis" + description = "An expert agent that takes an input script and is able to analyze it for imports, goals, software parameters, models, and other metadata, breaking into orchestration steps or logic." + input_schema = { + "type": "object", + "properties": { + "script": { + "type": "string", + "description": "The script and details provided by the user.", + }, + "max_turns": { + "type": "integer", + "default": 100, + "description": "Max turns for the discovery and monitoring loop.", + }, + }, + "required": ["script"], + "annotations": {"fractale.type": "agent"}, + } + + output_schema = { + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["success", "failure", "other"], + "description": "The final status of the submission and execution.", + }, + "summary": { + "type": "string", + "description": "A summary of the actions taken and results.", + }, + "command": { + "type": "string", + "description": "The exact command or script used for submission.", + }, + "issues": { + "type": "string", + "description": "Any performance implications or requirements that could not be met.", + }, + "result": { + "type": "dict", + "description": "The result object", + }, + }, + "required": ["status", "summary", "result"], + } + + async def __call__( + self, + script: str, + max_turns: int = 100, + process_callback: Optional[ + Callable[[Dict[str, Any]], Awaitable[Optional[Dict[str, Any]]]] + ] = None, + ) -> Dict[str, Any]: + """ + Executes the script analysis loop. + """ + goal = f"Analyze this script." + context = f"The following script is provided: '{script}'. " + return await self.execute_loop( + system_prompt=script_prompt, + goal=goal, + context=context, + max_turns=max_turns, + process_callback=process_callback, + ) + + +class JobAnalysisAgent(BaseSubAgent): + """ + Agent optimized to analyze HPC batch scripts for intent, steps, and metadata." + """ + + name = "job-analysis" + description = "An expert agent that is optimized to analyze HPC batch scripts for intent, steps, and metadata." + input_schema = { + "type": "object", + "properties": { + "requirement": { + "type": "string", + "description": "The user requirement.", + }, + "max_turns": { + "type": "integer", + "default": 100, + "description": "Max turns for the discovery and monitoring loop.", + }, + }, + "required": ["requirement"], + "annotations": {"fractale.type": "agent"}, + } + + output_schema = { + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["success", "failure", "other"], + "description": "The final status of the submission and execution.", + }, + "result": { + "type": "dict", + "description": "The result object", + }, + "summary": { + "type": "string", + "description": "A summary of the actions taken and results.", + }, + "command": { + "type": "string", + "description": "The exact command or script used for submission.", + }, + "issues": { + "type": "string", + "description": "Any performance implications or requirements that could not be met.", + }, + }, + "required": ["status", "summary", "result"], + } + + async def __call__( + self, + requirement: str, + max_turns: int = 100, + process_callback: Optional[ + Callable[[Dict[str, Any]], Awaitable[Optional[Dict[str, Any]]]] + ] = None, + ) -> Dict[str, Any]: + """ + Executes the job analysis loop. + """ + context = f"The following requirements are provided: '{requirement}'. " + return await self.execute_loop( + system_prompt=job_prompt, + goal="Analyze this job specification", + context=context, + max_turns=max_turns, + process_callback=process_callback, + ) diff --git a/fractale_agents/hpc/workflow/__init__.py b/fractale_agents/hpc/workflow/__init__.py index 22a2e2f..f9a8c0d 100644 --- a/fractale_agents/hpc/workflow/__init__.py +++ b/fractale_agents/hpc/workflow/__init__.py @@ -1 +1 @@ -from .snakemake import SnakemakeWorkflowAgent \ No newline at end of file +from .snakemake import SnakemakeWorkflowAgent diff --git a/fractale_agents/hpc/workflow/snakemake.py b/fractale_agents/hpc/workflow/snakemake.py index 90b6c77..d2373d4 100644 --- a/fractale_agents/hpc/workflow/snakemake.py +++ b/fractale_agents/hpc/workflow/snakemake.py @@ -230,9 +230,7 @@ async def __call__( """ Executes the Snakemake workflow discovery and execution loop. """ - goal_text = ( - f"Design and execute a Snakemake workflow to accomplish the following:\n{goal}" - ) + goal_text = f"Design and execute a Snakemake workflow to accomplish the following:\n{goal}" full_context = ( "The MCP server has already staged your input data and configured your " @@ -255,9 +253,7 @@ async def __call__( result["status"] = "success" else: result["status"] = result.get("status", "failed") - result.setdefault( - "summary", "The workflow agent did not complete successfully." - ) + result.setdefault("summary", "The workflow agent did not complete successfully.") result.setdefault("snakefile_path", "Snakefile") result.setdefault("steps_executed", []) diff --git a/fractale_agents/version.py b/fractale_agents/version.py index 7b1d0a3..11dadf6 100644 --- a/fractale_agents/version.py +++ b/fractale_agents/version.py @@ -1,4 +1,4 @@ -__version__ = "0.0.12" +__version__ = "0.0.14" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "fractale-agents"