Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 97 additions & 10 deletions aios/memory/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,103 @@
# multi-agent systems

from aios.memory.base import BaseMemory
from threading import Lock
import pickle
import os

'''
TODO: implement the shared memory mechanism
'''
class SharedMemory(BaseMemory):
def __init__(self):
pass
def __init__(self, storage_path="shared_memory"):
"""Initialize shared memory

Args:
storage_path: Path for storing shared memory data
"""
self.storage_path = storage_path
self._memory = {} # Dictionary for memory data
self._lock = Lock() # Thread lock for synchronization

# Create storage directory if not exists
if not os.path.exists(storage_path):
os.makedirs(storage_path)

def save(self, key, value, agent_id=None):
"""Save data to shared memory

Args:
key: Key for the data
value: Data to be saved
agent_id: Optional agent identifier
"""
with self._lock:
# Use agent_id as namespace if specified
if agent_id:
if agent_id not in self._memory:
self._memory[agent_id] = {}
self._memory[agent_id][key] = value
else:
self._memory[key] = value

# Persist to disk
self._save_to_disk()

def save(self):
pass

def load(self):
pass
def load(self, key, agent_id=None):
"""Load data from shared memory

Args:
key: Key of data to load
agent_id: Optional agent identifier

Returns:
Loaded data, or None if not exists
"""
with self._lock:
# Load from disk if file exists
self._load_from_disk()

try:
if agent_id:
return self._memory.get(agent_id, {}).get(key)
return self._memory.get(key)
except KeyError:
return None

def _save_to_disk(self):
"""Persist memory data to disk"""
file_path = os.path.join(self.storage_path, "shared_memory.pkl")
with open(file_path, "wb") as f:
pickle.dump(self._memory, f)

def _load_from_disk(self):
"""Load data from disk to memory"""
file_path = os.path.join(self.storage_path, "shared_memory.pkl")
if os.path.exists(file_path):
with open(file_path, "rb") as f:
self._memory = pickle.load(f)

def clear(self, agent_id=None):
"""Clear shared memory data

Args:
agent_id: Optional agent identifier, if specified only clear data for this agent
"""
with self._lock:
if agent_id:
self._memory.pop(agent_id, None)
else:
self._memory.clear()
self._save_to_disk()

def get_all(self, agent_id=None):
"""Get all shared memory data

Args:
agent_id: Optional agent identifier, if specified only return data for this agent

Returns:
Dictionary containing all data
"""
with self._lock:
self._load_from_disk()
if agent_id:
return self._memory.get(agent_id, {})
return self._memory
27 changes: 19 additions & 8 deletions aios/scheduler/rr_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class RRScheduler:
def __init__(
self,
llm,
# memory_manager,
log_mode,
get_llm_request: LLMRequestQueueGetMessage,
get_memory_request: MemoryRequestQueueGetMessage,
Expand All @@ -41,21 +40,19 @@ def __init__(
self.get_memory_request = get_memory_request
self.get_storage_request = get_storage_request
self.get_tool_request = get_tool_request
self.active = False # start/stop the scheduler
self.active = False
self.log_mode = log_mode
self.logger = self.setup_logger()
# self.thread = Thread(target=self.run)
self.request_processors = {
"llm_syscall_processor": Thread(target=self.run_llm_request),
"mem_syscall_processor": Thread(target=self.run_memory_request),
"sto_syscall_processor": Thread(target=self.run_storage_request),
"tool_syscall_processor": Thread(target=self.run_tool_request)
# "memory_request_processor": Thread(self.run_memory_request)
}
self.llm = llm
self.time_limit = 5
self.simple_context_manager = SimpleContextManager()
# self.memory_manager = memory_manager
self.shared_memory = SharedMemory()

def start(self):
"""start the scheduler"""
Expand Down Expand Up @@ -111,14 +108,28 @@ def run_memory_request(self):
try:
# wait at a fixed time interval, if there is nothing received in the time interval, it will raise Empty
agent_request = self.get_memory_request()

agent_request.set_status("executing")
self.logger.log(
f"{agent_request.agent_name} is executing. \n", "execute"
)
agent_request.set_start_time(time.time())

response = self.memory_manager.address_request(agent_request)
# Handle different types of memory operations
if agent_request.operation == "save":
self.shared_memory.save(
agent_request.key,
agent_request.value,
agent_request.agent_id
)
response = {"status": "success", "operation": "save"}
elif agent_request.operation == "load":
value = self.shared_memory.load(
agent_request.key,
agent_request.agent_id
)
response = {"status": "success", "operation": "load", "value": value}

agent_request.set_response(response)

# self.llm.address_request(agent_request)
Expand All @@ -129,7 +140,7 @@ def run_memory_request(self):

self.logger.log(
f"Current request of {agent_request.agent_name} is done. Thread ID is {agent_request.get_pid()}\n",
"done",
"done"
)
# wait at a fixed time interval, if there is nothing received in the time interval, it will raise Empty

Expand Down