feat(federation): Implement queue for fetches

This commit is contained in:
Jesse Wierzbinski 2024-11-25 20:29:59 +01:00
parent 79cf43d752
commit 5fc6c4dcfa
No known key found for this signature in database
7 changed files with 122 additions and 26 deletions

View file

@ -6,7 +6,7 @@ Versia Server `0.8.0` is fully backwards compatible with `0.7.0`.
## Features ## Features
- Outbound federation and inbox processing are now handled by a queue system (like most federated software). - Outbound federation, inbox processing and data fetching are now handled by a queue system (like most federated software).
- Added an administration UI for managing the queue. - Added an administration UI for managing the queue.
- Upgraded Bun to `1.1.36`. - Upgraded Bun to `1.1.36`.
- Implemented support for the [Instance Messaging Extension](https://versia.pub/extensions/instance-messaging) - Implemented support for the [Instance Messaging Extension](https://versia.pub/extensions/instance-messaging)

View file

@ -4,7 +4,7 @@ import { Instances } from "@versia/kit/tables";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
import ora from "ora"; import ora from "ora";
import { BaseCommand } from "~/cli/base"; import { BaseCommand } from "~/cli/base";
import { formatArray } from "~/cli/utils/format"; import { FetchJobType, fetchQueue } from "~/worker";
export default class FederationInstanceRefetch extends BaseCommand< export default class FederationInstanceRefetch extends BaseCommand<
typeof FederationInstanceRefetch typeof FederationInstanceRefetch
@ -36,30 +36,11 @@ export default class FederationInstanceRefetch extends BaseCommand<
throw new Error("Instance not found"); throw new Error("Instance not found");
} }
const data = await instance.updateFromRemote(); await fetchQueue.add(FetchJobType.Instance, {
uri: args.url,
});
if (!data) { spinner.succeed("Task added to queue");
spinner.fail("Failed to refetch instance metadata");
this.exit(1);
}
spinner.succeed("Refetched instance metadata");
const { name, baseUrl, protocol, version } = data.data;
this.log(
formatArray(
[
{
Name: name,
"Base URL": baseUrl,
Version: version,
Protocol: protocol,
},
],
["Name", "Base URL", "Version", "Protocol"],
),
);
this.exit(0); this.exit(0);
} }

View file

@ -274,6 +274,13 @@ remove_on_complete = 31536000
# Time in seconds to remove failed jobs # Time in seconds to remove failed jobs
remove_on_failure = 31536000 remove_on_failure = 31536000
# Control the fetch queue (for remote data refreshes)
[queues.fetch]
# Time in seconds to remove completed jobs
remove_on_complete = 31536000
# Time in seconds to remove failed jobs
remove_on_failure = 31536000
[federation] [federation]
# This is a list of domain names, such as "mastodon.social" or "pleroma.site" # This is a list of domain names, such as "mastodon.social" or "pleroma.site"
# These changes will not retroactively apply to existing data before they were changed # These changes will not retroactively apply to existing data before they were changed

View file

@ -3336,6 +3336,24 @@
"remove_on_complete": 31536000, "remove_on_complete": 31536000,
"remove_on_failure": 31536000 "remove_on_failure": 31536000
} }
},
"fetch": {
"type": "object",
"properties": {
"remove_on_complete": {
"type": "integer",
"default": 31536000
},
"remove_on_failure": {
"type": "integer",
"default": 31536000
}
},
"additionalProperties": false,
"default": {
"remove_on_complete": 31536000,
"remove_on_failure": 31536000
}
} }
}, },
"additionalProperties": false, "additionalProperties": false,
@ -3347,6 +3365,10 @@
"inbox": { "inbox": {
"remove_on_complete": 31536000, "remove_on_complete": 31536000,
"remove_on_failure": 31536000 "remove_on_failure": 31536000
},
"fetch": {
"remove_on_complete": 31536000,
"remove_on_failure": 31536000
} }
} }
}, },

View file

