refactor(federation): ♻️ Rewrite federation SDK

This commit is contained in:
Jesse Wierzbinski 2025-04-08 16:01:10 +02:00
parent ad1dc13a51
commit d638610361
No known key found for this signature in database
72 changed files with 2137 additions and 738 deletions

View file

@ -1,9 +1,10 @@
import { User } from "@versia/kit/db";
import * as VersiaEntities from "@versia/sdk/entities";
import { Queue } from "bullmq";
import { Worker } from "bullmq";
import chalk from "chalk";
import { config } from "~/config.ts";
import type { KnownEntity } from "~/types/api";
import type { JSONObject } from "~/packages/federation/types";
import { connection } from "~/utils/redis.ts";
export enum DeliveryJobType {
@ -11,7 +12,7 @@ export enum DeliveryJobType {
}
export type DeliveryJobData = {
entity: KnownEntity;
entity: JSONObject;
recipientId: string;
senderId: string;
};
@ -55,7 +56,23 @@ export const getDeliveryWorker = (): Worker<
`Federating entity [${entity.id}] from @${sender.getAcct()} to @${recipient.getAcct()}`,
);
await sender.federateToUser(entity, recipient);
const type = entity.type;
const entityCtor = Object.values(VersiaEntities).find(
(ctor) => ctor.name === type,
) as typeof VersiaEntities.Entity | undefined;
if (!entityCtor) {
throw new Error(
`Could not resolve entity type ${chalk.gray(
type,
)} for entity [${entity.id}]`,
);
}
await sender.federateToUser(
await entityCtor.fromJSON(entity),
recipient,
);
await job.log(
`✔ Finished federating entity [${entity.id}]`,

View file

@ -1,10 +1,10 @@
import { getLogger } from "@logtape/logtape";
import type { Entity } from "@versia/federation/types";
import { Instance, User } from "@versia/kit/db";
import { Queue } from "bullmq";
import { Worker } from "bullmq";
import type { SocketAddress } from "bun";
import { config } from "~/config.ts";
import type { JSONObject } from "~/packages/federation/types.ts";
import { connection } from "~/utils/redis.ts";
import { ApiError } from "../errors/api-error.ts";
import { InboxProcessor } from "../inbox/processor.ts";
@ -14,7 +14,7 @@ export enum InboxJobType {
}
export type InboxJobData = {
data: Entity;
data: JSONObject;
headers: {
"versia-signature"?: string;
"versia-signed-at"?: number;
@ -46,18 +46,25 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
await job.log(`Processing entity [${data.id}]`);
const req = new Request(request.url, {
method: request.method,
headers: new Headers(
Object.entries(headers)
.map(([k, v]) => [k, String(v)])
.concat([
["content-type", "application/json"],
]) as [string, string][],
),
body: request.body,
});
if (headers.authorization) {
try {
const processor = new InboxProcessor(
{
...request,
url: new URL(request.url),
},
req,
data,
null,
{
authorization: headers.authorization,
},
headers.authorization,
getLogger(["federation", "inbox"]),
ip,
);
@ -91,13 +98,7 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
return;
}
const {
"versia-signature": signature,
"versia-signed-at": signedAt,
"versia-signed-by": signedBy,
} = headers as {
"versia-signature": string;
"versia-signed-at": number;
const { "versia-signed-by": signedBy } = headers as {
"versia-signed-by": string;
};
@ -139,24 +140,27 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
);
}
const key = await crypto.subtle.importKey(
"spki",
Buffer.from(
sender?.data.publicKey ??
remoteInstance.data.publicKey.key,
"base64",
),
"Ed25519",
false,
["verify"],
);
try {
const processor = new InboxProcessor(
{
...request,
url: new URL(request.url),
},
req,
data,
{
instance: remoteInstance,
key:
sender?.data.publicKey ??
remoteInstance.data.publicKey.key,
},
{
signature,
signedAt: new Date(signedAt * 1000),
authorization: undefined,
key,
},
undefined,
getLogger(["federation", "inbox"]),
ip,
);