Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): low balance notiifcation #9534

Open
wants to merge 27 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6626fd9
feat(backend): Add cost on node & graph execution stats
majdyz Feb 25, 2025
e6e40de
fix(backend): email better
ntindle Feb 26, 2025
eb671a0
fix(backend): update to send data in the correct format with normal e…
ntindle Feb 26, 2025
a2297a8
fix: remove john's face :(
ntindle Feb 26, 2025
478ce0a
feat: refactor email a bit more
ntindle Feb 26, 2025
c62a462
fix: formatting
ntindle Feb 26, 2025
2e061e3
Merge branch 'dev' into zamilmajdy/add-graph-level-cost
ntindle Feb 26, 2025
1d1d1d2
fix: pull request review changes
ntindle Feb 27, 2025
91c4614
Merge branch 'dev' into zamilmajdy/add-graph-level-cost
ntindle Feb 27, 2025
5221d1d
fix: remove loggin
ntindle Feb 27, 2025
08b8398
feat(backend): initial low balance warning
ntindle Feb 27, 2025
f65967a
Merge branch 'dev' into zamilmajdy/add-graph-level-cost
ntindle Feb 27, 2025
b7430db
fix: remove bad comments
ntindle Feb 27, 2025
a1a7b5d
Merge branch 'zamilmajdy/add-graph-level-cost' into ntindle/secrt-112…
ntindle Feb 27, 2025
1e77b7c
fix(backend): raise instead of returning
ntindle Feb 27, 2025
c2f6511
feat: more features to put in line with spec
ntindle Feb 27, 2025
45073f4
feat:backend): changes for low balance
ntindle Feb 27, 2025
8884be3
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into nt…
majdyz Feb 28, 2025
0854b86
Fix & Refactor
majdyz Feb 28, 2025
975db7b
feat: error for insufficient balance
ntindle Feb 28, 2025
ddeca01
fix: linting
ntindle Feb 28, 2025
d738722
Merge branch 'dev' into ntindle/secrt-1120-low-balance-preventing-blo…
ntindle Mar 3, 2025
e32672b
feat: data model changes
ntindle Mar 3, 2025
6330b57
Merge branch 'ntindle/secrt-1120-low-balance-preventing-block-executi…
ntindle Mar 3, 2025
0c40280
fix: link in email
ntindle Mar 3, 2025
2642173
fix: paused->stopped
ntindle Mar 4, 2025
1fe69ab
fix: john change requests
ntindle Mar 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion autogpt_platform/backend/backend/data/credit.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,13 @@ async def _add_transaction(
if amount < 0 and user_balance + amount < 0:
if fail_insufficient_credits:
raise ValueError(
f"Insufficient balance of ${user_balance/100}, where this will cost ${abs(amount)/100}"
f"Insufficient balance of ${user_balance/100}, where this will cost ${abs(amount)/100}",
{
"user_id": user_id,
"balance": user_balance,
"amount": amount,
"type": "low_balance",
},
)
amount = min(-user_balance, 0)

Expand Down
15 changes: 9 additions & 6 deletions autogpt_platform/backend/backend/data/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class AgentRunData(BaseNotificationData):
execution_time: float
node_count: int = Field(..., description="Number of nodes executed")
graph_id: str
outputs: dict[str, Any] = Field(..., description="Outputs of the agent")
outputs: list[dict[str, Any]] = Field(..., description="Outputs of the agent")


class ZeroBalanceData(BaseNotificationData):
Expand All @@ -49,10 +49,13 @@ class ZeroBalanceData(BaseNotificationData):


class LowBalanceData(BaseNotificationData):
current_balance: float
threshold_amount: float
top_up_link: str
recent_usage: float = Field(..., description="Usage in the last 24 hours")
agent_name: str = Field(..., description="Name of the agent")
current_balance: float = Field(
..., description="Current balance in credits (100 = $1)"
)
top_up_link: str = Field(..., description="Link to top up credits")
top_up_amount: float = Field(..., description="Amount of credits to top up")
shortfall: float = Field(..., description="Amount of credits needed to continue")


class BlockExecutionFailedData(BaseNotificationData):
Expand Down Expand Up @@ -197,7 +200,7 @@ def strategy(self) -> QueueType:
NotificationType.AGENT_RUN: QueueType.IMMEDIATE,
# These are batched by the notification service, but with a backoff strategy
NotificationType.ZERO_BALANCE: QueueType.BACKOFF,
NotificationType.LOW_BALANCE: QueueType.BACKOFF,
NotificationType.LOW_BALANCE: QueueType.IMMEDIATE,
NotificationType.BLOCK_EXECUTION_FAILED: QueueType.BACKOFF,
NotificationType.CONTINUOUS_AGENT_ERROR: QueueType.BACKOFF,
NotificationType.DAILY_SUMMARY: QueueType.DAILY,
Expand Down
5 changes: 5 additions & 0 deletions autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ async def _spend_credits(entry: NodeExecutionEntry) -> int:
return await _user_credit_model.spend_credits(entry, 0, 0)


async def _top_up_intent(user_id: str, amount: int) -> str:
return await _user_credit_model.top_up_intent(user_id, amount)


class DatabaseManager(AppService):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -68,6 +72,7 @@ def send_execution_update(self, execution_result: ExecutionResult):

# Credits
spend_credits = exposed_run_and_wait(_spend_credits)
top_up_intent = exposed_run_and_wait(_top_up_intent)

# User + User Metadata + User Integrations
get_user_metadata = exposed_run_and_wait(get_user_metadata)
Expand Down
116 changes: 99 additions & 17 deletions autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

from redis.lock import Lock as RedisLock

from backend.blocks.basic import AgentOutputBlock
from backend.data.notifications import (
AgentRunData,
LowBalanceData,
NotificationEventDTO,
NotificationType,
)
Expand Down Expand Up @@ -115,7 +117,6 @@ def _wrap(self, msg: str, **extra):
def execute_node(
db_client: "DatabaseManager",
creds_manager: IntegrationCredentialsManager,
notification_service: "NotificationManager",
data: NodeExecutionEntry,
execution_stats: dict[str, Any] | None = None,
) -> ExecutionStream:
Expand Down Expand Up @@ -201,6 +202,7 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
extra_exec_kwargs[field_name] = credentials

output_size = 0
cost = 0
try:
# Charge the user for the execution before running the block.
cost = db_client.spend_credits(data)
Expand All @@ -227,21 +229,6 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:

# Update execution status and spend credits
update_execution(ExecutionStatus.COMPLETED)
event = NotificationEventDTO(
user_id=user_id,
type=NotificationType.AGENT_RUN,
data=AgentRunData(
outputs=outputs,
agent_name=node_block.name,
credits_used=cost,
execution_time=0,
graph_id=graph_id,
node_count=1,
).model_dump(),
)

logger.info(f"Sending notification for {event}")
notification_service.queue_notification(event)

except Exception as e:
error_msg = str(e)
Expand Down Expand Up @@ -273,6 +260,7 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
execution_stats.update(node_block.execution_stats)
execution_stats["input_size"] = input_size
execution_stats["output_size"] = output_size
execution_stats["cost"] = cost


def _enqueue_next_nodes(
Expand Down Expand Up @@ -582,7 +570,6 @@ def _on_node_execution(
for execution in execute_node(
db_client=cls.db_client,
creds_manager=cls.creds_manager,
notification_service=cls.notification_service,
data=node_exec,
execution_stats=stats,
):
Expand All @@ -594,6 +581,14 @@ def _on_node_execution(
log_metadata.info(
f"Failed node execution {node_exec.node_exec_id}: {e}"
)
if (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will trigger multiple times

len(e.args) == 2
and "type" in e.args[1]
and e.args[1]["type"] == "low_balance"
):
cls._handle_low_balance_notif(
node_exec.user_id, node_exec.graph_id, stats or {}, e
)
else:
log_metadata.exception(
f"Failed node execution {node_exec.node_exec_id}: {e}"
Expand Down Expand Up @@ -658,6 +653,8 @@ def on_graph_execution(
)
cls.db_client.send_execution_update(result)

cls._handle_agent_run_notif(graph_exec, timing_info.wall_time, exec_stats)

@classmethod
@time_measured
def _on_graph_execution(
Expand All @@ -677,6 +674,7 @@ def _on_graph_execution(
"nodes_walltime": 0,
"nodes_cputime": 0,
"node_count": 0,
"cost": 0,
}
error = None
finished = False
Expand Down Expand Up @@ -714,6 +712,7 @@ def callback(result: object):
exec_stats["node_count"] += 1
exec_stats["nodes_cputime"] += result.get("cputime", 0)
exec_stats["nodes_walltime"] += result.get("walltime", 0)
exec_stats["cost"] += result.get("cost", 0)

return callback

Expand Down Expand Up @@ -763,6 +762,14 @@ def callback(result: object):
f"Failed graph execution {graph_exec.graph_exec_id}: {e}"
)
error = e
if (
len(e.args) == 2
and "type" in e.args[1]
and e.args[1]["type"] == "low_balance"
):
cls._handle_low_balance_notif(
graph_exec.user_id, graph_exec.graph_id, exec_stats, e
)
finally:
if not cancel.is_set():
finished = True
Expand All @@ -776,6 +783,81 @@ def callback(result: object):
error,
)

@classmethod
def _handle_agent_run_notif(
cls,
graph_exec: GraphExecutionEntry,
wall_time: float,
exec_stats: dict[str, Any],
):
metadata = cls.db_client.get_graph_metadata(
graph_exec.graph_id, graph_exec.graph_version
)
assert metadata is not None
outputs = cls.db_client.get_execution_results(graph_exec.graph_exec_id)

# Collect named outputs as a list of dictionaries
named_outputs = []
for output in outputs:
if output.block_id == AgentOutputBlock().id:
# Create a dictionary for this named output
named_output = {
# Include the name as a field in each output
"name": (
output.output_data["name"][0]
if isinstance(output.output_data["name"], list)
else output.output_data["name"]
)
}

# Add all other fields
for key, value in output.output_data.items():
if key != "name":
named_output[key] = value

named_outputs.append(named_output)

event = NotificationEventDTO(
user_id=graph_exec.user_id,
type=NotificationType.AGENT_RUN,
data=AgentRunData(
outputs=named_outputs,
agent_name=metadata.name,
credits_used=exec_stats["cost"],
execution_time=wall_time,
graph_id=graph_exec.graph_id,
node_count=exec_stats["node_count"],
).model_dump(),
)

logger.info(
f"Sending Agent Run Notification for {graph_exec.user_id}, {metadata.name}"
)
get_notification_service().queue_notification(event)

@classmethod
def _handle_low_balance_notif(
cls, user_id: str, graph_id: str, exec_stats: dict, e: Exception
):
shortfall = e.args[1]["balance"] - e.args[1]["amount"]
metadata = cls.db_client.get_graph_metadata(graph_id)
top_up_amount = max(shortfall, 500)
top_up_link = cls.db_client.top_up_intent(user_id, top_up_amount)
logger.info(f"Sending low balance notification for user {user_id}")
cls.notification_service.queue_notification(
NotificationEventDTO(
user_id=user_id,
type=NotificationType.LOW_BALANCE,
data=LowBalanceData(
current_balance=exec_stats["cost"] if exec_stats else 0,
top_up_link=top_up_link,
top_up_amount=top_up_amount,
shortfall=shortfall,
agent_name=metadata.name if metadata else "Unknown",
).model_dump(),
)
)


class ExecutionManager(AppService):
def __init__(self):
Expand Down
Loading