@ -561,6 +561,23 @@ export const configValidator = z
remove_on_complete: 60 * 60 * 24 * 365, remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365,
}), }),
fetch: z
.object({
remove_on_complete: z
.number()
.int()
// 1 year
.default(60 * 60 * 24 * 365),
remove_on_failure: z
.number()
.int()
// 1 year
.default(60 * 60 * 24 * 365),
})
.default({
remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365,
}),
}) })
.default({ .default({
delivery: { delivery: {
@ -571,6 +588,10 @@ export const configValidator = z
remove_on_complete: 60 * 60 * 24 * 365, remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365,
}, },
fetch: {
remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365,
},
}), }),
instance: z instance: z
.object({ .object({

View file

@ -5,7 +5,7 @@ import { serveStatic } from "@hono/hono/bun";
import type { OpenAPIHono } from "@hono/zod-openapi"; import type { OpenAPIHono } from "@hono/zod-openapi";
import { config } from "~/packages/config-manager"; import { config } from "~/packages/config-manager";
import type { HonoEnv } from "~/types/api"; import type { HonoEnv } from "~/types/api";
import { deliveryQueue, inboxQueue } from "~/worker"; import { deliveryQueue, fetchQueue, inboxQueue } from "~/worker";
export const applyToHono = (app: OpenAPIHono<HonoEnv>): void => { export const applyToHono = (app: OpenAPIHono<HonoEnv>): void => {
const serverAdapter = new HonoAdapter(serveStatic); const serverAdapter = new HonoAdapter(serveStatic);
@ -14,6 +14,7 @@ export const applyToHono = (app: OpenAPIHono<HonoEnv>): void => {
queues: [ queues: [
new BullMQAdapter(inboxQueue), new BullMQAdapter(inboxQueue),
new BullMQAdapter(deliveryQueue), new BullMQAdapter(deliveryQueue),
new BullMQAdapter(fetchQueue),
], ],
serverAdapter, serverAdapter,
options: { options: {

View file

@ -4,8 +4,10 @@ import { Instance, User } from "@versia/kit/db";
import { Queue, Worker } from "bullmq"; import { Queue, Worker } from "bullmq";
import type { SocketAddress } from "bun"; import type { SocketAddress } from "bun";
import chalk from "chalk"; import chalk from "chalk";
import { eq } from "drizzle-orm";
import IORedis from "ioredis"; import IORedis from "ioredis";
import { InboxProcessor } from "./classes/inbox/processor.ts"; import { InboxProcessor } from "./classes/inbox/processor.ts";
import { Instances } from "./drizzle/schema.ts";
import { config } from "./packages/config-manager/index.ts"; import { config } from "./packages/config-manager/index.ts";
import type { KnownEntity } from "./types/api.ts"; import type { KnownEntity } from "./types/api.ts";
@ -25,6 +27,12 @@ export enum InboxJobType {
ProcessEntity = "processEntity", ProcessEntity = "processEntity",
} }
export enum FetchJobType {
Instance = "instance",
User = "user",
Note = "user",
}
export type InboxJobData = { export type InboxJobData = {
data: Entity; data: Entity;
headers: { headers: {
@ -47,6 +55,11 @@ export type DeliveryJobData = {
senderId: string; senderId: string;
}; };
export type FetchJobData = {
uri: string;
refetcher?: string;
};
export const deliveryQueue = new Queue<DeliveryJobData, void, DeliveryJobType>( export const deliveryQueue = new Queue<DeliveryJobData, void, DeliveryJobType>(
"delivery", "delivery",
{ {
@ -61,6 +74,10 @@ export const inboxQueue = new Queue<InboxJobData, Response, InboxJobType>(
}, },
); );
export const fetchQueue = new Queue<FetchJobData, void, FetchJobType>("fetch", {
connection,
});
export const deliveryWorker = new Worker< export const deliveryWorker = new Worker<
DeliveryJobData, DeliveryJobData,
void, void,
@ -260,3 +277,50 @@ export const inboxWorker = new Worker<InboxJobData, Response, InboxJobType>(
}, },
}, },
); );
export const fetchWorker = new Worker<FetchJobData, void, FetchJobType>(
fetchQueue.name,
async (job) => {
switch (job.name) {
case FetchJobType.Instance: {
const { uri } = job.data;
await job.log(`Fetching instance metadata from [${uri}]`);
// Check if exists
const host = new URL(uri).host;
const existingInstance = await Instance.fromSql(
eq(Instances.baseUrl, host),
);
if (existingInstance) {
await job.log("Instance is known, refetching remote data.");
await existingInstance.updateFromRemote();
await job.log(`Instance [${uri}] successfully refetched`);
return;
}
await Instance.resolve(uri);
await job.log(
`${chalk.green(
"✔",
)} Finished fetching instance metadata from [${uri}]`,
);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.fetch.remove_on_complete,
},
removeOnFail: {
age: config.queues.fetch.remove_on_failure,
},
},
);