feat: Split off queue workers into a separate worker process

This commit is contained in:
Jesse Wierzbinski 2024-11-25 21:54:31 +01:00
parent 0b3e74107e
commit 1b98381242
No known key found for this signature in database
34 changed files with 987 additions and 676 deletions

9
.editorconfig Normal file
View file

@ -0,0 +1,9 @@
root = true
[*]
charset = utf-8
end_of_line = lf
indent_style = space
insert_final_newline = true
tab_width = 4
trim_trailing_whitespace = true

View file

@ -1,13 +1,13 @@
name: Check Types
name: Check Types
on:
on:
push:
branches: ["*"]
pull_request:
# The branches below must be a subset of the branches above
branches: ["main"]
jobs:
jobs:
tests:
runs-on: ubuntu-latest
permissions:

View file

@ -1,74 +0,0 @@
name: Docker Build
on:
push:
branches: [ "main" ]
# Publish semver tags as releases.
tags: [ 'v*.*.*' ]
pull_request:
branches: [ "main" ]
env:
# Use docker.io for Docker Hub if empty
REGISTRY: ghcr.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive
- name: Setup QEMU
uses: docker/setup-qemu-action@v3
with:
platforms: all
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3 # v3.0.0
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@v3 # v3.0.0
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5 # v5.0.0
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Get the commit hash
run: echo "GIT_COMMIT=$(git rev-parse --short ${{ github.sha }})" >> $GITHUB_ENV
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5 # v5.0.0
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
GIT_COMMIT=${{ env.GIT_COMMIT }}
provenance: mode=max
sbom: true
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max

72
.github/workflows/docker-server.yml vendored Normal file
View file

@ -0,0 +1,72 @@
name: Build Server Docker Image
on:
push:
branches: ["main"]
# Publish semver tags as releases.
tags: ["v*.*.*"]
pull_request:
branches: ["main"]
env:
# Use docker.io for Docker Hub if empty
REGISTRY: ghcr.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive
- name: Setup QEMU
uses: docker/setup-qemu-action@v3
with:
platforms: all
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3 # v3.0.0
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@v3 # v3.0.0
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5 # v5.0.0
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Get the commit hash
run: echo "GIT_COMMIT=$(git rev-parse --short ${{ github.sha }})" >> $GITHUB_ENV
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5 # v5.0.0
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
GIT_COMMIT=${{ env.GIT_COMMIT }}
provenance: mode=max
sbom: true
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max

73
.github/workflows/docker-worker.yml vendored Normal file
View file

@ -0,0 +1,73 @@
name: Build Worker Docker Image
on:
push:
branches: ["main"]
# Publish semver tags as releases.
tags: ["v*.*.*"]
pull_request:
branches: ["main"]
env:
# Use docker.io for Docker Hub if empty
REGISTRY: ghcr.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive
- name: Setup QEMU
uses: docker/setup-qemu-action@v3
with:
platforms: all
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3 # v3.0.0
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@v3 # v3.0.0
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5 # v5.0.0
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Get the commit hash
run: echo "GIT_COMMIT=$(git rev-parse --short ${{ github.sha }})" >> $GITHUB_ENV
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5 # v5.0.0
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
GIT_COMMIT=${{ env.GIT_COMMIT }}
file: ./Worker.Dockerfile
provenance: mode=max
sbom: true
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max

53
Worker.Dockerfile Normal file
View file

