diff --git a/client/.env.example b/client/.env.example index 85f8bdd6..baf3fcfb 100644 --- a/client/.env.example +++ b/client/.env.example @@ -29,5 +29,6 @@ VITE_DOCS_URL=https://roostorg.github.io/coop/latest # This is used to proxy the content URL to the content proxy service. VITE_CONTENT_PROXY_URL=http://localhost:4000 -# Comma-separated list of patterns to match content URLs that should be displayed in iframes +# Comma-separated list of patterns to match content URLs that should be displayed in iframes. +# For the atproto demo, set this to: bsky.app VITE_CONTENT_URL_PATTERN= \ No newline at end of file diff --git a/server/bin/README.md b/server/bin/README.md index 345d46af..ea8b4816 100644 --- a/server/bin/README.md +++ b/server/bin/README.md @@ -238,3 +238,67 @@ npm run recover-mrt-queue -- \ - Report history is best-effort: only inbound `submitReport` rows that made it into `REPORTING_SERVICE.REPORTS` are restored. Rule-driven enqueues (`ENQUEUE_TO_MRT`) never had a report history to begin with. + +--- + +## atproto-proxy.mts + +Content proxy for the MRT job review iframe. The review UI embeds post content in an iframe via a proxy URL (default: `http://localhost:4000`). bsky.app blocks direct embedding with `X-Frame-Options: SAMEORIGIN`, so this script fetches post data from the Bluesky public API instead and renders it as a standalone HTML page. It also handles blur and grayscale postMessage events sent by the review UI's wellness settings. + +Run this alongside `atproto:demo` in a separate terminal: + +```sh +cd server && npm run atproto:proxy +``` + +You also need to set `VITE_CONTENT_URL_PATTERN=bsky.app` in `client/.env` so the review UI knows to show the iframe for Bluesky post URLs. + +**Options:** +- `--port` — port to listen on (default: `4000`) + +--- + +## atproto-setup.ts + +Creates (or updates) the AT Protocol item types needed by the atproto demo firehose connector. Run this once after `npm run create-org` to register the item types, then pass the printed item type IDs to `npm run atproto:demo`. + +**Create mode** (first run): + +```sh +cd server && npm run atproto:setup -- --org-id +``` + +**Update mode** (to add new fields to an existing org's item types): + +```sh +cd server && npm run atproto:setup -- --org-id --post-type-id --user-type-id +``` + +The script prints an **atproto Post item type ID** and an **atproto User item type ID** — copy both for use with `atproto:demo`. + +## atproto-demo.mts + +Feed a local Coop instance with real content from the [AT Protocol](https://atproto.com/) (Bluesky) firehose. + +1. Create the Bluesky item types (run once after `npm run create-org`): + + ```sh + cd server && npm run atproto:setup -- --org-id + ``` + + Copy both the **Post item type ID** and the **User item type ID** from the output. + +2. Start the firehose connector (in a separate terminal): + + ```sh + npm run atproto:demo -- \ + --api-key \ + --post-type-id \ + --user-type-id + ``` + +Posts from the Bluesky firehose will appear as submitted items. With `--user-type-id` set, posts are also submitted as mock reports at a separate rate, routed to your default queue. Omit `--user-type-id` to skip report submission entirely. + +Pass `--rate-limit ` to adjust the item submission cap (default: 100/min) and `--report-rate-limit ` for the report cap (default: 1/min). Use `--dry-run` to preview submissions without sending them. + +See `atproto-demo.mts` for the full list of options. diff --git a/server/bin/atproto-demo.mts b/server/bin/atproto-demo.mts new file mode 100644 index 00000000..7fdbb249 --- /dev/null +++ b/server/bin/atproto-demo.mts @@ -0,0 +1,444 @@ +#!/usr/bin/env node +/* eslint-disable no-console */ +/** + * AT Protocol firehose connector for local Coop demos. + * + * Subscribes to the AT Protocol Jetstream and forwards posts to a local Coop + * instance as item submissions, giving you a realistic stream of content to + * review without needing to integrate a real platform. + * + * Prerequisites: + * 1. Coop must be running locally (`npm run server:start`) + * 2. Run `cd server && npm run atproto:setup -- --org-id ` once to + * create the item types and get a post type ID + * + * Usage: + * npm run atproto:demo -- --api-key --post-type-id + * + * Options: + * --api-key Coop API key (from `npm run create-org`) [required] + * --post-type-id atproto Post item type ID (from atproto:setup) [required] + * --user-type-id atproto User item type ID (from atproto:setup); enables mock report submission + * --coop-url Base URL of the Coop server [default: http://localhost:3000] + * --rate-limit Max posts submitted per minute [default: 100] + * --report-rate-limit Max reports submitted per minute (requires --user-type-id) [default: 1] + * --dry-run Print submissions without sending them to Coop + * --langs Comma-separated language codes to filter (e.g. en,es) + */ + +import process from 'node:process'; + +// Require Node 24+ for native WebSocket and fetch. +const [major] = process.versions.node.split('.').map(Number); +if (major < 22) { + console.error('Node 22 or later is required (project uses Node 24).'); + process.exit(1); +} + +const JETSTREAM_URL = + 'wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post'; + +// --- CLI args (manual parse to avoid importing yargs outside server/) -------- + +function getArg(flag: string): string | undefined { + const idx = process.argv.indexOf(flag); + return idx !== -1 ? process.argv[idx + 1] : undefined; +} + +function hasFlag(flag: string): boolean { + return process.argv.includes(flag); +} + +const apiKey = getArg('--api-key'); +const postTypeId = getArg('--post-type-id'); +const userTypeId = getArg('--user-type-id'); +const coopUrl = getArg('--coop-url') ?? 'http://localhost:3000'; +const rateLimit = Number(getArg('--rate-limit') ?? '100'); +const reportRateLimit = Number(getArg('--report-rate-limit') ?? '1'); +const dryRun = hasFlag('--dry-run'); +const langsFilter = getArg('--langs') + ? new Set((getArg('--langs') as string).split(',').map((l) => l.trim())) + : null; + +if (!apiKey || !postTypeId) { + console.error( + 'Usage: npm run atproto:demo -- --api-key --post-type-id \n' + + 'Run `cd server && npm run atproto:setup -- --org-id ` first to get the post type ID.', + ); + process.exit(1); +} + +if (hasFlag('--help') || hasFlag('-h')) { + console.log( + 'See the top of scripts/atproto-demo.ts for full option documentation.', + ); + process.exit(0); +} + +// --- atproto profile cache --------------------------------------------------- + +interface BlueskyProfile { + handle: string; + displayName?: string; +} + +const profileCache = new Map(); + +async function fetchProfile(did: string): Promise { + const cached = profileCache.get(did); + if (cached) return cached; + + try { + const response = await fetch( + `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(did)}`, + ); + if (!response.ok) throw new Error(`${response.status}`); + const data = (await response.json()) as { + handle: string; + displayName?: string; + }; + const profile: BlueskyProfile = { handle: data.handle, displayName: data.displayName }; + profileCache.set(did, profile); + return profile; + } catch { + return { handle: did }; + } +} + +// --- Types for Jetstream messages -------------------------------------------- + +interface JetstreamCommit { + kind: 'commit'; + did: string; + commit: { + operation: 'create' | 'update' | 'delete'; + collection: string; + rkey: string; + record?: BlueskyPost; + }; +} + +interface BlueskyPost { + $type: 'app.bsky.feed.post'; + text: string; + createdAt?: string; + langs?: string[]; + reply?: { + parent: { uri: string }; + root: { uri: string }; + }; + embed?: { + $type: string; + external?: { uri: string; title?: string; description?: string }; + images?: Array; + }; +} + +// --- Rate limiter (token bucket) -------------------------------------------- + +class RateLimiter { + private tokens: number; + private readonly maxTokens: number; + private readonly refillIntervalMs: number; + private lastRefill: number; + + constructor(perMinute: number) { + this.maxTokens = perMinute; + this.tokens = perMinute; + this.refillIntervalMs = 60_000; + this.lastRefill = Date.now(); + } + + tryConsume(): boolean { + const now = Date.now(); + const elapsed = now - this.lastRefill; + if (elapsed >= this.refillIntervalMs) { + this.tokens = this.maxTokens; + this.lastRefill = now; + } + if (this.tokens > 0) { + this.tokens--; + return true; + } + return false; + } +} + +// --- Submission logic -------------------------------------------------------- + +interface CoopItem { + id: string; + typeId: string; + data: { + text: string; + url: string; + did: string; + handle: string; + displayName?: string; + langs?: string; + createdAt?: string; + replyTo?: string; + embedType?: string; + embedUrl?: string; + embedTitle?: string; + embedDescription?: string; + }; +} + +function buildAtUri(did: string, rkey: string): string { + return `at://${did}/app.bsky.feed.post/${rkey}`; +} + +function buildBskyUrl(did: string, rkey: string): string { + return `https://bsky.app/profile/${did}/post/${rkey}`; +} + +async function postToCoopItem( + did: string, + rkey: string, + record: BlueskyPost, +): Promise { + const atUri = buildAtUri(did, rkey); + const profile = await fetchProfile(did); + return { + id: atUri, + typeId: postTypeId as string, + data: { + text: record.text, + url: buildBskyUrl(did, rkey), + did, + handle: profile.handle, + ...(profile.displayName ? { displayName: profile.displayName } : {}), + ...(record.langs?.length ? { langs: record.langs.join(', ') } : {}), + ...(record.createdAt ? { createdAt: record.createdAt } : {}), + ...(record.reply ? { replyTo: record.reply.parent.uri } : {}), + ...(record.embed + ? { + embedType: record.embed.$type + .replace('app.bsky.embed.', '') + .replace('#view', ''), + ...(record.embed.external?.uri + ? { embedUrl: record.embed.external.uri } + : {}), + ...(record.embed.external?.title + ? { embedTitle: record.embed.external.title } + : {}), + ...(record.embed.external?.description + ? { embedDescription: record.embed.external.description } + : {}), + } + : {}), + }, + }; +} + +async function submitToCoop(item: CoopItem): Promise { + const response = await fetch(`${coopUrl}/api/v1/items/async/`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': apiKey as string, + }, + body: JSON.stringify({ items: [item] }), + }); + + if (!response.ok) { + const body = await response.text().catch(() => ''); + throw new Error(`Coop returned ${response.status}: ${body}`); + } +} + +async function submitReport( + did: string, + rkey: string, + record: BlueskyPost, +): Promise { + const item = await postToCoopItem(did, rkey, record); + const payload = { + reporter: { + kind: 'user', + id: did, + typeId: userTypeId, + }, + reportedAt: new Date().toISOString(), + reportedItem: item, + reportedForReason: { + reason: 'Automatically flagged for demo purposes.', + }, + }; + + if (dryRun) { + console.log( + `[${new Date().toISOString()}] DRY RUN report:`, + JSON.stringify(payload, null, 2), + ); + return 'dry-run'; + } + + const response = await fetch(`${coopUrl}/api/v1/report`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': apiKey as string, + }, + body: JSON.stringify(payload), + }); + + if (!response.ok) { + const body = await response.text().catch(() => ''); + throw new Error(`Coop returned ${response.status}: ${body}`); + } + + const data = (await response.json()) as { reportId: string }; + return data.reportId; +} + +// --- Main ------------------------------------------------------------------- + +let submitted = 0; +let skipped = 0; +let errors = 0; +let reportsSubmitted = 0; +let reportsSkipped = 0; +let reportErrors = 0; + +const limiter = new RateLimiter(rateLimit); +const reportLimiter = new RateLimiter(reportRateLimit); + +function logStatus(action: string, text: string) { + const preview = text.length > 60 ? text.slice(0, 57) + '…' : text; + console.log(`[${new Date().toISOString()}] ${action}: "${preview}"`); +} + +function connect() { + console.log(`Connecting to Jetstream…`); + console.log( + ` Coop: ${coopUrl} | rate limit: ${rateLimit}/min | dry run: ${dryRun}`, + ); + if (langsFilter) { + console.log(` Language filter: ${[...langsFilter].join(', ')}`); + } + if (userTypeId) { + console.log(` Report submission: enabled (${reportRateLimit}/min)`); + } else { + console.log( + ` Report submission: disabled (pass --user-type-id to enable)`, + ); + } + console.log(''); + + // Node 24 ships native WebSocket (WHATWG). Cast to any here because + // @types/node may not yet include the global WebSocket type. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const ws = new (globalThis as any).WebSocket(JETSTREAM_URL); + + ws.addEventListener('open', () => { + console.log('Connected to Jetstream. Waiting for posts…\n'); + }); + + ws.addEventListener( + 'message', + (event: { data: string }) => { + let msg: JetstreamCommit; + try { + msg = JSON.parse(event.data) as JetstreamCommit; + } catch { + return; + } + + if ( + msg.kind !== 'commit' || + msg.commit.operation !== 'create' || + msg.commit.collection !== 'app.bsky.feed.post' || + !msg.commit.record + ) { + return; + } + + const record = msg.commit.record; + + // Language filter + if (langsFilter && record.langs) { + const hasMatch = record.langs.some((l) => langsFilter.has(l)); + if (!hasMatch) { + return; + } + } + + // Report submission (independent of item rate limit) + if (userTypeId) { + if (reportLimiter.tryConsume()) { + submitReport(msg.did, msg.commit.rkey, record) + .then((reportId) => { + reportsSubmitted++; + console.log( + `[${new Date().toISOString()}] Report submitted: ${reportId}`, + ); + }) + .catch((err: unknown) => { + reportErrors++; + console.error( + `[${new Date().toISOString()}] ERROR submitting report: ${String(err)}`, + ); + }); + } else { + reportsSkipped++; + } + } + + // Item submission + if (!limiter.tryConsume()) { + skipped++; + return; + } + + postToCoopItem(msg.did, msg.commit.rkey, record) + .then((item) => { + if (dryRun) { + logStatus('DRY RUN', record.text); + console.log(' Would submit:', JSON.stringify(item, null, 2)); + submitted++; + return; + } + return submitToCoop(item).then(() => { + submitted++; + logStatus('Submitted', record.text); + }); + }) + .catch((err: unknown) => { + errors++; + console.error( + `[${new Date().toISOString()}] ERROR submitting: ${String(err)}`, + ); + }); + }, + ); + + ws.addEventListener('close', () => { + console.log( + `\nJetstream connection closed. Submitted: ${submitted}, Skipped (rate): ${skipped}, Errors: ${errors}`, + ); + console.log('Reconnecting in 5 seconds…'); + setTimeout(connect, 5_000); + }); + + ws.addEventListener('error', (event: unknown) => { + console.error('WebSocket error:', event); + }); +} + +// Print running totals every 60 seconds +setInterval(() => { + console.log( + `[${new Date().toISOString()}] Status — submitted: ${submitted}, skipped: ${skipped}, errors: ${errors}, reports: ${reportsSubmitted}, reports skipped: ${reportsSkipped}, report errors: ${reportErrors}`, + ); +}, 60_000); + +process.on('SIGINT', () => { + console.log( + `\nShutting down. Submitted: ${submitted}, Skipped: ${skipped}, Errors: ${errors}, Reports: ${reportsSubmitted}, Reports skipped: ${reportsSkipped}, Report errors: ${reportErrors}`, + ); + process.exit(0); +}); + +connect(); diff --git a/server/bin/atproto-proxy.mts b/server/bin/atproto-proxy.mts new file mode 100644 index 00000000..69e33bb3 --- /dev/null +++ b/server/bin/atproto-proxy.mts @@ -0,0 +1,363 @@ +#!/usr/bin/env node +/* eslint-disable no-console */ +/** + * AT Protocol content proxy for local Coop demos. + * + * Serves AT Protocol posts as embeddable HTML pages for the MRT review iframe. + * A proxy is necessary because bsky.app sets X-Frame-Options: SAMEORIGIN, + * preventing direct embedding. + * + * Prerequisites: + * Set VITE_CONTENT_URL_PATTERN=bsky.app in client/.env + * + * Usage: + * npm run atproto:proxy + * + * Options: + * --port Port to listen on [default: 4000] + */ + +import http from 'node:http'; +import process from 'node:process'; + +function getArg(flag: string): string | undefined { + const idx = process.argv.indexOf(flag); + return idx !== -1 ? process.argv[idx + 1] : undefined; +} + +const port = Number(getArg('--port') ?? '4000'); + +const BSKY_API = 'https://public.api.bsky.app/xrpc'; + +function isDid(s: string): boolean { + return s.startsWith('did:'); +} + +async function resolveActor(actor: string): Promise { + if (isDid(actor)) return actor; + const res = await fetch( + `${BSKY_API}/app.bsky.actor.getProfile?actor=${encodeURIComponent(actor)}`, + ); + if (!res.ok) throw new Error(`Failed to resolve actor ${actor}: ${res.status}`); + const data = (await res.json()) as { did: string }; + return data.did; +} + +interface BskyEmbed { + $type: string; + // external link card + external?: { uri: string; title: string; description: string; thumb?: string }; + // image grid + images?: Array<{ + thumb: string; + fullsize: string; + alt: string; + aspectRatio?: { width: number; height: number }; + }>; + // video + thumbnail?: string; + alt?: string; + // quote post + record?: { + author: { handle: string; displayName?: string }; + value: { text: string }; + }; + // recordWithMedia: record + media (images or external) + media?: BskyEmbed; +} + +interface BskyPost { + uri: string; + author: { handle: string; displayName?: string }; + record: { text: string; createdAt?: string }; + embed?: BskyEmbed; +} + +async function fetchPost(did: string, rkey: string): Promise { + const uri = `at://${did}/app.bsky.feed.post/${rkey}`; + const res = await fetch( + `${BSKY_API}/app.bsky.feed.getPostThread?uri=${encodeURIComponent(uri)}&depth=0`, + ); + if (!res.ok) throw new Error(`Failed to fetch post: ${res.status}`); + const data = (await res.json()) as { thread: { post: BskyPost } }; + return data.thread.post; +} + +function formatDate(iso?: string): string { + if (!iso) return ''; + try { + return new Date(iso).toLocaleString(undefined, { + year: 'numeric', + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + }); + } catch { + return iso; + } +} + +function escapeHtml(s: string): string { + return s + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"'); +} + +function hostname(uri: string): string { + try { + return new URL(uri).hostname; + } catch { + return uri; + } +} + +function renderEmbed(embed: BskyEmbed, postUrl: string): string { + const type = embed.$type; + + if (type === 'app.bsky.embed.external#view' && embed.external) { + const { uri, title, description, thumb } = embed.external; + return ` + + ${thumb ? `` : ''} +
+
${escapeHtml(title)}
+ ${description ? `
${escapeHtml(description)}
` : ''} +
${escapeHtml(hostname(uri))}
+
+
`; + } + + if (type === 'app.bsky.embed.images#view' && embed.images?.length) { + const imgs = embed.images + .slice(0, 4) + .map( + (img) => + `${escapeHtml(img.alt)}`, + ) + .join('\n '); + const gridClass = embed.images.length === 1 ? 'embed-images single' : 'embed-images'; + return `\n
\n ${imgs}\n
`; + } + + if (type === 'app.bsky.embed.video#view') { + return ` + + ${embed.thumbnail ? `${escapeHtml(embed.alt ?? '')}` : ''} +
+
`; + } + + if (type === 'app.bsky.embed.record#view' && embed.record) { + const { author, value } = embed.record; + const qName = author.displayName ? escapeHtml(author.displayName) : ''; + const qHandle = escapeHtml(author.handle); + const qText = escapeHtml(value.text).replace(/\n/g, '
'); + return ` +
+
+ ${qName ? `${qName}` : ''} + @${qHandle} +
+
${qText}
+
`; + } + + if (type === 'app.bsky.embed.recordWithMedia#view') { + const mediaPart = embed.media ? renderEmbed(embed.media, postUrl) : ''; + const recordPart = embed.record + ? renderEmbed({ $type: 'app.bsky.embed.record#view', record: embed.record }, postUrl) + : ''; + return mediaPart + recordPart; + } + + return ''; +} + +function renderPost(post: BskyPost, postUrl: string): string { + const { author, record, embed } = post; + const name = author.displayName ? escapeHtml(author.displayName) : ''; + const handle = escapeHtml(author.handle); + const text = escapeHtml(record.text).replace(/\n/g, '
'); + const date = escapeHtml(formatDate(record.createdAt)); + const embedHtml = embed ? renderEmbed(embed, postUrl) : ''; + + return ` + + + + + atproto post by @${handle} + + + + + +
+
+ ${name ? `${name}` : ''} + @${handle} +
+

