feat(federation): Add queue to note delivery

This commit is contained in:
Jesse Wierzbinski 2024-11-25 11:29:48 +01:00
parent 3ef361f521
commit 8a920218ea
No known key found for this signature in database
4 changed files with 51 additions and 16 deletions

View file

@ -41,6 +41,7 @@ import {
parseTextMentions, parseTextMentions,
} from "~/classes/functions/status"; } from "~/classes/functions/status";
import { config } from "~/packages/config-manager"; import { config } from "~/packages/config-manager";
import { DeliveryJobType, deliveryQueue } from "~/worker.ts";
import { Application } from "./application.ts"; import { Application } from "./application.ts";
import { Attachment } from "./attachment.ts"; import { Attachment } from "./attachment.ts";
import { BaseInterface } from "./base.ts"; import { BaseInterface } from "./base.ts";
@ -310,9 +311,16 @@ export class Note extends BaseInterface<typeof Notes, NoteTypeWithRelations> {
public async federateToUsers(): Promise<void> { public async federateToUsers(): Promise<void> {
const users = await this.getUsersToFederateTo(); const users = await this.getUsersToFederateTo();
for (const user of users) { await deliveryQueue.addBulk(
await this.author.federateToUser(this.toVersia(), user); users.map((user) => ({
} data: {
entity: this.toVersia(),
recipientId: user.id,
senderId: this.author.id,
},
name: DeliveryJobType.FederateEntity,
})),
);
} }
/** /**

View file

@ -1015,9 +1015,9 @@ export class User extends BaseInterface<typeof Users, UserWithRelations> {
}, },
); );
} catch (e) { } catch (e) {
getLogger(["federation", "outbox"]) getLogger(["federation", "delivery"])
.error`Federating ${chalk.gray(entity.type)} to ${user.getUri()} ${chalk.bold.red("failed")}`; .error`Federating ${chalk.gray(entity.type)} to ${user.getUri()} ${chalk.bold.red("failed")}`;
getLogger(["federation", "outbox"]).error`${e}`; getLogger(["federation", "delivery"]).error`${e}`;
sentry?.captureException(e); sentry?.captureException(e);
return { ok: false }; return { ok: false };

View file

@ -180,7 +180,7 @@ export const configureLoggers = (silent = false): Promise<void> =>
filters: ["configFilter"], filters: ["configFilter"],
}, },
{ {
category: ["federation", "outbox"], category: ["federation", "delivery"],
sinks: ["console", "file"], sinks: ["console", "file"],
filters: ["configFilter"], filters: ["configFilter"],
}, },

View file

@ -1,12 +1,13 @@
import { getLogger } from "@logtape/logtape"; import { getLogger } from "@logtape/logtape";
import type { Entity } from "@versia/federation/types"; import type { Entity } from "@versia/federation/types";
import { Instance, Note, User } from "@versia/kit/db"; import { Instance, User } from "@versia/kit/db";
import { Queue, Worker } from "bullmq"; import { Queue, Worker } from "bullmq";
import type { SocketAddress } from "bun"; import type { SocketAddress } from "bun";
import chalk from "chalk"; import chalk from "chalk";
import IORedis from "ioredis"; import IORedis from "ioredis";
import { InboxProcessor } from "./classes/inbox/processor.ts"; import { InboxProcessor } from "./classes/inbox/processor.ts";
import { config } from "./packages/config-manager/index.ts"; import { config } from "./packages/config-manager/index.ts";
import type { KnownEntity } from "./types/api.ts";
const connection = new IORedis({ const connection = new IORedis({
host: config.redis.queue.host, host: config.redis.queue.host,
@ -17,7 +18,7 @@ const connection = new IORedis({
}); });
export enum DeliveryJobType { export enum DeliveryJobType {
FederateNote = "federateNote", FederateEntity = "federateEntity",
} }
export enum InboxJobType { export enum InboxJobType {
@ -40,7 +41,13 @@ export type InboxJobData = {
ip: SocketAddress | null; ip: SocketAddress | null;
}; };
const deliveryQueue = new Queue<{ noteId: string }, void, DeliveryJobType>( export type DeliveryJobData = {
entity: KnownEntity;
recipientId: string;
senderId: string;
};
export const deliveryQueue = new Queue<DeliveryJobData, void, DeliveryJobType>(
"delivery", "delivery",
{ {
connection, connection,
@ -55,25 +62,45 @@ export const inboxQueue = new Queue<InboxJobData, Response, InboxJobType>(
); );
export const deliveryWorker = new Worker< export const deliveryWorker = new Worker<
{ noteId: string }, DeliveryJobData,
void, void,
DeliveryJobType DeliveryJobType
>( >(
deliveryQueue.name, deliveryQueue.name,
async (job) => { async (job) => {
switch (job.name) { switch (job.name) {
case DeliveryJobType.FederateNote: { case DeliveryJobType.FederateEntity: {
const noteId = job.data.noteId; const { entity, recipientId, senderId } = job.data;
const note = await Note.fromId(noteId); const logger = getLogger(["federation", "delivery"]);
if (!note) { const sender = await User.fromId(senderId);
if (!sender) {
throw new Error( throw new Error(
`Note with ID ${noteId} not found in database`, `Could not resolve sender ID ${chalk.gray(senderId)}`,
); );
} }
await note.federateToUsers(); const recipient = await User.fromId(recipientId);
if (!recipient) {
throw new Error(
`Could not resolve recipient ID ${chalk.gray(recipientId)}`,
);
}
logger.debug`Federating entity ${chalk.gray(
entity.id,
)} from ${chalk.gray(`@${sender.getAcct()}`)} to ${chalk.gray(
recipient.getAcct(),
)}`;
await sender.federateToUser(entity, recipient);
logger.debug`${chalk.green(
"✔",
)} Finished federating entity ${chalk.gray(entity.id)}`;
} }
} }
}, },