@ -0,0 +1,53 @@
# Node is required for building the project
FROM imbios/bun-node:1-20-alpine AS base
RUN apk add --no-cache libstdc++
# Install dependencies into temp directory
# This will cache them and speed up future builds
FROM base AS install
RUN mkdir -p /temp
COPY . /temp
WORKDIR /temp
RUN bun install --production
FROM base AS build
# Copy the project
RUN mkdir -p /temp
COPY . /temp
# Copy dependencies
COPY --from=install /temp/node_modules /temp/node_modules
# Build the project
WORKDIR /temp
RUN bun run build:worker
WORKDIR /temp/dist
# Copy production dependencies and source code into final image
FROM oven/bun:1.1.36-alpine
# Install libstdc++ for Bun and create app directory
RUN apk add --no-cache libstdc++ && \
mkdir -p /app
COPY --from=build /temp/dist /app/dist
COPY entrypoint.sh /app
LABEL org.opencontainers.image.authors="Gaspard Wierzbinski (https://cpluspatch.dev)"
LABEL org.opencontainers.image.source="https://github.com/versia-pub/server"
LABEL org.opencontainers.image.vendor="Versia Pub"
LABEL org.opencontainers.image.licenses="AGPL-3.0-or-later"
LABEL org.opencontainers.image.title="Versia Server Worker"
LABEL org.opencontainers.image.description="Versia Server Worker Docker image"
# Set current Git commit hash as an environment variable
ARG GIT_COMMIT
ENV GIT_COMMIT=$GIT_COMMIT
# CD to app
WORKDIR /app/dist
ENV NODE_ENV=production
# Run migrations and start the server
CMD [ "bun", "run", "index.js" ]

View file

