-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
151 lines (123 loc) · 5.06 KB
/
database.py
File metadata and controls
151 lines (123 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""SQLAlchemy database setup and models."""
from datetime import datetime, timezone
from sqlalchemy import (
Column, Integer, String, Text, Boolean, DateTime, ForeignKey,
create_engine, event
)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from sqlalchemy.pool import StaticPool
from config import DATABASE_URL, DATABASE_PATH
Base = declarative_base()
class Project(Base):
"""Project model."""
__tablename__ = "projects"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), nullable=False)
description = Column(Text, default="")
path = Column(String(500), default="")
notes = Column(Text, default="")
priority = Column(String(20), default="medium") # highest, high, medium, low, lowest
paused = Column(Boolean, default=False)
position = Column(Integer, default=0)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
tasks = relationship("Task", back_populates="project", cascade="all, delete-orphan")
services = relationship("Service", back_populates="project", cascade="all, delete-orphan")
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"description": self.description,
"path": self.path,
"notes": self.notes,
"priority": self.priority or "medium",
"paused": self.paused or False,
"position": self.position,
"created_at": self.created_at.isoformat() if self.created_at else None,
"tasks": [t.to_dict() for t in sorted(self.tasks, key=lambda x: x.priority)],
"services": [s.to_dict() for s in self.services],
}
class Task(Base):
"""Task model."""
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, autoincrement=True)
project_id = Column(Integer, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False)
title = Column(String(500), nullable=False)
completed = Column(Boolean, default=False)
highlighted = Column(Boolean, default=False)
blue_highlighted = Column(Boolean, default=False)
priority = Column(Integer, default=0)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
project = relationship("Project", back_populates="tasks")
def to_dict(self):
return {
"id": self.id,
"project_id": self.project_id,
"title": self.title,
"completed": self.completed,
"highlighted": self.highlighted,
"blue_highlighted": self.blue_highlighted,
"priority": self.priority,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class Service(Base):
"""Service model for health monitoring."""
__tablename__ = "services"
id = Column(Integer, primary_key=True, autoincrement=True)
project_id = Column(Integer, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False)
name = Column(String(255), nullable=False)
check_type = Column(String(50), default="http") # http, tcp, command
target = Column(String(500), nullable=False) # URL, host:port, or shell command
status = Column(String(50), default="unknown") # up, down, unknown
status_code = Column(Integer, nullable=True)
response_time_ms = Column(Integer, nullable=True)
last_error = Column(Text, nullable=True)
last_checked = Column(DateTime, nullable=True)
consecutive_failures = Column(Integer, default=0)
# Relationships
project = relationship("Project", back_populates="services")
def to_dict(self):
return {
"id": self.id,
"project_id": self.project_id,
"name": self.name,
"check_type": self.check_type,
"target": self.target,
"status": self.status,
"status_code": self.status_code,
"response_time_ms": self.response_time_ms,
"last_error": self.last_error,
"last_checked": self.last_checked.isoformat() if self.last_checked else None,
"consecutive_failures": self.consecutive_failures,
}
# Async engine for FastAPI
engine = create_async_engine(
DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
# Session factory
from sqlalchemy.ext.asyncio import async_sessionmaker
async_session = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db():
"""Dependency for getting database session."""
async with async_session() as session:
try:
yield session
finally:
await session.close()
async def init_db():
"""Initialize the database."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Set file permissions
if DATABASE_PATH.exists():
DATABASE_PATH.chmod(0o600)
async def close_db():
"""Close database connections."""
await engine.dispose()