From 5fc6c4dcfa5298ed85fcf5802f449bd5fcb95ca8 Mon Sep 17 00:00:00 2001 From: Jesse Wierzbinski Date: Mon, 25 Nov 2024 20:29:59 +0100 Subject: [PATCH] feat(federation): :sparkles: Implement queue for fetches --- CHANGELOG.md | 2 +- cli/commands/federation/instance/refetch.ts | 29 ++-------- config/config.example.toml | 7 +++ config/config.schema.json | 22 +++++++ packages/config-manager/config.type.ts | 21 +++++++ utils/bull-board.ts | 3 +- worker.ts | 64 +++++++++++++++++++++ 7 files changed, 122 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aedfc16a..37107ae0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ Versia Server `0.8.0` is fully backwards compatible with `0.7.0`. ## Features -- Outbound federation and inbox processing are now handled by a queue system (like most federated software). +- Outbound federation, inbox processing and data fetching are now handled by a queue system (like most federated software). - Added an administration UI for managing the queue. - Upgraded Bun to `1.1.36`. - Implemented support for the [Instance Messaging Extension](https://versia.pub/extensions/instance-messaging) diff --git a/cli/commands/federation/instance/refetch.ts b/cli/commands/federation/instance/refetch.ts index 7b74fe1d..22dd8b9d 100644 --- a/cli/commands/federation/instance/refetch.ts +++ b/cli/commands/federation/instance/refetch.ts @@ -4,7 +4,7 @@ import { Instances } from "@versia/kit/tables"; import { eq } from "drizzle-orm"; import ora from "ora"; import { BaseCommand } from "~/cli/base"; -import { formatArray } from "~/cli/utils/format"; +import { FetchJobType, fetchQueue } from "~/worker"; export default class FederationInstanceRefetch extends BaseCommand< typeof FederationInstanceRefetch @@ -36,30 +36,11 @@ export default class FederationInstanceRefetch extends BaseCommand< throw new Error("Instance not found"); } - const data = await instance.updateFromRemote(); + await fetchQueue.add(FetchJobType.Instance, { + uri: args.url, + }); - if (!data) { - spinner.fail("Failed to refetch instance metadata"); - this.exit(1); - } - - spinner.succeed("Refetched instance metadata"); - - const { name, baseUrl, protocol, version } = data.data; - - this.log( - formatArray( - [ - { - Name: name, - "Base URL": baseUrl, - Version: version, - Protocol: protocol, - }, - ], - ["Name", "Base URL", "Version", "Protocol"], - ), - ); + spinner.succeed("Task added to queue"); this.exit(0); } diff --git a/config/config.example.toml b/config/config.example.toml index c4b01d2f..9a84214d 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -274,6 +274,13 @@ remove_on_complete = 31536000 # Time in seconds to remove failed jobs remove_on_failure = 31536000 +# Control the fetch queue (for remote data refreshes) +[queues.fetch] +# Time in seconds to remove completed jobs +remove_on_complete = 31536000 +# Time in seconds to remove failed jobs +remove_on_failure = 31536000 + [federation] # This is a list of domain names, such as "mastodon.social" or "pleroma.site" # These changes will not retroactively apply to existing data before they were changed diff --git a/config/config.schema.json b/config/config.schema.json index e4738e5d..ecb27697 100644 --- a/config/config.schema.json +++ b/config/config.schema.json @@ -3336,6 +3336,24 @@ "remove_on_complete": 31536000, "remove_on_failure": 31536000 } + }, + "fetch": { + "type": "object", + "properties": { + "remove_on_complete": { + "type": "integer", + "default": 31536000 + }, + "remove_on_failure": { + "type": "integer", + "default": 31536000 + } + }, + "additionalProperties": false, + "default": { + "remove_on_complete": 31536000, + "remove_on_failure": 31536000 + } } }, "additionalProperties": false, @@ -3347,6 +3365,10 @@ "inbox": { "remove_on_complete": 31536000, "remove_on_failure": 31536000 + }, + "fetch": { + "remove_on_complete": 31536000, + "remove_on_failure": 31536000 } } }, diff --git a/packages/config-manager/config.type.ts b/packages/config-manager/config.type.ts index a1021f06..d636ff60 100644 --- a/packages/config-manager/config.type.ts +++ b/packages/config-manager/config.type.ts @@ -561,6 +561,23 @@ export const configValidator = z remove_on_complete: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365, }), + fetch: z + .object({ + remove_on_complete: z + .number() + .int() + // 1 year + .default(60 * 60 * 24 * 365), + remove_on_failure: z + .number() + .int() + // 1 year + .default(60 * 60 * 24 * 365), + }) + .default({ + remove_on_complete: 60 * 60 * 24 * 365, + remove_on_failure: 60 * 60 * 24 * 365, + }), }) .default({ delivery: { @@ -571,6 +588,10 @@ export const configValidator = z remove_on_complete: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365, }, + fetch: { + remove_on_complete: 60 * 60 * 24 * 365, + remove_on_failure: 60 * 60 * 24 * 365, + }, }), instance: z .object({ diff --git a/utils/bull-board.ts b/utils/bull-board.ts index 38c33395..628ff638 100644 --- a/utils/bull-board.ts +++ b/utils/bull-board.ts @@ -5,7 +5,7 @@ import { serveStatic } from "@hono/hono/bun"; import type { OpenAPIHono } from "@hono/zod-openapi"; import { config } from "~/packages/config-manager"; import type { HonoEnv } from "~/types/api"; -import { deliveryQueue, inboxQueue } from "~/worker"; +import { deliveryQueue, fetchQueue, inboxQueue } from "~/worker"; export const applyToHono = (app: OpenAPIHono): void => { const serverAdapter = new HonoAdapter(serveStatic); @@ -14,6 +14,7 @@ export const applyToHono = (app: OpenAPIHono): void => { queues: [ new BullMQAdapter(inboxQueue), new BullMQAdapter(deliveryQueue), + new BullMQAdapter(fetchQueue), ], serverAdapter, options: { diff --git a/worker.ts b/worker.ts index 6c7ca2d2..a48e850d 100644 --- a/worker.ts +++ b/worker.ts @@ -4,8 +4,10 @@ import { Instance, User } from "@versia/kit/db"; import { Queue, Worker } from "bullmq"; import type { SocketAddress } from "bun"; import chalk from "chalk"; +import { eq } from "drizzle-orm"; import IORedis from "ioredis"; import { InboxProcessor } from "./classes/inbox/processor.ts"; +import { Instances } from "./drizzle/schema.ts"; import { config } from "./packages/config-manager/index.ts"; import type { KnownEntity } from "./types/api.ts"; @@ -25,6 +27,12 @@ export enum InboxJobType { ProcessEntity = "processEntity", } +export enum FetchJobType { + Instance = "instance", + User = "user", + Note = "user", +} + export type InboxJobData = { data: Entity; headers: { @@ -47,6 +55,11 @@ export type DeliveryJobData = { senderId: string; }; +export type FetchJobData = { + uri: string; + refetcher?: string; +}; + export const deliveryQueue = new Queue( "delivery", { @@ -61,6 +74,10 @@ export const inboxQueue = new Queue( }, ); +export const fetchQueue = new Queue("fetch", { + connection, +}); + export const deliveryWorker = new Worker< DeliveryJobData, void, @@ -260,3 +277,50 @@ export const inboxWorker = new Worker( }, }, ); + +export const fetchWorker = new Worker( + fetchQueue.name, + async (job) => { + switch (job.name) { + case FetchJobType.Instance: { + const { uri } = job.data; + + await job.log(`Fetching instance metadata from [${uri}]`); + + // Check if exists + const host = new URL(uri).host; + + const existingInstance = await Instance.fromSql( + eq(Instances.baseUrl, host), + ); + + if (existingInstance) { + await job.log("Instance is known, refetching remote data."); + + await existingInstance.updateFromRemote(); + + await job.log(`Instance [${uri}] successfully refetched`); + + return; + } + + await Instance.resolve(uri); + + await job.log( + `${chalk.green( + "✔", + )} Finished fetching instance metadata from [${uri}]`, + ); + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.fetch.remove_on_complete, + }, + removeOnFail: { + age: config.queues.fetch.remove_on_failure, + }, + }, +);