@ -2,7 +2,7 @@ import { apiRoute, applyConfig } from "@/api";
import { createRoute } from "@hono/zod-openapi";
import type { Entity } from "@versia/federation/types";
import { z } from "zod";
import { InboxJobType, inboxQueue } from "~/worker";
import { InboxJobType, inboxQueue } from "~/classes/queues/inbox";
export const meta = applyConfig({
auth: {

View file

@ -2,8 +2,8 @@ import { apiRoute, applyConfig } from "@/api";
import { createRoute } from "@hono/zod-openapi";
import type { Entity } from "@versia/federation/types";
import { z } from "zod";
import { InboxJobType, inboxQueue } from "~/classes/queues/inbox";
import { ErrorSchema } from "~/types/api";
import { InboxJobType, inboxQueue } from "~/worker";
export const meta = applyConfig({
auth: {

26
build-worker.ts Normal file
View file

@ -0,0 +1,26 @@
import { $ } from "bun";
import ora from "ora";
const buildSpinner = ora("Building").start();
await $`rm -rf dist && mkdir dist`;
await Bun.build({
entrypoints: ["entrypoints/worker/index.ts"],
outdir: "dist",
target: "bun",
splitting: true,
minify: false,
}).then((output) => {
if (!output.success) {
console.error(output.logs);
throw new Error("Build failed");
}
});
buildSpinner.text = "Transforming";
// Copy Drizzle migrations to dist
await $`cp -r drizzle dist/drizzle`;
buildSpinner.stop();

View file

@ -41,7 +41,7 @@ import {
parseTextMentions,
} from "~/classes/functions/status";
import { config } from "~/packages/config-manager";
import { DeliveryJobType, deliveryQueue } from "~/worker.ts";
import { DeliveryJobType, deliveryQueue } from "../queues/delivery.ts";
import { Application } from "./application.ts";
import { Attachment } from "./attachment.ts";
import { BaseInterface } from "./base.ts";

View file

@ -53,7 +53,7 @@ import { findManyUsers } from "~/classes/functions/user";
import { searchManager } from "~/classes/search/search-manager";
import { type Config, config } from "~/packages/config-manager";
import type { KnownEntity } from "~/types/api.ts";
import { DeliveryJobType, deliveryQueue } from "~/worker.ts";
import { DeliveryJobType, deliveryQueue } from "../queues/delivery.ts";
import { BaseInterface } from "./base.ts";
import { Emoji } from "./emoji.ts";
import { Instance } from "./instance.ts";

View file

@ -0,0 +1,20 @@
import { Queue } from "bullmq";
import type { KnownEntity } from "~/types/api";
import { connection } from "~/utils/redis.ts";
export enum DeliveryJobType {
FederateEntity = "federateEntity",
}
export type DeliveryJobData = {
entity: KnownEntity;
recipientId: string;
senderId: string;
};
export const deliveryQueue = new Queue<DeliveryJobData, void, DeliveryJobType>(
"delivery",
{
connection,
},
);

17
classes/queues/fetch.ts Normal file
View file

@ -0,0 +1,17 @@
import { Queue } from "bullmq";
import { connection } from "~/utils/redis.ts";
export enum FetchJobType {
Instance = "instance",
User = "user",
Note = "user",
}
export type FetchJobData = {
uri: string;
refetcher?: string;
};
export const fetchQueue = new Queue<FetchJobData, void, FetchJobType>("fetch", {
connection,
});

31
classes/queues/inbox.ts Normal file
View file

@ -0,0 +1,31 @@
import type { Entity } from "@versia/federation/types";
import { Queue } from "bullmq";
import type { SocketAddress } from "bun";
import { connection } from "~/utils/redis.ts";
export enum InboxJobType {
ProcessEntity = "processEntity",
}
export type InboxJobData = {
data: Entity;
headers: {
"x-signature"?: string;
"x-nonce"?: string;
"x-signed-by"?: string;
authorization?: string;
};
request: {
url: string;
method: string;
body: string;
};
ip: SocketAddress | null;
};
export const inboxQueue = new Queue<InboxJobData, Response, InboxJobType>(
"inbox",
{
connection,
},
);

View file

@ -0,0 +1,66 @@
import { getLogger } from "@logtape/logtape";
import { User } from "@versia/kit/db";
import { Worker } from "bullmq";
import chalk from "chalk";
import { config } from "~/packages/config-manager";
import { connection } from "~/utils/redis.ts";
import {
type DeliveryJobData,
DeliveryJobType,
deliveryQueue,
} from "../queues/delivery.ts";
export const getDeliveryWorker = (): Worker<
DeliveryJobData,
void,
DeliveryJobType
> =>
new Worker<DeliveryJobData, void, DeliveryJobType>(
deliveryQueue.name,
async (job) => {
switch (job.name) {
case DeliveryJobType.FederateEntity: {
const { entity, recipientId, senderId } = job.data;
const logger = getLogger(["federation", "delivery"]);
const sender = await User.fromId(senderId);
if (!sender) {
throw new Error(
`Could not resolve sender ID ${chalk.gray(senderId)}`,
);
}
const recipient = await User.fromId(recipientId);
if (!recipient) {
throw new Error(
`Could not resolve recipient ID ${chalk.gray(recipientId)}`,
);
}
logger.debug`Federating entity ${chalk.gray(
entity.id,
)} from ${chalk.gray(`@${sender.getAcct()}`)} to ${chalk.gray(
recipient.getAcct(),
)}`;
await sender.federateToUser(entity, recipient);
logger.debug`${chalk.green(
"✔",
)} Finished federating entity ${chalk.gray(entity.id)}`;
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.delivery.remove_on_complete,
},
removeOnFail: {
age: config.queues.delivery.remove_on_failure,
},
},
);

64
classes/workers/fetch.ts Normal file
View file

@ -0,0 +1,64 @@
import { Instance } from "@versia/kit/db";
import { Instances } from "@versia/kit/tables";
import { Worker } from "bullmq";
import chalk from "chalk";
import { eq } from "drizzle-orm";
import { config } from "~/packages/config-manager";
import { connection } from "~/utils/redis.ts";
import {
type FetchJobData,
FetchJobType,
fetchQueue,
} from "../queues/fetch.ts";
export const getFetchWorker = (): Worker<FetchJobData, void, FetchJobType> =>
new Worker<FetchJobData, void, FetchJobType>(
fetchQueue.name,
async (job) => {
switch (job.name) {
case FetchJobType.Instance: {
const { uri } = job.data;
await job.log(`Fetching instance metadata from [${uri}]`);
// Check if exists
const host = new URL(uri).host;
const existingInstance = await Instance.fromSql(
eq(Instances.baseUrl, host),
);
if (existingInstance) {
await job.log(
"Instance is known, refetching remote data.",
);
await existingInstance.updateFromRemote();
await job.log(
`Instance [${uri}] successfully refetched`,
);
return;
}
await Instance.resolve(uri);
await job.log(
`${chalk.green(
"✔",
)} Finished fetching instance metadata from [${uri}]`,
);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.fetch.remove_on_complete,
},
removeOnFail: {
age: config.queues.fetch.remove_on_failure,
},
},
);

167
classes/workers/inbox.ts Normal file
View file

@ -0,0 +1,167 @@
import { getLogger } from "@logtape/logtape";
import { Instance, User } from "@versia/kit/db";
import { Worker } from "bullmq";
import chalk from "chalk";
import { config } from "~/packages/config-manager/index.ts";
import { connection } from "~/utils/redis.ts";
import { InboxProcessor } from "../inbox/processor.ts";
import {
type InboxJobData,
InboxJobType,
inboxQueue,
} from "../queues/inbox.ts";
export const getInboxWorker = (): Worker<
InboxJobData,
Response,
InboxJobType
> =>
new Worker<InboxJobData, Response, InboxJobType>(
inboxQueue.name,
async (job) => {
switch (job.name) {
case InboxJobType.ProcessEntity: {
const {
data,
headers: {
"x-signature": signature,
"x-nonce": nonce,
"x-signed-by": signedBy,
authorization,
},
request,
ip,
} = job.data;
const logger = getLogger(["federation", "inbox"]);
logger.debug`Processing entity ${chalk.gray(
data.id,
)} from ${chalk.gray(signedBy)}`;
if (authorization) {
const processor = new InboxProcessor(
request,
data,
null,
{
signature,
nonce,
authorization,
},
logger,
ip,
);
logger.debug`Entity ${chalk.gray(
data.id,
)} is potentially from a bridge`;
return await processor.process();
}
// If not potentially from bridge, check for required headers
if (!(signature && nonce && signedBy)) {
return Response.json(
{
error: "Missing required headers: x-signature, x-nonce, or x-signed-by",
},
{
status: 400,
},
);
}
const sender = await User.resolve(signedBy);
if (!(sender || signedBy.startsWith("instance "))) {
return Response.json(
{
error: `Couldn't resolve sender URI ${signedBy}`,
},
{
status: 404,
},
);
}
if (sender?.isLocal()) {
return Response.json(
{
error: "Cannot process federation requests from local users",
},
{
status: 400,
},
);
}
const remoteInstance = sender
? await Instance.fromUser(sender)
: await Instance.resolveFromHost(
signedBy.split(" ")[1],
);
if (!remoteInstance) {
return Response.json(
{ error: "Could not resolve the remote instance." },
{
status: 500,
},
);
}
logger.debug`Entity ${chalk.gray(
data.id,
)} is from remote instance ${chalk.gray(
remoteInstance.data.baseUrl,
)}`;
if (!remoteInstance.data.publicKey?.key) {
throw new Error(
`Instance ${remoteInstance.data.baseUrl} has no public key stored in database`,
);
}
const processor = new InboxProcessor(
request,
data,
{
instance: remoteInstance,
key:
sender?.data.publicKey ??
remoteInstance.data.publicKey.key,
},
{
signature,
nonce,
authorization,
},
logger,
ip,
);
const output = await processor.process();
logger.debug`${chalk.green(
"✔",
)} Finished processing entity ${chalk.gray(data.id)}`;
return output;
}
default: {
throw new Error(`Unknown job type: ${job.name}`);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.inbox.remove_on_complete,
},
removeOnFail: {
age: config.queues.inbox.remove_on_failure,
},
},
);

View file

@ -3,8 +3,8 @@ import { Instance } from "@versia/kit/db";
import { Instances } from "@versia/kit/tables";
import { eq } from "drizzle-orm";
import ora from "ora";
import { FetchJobType, fetchQueue } from "~/classes/queues/fetch";
import { BaseCommand } from "~/cli/base";
import { FetchJobType, fetchQueue } from "~/worker";
export default class FederationInstanceRefetch extends BaseCommand<
typeof FederationInstanceRefetch

View file

@ -16,6 +16,20 @@ services:
- sonic
- fe
worker:
image: ghcr.io/versia-pub/worker:latest
volumes:
- ./logs:/app/dist/logs
- ./config:/app/dist/config
restart: unless-stopped
container_name: versia-worker
tty: true
networks:
- versia-net
depends_on:
- db
- redis
fe:
image: ghcr.io/versia-pub/frontend:main
container_name: versia-fe

20
entrypoints/api/index.ts Normal file
View file

@ -0,0 +1,20 @@
import cluster from "node:cluster";
import { sentry } from "@/sentry";
import { createServer } from "@/server";
import { appFactory } from "~/app";
import { config } from "~/packages/config-manager/index.ts";
process.on("SIGINT", () => {
process.exit();
});
if (cluster.isPrimary) {
for (let i = 0; i < Number(process.env.NUM_CPUS ?? 1); i++) {
cluster.fork();
}
await import("~/entrypoints/api/setup.ts");
sentry?.captureMessage("Server started", "info");
} else {
createServer(config, await appFactory());
}

View file

@ -5,7 +5,7 @@ import { Note } from "@versia/kit/db";
import IORedis from "ioredis";
import { setupDatabase } from "~/drizzle/db";
import { config } from "~/packages/config-manager/index.ts";
import { searchManager } from "./classes/search/search-manager.ts";
import { searchManager } from "../../classes/search/search-manager.ts";
const timeAtStart = performance.now();

View file

@ -0,0 +1,29 @@
import { sentry } from "@/sentry";
import { getLogger } from "@logtape/logtape";
import chalk from "chalk";
import { getDeliveryWorker } from "~/classes/workers/delivery";
import { getFetchWorker } from "~/classes/workers/fetch";
import { getInboxWorker } from "~/classes/workers/inbox";
process.on("SIGINT", () => {
process.exit();
});
await import("~/entrypoints/worker/setup.ts");
sentry?.captureMessage("Server started", "info");
const serverLogger = getLogger("server");
serverLogger.info`Starting Fetch Worker...`;
getFetchWorker();
serverLogger.info`${chalk.green("✔")} Fetch Worker started`;
serverLogger.info`Starting Delivery Worker...`;
getDeliveryWorker();
serverLogger.info`${chalk.green("✔")} Delivery Worker started`;
serverLogger.info`Starting Inbox Worker...`;
getInboxWorker();
serverLogger.info`${chalk.green("✔")} Inbox Worker started`;
serverLogger.info`${chalk.green("✔✔✔")} All workers started`;

View file

@ -0,0 +1,55 @@
import { checkConfig } from "@/init";
import { configureLoggers } from "@/loggers";
import { getLogger } from "@logtape/logtape";
import { Note } from "@versia/kit/db";
import chalk from "chalk";
import IORedis from "ioredis";
import { setupDatabase } from "~/drizzle/db";
import { config } from "~/packages/config-manager/index.ts";
import { searchManager } from "../../classes/search/search-manager.ts";
const timeAtStart = performance.now();
await configureLoggers();
const serverLogger = getLogger("server");
console.info(`
${chalk.redBright.bold("** WORKER MODE **")}
`);
serverLogger.info`Starting Versia Server Worker...`;
await setupDatabase();
if (config.sonic.enabled) {
await searchManager.connect();
}
// Check if database is reachable
const postCount = await Note.getCount();
await checkConfig(config);
serverLogger.info`Versia Server Worker started at ${config.http.bind}:${config.http.bind_port} in ${(performance.now() - timeAtStart).toFixed(0)}ms`;
serverLogger.info`Database is online, containing ${postCount} posts`;
// Check if Redis is reachable
const connection = new IORedis({
host: config.redis.queue.host,
port: config.redis.queue.port,
password: config.redis.queue.password,
db: config.redis.queue.database,
maxRetriesPerRequest: null,
});
await connection.ping();
serverLogger.info`Redis is online`;

View file

@ -1,19 +1 @@
import cluster from "node:cluster";
import { sentry } from "@/sentry";
import { createServer } from "@/server";
import { appFactory } from "~/app";
import { config } from "~/packages/config-manager/index.ts";
process.on("SIGINT", () => {
process.exit();
});
if (cluster.isPrimary) {
for (let i = 0; i < Number(process.env.NUM_CPUS ?? 1); i++) {
cluster.fork();
}
await import("./setup.ts");
sentry?.captureMessage("Server started", "info");
} else {
createServer(config, await appFactory());
}
await import("~/entrypoints/api/index.ts");

View file

@ -33,6 +33,7 @@
"start": "NODE_ENV=production bun run dist/index.js --prod",
"lint": "bunx @biomejs/biome check .",
"build": "bun run build.ts",
"build:worker": "bun run build-worker.ts",
"cloc": "cloc . --exclude-dir node_modules,dist,.output,.nuxt,meta,logs --exclude-ext sql,log,pem",
"wc": "find server database *.ts docs packages types utils drizzle tests -type f -print0 | wc -m --files0-from=-",
"cli": "bun run cli/index.ts",

View file

@ -3,9 +3,11 @@ import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { HonoAdapter } from "@bull-board/hono";
import { serveStatic } from "@hono/hono/bun";
import type { OpenAPIHono } from "@hono/zod-openapi";
import { deliveryQueue } from "~/classes/queues/delivery";
import { fetchQueue } from "~/classes/queues/fetch";
import { inboxQueue } from "~/classes/queues/inbox";
import { config } from "~/packages/config-manager";
import type { HonoEnv } from "~/types/api";
import { deliveryQueue, fetchQueue, inboxQueue } from "~/worker";
export const applyToHono = (app: OpenAPIHono<HonoEnv>): void => {
const serverAdapter = new HonoAdapter(serveStatic);

10
utils/redis.ts Normal file
View file

@ -0,0 +1,10 @@
import IORedis from "ioredis";
import { config } from "~/packages/config-manager/index.ts";
export const connection = new IORedis({
host: config.redis.queue.host,
port: config.redis.queue.port,
password: config.redis.queue.password,
db: config.redis.queue.database,
maxRetriesPerRequest: null,
});

326
worker.ts
View file

@ -1,326 +0,0 @@
import { getLogger } from "@logtape/logtape";
import type { Entity } from "@versia/federation/types";
import { Instance, User } from "@versia/kit/db";
import { Queue, Worker } from "bullmq";
import type { SocketAddress } from "bun";
import chalk from "chalk";
import { eq } from "drizzle-orm";
import IORedis from "ioredis";
import { InboxProcessor } from "./classes/inbox/processor.ts";
import { Instances } from "./drizzle/schema.ts";
import { config } from "./packages/config-manager/index.ts";
import type { KnownEntity } from "./types/api.ts";
const connection = new IORedis({
host: config.redis.queue.host,
port: config.redis.queue.port,
password: config.redis.queue.password,
db: config.redis.queue.database,
maxRetriesPerRequest: null,
});
export enum DeliveryJobType {
FederateEntity = "federateEntity",
}
export enum InboxJobType {
ProcessEntity = "processEntity",
}
export enum FetchJobType {
Instance = "instance",
User = "user",
Note = "user",
}
export type InboxJobData = {
data: Entity;
headers: {
"x-signature"?: string;
"x-nonce"?: string;
"x-signed-by"?: string;
authorization?: string;
};
request: {
url: string;
method: string;
body: string;
};
ip: SocketAddress | null;
};
export type DeliveryJobData = {
entity: KnownEntity;
recipientId: string;
senderId: string;
};
export type FetchJobData = {
uri: string;
refetcher?: string;
};
export const deliveryQueue = new Queue<DeliveryJobData, void, DeliveryJobType>(
"delivery",
{
connection,
},
);
export const inboxQueue = new Queue<InboxJobData, Response, InboxJobType>(
"inbox",
{
connection,
},
);
export const fetchQueue = new Queue<FetchJobData, void, FetchJobType>("fetch", {
connection,
});
export const deliveryWorker = new Worker<
DeliveryJobData,
void,
DeliveryJobType
>(
deliveryQueue.name,
async (job) => {
switch (job.name) {
case DeliveryJobType.FederateEntity: {
const { entity, recipientId, senderId } = job.data;
const logger = getLogger(["federation", "delivery"]);
const sender = await User.fromId(senderId);
if (!sender) {
throw new Error(
`Could not resolve sender ID ${chalk.gray(senderId)}`,
);
}
const recipient = await User.fromId(recipientId);
if (!recipient) {
throw new Error(
`Could not resolve recipient ID ${chalk.gray(recipientId)}`,
);
}
logger.debug`Federating entity ${chalk.gray(
entity.id,
)} from ${chalk.gray(`@${sender.getAcct()}`)} to ${chalk.gray(
recipient.getAcct(),
)}`;
await sender.federateToUser(entity, recipient);
logger.debug`${chalk.green(
"✔",
)} Finished federating entity ${chalk.gray(entity.id)}`;
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.delivery.remove_on_complete,
},
removeOnFail: {
age: config.queues.delivery.remove_on_failure,
},
},
);
export const inboxWorker = new Worker<InboxJobData, Response, InboxJobType>(
inboxQueue.name,
async (job) => {
switch (job.name) {
case InboxJobType.ProcessEntity: {
const {
data,
headers: {
"x-signature": signature,
"x-nonce": nonce,
"x-signed-by": signedBy,
authorization,
},
request,
ip,
} = job.data;
const logger = getLogger(["federation", "inbox"]);
logger.debug`Processing entity ${chalk.gray(
data.id,
)} from ${chalk.gray(signedBy)}`;
if (authorization) {
const processor = new InboxProcessor(
request,
data,
null,
{
signature,
nonce,
authorization,
},
logger,
ip,
);
logger.debug`Entity ${chalk.gray(
data.id,
)} is potentially from a bridge`;
return await processor.process();
}
// If not potentially from bridge, check for required headers
if (!(signature && nonce && signedBy)) {
return Response.json(
{
error: "Missing required headers: x-signature, x-nonce, or x-signed-by",
},
{
status: 400,
},
);
}
const sender = await User.resolve(signedBy);
if (!(sender || signedBy.startsWith("instance "))) {
return Response.json(
{ error: `Couldn't resolve sender URI ${signedBy}` },
{
status: 404,
},
);
}
if (sender?.isLocal()) {
return Response.json(
{
error: "Cannot process federation requests from local users",
},
{
status: 400,
},
);
}
const remoteInstance = sender
? await Instance.fromUser(sender)
: await Instance.resolveFromHost(signedBy.split(" ")[1]);
if (!remoteInstance) {
return Response.json(
{ error: "Could not resolve the remote instance." },
{
status: 500,
},
);
}
logger.debug`Entity ${chalk.gray(
data.id,
)} is from remote instance ${chalk.gray(
remoteInstance.data.baseUrl,
)}`;
if (!remoteInstance.data.publicKey?.key) {
throw new Error(
`Instance ${remoteInstance.data.baseUrl} has no public key stored in database`,
);
}
const processor = new InboxProcessor(
request,
data,
{
instance: remoteInstance,
key:
sender?.data.publicKey ??
remoteInstance.data.publicKey.key,
},
{
signature,
nonce,
authorization,
},
logger,
ip,
);
const output = await processor.process();
logger.debug`${chalk.green(
"✔",
)} Finished processing entity ${chalk.gray(data.id)}`;
return output;
}
default: {
throw new Error(`Unknown job type: ${job.name}`);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.inbox.remove_on_complete,
},
removeOnFail: {
age: config.queues.inbox.remove_on_failure,
},
},
);
export const fetchWorker = new Worker<FetchJobData, void, FetchJobType>(
fetchQueue.name,
async (job) => {
switch (job.name) {
case FetchJobType.Instance: {
const { uri } = job.data;
await job.log(`Fetching instance metadata from [${uri}]`);
// Check if exists
const host = new URL(uri).host;
const existingInstance = await Instance.fromSql(
eq(Instances.baseUrl, host),
);
if (existingInstance) {
await job.log("Instance is known, refetching remote data.");
await existingInstance.updateFromRemote();
await job.log(`Instance [${uri}] successfully refetched`);
return;
}
await Instance.resolve(uri);
await job.log(
`${chalk.green(
"✔",
)} Finished fetching instance metadata from [${uri}]`,
);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.fetch.remove_on_complete,
},
removeOnFail: {
age: config.queues.fetch.remove_on_failure,
},
},
);