diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..57ca69e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +fastapi +pydantic +uvicorn +pyzmq +pytest +pytest-asyncio +httpx \ No newline at end of file diff --git a/src/marketing_organism/__init__.py b/src/marketing_organism/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/agents/__init__.py b/src/marketing_organism/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/agents/base.py b/src/marketing_organism/agents/base.py new file mode 100644 index 0000000..03ee5bc --- /dev/null +++ b/src/marketing_organism/agents/base.py @@ -0,0 +1,95 @@ +from abc import ABC, abstractmethod +import asyncio +from typing import Dict, Any, List +import uuid +import logging + +from src.marketing_organism.exceptions import AgentExecutionError + +logger = logging.getLogger(__name__) + +class BaseAgent(ABC): + def __init__(self, agent_id: str = None, max_memory_events: int = 100): + self.agent_id = agent_id or str(uuid.uuid4()) + self.memory: Dict[str, Any] = {} + self.state: str = "initialized" + self._running = False + self._task = None + self.event_queue = asyncio.Queue() + self.max_memory_events = max_memory_events + self._consecutive_errors = 0 + + async def perceive(self, event): + """Called by event bus when subscribed events occur.""" + await self.event_queue.put(event) + + @abstractmethod + async def decide(self) -> Any: + """Evaluate internal state and memory to decide next action.""" + pass + + @abstractmethod + async def act(self, action: Any): + """Execute the decided action.""" + pass + + async def _loop(self): + """Main agent perception-decision-action loop.""" + while self._running: + try: + # Perceive: fetch new events + # Use a timeout to ensure we periodically evaluate state + try: + event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0) + self._process_event(event) + self.event_queue.task_done() + except asyncio.TimeoutError: + pass + + # Decide + action = await self.decide() + + # Act + if action: + await self.act(action) + + self._consecutive_errors = 0 # reset on success + + except asyncio.CancelledError: + break + except Exception as e: + self._consecutive_errors += 1 + backoff_time = min(60, (2 ** self._consecutive_errors)) + logger.error( + f"AgentExecutionError: Error in agent loop for {self.agent_id}: {e}. Backing off for {backoff_time}s", + exc_info=True + ) + await asyncio.sleep(backoff_time) + + def _process_event(self, event): + """Internal method to update memory based on perceived event.""" + # Derived classes can override to format event for memory + if "recent_events" not in self.memory: + self.memory["recent_events"] = [] + self.memory["recent_events"].append(event) + + # Enforce memory eviction policy (FIFO based on configured limit) + if len(self.memory["recent_events"]) > self.max_memory_events: + self.memory["recent_events"] = self.memory["recent_events"][-self.max_memory_events:] + + def start(self): + if not self._running: + self.state = "running" + self._running = True + self._task = asyncio.create_task(self._loop()) + + async def stop(self): + self.state = "stopped" + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None diff --git a/src/marketing_organism/agents/lifecycle.py b/src/marketing_organism/agents/lifecycle.py new file mode 100644 index 0000000..e5fff8d --- /dev/null +++ b/src/marketing_organism/agents/lifecycle.py @@ -0,0 +1,37 @@ +from typing import Dict, Type, Any, Optional +from .base import BaseAgent + +class AgentManager: + def __init__(self): + self.active_agents: Dict[str, BaseAgent] = {} + self.performance_metrics: Dict[str, float] = {} + self.config: Dict[str, Any] = {} + + def spawn_agent(self, agent_class: Type[BaseAgent], agent_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None) -> BaseAgent: + """Dynamically instantiates and starts an agent.""" + agent = agent_class(agent_id=agent_id) + if config: + agent.memory.update(config) + + agent.start() + self.active_agents[agent.agent_id] = agent + self.performance_metrics[agent.agent_id] = 1.0 # default starting performance + + return agent + + async def retire_agent(self, agent_id: str): + """Gracefully stops and removes an underperforming or obsolete agent.""" + if agent_id in self.active_agents: + agent = self.active_agents[agent_id] + await agent.stop() + del self.active_agents[agent_id] + if agent_id in self.performance_metrics: + del self.performance_metrics[agent_id] + + def evaluate_agents(self, threshold: float = 0.5): + """Identify agents below the performance threshold for potential retirement.""" + underperforming = [] + for agent_id, metric in self.performance_metrics.items(): + if metric < threshold: + underperforming.append(agent_id) + return underperforming diff --git a/src/marketing_organism/event_bus/__init__.py b/src/marketing_organism/event_bus/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/event_bus/bus.py b/src/marketing_organism/event_bus/bus.py new file mode 100644 index 0000000..5f78092 --- /dev/null +++ b/src/marketing_organism/event_bus/bus.py @@ -0,0 +1,111 @@ +import asyncio +import logging +import fnmatch +from typing import Callable, Dict, List + +from src.marketing_organism.exceptions import EventBusError +from .events import BaseEvent + +logger = logging.getLogger(__name__) + +class TopicRouter: + def __init__(self): + self.subscriptions: Dict[str, List[Callable]] = {} + + def subscribe(self, topic_pattern: str, callback: Callable): + if topic_pattern not in self.subscriptions: + self.subscriptions[topic_pattern] = [] + self.subscriptions[topic_pattern].append(callback) + + def unsubscribe(self, topic_pattern: str, callback: Callable): + if topic_pattern in self.subscriptions: + try: + self.subscriptions[topic_pattern].remove(callback) + except ValueError: + pass + + def get_callbacks(self, topic: str) -> List[Callable]: + callbacks = [] + for pattern, subs in self.subscriptions.items(): + if fnmatch.fnmatch(topic, pattern): + callbacks.extend(subs) + return callbacks + +class EventBus: + def __init__(self, dlq_max_size: int = 1000): + self.router = TopicRouter() + self.queue = asyncio.Queue() + self.dlq = asyncio.Queue(maxsize=dlq_max_size) + self._running = False + self._task = None + + def subscribe(self, topic_pattern: str, callback: Callable): + self.router.subscribe(topic_pattern, callback) + + def unsubscribe(self, topic_pattern: str, callback: Callable): + self.router.unsubscribe(topic_pattern, callback) + + async def publish(self, topic: str, event: BaseEvent): + await self.queue.put((topic, event)) + + async def _process_events(self): + while self._running: + try: + topic, event = await self.queue.get() + callbacks = self.router.get_callbacks(topic) + + # Execute callbacks concurrently + if callbacks: + tasks = [] + for cb in callbacks: + if asyncio.iscoroutinefunction(cb): + # Wrap async callbacks to catch exceptions individually and send to DLQ + async def safe_cb(callback=cb, t=topic, e=event): + try: + await callback(t, e) + except Exception as err: + logger.error(f"EventBusError: Async callback for {t} failed: {err}", exc_info=True) + try: + self.dlq.put_nowait((t, e, str(err))) + except asyncio.QueueFull: + logger.warning("EventBusError: DLQ is full. Dropping failed async event.") + tasks.append(asyncio.create_task(safe_cb())) + else: + # If sync callback, just call it directly + try: + cb(topic, event) + except Exception as err_sync: + logger.error(f"EventBusError: Sync callback for {topic} failed: {err_sync}", exc_info=True) + try: + self.dlq.put_nowait((topic, event, str(err_sync))) + except asyncio.QueueFull: + logger.warning("EventBusError: DLQ is full. Dropping failed sync event.") + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + self.queue.task_done() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"EventBusError: Critical failure in event loop: {e}", exc_info=True) + # Send totally failed items to DLQ if possible + try: + self.dlq.put_nowait(("unknown_topic", None, str(e))) + except (asyncio.QueueFull, NameError): + logger.warning("EventBusError: Unable to place critical loop failure in DLQ.") + + def start(self): + if not self._running: + self._running = True + self._task = asyncio.create_task(self._process_events()) + + async def stop(self): + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None diff --git a/src/marketing_organism/event_bus/events.py b/src/marketing_organism/event_bus/events.py new file mode 100644 index 0000000..9391d32 --- /dev/null +++ b/src/marketing_organism/event_bus/events.py @@ -0,0 +1,48 @@ +import hashlib +import json +from pydantic import BaseModel, Field, model_validator +from typing import Any, Dict, Optional +import uuid +from datetime import datetime, timezone + +class BaseEvent(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + source: str + metadata: Dict[str, Any] = Field(default_factory=dict) + cryptographic_hash: str = Field(default="") + + @model_validator(mode='after') + def compute_hash(self) -> 'BaseEvent': + if not self.cryptographic_hash: + # Create a deterministic representation for hashing + data_to_hash = { + "id": self.id, + "timestamp": self.timestamp.isoformat(), + "source": self.source, + "metadata": self.metadata + } + # For subclasses, add their specific fields to the hash + for field in self.model_fields.keys(): + if field not in ["id", "timestamp", "source", "metadata", "cryptographic_hash"]: + data_to_hash[field] = getattr(self, field) + + encoded = json.dumps(data_to_hash, sort_keys=True).encode('utf-8') + self.cryptographic_hash = hashlib.sha256(encoded).hexdigest() + return self + +class PerformanceAnomalyEvent(BaseEvent): + metric: str + deviation: float + direction: str # e.g., "up", "down" + context: Optional[str] = None + +class AudienceSignalEvent(BaseEvent): + signal_type: str + confidence: float + segment: str + +class CapabilityGapEvent(BaseEvent): + gap_type: str + description: str + priority: int diff --git a/src/marketing_organism/evolution/__init__.py b/src/marketing_organism/evolution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/evolution/genome.py b/src/marketing_organism/evolution/genome.py new file mode 100644 index 0000000..670fd5b --- /dev/null +++ b/src/marketing_organism/evolution/genome.py @@ -0,0 +1,71 @@ +import uuid +import random +import copy +import hashlib +import json +from typing import Dict, Any + +class StrategyGenome: + def __init__(self, parameters: Dict[str, Any] = None, lineage: list = None): + self.genome_id = str(uuid.uuid4()) + self.parameters = parameters or {} + # Core genes representation + self.genes = { + "objective_weights": self.parameters.get("objective_weights", [1.0, 0.5, 0.2]), + "audience_targeting": self.parameters.get("audience_targeting", ["segment_a"]), + "budget_allocation": self.parameters.get("budget_allocation", 100.0), + "adaptation_rate": self.parameters.get("adaptation_rate", 0.05) + } + self.fitness_history = [] + self.lineage = lineage or [] + self.cryptographic_hash = self._compute_hash() + + def _compute_hash(self) -> str: + data = { + "genome_id": self.genome_id, + "genes": self.genes, + "lineage": self.lineage + } + encoded = json.dumps(data, sort_keys=True).encode('utf-8') + return hashlib.sha256(encoded).hexdigest() + + def mutate(self, mutation_rate: float = 0.1): + """Randomly alters a subset of parameters.""" + mutated_genes = copy.deepcopy(self.genes) + + # Simple point mutations + for i in range(len(mutated_genes["objective_weights"])): + if random.random() < mutation_rate: + mutated_genes["objective_weights"][i] += random.uniform(-0.1, 0.1) + + if random.random() < mutation_rate: + mutated_genes["budget_allocation"] *= random.uniform(0.9, 1.1) + + if random.random() < mutation_rate: + mutated_genes["adaptation_rate"] = max(0.01, mutated_genes["adaptation_rate"] + random.uniform(-0.02, 0.02)) + + offspring = StrategyGenome(parameters=mutated_genes, lineage=self.lineage + [self.cryptographic_hash]) + return offspring + + def crossover(self, other_genome: 'StrategyGenome') -> 'StrategyGenome': + """Combines genes from self and another genome.""" + child_genes = {} + + # Pick traits from parent 1 or 2 + for key in self.genes: + if random.random() < 0.5: + child_genes[key] = copy.deepcopy(self.genes[key]) + else: + child_genes[key] = copy.deepcopy(other_genome.genes[key]) + + offspring = StrategyGenome(parameters=child_genes, lineage=[self.cryptographic_hash, other_genome.cryptographic_hash]) + return offspring + + def update_fitness(self, score: float): + self.fitness_history.append(score) + + @property + def current_fitness(self) -> float: + if not self.fitness_history: + return 0.0 + return sum(self.fitness_history[-5:]) / len(self.fitness_history[-5:]) diff --git a/src/marketing_organism/evolution/selection.py b/src/marketing_organism/evolution/selection.py new file mode 100644 index 0000000..4305f31 --- /dev/null +++ b/src/marketing_organism/evolution/selection.py @@ -0,0 +1,96 @@ +import random +from typing import Dict, List +from .genome import StrategyGenome + +class EvolutionarySelector: + def __init__(self): + self.population: Dict[str, StrategyGenome] = {} + self.performance_metrics: Dict[str, float] = {} + self.generation: int = 0 + + def add_genome(self, genome: StrategyGenome): + self.population[genome.genome_id] = genome + + def evaluate_fitness(self, genome_id: str, metric: float): + """Update fitness score for a specific genome based on its recent performance.""" + if genome_id in self.population: + genome = self.population[genome_id] + genome.update_fitness(metric) + self.performance_metrics[genome_id] = genome.current_fitness + + def calculate_diversity_score(self, target_genome: StrategyGenome) -> float: + """Calculates how unique a genome is compared to the rest of the population.""" + if len(self.population) <= 1: + return 1.0 + + diversity_sum = 0.0 + for genome_id, other_genome in self.population.items(): + if genome_id == target_genome.genome_id: + continue + + # Simple Euclidean distance equivalent for objective_weights to represent structural diversity + target_weights = target_genome.genes.get("objective_weights", []) + other_weights = other_genome.genes.get("objective_weights", []) + + if len(target_weights) == len(other_weights) and len(target_weights) > 0: + dist = sum((a - b) ** 2 for a, b in zip(target_weights, other_weights)) ** 0.5 + diversity_sum += dist + + return diversity_sum / (len(self.population) - 1) + + def reallocate_resources(self, min_threshold: float = 0.3, diversity_weight: float = 0.2): + """Eliminates low-performing strategies, retaining those that preserve diversity.""" + def combined_score(genome): + fitness = genome.current_fitness + diversity = self.calculate_diversity_score(genome) + return (fitness * (1 - diversity_weight)) + (diversity * diversity_weight) + + ranked_genomes = sorted( + self.population.items(), + key=lambda x: combined_score(x[1]), + reverse=True + ) + + underperforming = [g_id for g_id, genome in ranked_genomes if combined_score(genome) < min_threshold and len(genome.fitness_history) >= 5] + + for g_id in underperforming: + # Terminate and remove + del self.population[g_id] + if g_id in self.performance_metrics: + del self.performance_metrics[g_id] + + # In a full system, you would proportionally map the remaining genomes to available resources + + def spawn_generation(self, mutation_rate: float = 0.1, crossover_prob: float = 0.3, diversity_weight: float = 0.2): + """Create a new generation from top performers via mutation and crossover.""" + if not self.population: + return + + def combined_score(genome): + fitness = genome.current_fitness + diversity = self.calculate_diversity_score(genome) + return (fitness * (1 - diversity_weight)) + (diversity * diversity_weight) + + ranked_genomes = sorted( + self.population.values(), + key=lambda g: combined_score(g), + reverse=True + ) + + # Retain top 50% + parents = ranked_genomes[:max(1, len(ranked_genomes) // 2)] + + new_offspring = [] + for _ in range(len(self.population) - len(parents)): + parent1 = parents[0] # simplified selection (e.g., top parent) + if len(parents) > 1 and random.random() < crossover_prob: + parent2 = random.choice(parents[1:]) + child = parent1.crossover(parent2) + else: + child = parent1.mutate(mutation_rate) + new_offspring.append(child) + + for child in new_offspring: + self.add_genome(child) + + self.generation += 1 diff --git a/src/marketing_organism/exceptions.py b/src/marketing_organism/exceptions.py new file mode 100644 index 0000000..076a89a --- /dev/null +++ b/src/marketing_organism/exceptions.py @@ -0,0 +1,25 @@ +"""Custom exception hierarchies for the Autonomous Marketing Organism.""" + +class OrganismError(Exception): + """Base exception class for all Organism-related errors.""" + pass + +class AgentExecutionError(OrganismError): + """Raised when an agent encounters a critical failure during its PDA loop.""" + pass + +class EventBusError(OrganismError): + """Raised when the event bus fails to publish or route an event.""" + pass + +class KnowledgeGraphError(OrganismError): + """Raised when a persistent storage operation fails.""" + pass + +class LLMIntegrationError(OrganismError): + """Raised when communication with the language model backend fails.""" + pass + +class ToolGenerationError(OrganismError): + """Raised when dynamic tool synthesis or validation fails.""" + pass diff --git a/src/marketing_organism/knowledge/__init__.py b/src/marketing_organism/knowledge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/knowledge/graph.py b/src/marketing_organism/knowledge/graph.py new file mode 100644 index 0000000..0803510 --- /dev/null +++ b/src/marketing_organism/knowledge/graph.py @@ -0,0 +1,142 @@ +import json +import logging +import asyncio +import sqlite3 +from abc import ABC, abstractmethod +from typing import Dict, Any, List + +logger = logging.getLogger(__name__) + +class BaseKnowledgeGraph(ABC): + """Abstract Base Class defining the interface for persistent knowledge storage.""" + + @abstractmethod + async def store_entity(self, entity_id: str, data: Dict[str, Any]) -> None: + """Creates or updates a graph node.""" + pass + + @abstractmethod + async def get_entity(self, entity_id: str) -> Dict[str, Any]: + """Retrieves a graph node by ID.""" + pass + + @abstractmethod + async def add_relationship(self, source_id: str, target_id: str, relationship_type: str, weight: float = 1.0) -> None: + """Creates an edge between two entities.""" + pass + + @abstractmethod + async def query_relations(self, source_id: str) -> List[Dict[str, Any]]: + """Returns all connected edges from a node.""" + pass + + @abstractmethod + async def query_by_type(self, entity_type: str) -> List[Dict[str, Any]]: + """Finds entities by their 'type' attribute.""" + pass + + +class KnowledgeGraph(BaseKnowledgeGraph): + """SQLite-backed implementation of the Knowledge Graph.""" + + def __init__(self, in_memory: bool = True, db_path: str = None): + self.in_memory = in_memory + self.db_path = db_path if not in_memory and db_path else ":memory:" + self._lock = asyncio.Lock() + + # When using an in-memory db, sqlite closes the db when the connection object is destroyed. + # We need a persistent connection for in_memory across function calls. + self._conn = sqlite3.connect(self.db_path, check_same_thread=False) + self._init_db() + + def _init_db(self): + cursor = self._conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS entities ( + id TEXT PRIMARY KEY, + type TEXT, + data TEXT + ) + ''') + cursor.execute(''' + CREATE TABLE IF NOT EXISTS relationships ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id TEXT, + target_id TEXT, + type TEXT, + weight REAL, + FOREIGN KEY(source_id) REFERENCES entities(id), + FOREIGN KEY(target_id) REFERENCES entities(id) + ) + ''') + self._conn.commit() + + async def store_entity(self, entity_id: str, data: Dict[str, Any]): + """Creates or updates a graph node.""" + async with self._lock: + def _insert(): + entity_type = data.get("type", "") + self._conn.execute( + "INSERT OR REPLACE INTO entities (id, type, data) VALUES (?, ?, ?)", + (entity_id, entity_type, json.dumps(data)) + ) + self._conn.commit() + await asyncio.to_thread(_insert) + + async def get_entity(self, entity_id: str) -> Dict[str, Any]: + async with self._lock: + def _get(): + cursor = self._conn.cursor() + cursor.execute("SELECT data FROM entities WHERE id = ?", (entity_id,)) + row = cursor.fetchone() + if row: + return json.loads(row[0]) + return {} + return await asyncio.to_thread(_get) + + async def add_relationship(self, source_id: str, target_id: str, relationship_type: str, weight: float = 1.0): + """Creates an edge between two entities.""" + async with self._lock: + def _insert_edge(): + self._conn.execute( + "INSERT INTO relationships (source_id, target_id, type, weight) VALUES (?, ?, ?, ?)", + (source_id, target_id, relationship_type, weight) + ) + self._conn.commit() + await asyncio.to_thread(_insert_edge) + + async def query_relations(self, source_id: str) -> List[Dict[str, Any]]: + """Returns all connected edges from a node.""" + async with self._lock: + def _query(): + cursor = self._conn.cursor() + cursor.execute("SELECT target_id, type, weight FROM relationships WHERE source_id = ?", (source_id,)) + results = [] + for row in cursor.fetchall(): + results.append({ + "target": row[0], + "type": row[1], + "weight": row[2] + }) + return results + return await asyncio.to_thread(_query) + + async def query_by_type(self, entity_type: str) -> List[Dict[str, Any]]: + """Finds entities by their 'type' attribute.""" + async with self._lock: + def _query_type(): + cursor = self._conn.cursor() + cursor.execute("SELECT id, data FROM entities WHERE type = ?", (entity_type,)) + results = [] + for row in cursor.fetchall(): + data = json.loads(row[1]) + results.append({"id": row[0], **data}) + return results + return await asyncio.to_thread(_query_type) + + def __del__(self): + try: + if hasattr(self, '_conn') and self._conn: + self._conn.close() + except Exception: + pass diff --git a/src/marketing_organism/llm/__init__.py b/src/marketing_organism/llm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/llm/reasoning.py b/src/marketing_organism/llm/reasoning.py new file mode 100644 index 0000000..9723835 --- /dev/null +++ b/src/marketing_organism/llm/reasoning.py @@ -0,0 +1,95 @@ +"""Reasoning module providing prompt chaining and LLM interaction logic.""" + +import logging +from typing import List + +import httpx +from src.marketing_organism.exceptions import LLMIntegrationError + +logger = logging.getLogger(__name__) + +class PromptChainer: + """Manages sequential execution of LLM tasks and multi-step reasoning. + + Attributes: + endpoint_url: The URL to the local FastAPI wrapper endpoint. + client: The asynchronous HTTP client. + """ + + def __init__(self, endpoint_url: str = "http://127.0.0.1:8000") -> None: + """Initializes the PromptChainer. + + Args: + endpoint_url: The base URL of the LLM generation service. + """ + self.endpoint_url = endpoint_url + self.client = httpx.AsyncClient() + + async def _call_llm(self, prompt: str, timeout: float = 60.0) -> str: + """Sends a prompt to the LLM backend. + + Args: + prompt: The instruction text to send. + timeout: Max time in seconds to wait for a response. + + Returns: + The generated text string from the LLM. + """ + try: + response = await self.client.post( + f"{self.endpoint_url}/generate", + json={"prompt": prompt, "max_tokens": 1024, "temperature": 0.3}, + timeout=timeout + ) + response.raise_for_status() + data = response.json() + return str(data.get("generated_text", "")) + except httpx.RequestError as e: + logger.error(f"LLMIntegrationError: Request failed: {e}", exc_info=True) + return "" + except httpx.HTTPStatusError as e: + logger.error(f"LLMIntegrationError: HTTP error {e.response.status_code}: {e}", exc_info=True) + return "" + except Exception as e: + logger.error(f"LLMIntegrationError: Unexpected error calling LLM: {e}", exc_info=True) + return "" + + async def execute_chain(self, task_list: List[str]) -> List[str]: + """Executes a sequence of subtasks as a sequential prompt chain. + + Args: + task_list: A list of tasks to execute in order. + + Returns: + A list of string outputs corresponding to each task result. + """ + results = [] + context = "" + for i, task in enumerate(task_list): + prompt = f"Task: {task}\nContext: {context}\nPlease generate the next step or output." + output = await self._call_llm(prompt) + results.append(output) + context += f"\nResult {i}: {output}" + + return results + + async def decompose_task(self, goal: str) -> List[str]: + """Decomposes a high-level goal into actionable sub-tasks. + + Args: + goal: The overarching objective to be broken down. + + Returns: + A list of smaller, actionable step descriptions. + """ + prompt = f"Decompose the following goal into a sequence of actionable steps:\nGoal: {goal}" + result = await self._call_llm(prompt) + + # In a real implementation, parse result into a list of steps. + # Mock parsing: + steps = [step.strip() for step in result.split('\n') if step.strip()] + return steps if steps else ["Perform task execution"] + + async def close(self) -> None: + """Closes the underlying HTTP client session.""" + await self.client.aclose() diff --git a/src/marketing_organism/llm/service.py b/src/marketing_organism/llm/service.py new file mode 100644 index 0000000..ec23d68 --- /dev/null +++ b/src/marketing_organism/llm/service.py @@ -0,0 +1,87 @@ +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import asyncio +import logging +import httpx +import os + +app = FastAPI(title="Local LLM Service Wrapper") +logger = logging.getLogger("llm_service") + +# Optionally configure this to point to a real local Ollama/OpenAI API compatible backend +LLM_BACKEND_URL = os.getenv("LLM_BACKEND_URL", "http://127.0.0.1:11434/api/generate") + +class GenerateRequest(BaseModel): + prompt: str + max_tokens: int = 256 + temperature: float = 0.7 + model: str = "qwen" + +class EmbedRequest(BaseModel): + text: str + model: str = "nomic-embed-text" + +class GenerateResponse(BaseModel): + generated_text: str + +class EmbedResponse(BaseModel): + embeddings: list[float] + +@app.post("/generate", response_model=GenerateResponse) +async def generate(req: GenerateRequest): + logger.info(f"Received generation request: {req.prompt[:50]}...") + + # Attempt to proxy the request to a real local LLM backend if configured and reachable + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(LLM_BACKEND_URL, json={ + "model": req.model, + "prompt": req.prompt, + "stream": False, + "options": { + "temperature": req.temperature, + "num_predict": req.max_tokens + } + }) + response.raise_for_status() + data = response.json() + return {"generated_text": data.get("response", "")} + except Exception as e: + logger.warning(f"Failed to reach actual LLM backend ({e}). Falling back to mocked generation.") + await asyncio.sleep(0.5) + return {"generated_text": f"Mocked fallback LLM generation for prompt '{req.prompt}'"} + +@app.post("/embed", response_model=EmbedResponse) +async def embed(req: EmbedRequest): + logger.info("Received embedding request") + + embed_url = LLM_BACKEND_URL.replace("/generate", "/embeddings") + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post(embed_url, json={ + "model": req.model, + "prompt": req.text + }) + response.raise_for_status() + data = response.json() + return {"embeddings": data.get("embedding", [])} + except Exception as e: + logger.warning(f"Failed to reach actual LLM backend for embedding ({e}). Falling back to mocked extraction.") + await asyncio.sleep(0.1) + return {"embeddings": [0.1, 0.2, 0.3, 0.4]} + +class FastAPIService: + def __init__(self, host="127.0.0.1", port=8000): + self.host = host + self.port = port + self.server = None + + def start(self): + import uvicorn + config = uvicorn.Config(app, host=self.host, port=self.port, loop="asyncio") + self.server = uvicorn.Server(config) + return self.server.serve() + + async def stop(self): + if self.server: + self.server.should_exit = True diff --git a/src/marketing_organism/main.py b/src/marketing_organism/main.py new file mode 100644 index 0000000..d52d67a --- /dev/null +++ b/src/marketing_organism/main.py @@ -0,0 +1,85 @@ +import asyncio +import logging +from src.marketing_organism.event_bus.bus import EventBus +from src.marketing_organism.agents.lifecycle import AgentManager +from src.marketing_organism.agents.base import BaseAgent +from src.marketing_organism.evolution.selection import EvolutionarySelector +from src.marketing_organism.knowledge.graph import KnowledgeGraph +from src.marketing_organism.llm.reasoning import PromptChainer +from src.marketing_organism.tool_forge.generator import ToolGenerator + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("organism_main") + +class OrchestratorAgent(BaseAgent): + """A baseline agent to handle system orchestration tasks.""" + def __init__(self, *args, event_bus: EventBus = None, knowledge_graph: KnowledgeGraph = None, **kwargs): + super().__init__(*args, **kwargs) + self.event_bus = event_bus + self.knowledge_graph = knowledge_graph + + async def decide(self): + # The orchestrator could check system health or DLQ here + await asyncio.sleep(5) + return "health_check" + + async def act(self, action): + if action == "health_check": + logger.info("Orchestrator Agent: System is healthy.") + +async def main(): + logger.info("Starting Autonomous Adaptive Marketing Ecosystem Orchestrator...") + + # 1. Initialize Core Dependencies + event_bus = EventBus(dlq_max_size=1000) + event_bus.start() + logger.info("Event Bus initialized.") + + knowledge_graph = KnowledgeGraph(in_memory=False, db_path="marketing_organism.db") + logger.info("Knowledge Graph (SQLite) initialized.") + + prompt_chainer = PromptChainer(endpoint_url="http://127.0.0.1:8000") + tool_generator = ToolGenerator(workspace_path="./generated_tools", prompt_chainer=prompt_chainer) + logger.info("Tool Forge and LLM Integration initialized.") + + evolution_engine = EvolutionarySelector() + logger.info("Evolution Engine initialized.") + + # 2. Initialize Agent Manager and spawn baseline agent using Dependency Injection + agent_manager = AgentManager() + orchestrator = agent_manager.spawn_agent( + OrchestratorAgent, + config={"role": "orchestrator"}, + ) + # Inject dependencies post-spawn or via a custom factory method in a real system + orchestrator.event_bus = event_bus + orchestrator.knowledge_graph = knowledge_graph + logger.info(f"Orchestrator Agent spawned with ID: {orchestrator.agent_id}") + + try: + # Keep the main loop alive + while True: + await asyncio.sleep(60) + + # Periodically evaluate agents + underperforming = agent_manager.evaluate_agents(threshold=0.3) + for agent_id in underperforming: + logger.info(f"Retiring underperforming agent: {agent_id}") + await agent_manager.retire_agent(agent_id) + + except asyncio.CancelledError: + logger.info("Shutting down ecosystem...") + finally: + await event_bus.stop() + for agent_id in list(agent_manager.active_agents.keys()): + await agent_manager.retire_agent(agent_id) + logger.info("Ecosystem shutdown complete.") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Process interrupted by user.") \ No newline at end of file diff --git a/src/marketing_organism/tool_forge/__init__.py b/src/marketing_organism/tool_forge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_organism/tool_forge/generator.py b/src/marketing_organism/tool_forge/generator.py new file mode 100644 index 0000000..cdbe91c --- /dev/null +++ b/src/marketing_organism/tool_forge/generator.py @@ -0,0 +1,147 @@ +"""Dynamic tool generation module resolving capability gaps in the ecosystem.""" + +import logging +import uuid +import os +from typing import Dict, Any, Optional + +from src.marketing_organism.llm.reasoning import PromptChainer +from src.marketing_organism.exceptions import ToolGenerationError + +logger = logging.getLogger(__name__) + +class ToolGenerator: + """Automates creation and validation of new tool capabilities via LLM.""" + + def __init__(self, workspace_path: str = "./generated_tools", prompt_chainer: Optional[PromptChainer] = None) -> None: + """Initializes the ToolGenerator. + + Args: + workspace_path: Path to the directory where generated tools are saved. + prompt_chainer: Optional PromptChainer instance to use for code synthesis. + """ + self.workspace_path = workspace_path + self.prompt_chainer = prompt_chainer or PromptChainer() + if not os.path.exists(workspace_path): + os.makedirs(workspace_path) + + async def analyze_gap(self, gap_description: str) -> Dict[str, Any]: + """Analyzes a capability gap and outlines a tool spec. + + Args: + gap_description: A description of the missing system capability. + + Returns: + A dictionary containing the generated specification details. + """ + logger.info(f"Analyzing capability gap: {gap_description}") + return { + "name": f"tool_{uuid.uuid4().hex[:8]}", + "type": "python", + "description": f"Generated tool to address: {gap_description}" + } + + async def generate_tool(self, gap_description: str) -> str: + """Synthesizes a tool script dynamically using the LLM prompt chainer. + + Args: + gap_description: Description of the functionality the tool should implement. + + Returns: + The filepath of the newly generated tool. + + Raises: + ToolGenerationError: If file writing fails. + """ + spec = await self.analyze_gap(gap_description) + tool_name = spec["name"] + + prompt = f"""Write a Python script to fulfill the following capability gap in a marketing automation ecosystem: +Gap: {gap_description} + +Requirements: +- The script MUST define a main function named `{tool_name}(*args, **kwargs)`. +- The script MUST NOT import `os`, `sys`, or `subprocess` due to security constraints. +- Output ONLY valid Python code, no markdown blocks, no explanations. +""" + # Call LLM to generate code dynamically + generated_code = await self.prompt_chainer._call_llm(prompt, timeout=120.0) + + # Clean up common markdown wrappings if the LLM ignores instructions + if generated_code.startswith("```python"): + generated_code = generated_code[9:] + if generated_code.startswith("```"): + generated_code = generated_code[3:] + if generated_code.endswith("```"): + generated_code = generated_code[:-3] + + generated_code = generated_code.strip() + + # Fallback if LLM fails + if not generated_code: + generated_code = f''' +import logging + +def {tool_name}(*args, **kwargs): + """ + Fallback auto-generated tool to fulfill gap: + {gap_description} + """ + logging.info(f"Executing fallback tool {tool_name}") + return "Operation successful" + +if __name__ == "__main__": + {tool_name}() +''' + + filepath = os.path.join(self.workspace_path, f"{tool_name}.py") + try: + with open(filepath, "w", encoding="utf-8") as f: + f.write(generated_code) + logger.info(f"Tool {tool_name} successfully generated at {filepath}") + return filepath + except Exception as e: + error_msg = f"Failed to generate tool {tool_name}: {e}" + logger.error(error_msg, exc_info=True) + raise ToolGenerationError(error_msg) from e + + def validate_tool(self, filepath: str) -> bool: + """Runs basic syntactic and static analysis on generated tools. + + Args: + filepath: The location of the generated script. + + Returns: + True if the tool passes static analysis constraints, False otherwise. + """ + if not os.path.exists(filepath): + return False + + try: + # Check compilation + import py_compile + py_compile.compile(filepath, doraise=True) + + # Static AST analysis for unsafe operations + import ast + with open(filepath, "r") as f: + tree = ast.parse(f.read()) + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + if alias.name in ("os", "subprocess", "sys"): + logger.warning(f"ToolGenerationError: Unsafe import '{alias.name}' found in {filepath}") + return False + elif isinstance(node, ast.ImportFrom): + if node.module in ("os", "subprocess", "sys"): + logger.warning(f"ToolGenerationError: Unsafe import from '{node.module}' found in {filepath}") + return False + + return True + except py_compile.PyCompileError as e: + logger.error(f"ToolGenerationError: Tool compilation failed for {filepath}: {e}") + return False + except SyntaxError as e: + logger.error(f"ToolGenerationError: Syntax error during AST parsing for {filepath}: {e}") + return False diff --git a/tests/test_agents.py b/tests/test_agents.py new file mode 100644 index 0000000..1a01ba5 --- /dev/null +++ b/tests/test_agents.py @@ -0,0 +1,71 @@ +import pytest +import asyncio +from typing import Any +from src.marketing_organism.agents.base import BaseAgent +from src.marketing_organism.agents.lifecycle import AgentManager +from src.marketing_organism.event_bus.events import BaseEvent + +class DummyAgent(BaseAgent): + async def decide(self) -> Any: + if self.memory.get("recent_events"): + return "process_events" + return None + + async def act(self, action: Any): + if action == "process_events": + self.memory["processed"] = True + self.memory["recent_events"] = [] + +class CustomTestEvent(BaseEvent): + metric: str = "test" + value: float = 1.0 + +@pytest.mark.asyncio +async def test_agent_lifecycle(): + manager = AgentManager() + + agent = manager.spawn_agent(DummyAgent, config={"custom_var": 42}) + assert agent.state == "running" + assert agent.memory.get("custom_var") == 42 + + event = CustomTestEvent(source="test", metadata={}) + await agent.perceive(event) + + # Wait for the PDA loop to run + await asyncio.sleep(0.1) + + assert agent.memory.get("processed") is True + assert len(agent.memory.get("recent_events")) == 0 + + await manager.retire_agent(agent.agent_id) + assert agent.state == "stopped" + assert agent.agent_id not in manager.active_agents + +@pytest.mark.asyncio +async def test_memory_eviction(): + manager = AgentManager() + agent = manager.spawn_agent(DummyAgent) + agent.max_memory_events = 5 + + # We will bypass normal loop processing to just test memory appending + for _ in range(10): + event = CustomTestEvent(source="test", metadata={}) + agent._process_event(event) + + assert len(agent.memory["recent_events"]) == 5 + +@pytest.mark.asyncio +async def test_evaluate_agents(): + manager = AgentManager() + agent1 = manager.spawn_agent(DummyAgent) + agent2 = manager.spawn_agent(DummyAgent) + + manager.performance_metrics[agent1.agent_id] = 0.8 + manager.performance_metrics[agent2.agent_id] = 0.2 + + underperforming = manager.evaluate_agents(threshold=0.5) + assert len(underperforming) == 1 + assert underperforming[0] == agent2.agent_id + + await manager.retire_agent(agent1.agent_id) + await manager.retire_agent(agent2.agent_id) diff --git a/tests/test_event_bus.py b/tests/test_event_bus.py new file mode 100644 index 0000000..a268cdf --- /dev/null +++ b/tests/test_event_bus.py @@ -0,0 +1,66 @@ +import pytest +import asyncio +from src.marketing_organism.event_bus.bus import EventBus +from src.marketing_organism.event_bus.events import BaseEvent + +class CustomTestEvent(BaseEvent): + metric: str = "test" + value: float = 1.0 + +@pytest.mark.asyncio +async def test_event_bus(): + bus = EventBus() + bus.start() + + received_events = [] + + async def callback(topic, event): + received_events.append((topic, event)) + + bus.subscribe("test.*", callback) + + event1 = CustomTestEvent(source="test", metadata={}) + await bus.publish("test.event1", event1) + + # Wait for processing + await asyncio.sleep(0.1) + + await bus.stop() + + assert len(received_events) == 1 + assert received_events[0][0] == "test.event1" + assert received_events[0][1].id == event1.id + +def test_event_cryptographic_hash(): + event1 = CustomTestEvent(source="test", metadata={"key": "val"}) + assert event1.cryptographic_hash is not None + assert len(event1.cryptographic_hash) == 64 + + # Same event data should yield same hash + import copy + event2 = CustomTestEvent(id=event1.id, timestamp=event1.timestamp, source=event1.source, metadata=event1.metadata, metric="test", value=1.0) + assert event1.cryptographic_hash == event2.cryptographic_hash + +@pytest.mark.asyncio +async def test_dlq_on_error(): + bus = EventBus() + bus.start() + + async def failing_callback(topic, event): + raise ValueError("Simulated failure") + + bus.subscribe("error.*", failing_callback) + + event1 = CustomTestEvent(source="test", metadata={}) + await bus.publish("error.event1", event1) + + await asyncio.sleep(0.1) + + assert bus.dlq.qsize() == 1 + dlq_item = await bus.dlq.get() + + assert dlq_item[0] == "error.event1" + assert dlq_item[1].id == event1.id + assert "Simulated failure" in dlq_item[2] + + await bus.stop() diff --git a/tests/test_evolution.py b/tests/test_evolution.py new file mode 100644 index 0000000..37d0967 --- /dev/null +++ b/tests/test_evolution.py @@ -0,0 +1,74 @@ +import pytest +from src.marketing_organism.evolution.genome import StrategyGenome +from src.marketing_organism.evolution.selection import EvolutionarySelector + +def test_genome_mutation(): + genome = StrategyGenome(parameters={"objective_weights": [1.0, 1.0, 1.0]}) + # Force mutation + mutated = genome.mutate(mutation_rate=1.0) + assert mutated.genome_id != genome.genome_id + assert mutated.lineage == [genome.cryptographic_hash] + + # Assert parameters changed + assert mutated.genes["budget_allocation"] != genome.genes["budget_allocation"] + +def test_genome_crossover(): + genome1 = StrategyGenome(parameters={"budget_allocation": 100.0, "adaptation_rate": 0.05}) + genome2 = StrategyGenome(parameters={"budget_allocation": 200.0, "adaptation_rate": 0.1}) + + child = genome1.crossover(genome2) + assert child.genome_id != genome1.genome_id + assert child.genome_id != genome2.genome_id + assert child.lineage == [genome1.cryptographic_hash, genome2.cryptographic_hash] + assert child.genes["budget_allocation"] in [100.0, 200.0] + assert child.genes["adaptation_rate"] in [0.05, 0.1] + +def test_evolutionary_selector(): + selector = EvolutionarySelector() + + g1 = StrategyGenome() + g2 = StrategyGenome() + g3 = StrategyGenome() + + selector.add_genome(g1) + selector.add_genome(g2) + selector.add_genome(g3) + + # Simulate fitness + for _ in range(5): + selector.evaluate_fitness(g1.genome_id, 0.1) # Low performer + selector.evaluate_fitness(g2.genome_id, 0.9) # High performer + selector.evaluate_fitness(g3.genome_id, 0.8) # High performer + + selector.reallocate_resources(min_threshold=0.3) + + assert g1.genome_id not in selector.population + assert g2.genome_id in selector.population + assert g3.genome_id in selector.population + + # Spawn new generation + selector.spawn_generation() + assert selector.generation == 1 + assert len(selector.population) >= 2 + +def test_evolutionary_diversity(): + selector = EvolutionarySelector() + + # 3 identical genomes, 1 distinct genome + g1 = StrategyGenome(parameters={"objective_weights": [1.0, 1.0, 1.0]}) + g2 = StrategyGenome(parameters={"objective_weights": [1.0, 1.0, 1.0]}) + g3 = StrategyGenome(parameters={"objective_weights": [1.0, 1.0, 1.0]}) + + # This one is very structurally different + g4_distinct = StrategyGenome(parameters={"objective_weights": [10.0, 10.0, 10.0]}) + + selector.add_genome(g1) + selector.add_genome(g2) + selector.add_genome(g3) + selector.add_genome(g4_distinct) + + div_1 = selector.calculate_diversity_score(g1) + div_4 = selector.calculate_diversity_score(g4_distinct) + + # Distinct genome should have a much higher diversity score + assert div_4 > div_1 diff --git a/tests/test_knowledge_toolforge.py b/tests/test_knowledge_toolforge.py new file mode 100644 index 0000000..776d315 --- /dev/null +++ b/tests/test_knowledge_toolforge.py @@ -0,0 +1,88 @@ +import pytest +from src.marketing_organism.knowledge.graph import KnowledgeGraph +from src.marketing_organism.tool_forge.generator import ToolGenerator +import os +import uuid + +@pytest.mark.asyncio +async def test_knowledge_graph(): + kg = KnowledgeGraph(in_memory=True) + await kg.store_entity("entity1", {"name": "Node A", "type": "campaign"}) + await kg.store_entity("entity2", {"name": "Node B", "type": "audience"}) + + e1 = await kg.get_entity("entity1") + e2 = await kg.get_entity("entity2") + + assert e1["name"] == "Node A" + assert e2["type"] == "audience" + + await kg.add_relationship("entity1", "entity2", "targets") + + relations = await kg.query_relations("entity1") + assert len(relations) == 1 + assert relations[0]["target"] == "entity2" + assert relations[0]["type"] == "targets" + + campaigns = await kg.query_by_type("campaign") + assert len(campaigns) == 1 + assert campaigns[0]["id"] == "entity1" + +from unittest.mock import AsyncMock, patch + +@pytest.mark.asyncio +async def test_tool_generator(tmp_path): + generator = ToolGenerator(workspace_path=str(tmp_path)) + + gap_description = "Need to parse unstructured social media text" + + # We will patch analyze_gap so the uuid generated matches during test + original_analyze_gap = generator.analyze_gap + async def mock_analyze(gap): + return { + "name": f"tool_mocked123", + "type": "python", + "description": f"Generated tool to address: {gap}" + } + generator.analyze_gap = mock_analyze + + spec = await generator.analyze_gap(gap_description) + assert spec["name"].startswith("tool_") + + # Mock LLM backend response + mock_llm_response = ''' +import logging + +def tool_mocked123(*args, **kwargs): + """ + Auto-generated tool to fulfill gap: + Need to parse unstructured social media text + """ + logging.info(f"Executing auto-generated tool tool_mocked123") + return "Operation successful" + +if __name__ == "__main__": + tool_mocked123() +''' + with patch.object(generator.prompt_chainer, '_call_llm', return_value=mock_llm_response) as mock_call: + filepath = await generator.generate_tool(gap_description) + assert filepath.endswith(".py") + assert os.path.exists(filepath) + + with open(filepath, "r") as f: + content = f.read() + assert gap_description in content + assert spec["name"] in content + + assert generator.validate_tool(filepath) is True + + # Test AST Unsafe scanner + unsafe_code = """ +import os +def bad_tool(): + os.system("rm -rf /") +""" + unsafe_filepath = os.path.join(generator.workspace_path, "tool_unsafe.py") + with open(unsafe_filepath, "w") as f: + f.write(unsafe_code) + + assert generator.validate_tool(unsafe_filepath) is False diff --git a/tests/test_llm.py b/tests/test_llm.py new file mode 100644 index 0000000..d747511 --- /dev/null +++ b/tests/test_llm.py @@ -0,0 +1,46 @@ +import pytest +import asyncio +from src.marketing_organism.llm.service import FastAPIService +from src.marketing_organism.llm.reasoning import PromptChainer +from httpx import AsyncClient, ASGITransport + +@pytest.mark.asyncio +async def test_llm_service(): + service = FastAPIService(port=8001) + + # Simple direct testing of endpoints using FastAPIService's internal app directly + from src.marketing_organism.llm.service import app + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + response = await ac.post("/generate", json={"prompt": "test"}) + assert response.status_code == 200 + assert "mocked" in response.json()["generated_text"].lower() + +from unittest.mock import AsyncMock, patch + +from unittest.mock import AsyncMock, patch, MagicMock + +@pytest.mark.asyncio +async def test_prompt_chainer(): + chainer = PromptChainer(endpoint_url="http://127.0.0.1:8001") + + mock_response = MagicMock() + mock_response.json.return_value = {"generated_text": "Mocked Step"} + mock_response.raise_for_status = MagicMock() + + with patch.object(chainer.client, 'post', return_value=mock_response) as mock_post: + # For an AsyncClient, post is async, so we need to return the sync mock object from an async coroutine. + mock_post_async = AsyncMock(return_value=mock_response) + chainer.client.post = mock_post_async + + steps = ["step1", "step2"] + results = await chainer.execute_chain(steps) + + assert len(results) == 2 + assert results[0] == "Mocked Step" + assert results[1] == "Mocked Step" + + decomposition = await chainer.decompose_task("Complex Goal") + assert len(decomposition) == 1 # "Mocked Step" + + await chainer.close()