diff --git a/classes/database/note.ts b/classes/database/note.ts index 139b737c..fdc835a8 100644 --- a/classes/database/note.ts +++ b/classes/database/note.ts @@ -41,6 +41,7 @@ import { parseTextMentions, } from "~/classes/functions/status"; import { config } from "~/packages/config-manager"; +import { DeliveryJobType, deliveryQueue } from "~/worker.ts"; import { Application } from "./application.ts"; import { Attachment } from "./attachment.ts"; import { BaseInterface } from "./base.ts"; @@ -310,9 +311,16 @@ export class Note extends BaseInterface { public async federateToUsers(): Promise { const users = await this.getUsersToFederateTo(); - for (const user of users) { - await this.author.federateToUser(this.toVersia(), user); - } + await deliveryQueue.addBulk( + users.map((user) => ({ + data: { + entity: this.toVersia(), + recipientId: user.id, + senderId: this.author.id, + }, + name: DeliveryJobType.FederateEntity, + })), + ); } /** diff --git a/classes/database/user.ts b/classes/database/user.ts index 3c496a3b..6ec91ef1 100644 --- a/classes/database/user.ts +++ b/classes/database/user.ts @@ -1015,9 +1015,9 @@ export class User extends BaseInterface { }, ); } catch (e) { - getLogger(["federation", "outbox"]) + getLogger(["federation", "delivery"]) .error`Federating ${chalk.gray(entity.type)} to ${user.getUri()} ${chalk.bold.red("failed")}`; - getLogger(["federation", "outbox"]).error`${e}`; + getLogger(["federation", "delivery"]).error`${e}`; sentry?.captureException(e); return { ok: false }; diff --git a/utils/loggers.ts b/utils/loggers.ts index e4f51e2f..8c583d05 100644 --- a/utils/loggers.ts +++ b/utils/loggers.ts @@ -180,7 +180,7 @@ export const configureLoggers = (silent = false): Promise => filters: ["configFilter"], }, { - category: ["federation", "outbox"], + category: ["federation", "delivery"], sinks: ["console", "file"], filters: ["configFilter"], }, diff --git a/worker.ts b/worker.ts index 92e75a92..b884860e 100644 --- a/worker.ts +++ b/worker.ts @@ -1,12 +1,13 @@ import { getLogger } from "@logtape/logtape"; import type { Entity } from "@versia/federation/types"; -import { Instance, Note, User } from "@versia/kit/db"; +import { Instance, User } from "@versia/kit/db"; import { Queue, Worker } from "bullmq"; import type { SocketAddress } from "bun"; import chalk from "chalk"; import IORedis from "ioredis"; import { InboxProcessor } from "./classes/inbox/processor.ts"; import { config } from "./packages/config-manager/index.ts"; +import type { KnownEntity } from "./types/api.ts"; const connection = new IORedis({ host: config.redis.queue.host, @@ -17,7 +18,7 @@ const connection = new IORedis({ }); export enum DeliveryJobType { - FederateNote = "federateNote", + FederateEntity = "federateEntity", } export enum InboxJobType { @@ -40,7 +41,13 @@ export type InboxJobData = { ip: SocketAddress | null; }; -const deliveryQueue = new Queue<{ noteId: string }, void, DeliveryJobType>( +export type DeliveryJobData = { + entity: KnownEntity; + recipientId: string; + senderId: string; +}; + +export const deliveryQueue = new Queue( "delivery", { connection, @@ -55,25 +62,45 @@ export const inboxQueue = new Queue( ); export const deliveryWorker = new Worker< - { noteId: string }, + DeliveryJobData, void, DeliveryJobType >( deliveryQueue.name, async (job) => { switch (job.name) { - case DeliveryJobType.FederateNote: { - const noteId = job.data.noteId; + case DeliveryJobType.FederateEntity: { + const { entity, recipientId, senderId } = job.data; - const note = await Note.fromId(noteId); + const logger = getLogger(["federation", "delivery"]); - if (!note) { + const sender = await User.fromId(senderId); + + if (!sender) { throw new Error( - `Note with ID ${noteId} not found in database`, + `Could not resolve sender ID ${chalk.gray(senderId)}`, ); } - await note.federateToUsers(); + const recipient = await User.fromId(recipientId); + + if (!recipient) { + throw new Error( + `Could not resolve recipient ID ${chalk.gray(recipientId)}`, + ); + } + + logger.debug`Federating entity ${chalk.gray( + entity.id, + )} from ${chalk.gray(`@${sender.getAcct()}`)} to ${chalk.gray( + recipient.getAcct(), + )}`; + + await sender.federateToUser(entity, recipient); + + logger.debug`${chalk.green( + "✔", + )} Finished federating entity ${chalk.gray(entity.id)}`; } } },