Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/das/src/queue/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ export const FETCH_JOBS = {
PR_FILES: "fetch-pr-files",
BACKFILL_REPO: "backfill-repo",
ISSUE_CLOSURE: "fetch-issue-closure",
RECONCILE_REGISTRY: "reconcile-registry",
} as const;

export const DEFAULT_BACKFILL_DAYS = 40;

export const REGISTRY_RECONCILE_CRON = "0 */2 * * *";

export const MASTER_REPOSITORIES_URL =
"https://raw.githubusercontent.com/entrius/gittensor/test/gittensor/validator/weights/master_repositories.json";

export function prFilesJobId(
repoFullName: string,
prNumber: number,
Expand Down
6 changes: 6 additions & 0 deletions packages/das/src/queue/fetch.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { IsNull, Repository } from "typeorm";
import { Job, Queue } from "bullmq";
import { Issue, PullRequest } from "../entities";
import { GitHubFetcherService } from "../webhook/github-fetcher.service";
import { RegistryReconcilerService } from "./registry-reconciler.service";
import {
FETCH_QUEUE,
FETCH_JOBS,
Expand Down Expand Up @@ -57,6 +58,7 @@ export class FetchProcessor extends WorkerHost {
private readonly issueRepo: Repository<Issue>,
@InjectQueue(FETCH_QUEUE)
private readonly fetchQueue: Queue,
private readonly registryReconciler: RegistryReconcilerService,
) {
super();
}
Expand All @@ -82,6 +84,10 @@ export class FetchProcessor extends WorkerHost {
await this.handleIssueClosure(repoFullName, issueNumber);
break;
}
case FETCH_JOBS.RECONCILE_REGISTRY: {
await this.registryReconciler.reconcile();
break;
}
default:
this.logger.warn(`Unknown job name: ${job.name}`);
}
Expand Down
3 changes: 2 additions & 1 deletion packages/das/src/queue/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "../entities";
import { GitHubFetcherService } from "../webhook/github-fetcher.service";
import { FetchProcessor } from "./fetch.processor";
import { RegistryReconcilerService } from "./registry-reconciler.service";
import { FETCH_QUEUE } from "./constants";

@Module({
Expand All @@ -38,7 +39,7 @@ import { FETCH_QUEUE } from "./constants";
Review,
]),
],
providers: [FetchProcessor, GitHubFetcherService],
providers: [FetchProcessor, GitHubFetcherService, RegistryReconcilerService],
exports: [BullModule],
})
export class QueueModule {}
82 changes: 82 additions & 0 deletions packages/das/src/queue/registry-reconciler.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
import { InjectQueue } from "@nestjs/bullmq";
import { InjectRepository } from "@nestjs/typeorm";
import { Queue } from "bullmq";
import { In, Repository } from "typeorm";
import { Repo } from "../entities";
import {
FETCH_JOBS,
FETCH_QUEUE,
MASTER_REPOSITORIES_URL,
REGISTRY_RECONCILE_CRON,
} from "./constants";

@Injectable()
export class RegistryReconcilerService implements OnModuleInit {
private readonly logger = new Logger(RegistryReconcilerService.name);

constructor(
@InjectQueue(FETCH_QUEUE) private readonly fetchQueue: Queue,
@InjectRepository(Repo) private readonly repoRepo: Repository<Repo>,
) {}

async onModuleInit(): Promise<void> {
// BullMQ deduplicates repeatable jobs by repeat key, so restarts
// don't accumulate duplicate schedules.
await this.fetchQueue.add(
FETCH_JOBS.RECONCILE_REGISTRY,
{},
{
repeat: { pattern: REGISTRY_RECONCILE_CRON },
removeOnComplete: true,
removeOnFail: 50,
},
);
}

async reconcile(): Promise<void> {
const canonical = await this.fetchCanonicalRepos();
// Fail-safe: a network blip or non-200 must never mass-deregister.
if (!canonical) return;
if (canonical.size === 0) {
this.logger.warn("Canonical repo list is empty — refusing to deregister");
return;
}

const registered = await this.repoRepo.find({
select: ["repoFullName"],
where: { registered: true },
});
const toDeregister = registered
.map((r) => r.repoFullName)
.filter((name) => !canonical.has(name.toLowerCase()));

// installationId stays — delisting != uninstalling; re-list via admin/repos/register.
await this.repoRepo.update(
{ repoFullName: In(toDeregister) },
{ registered: false },
);

this.logger.log(
`Registry reconcile: deregistered ${toDeregister.length} repo(s)` +
(toDeregister.length ? `: ${toDeregister.join(", ")}` : ""),
);
}

private async fetchCanonicalRepos(): Promise<Set<string> | null> {
try {
const res = await fetch(MASTER_REPOSITORIES_URL);
if (!res.ok) {
this.logger.error(
`Master repos fetch returned ${res.status} ${res.statusText}`,
);
return null;
}
const json = (await res.json()) as Record<string, unknown>;
return new Set(Object.keys(json).map((k) => k.toLowerCase()));
} catch (err) {
this.logger.error(`Master repos fetch failed: ${String(err)}`);
return null;
}
}
}