|
1 | | -from typing import Optional, Dict, Any |
2 | 1 | import os |
| 2 | +from typing import Any, Optional |
3 | 3 |
|
4 | 4 | from codegen.agents.client.openapi_client.api.agents_api import AgentsApi |
5 | 5 | from codegen.agents.client.openapi_client.api_client import ApiClient |
| 6 | +from codegen.agents.client.openapi_client.configuration import Configuration |
6 | 7 | from codegen.agents.client.openapi_client.models.agent_run_response import AgentRunResponse |
7 | 8 | from codegen.agents.client.openapi_client.models.create_agent_run_input import CreateAgentRunInput |
8 | | -from codegen.agents.client.openapi_client.configuration import Configuration |
9 | 9 | from codegen.agents.constants import CODEGEN_BASE_API_URL |
10 | 10 |
|
11 | 11 |
|
12 | 12 | class AgentTask: |
13 | 13 | """Represents an agent run job.""" |
14 | | - |
| 14 | + |
15 | 15 | def __init__(self, task_data: AgentRunResponse, api_client: ApiClient, org_id: int): |
16 | 16 | self.id = task_data.id |
17 | 17 | self.org_id = org_id |
18 | 18 | self.status = task_data.status |
19 | 19 | self.result = task_data.result |
20 | 20 | self._api_client = api_client |
21 | 21 | self._agents_api = AgentsApi(api_client) |
22 | | - |
| 22 | + |
23 | 23 | def refresh(self) -> None: |
24 | 24 | """Refresh the job status from the API.""" |
25 | 25 | if self.id is None: |
26 | 26 | return |
27 | | - |
| 27 | + |
28 | 28 | job_data = self._agents_api.get_agent_run_v1_organizations_org_id_agent_run_agent_run_id_get( |
29 | | - agent_run_id=int(self.id), |
30 | | - org_id=int(self.org_id), |
31 | | - authorization=f"Bearer {self._api_client.configuration.access_token}" |
| 29 | + agent_run_id=int(self.id), org_id=int(self.org_id), authorization=f"Bearer {self._api_client.configuration.access_token}" |
32 | 30 | ) |
33 | | - |
| 31 | + |
34 | 32 | # Convert API response to dict for attribute access |
35 | 33 | job_dict = {} |
36 | | - if hasattr(job_data, '__dict__'): |
| 34 | + if hasattr(job_data, "__dict__"): |
37 | 35 | job_dict = job_data.__dict__ |
38 | 36 | elif isinstance(job_data, dict): |
39 | 37 | job_dict = job_data |
40 | | - |
| 38 | + |
41 | 39 | self.status = job_dict.get("status") |
42 | 40 | self.result = job_dict.get("result") |
43 | 41 |
|
44 | 42 |
|
45 | 43 | class Agent: |
46 | 44 | """API client for interacting with Codegen AI agents.""" |
47 | | - |
| 45 | + |
48 | 46 | def __init__(self, token: str, org_id: Optional[int] = None, base_url: Optional[str] = CODEGEN_BASE_API_URL): |
49 | | - """ |
50 | | - Initialize a new Agent client. |
51 | | - |
| 47 | + """Initialize a new Agent client. |
| 48 | +
|
52 | 49 | Args: |
53 | 50 | token: API authentication token |
54 | 51 | org_id: Optional organization ID. If not provided, default org will be used. |
55 | 52 | """ |
56 | 53 | self.token = token |
57 | 54 | self.org_id = org_id or int(os.environ.get("CODEGEN_ORG_ID", "1")) # Default to org ID 1 if not specified |
58 | | - |
| 55 | + |
59 | 56 | # Configure API client |
60 | 57 | config = Configuration(host=base_url, access_token=token) |
61 | 58 | self.api_client = ApiClient(configuration=config) |
62 | 59 | self.agents_api = AgentsApi(self.api_client) |
63 | | - |
| 60 | + |
64 | 61 | # Current job |
65 | 62 | self.current_job = None |
66 | | - |
| 63 | + |
67 | 64 | def run(self, prompt: str) -> AgentTask: |
68 | | - """ |
69 | | - Run an agent with the given prompt. |
70 | | - |
| 65 | + """Run an agent with the given prompt. |
| 66 | +
|
71 | 67 | Args: |
72 | 68 | prompt: The instruction for the agent to execute |
73 | | - |
| 69 | +
|
74 | 70 | Returns: |
75 | 71 | Job: A job object representing the agent run |
76 | 72 | """ |
77 | 73 | run_input = CreateAgentRunInput(prompt=prompt) |
78 | 74 | agent_run_response = self.agents_api.create_agent_run_v1_organizations_org_id_agent_run_post( |
79 | | - org_id=int(self.org_id), |
80 | | - create_agent_run_input=run_input, |
81 | | - authorization=f"Bearer {self.token}", |
82 | | - _headers={"Content-Type": "application/json"} |
| 75 | + org_id=int(self.org_id), create_agent_run_input=run_input, authorization=f"Bearer {self.token}", _headers={"Content-Type": "application/json"} |
83 | 76 | ) |
84 | | - |
| 77 | + |
85 | 78 | # Convert API response to dict for Job initialization |
86 | | - |
| 79 | + |
87 | 80 | job = AgentTask(agent_run_response, self.api_client, self.org_id) |
88 | 81 | self.current_job = job |
89 | 82 | return job |
90 | | - |
91 | | - def get_status(self) -> Optional[Dict[str, Any]]: |
92 | | - """ |
93 | | - Get the status of the current job. |
94 | | - |
| 83 | + |
| 84 | + def get_status(self) -> Optional[dict[str, Any]]: |
| 85 | + """Get the status of the current job. |
| 86 | +
|
95 | 87 | Returns: |
96 | 88 | dict: A dictionary containing job status information, |
97 | 89 | or None if no job has been run. |
98 | 90 | """ |
99 | 91 | if self.current_job: |
100 | 92 | self.current_job.refresh() |
101 | | - return { |
102 | | - "id": self.current_job.id, |
103 | | - "status": self.current_job.status, |
104 | | - "result": self.current_job.result |
105 | | - } |
| 93 | + return {"id": self.current_job.id, "status": self.current_job.status, "result": self.current_job.result} |
106 | 94 | return None |
0 commit comments