${text}

+ ${embedHtml} +
+ ${date ? `${date}` : ''} + View on bsky.app +
+
+ + +`; +} + +function errorPage(status: number, message: string): string { + return `

${status}

${escapeHtml(message)}

`; +} + +const BSKY_URL_RE = /^https:\/\/bsky\.app\/profile\/([^/]+)\/post\/([^/?#]+)/; + +const server = http.createServer(async (req, res) => { + if (req.method !== 'GET' || !req.url) { + res.writeHead(405); + res.end(); + return; + } + + const reqUrl = new URL(req.url, `http://localhost:${port}`); + const contentUrl = reqUrl.searchParams.get('contentUrl'); + + if (!contentUrl) { + res.writeHead(400, { 'Content-Type': 'text/html' }); + res.end(errorPage(400, 'Missing contentUrl parameter')); + return; + } + + const match = BSKY_URL_RE.exec(contentUrl); + if (!match) { + res.writeHead(400, { 'Content-Type': 'text/html' }); + res.end(errorPage(400, `Unrecognized URL: ${contentUrl}`)); + return; + } + + const [, actor, rkey] = match; + + try { + const did = await resolveActor(actor); + const post = await fetchPost(did, rkey); + const html = renderPost(post, contentUrl); + res.writeHead(200, { + 'Content-Type': 'text/html; charset=utf-8', + 'Content-Security-Policy': "frame-ancestors *", + }); + res.end(html); + } catch (err) { + console.error(`[${new Date().toISOString()}] ERROR: ${String(err)}`); + res.writeHead(502, { 'Content-Type': 'text/html' }); + res.end(errorPage(502, `Failed to fetch post: ${String(err)}`)); + } +}); + +server.listen(port, () => { + console.log(`atproto content proxy listening on http://localhost:${port}`); + console.log( + `Also ensure VITE_CONTENT_URL_PATTERN=bsky.app is set in client/.env\n`, + ); +}); diff --git a/server/bin/atproto-setup.ts b/server/bin/atproto-setup.ts new file mode 100644 index 00000000..ed36eec5 --- /dev/null +++ b/server/bin/atproto-setup.ts @@ -0,0 +1,157 @@ +#!/usr/bin/env node +/* eslint-disable no-console */ +/** + * Creates or updates the AT Protocol item types needed by the atproto demo + * firehose connector. Run this once after `npm run create-org` to register + * the item types, then pass the printed item type IDs to `npm run atproto:demo`. + * + * To update existing item types (e.g. to add new fields to an existing org), + * pass the existing type IDs with --post-type-id and --user-type-id. + * + * Usage: + * cd server && npm run atproto:setup -- --org-id + * cd server && npm run atproto:setup -- --org-id --post-type-id --user-type-id + */ + +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; + +import getBottle from '../iocContainer/index.js'; + +const argv = await yargs(hideBin(process.argv)) + .options({ + 'org-id': { + type: 'string', + demandOption: true, + description: 'Organization ID (from npm run create-org)', + }, + 'post-type-id': { + type: 'string', + description: 'Existing atproto Post item type ID — triggers update mode', + }, + 'user-type-id': { + type: 'string', + description: 'Existing atproto User item type ID — triggers update mode', + }, + }) + .help() + .parse(); + +const orgId = argv['org-id']; +const existingPostTypeId = argv['post-type-id']; +const existingUserTypeId = argv['user-type-id']; +const updateMode = Boolean(existingPostTypeId && existingUserTypeId); + +const USER_SCHEMA = [ + { name: 'handle', type: 'STRING', required: true, container: null }, + { name: 'displayName', type: 'STRING', required: false, container: null }, + { name: 'description', type: 'STRING', required: false, container: null }, + { name: 'avatar', type: 'IMAGE', required: false, container: null }, + { name: 'indexedAt', type: 'DATETIME', required: false, container: null }, +] as const; + +const USER_ROLES = { + displayName: 'handle', + profileIcon: 'avatar', + createdAt: 'indexedAt', +} as const; + +const POST_SCHEMA = [ + { name: 'text', type: 'STRING', required: true, container: null }, + { name: 'url', type: 'URL', required: true, container: null }, + { name: 'did', type: 'STRING', required: false, container: null }, + { name: 'handle', type: 'STRING', required: false, container: null }, + { name: 'displayName', type: 'STRING', required: false, container: null }, + { name: 'langs', type: 'STRING', required: false, container: null }, + { name: 'createdAt', type: 'DATETIME', required: false, container: null }, + { name: 'replyTo', type: 'STRING', required: false, container: null }, + { name: 'embedType', type: 'STRING', required: false, container: null }, + { name: 'embedUrl', type: 'URL', required: false, container: null }, + { name: 'embedTitle', type: 'STRING', required: false, container: null }, + { name: 'embedDescription', type: 'STRING', required: false, container: null }, + { name: 'embedThumb', type: 'URL', required: false, container: null }, +] as const; + +const POST_ROLES = { + displayName: 'text', + createdAt: 'createdAt', +} as const; + +async function setup() { + const bottle = await getBottle(); + const { ModerationConfigService, closeSharedResourcesForShutdown } = + bottle.container; + + try { + let userTypeId: string; + let postTypeId: string; + + if (updateMode) { + console.log('\nUpdating existing AT Protocol item types…\n'); + + const userType = await ModerationConfigService.updateUserType(orgId, { + id: existingUserTypeId as string, + schema: USER_SCHEMA, + schemaFieldRoles: USER_ROLES, + }); + userTypeId = userType.id; + + const postType = await ModerationConfigService.updateContentType(orgId, { + id: existingPostTypeId as string, + schema: POST_SCHEMA, + schemaFieldRoles: POST_ROLES, + }); + postTypeId = postType.id; + + console.log('✅ AT Protocol item types updated successfully!\n'); + } else { + console.log('\nCreating AT Protocol item types…\n'); + + const userType = await ModerationConfigService.createUserType(orgId, { + name: 'atproto User', + description: 'AT Protocol account.', + schema: USER_SCHEMA, + schemaFieldRoles: USER_ROLES, + }); + userTypeId = userType.id; + + const postType = await ModerationConfigService.createContentType(orgId, { + name: 'atproto Post', + description: 'Post ingested from the AT Protocol Jetstream firehose.', + schema: POST_SCHEMA, + schemaFieldRoles: POST_ROLES, + }); + postTypeId = postType.id; + + console.log('✅ AT Protocol item types created successfully!\n'); + } + + console.log('═'.repeat(60)); + console.log('Copy these IDs for use with the firehose connector:'); + console.log('═'.repeat(60)); + console.log(`atproto User item type ID: ${userTypeId}`); + console.log(`atproto Post item type ID: ${postTypeId}`); + console.log('═'.repeat(60)); + console.log('\nStart the firehose connector:'); + console.log( + ` npm run atproto:demo -- --api-key --post-type-id ${postTypeId} --user-type-id ${userTypeId}\n`, + ); + + await closeSharedResourcesForShutdown(); + process.exit(0); + } catch (error: unknown) { + console.error('\n❌ Error setting up AT Protocol item types:\n'); + console.error(error); + try { + await closeSharedResourcesForShutdown(); + } catch (shutdownError) { + console.error('Error during shutdown:', shutdownError); + } + process.exit(1); + } +} + +setup().catch((error) => { + console.error('Unhandled error:', error); + process.exit(1); +}); diff --git a/server/package.json b/server/package.json index c593b9d1..565cb3d3 100644 --- a/server/package.json +++ b/server/package.json @@ -19,7 +19,10 @@ "runWorkerOrJob": "node --loader ts-node/esm --require dotenv/config bin/run-worker-or-job.ts", "create-org": "node --loader ts-node/esm --require dotenv/config bin/create-org-and-user.ts", "get-invite": "node --loader ts-node/esm --require dotenv/config bin/get-invite-token.ts", - "recover-mrt-queue": "node --loader ts-node/esm --require dotenv/config bin/recover-mrt-queue.ts" + "recover-mrt-queue": "node --loader ts-node/esm --require dotenv/config bin/recover-mrt-queue.ts", + "atproto:setup": "node --loader ts-node/esm --require dotenv/config bin/atproto-setup.ts", + "atproto:demo": "node --loader ts-node/esm bin/atproto-demo.mts", + "atproto:proxy": "node --loader ts-node/esm bin/atproto-proxy.mts" }, "author": "Roostorg", "license": "ISC",