import { getLogger } from "@logtape/logtape"; import type { Entity } from "@versia/federation/types"; import { Instance, User } from "@versia/kit/db"; import { Queue, Worker } from "bullmq"; import type { SocketAddress } from "bun"; import chalk from "chalk"; import IORedis from "ioredis"; import { InboxProcessor } from "./classes/inbox/processor.ts"; import { config } from "./packages/config-manager/index.ts"; import type { KnownEntity } from "./types/api.ts"; const connection = new IORedis({ host: config.redis.queue.host, port: config.redis.queue.port, password: config.redis.queue.password, db: config.redis.queue.database, maxRetriesPerRequest: null, }); export enum DeliveryJobType { FederateEntity = "federateEntity", } export enum InboxJobType { ProcessEntity = "processEntity", } export type InboxJobData = { data: Entity; headers: { "x-signature"?: string; "x-nonce"?: string; "x-signed-by"?: string; authorization?: string; }; request: { url: string; method: string; body: string; }; ip: SocketAddress | null; }; export type DeliveryJobData = { entity: KnownEntity; recipientId: string; senderId: string; }; export const deliveryQueue = new Queue( "delivery", { connection, }, ); export const inboxQueue = new Queue( "inbox", { connection, }, ); export const deliveryWorker = new Worker< DeliveryJobData, void, DeliveryJobType >( deliveryQueue.name, async (job) => { switch (job.name) { case DeliveryJobType.FederateEntity: { const { entity, recipientId, senderId } = job.data; const logger = getLogger(["federation", "delivery"]); const sender = await User.fromId(senderId); if (!sender) { throw new Error( `Could not resolve sender ID ${chalk.gray(senderId)}`, ); } const recipient = await User.fromId(recipientId); if (!recipient) { throw new Error( `Could not resolve recipient ID ${chalk.gray(recipientId)}`, ); } logger.debug`Federating entity ${chalk.gray( entity.id, )} from ${chalk.gray(`@${sender.getAcct()}`)} to ${chalk.gray( recipient.getAcct(), )}`; await sender.federateToUser(entity, recipient); logger.debug`${chalk.green( "✔", )} Finished federating entity ${chalk.gray(entity.id)}`; } } }, { connection, removeOnComplete: { age: config.queues.delivery.remove_on_complete, }, removeOnFail: { age: config.queues.delivery.remove_on_failure, }, }, ); export const inboxWorker = new Worker( inboxQueue.name, async (job) => { switch (job.name) { case InboxJobType.ProcessEntity: { const { data, headers: { "x-signature": signature, "x-nonce": nonce, "x-signed-by": signedBy, authorization, }, request, ip, } = job.data; const logger = getLogger(["federation", "inbox"]); logger.debug`Processing entity ${chalk.gray( data.id, )} from ${chalk.gray(signedBy)}`; if (authorization) { const processor = new InboxProcessor( request, data, null, { signature, nonce, authorization, }, logger, ip, ); logger.debug`Entity ${chalk.gray( data.id, )} is potentially from a bridge`; return await processor.process(); } // If not potentially from bridge, check for required headers if (!(signature && nonce && signedBy)) { return Response.json( { error: "Missing required headers: x-signature, x-nonce, or x-signed-by", }, { status: 400, }, ); } const sender = await User.resolve(signedBy); if (!(sender || signedBy.startsWith("instance "))) { return Response.json( { error: `Couldn't resolve sender URI ${signedBy}` }, { status: 404, }, ); } if (sender?.isLocal()) { return Response.json( { error: "Cannot process federation requests from local users", }, { status: 400, }, ); } const remoteInstance = sender ? await Instance.fromUser(sender) : await Instance.resolveFromHost(signedBy.split(" ")[1]); if (!remoteInstance) { return Response.json( { error: "Could not resolve the remote instance." }, { status: 500, }, ); } logger.debug`Entity ${chalk.gray( data.id, )} is from remote instance ${chalk.gray( remoteInstance.data.baseUrl, )}`; if (!remoteInstance.data.publicKey?.key) { throw new Error( `Instance ${remoteInstance.data.baseUrl} has no public key stored in database`, ); } const processor = new InboxProcessor( request, data, { instance: remoteInstance, key: sender?.data.publicKey ?? remoteInstance.data.publicKey.key, }, { signature, nonce, authorization, }, logger, ip, ); const output = await processor.process(); logger.debug`${chalk.green( "✔", )} Finished processing entity ${chalk.gray(data.id)}`; return output; } default: { throw new Error(`Unknown job type: ${job.name}`); } } }, { connection, removeOnComplete: { age: config.queues.inbox.remove_on_complete, }, removeOnFail: { age: config.queues.inbox.remove_on_failure, }, }, );