diff --git a/aios/memory/shared_memory.py b/aios/memory/shared_memory.py index fbd490ba..d4b01325 100644 --- a/aios/memory/shared_memory.py +++ b/aios/memory/shared_memory.py @@ -2,16 +2,103 @@ # multi-agent systems from aios.memory.base import BaseMemory +from threading import Lock +import pickle +import os -''' -TODO: implement the shared memory mechanism -''' class SharedMemory(BaseMemory): - def __init__(self): - pass + def __init__(self, storage_path="shared_memory"): + """Initialize shared memory + + Args: + storage_path: Path for storing shared memory data + """ + self.storage_path = storage_path + self._memory = {} # Dictionary for memory data + self._lock = Lock() # Thread lock for synchronization + + # Create storage directory if not exists + if not os.path.exists(storage_path): + os.makedirs(storage_path) + + def save(self, key, value, agent_id=None): + """Save data to shared memory + + Args: + key: Key for the data + value: Data to be saved + agent_id: Optional agent identifier + """ + with self._lock: + # Use agent_id as namespace if specified + if agent_id: + if agent_id not in self._memory: + self._memory[agent_id] = {} + self._memory[agent_id][key] = value + else: + self._memory[key] = value + + # Persist to disk + self._save_to_disk() - def save(self): - pass - - def load(self): - pass + def load(self, key, agent_id=None): + """Load data from shared memory + + Args: + key: Key of data to load + agent_id: Optional agent identifier + + Returns: + Loaded data, or None if not exists + """ + with self._lock: + # Load from disk if file exists + self._load_from_disk() + + try: + if agent_id: + return self._memory.get(agent_id, {}).get(key) + return self._memory.get(key) + except KeyError: + return None + + def _save_to_disk(self): + """Persist memory data to disk""" + file_path = os.path.join(self.storage_path, "shared_memory.pkl") + with open(file_path, "wb") as f: + pickle.dump(self._memory, f) + + def _load_from_disk(self): + """Load data from disk to memory""" + file_path = os.path.join(self.storage_path, "shared_memory.pkl") + if os.path.exists(file_path): + with open(file_path, "rb") as f: + self._memory = pickle.load(f) + + def clear(self, agent_id=None): + """Clear shared memory data + + Args: + agent_id: Optional agent identifier, if specified only clear data for this agent + """ + with self._lock: + if agent_id: + self._memory.pop(agent_id, None) + else: + self._memory.clear() + self._save_to_disk() + + def get_all(self, agent_id=None): + """Get all shared memory data + + Args: + agent_id: Optional agent identifier, if specified only return data for this agent + + Returns: + Dictionary containing all data + """ + with self._lock: + self._load_from_disk() + if agent_id: + return self._memory.get(agent_id, {}) + return self._memory diff --git a/aios/scheduler/rr_scheduler.py b/aios/scheduler/rr_scheduler.py index 3142b79a..f0e0e5ab 100644 --- a/aios/scheduler/rr_scheduler.py +++ b/aios/scheduler/rr_scheduler.py @@ -29,7 +29,6 @@ class RRScheduler: def __init__( self, llm, - # memory_manager, log_mode, get_llm_request: LLMRequestQueueGetMessage, get_memory_request: MemoryRequestQueueGetMessage, @@ -41,21 +40,19 @@ def __init__( self.get_memory_request = get_memory_request self.get_storage_request = get_storage_request self.get_tool_request = get_tool_request - self.active = False # start/stop the scheduler + self.active = False self.log_mode = log_mode self.logger = self.setup_logger() - # self.thread = Thread(target=self.run) self.request_processors = { "llm_syscall_processor": Thread(target=self.run_llm_request), "mem_syscall_processor": Thread(target=self.run_memory_request), "sto_syscall_processor": Thread(target=self.run_storage_request), "tool_syscall_processor": Thread(target=self.run_tool_request) - # "memory_request_processor": Thread(self.run_memory_request) } self.llm = llm self.time_limit = 5 self.simple_context_manager = SimpleContextManager() - # self.memory_manager = memory_manager + self.shared_memory = SharedMemory() def start(self): """start the scheduler""" @@ -111,14 +108,28 @@ def run_memory_request(self): try: # wait at a fixed time interval, if there is nothing received in the time interval, it will raise Empty agent_request = self.get_memory_request() - + agent_request.set_status("executing") self.logger.log( f"{agent_request.agent_name} is executing. \n", "execute" ) agent_request.set_start_time(time.time()) - response = self.memory_manager.address_request(agent_request) + # Handle different types of memory operations + if agent_request.operation == "save": + self.shared_memory.save( + agent_request.key, + agent_request.value, + agent_request.agent_id + ) + response = {"status": "success", "operation": "save"} + elif agent_request.operation == "load": + value = self.shared_memory.load( + agent_request.key, + agent_request.agent_id + ) + response = {"status": "success", "operation": "load", "value": value} + agent_request.set_response(response) # self.llm.address_request(agent_request) @@ -129,7 +140,7 @@ def run_memory_request(self): self.logger.log( f"Current request of {agent_request.agent_name} is done. Thread ID is {agent_request.get_pid()}\n", - "done", + "done" ) # wait at a fixed time interval, if there is nothing received in the time interval, it will raise Empty