Skip to content

Commit 127c524

Browse files
feat(repositories): add job and audiobook job repositories
Implement repositories for job management and job-audiobook relations with worker-safe operations. Includes atomic job claiming, progress updates, and status transitions with optimistic locking.
1 parent dd726fd commit 127c524

3 files changed

Lines changed: 364 additions & 0 deletions

File tree

backend/src/bookbytes/repositories/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
"""
55

66
from bookbytes.repositories.audio_book import AudioBookRepository
7+
from bookbytes.repositories.audio_book_job import AudioBookJobRepository
78
from bookbytes.repositories.base import BaseRepository, SoftDeleteRepository
89
from bookbytes.repositories.book_provider import BookProviderRepository
910
from bookbytes.repositories.chapter import ChapterRepository
1011
from bookbytes.repositories.edition import EditionRepository
12+
from bookbytes.repositories.job import JobRepository
1113
from bookbytes.repositories.work import WorkRepository
1214

1315
__all__ = [
@@ -20,4 +22,7 @@
2022
"BookProviderRepository",
2123
"AudioBookRepository",
2224
"ChapterRepository",
25+
# Phase 3.1: Audio Books Pipeline
26+
"JobRepository",
27+
"AudioBookJobRepository",
2328
]
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
"""AudioBookJob repository for job-audiobook relations.
2+
3+
Provides operations to link jobs to audiobooks and query jobs
4+
for a specific audiobook.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from uuid import UUID
10+
11+
from sqlalchemy import select
12+
from sqlalchemy.orm import joinedload
13+
14+
from bookbytes.models.audio_book_job import AudioBookJob
15+
from bookbytes.models.job import Job
16+
from bookbytes.repositories.base import BaseRepository
17+
18+
19+
class AudioBookJobRepository(BaseRepository[AudioBookJob]):
20+
"""Repository for AudioBookJob relation model.
21+
22+
Manages the link between generic jobs and audiobooks.
23+
"""
24+
25+
async def create_link(
26+
self,
27+
job_id: UUID,
28+
audio_book_id: UUID,
29+
) -> AudioBookJob:
30+
"""Create a link between a job and an audiobook.
31+
32+
Args:
33+
job_id: The job's UUID
34+
audio_book_id: The audiobook's UUID
35+
36+
Returns:
37+
The created AudioBookJob link
38+
"""
39+
link = AudioBookJob(job_id=job_id, audio_book_id=audio_book_id)
40+
self.session.add(link)
41+
await self.session.commit()
42+
await self.session.refresh(link)
43+
return link
44+
45+
async def get_by_job_id(self, job_id: UUID) -> AudioBookJob | None:
46+
"""Get the link for a specific job.
47+
48+
Args:
49+
job_id: The job's UUID
50+
51+
Returns:
52+
The link if found, None otherwise
53+
"""
54+
query = (
55+
select(AudioBookJob)
56+
.where(AudioBookJob.job_id == job_id)
57+
.options(joinedload(AudioBookJob.audio_book))
58+
)
59+
result = await self.session.execute(query)
60+
return result.scalar_one_or_none()
61+
62+
async def get_jobs_for_audiobook(
63+
self,
64+
audio_book_id: UUID,
65+
limit: int = 50,
66+
) -> list[Job]:
67+
"""Get all jobs associated with an audiobook.
68+
69+
Args:
70+
audio_book_id: The audiobook's UUID
71+
limit: Maximum number of results
72+
73+
Returns:
74+
List of jobs for this audiobook, newest first
75+
"""
76+
query = (
77+
select(Job)
78+
.join(AudioBookJob, AudioBookJob.job_id == Job.id)
79+
.where(AudioBookJob.audio_book_id == audio_book_id)
80+
.order_by(Job.created_at.desc())
81+
.limit(limit)
82+
)
83+
result = await self.session.execute(query)
84+
return list(result.scalars().all())
85+
86+
async def get_latest_job_for_audiobook(
87+
self,
88+
audio_book_id: UUID,
89+
) -> Job | None:
90+
"""Get the most recent job for an audiobook.
91+
92+
Args:
93+
audio_book_id: The audiobook's UUID
94+
95+
Returns:
96+
The latest job, or None if no jobs exist
97+
"""
98+
query = (
99+
select(Job)
100+
.join(AudioBookJob, AudioBookJob.job_id == Job.id)
101+
.where(AudioBookJob.audio_book_id == audio_book_id)
102+
.order_by(Job.created_at.desc())
103+
.limit(1)
104+
)
105+
result = await self.session.execute(query)
106+
return result.scalar_one_or_none()
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
"""Job repository with worker-safe operations.
2+
3+
Provides CRUD operations for jobs with atomic claim, progress updates,
4+
and status transitions. Uses optimistic locking for concurrency control.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from datetime import UTC, datetime
10+
from uuid import UUID
11+
12+
from sqlalchemy import func, select, update
13+
14+
from bookbytes.models.job import Job, JobStatus, JobType
15+
from bookbytes.repositories.base import BaseRepository
16+
17+
18+
class JobRepository(BaseRepository[Job]):
19+
"""Repository for Job model with worker-safe operations.
20+
21+
Provides atomic operations for job claiming and status updates
22+
to ensure safe concurrent access from multiple workers.
23+
"""
24+
25+
async def claim_next(
26+
self,
27+
worker_id: str,
28+
job_type: str | None = None,
29+
) -> Job | None:
30+
"""Atomically claim the next pending job.
31+
32+
Uses optimistic locking via version column to prevent
33+
race conditions when multiple workers try to claim jobs.
34+
35+
Args:
36+
worker_id: Identifier of the claiming worker
37+
job_type: Optional filter by job type
38+
39+
Returns:
40+
The claimed job, or None if no jobs available
41+
"""
42+
# Find oldest pending job
43+
query = (
44+
select(Job)
45+
.where(Job.status == JobStatus.PENDING.value)
46+
.order_by(Job.created_at)
47+
.limit(1)
48+
)
49+
50+
if job_type:
51+
query = query.where(Job.job_type == job_type)
52+
53+
result = await self.session.execute(query)
54+
job = result.scalar_one_or_none()
55+
56+
if not job:
57+
return None
58+
59+
# Atomically claim with optimistic lock
60+
stmt = (
61+
update(Job)
62+
.where(Job.id == job.id)
63+
.where(Job.version == job.version) # Optimistic lock
64+
.values(
65+
status=JobStatus.PROCESSING.value,
66+
worker_id=worker_id,
67+
started_at=datetime.now(UTC),
68+
version=job.version + 1,
69+
)
70+
.returning(Job)
71+
)
72+
73+
result = await self.session.execute(stmt)
74+
claimed = result.scalar_one_or_none()
75+
76+
if claimed:
77+
await self.session.commit()
78+
79+
return claimed
80+
81+
async def update_progress(
82+
self,
83+
job_id: UUID,
84+
progress: int,
85+
current_step: str | None = None,
86+
) -> bool:
87+
"""Update job progress.
88+
89+
Args:
90+
job_id: The job's UUID
91+
progress: Progress percentage (0-100)
92+
current_step: Optional human-readable step description
93+
94+
Returns:
95+
True if update succeeded, False if job not found
96+
"""
97+
values: dict[str, int | str | None] = {"progress": min(100, max(0, progress))}
98+
if current_step is not None:
99+
values["current_step"] = current_step
100+
101+
stmt = update(Job).where(Job.id == job_id).values(**values)
102+
result = await self.session.execute(stmt)
103+
await self.session.commit()
104+
105+
return result.rowcount > 0
106+
107+
async def mark_completed(self, job_id: UUID) -> bool:
108+
"""Mark job as completed.
109+
110+
Args:
111+
job_id: The job's UUID
112+
113+
Returns:
114+
True if update succeeded
115+
"""
116+
stmt = (
117+
update(Job)
118+
.where(Job.id == job_id)
119+
.values(
120+
status=JobStatus.COMPLETED.value,
121+
progress=100,
122+
completed_at=datetime.now(UTC),
123+
)
124+
)
125+
result = await self.session.execute(stmt)
126+
await self.session.commit()
127+
128+
return result.rowcount > 0
129+
130+
async def mark_failed(
131+
self,
132+
job_id: UUID,
133+
error_message: str,
134+
error_code: str | None = None,
135+
) -> bool:
136+
"""Mark job as failed with error details.
137+
138+
Args:
139+
job_id: The job's UUID
140+
error_message: Human-readable error message
141+
error_code: Optional machine-readable error code
142+
143+
Returns:
144+
True if update succeeded
145+
"""
146+
stmt = (
147+
update(Job)
148+
.where(Job.id == job_id)
149+
.values(
150+
status=JobStatus.FAILED.value,
151+
error_message=error_message[:2000], # Truncate to fit
152+
error_code=error_code[:50] if error_code else None,
153+
completed_at=datetime.now(UTC),
154+
)
155+
)
156+
result = await self.session.execute(stmt)
157+
await self.session.commit()
158+
159+
return result.rowcount > 0
160+
161+
async def schedule_retry(self, job_id: UUID) -> bool:
162+
"""Schedule a failed job for retry.
163+
164+
Increments retry_count and sets status back to pending.
165+
166+
Args:
167+
job_id: The job's UUID
168+
169+
Returns:
170+
True if retry scheduled, False if max retries exceeded
171+
"""
172+
# Get current job state
173+
job = await self.get_by_id(job_id)
174+
if not job or not job.can_retry:
175+
return False
176+
177+
stmt = (
178+
update(Job)
179+
.where(Job.id == job_id)
180+
.values(
181+
status=JobStatus.PENDING.value,
182+
retry_count=job.retry_count + 1,
183+
worker_id=None,
184+
started_at=None,
185+
completed_at=None,
186+
error_message=None,
187+
error_code=None,
188+
)
189+
)
190+
await self.session.execute(stmt)
191+
await self.session.commit()
192+
193+
return True
194+
195+
async def get_by_status(
196+
self,
197+
status: JobStatus,
198+
limit: int = 100,
199+
) -> list[Job]:
200+
"""Get jobs by status.
201+
202+
Args:
203+
status: The status to filter by
204+
limit: Maximum number of results
205+
206+
Returns:
207+
List of jobs with the given status
208+
"""
209+
query = (
210+
select(Job)
211+
.where(Job.status == status.value)
212+
.order_by(Job.created_at)
213+
.limit(limit)
214+
)
215+
result = await self.session.execute(query)
216+
return list(result.scalars().all())
217+
218+
async def get_pending_count(self) -> int:
219+
"""Get count of pending jobs (for monitoring).
220+
221+
Returns:
222+
Number of pending jobs
223+
"""
224+
query = (
225+
select(func.count())
226+
.select_from(Job)
227+
.where(Job.status == JobStatus.PENDING.value)
228+
)
229+
result = await self.session.execute(query)
230+
return result.scalar() or 0
231+
232+
async def get_by_job_type(
233+
self,
234+
job_type: JobType,
235+
limit: int = 100,
236+
) -> list[Job]:
237+
"""Get jobs by type.
238+
239+
Args:
240+
job_type: The job type to filter by
241+
limit: Maximum number of results
242+
243+
Returns:
244+
List of jobs of the given type
245+
"""
246+
query = (
247+
select(Job)
248+
.where(Job.job_type == job_type.value)
249+
.order_by(Job.created_at.desc())
250+
.limit(limit)
251+
)
252+
result = await self.session.execute(query)
253+
return list(result.scalars().all())

0 commit comments

Comments
 (0)