refactor(federation): ♻️ Simplify inbox processing by using ApiError

This commit is contained in:
Jesse Wierzbinski 2025-03-30 21:13:47 +02:00
parent 757c227f00
commit c68bfdf6e1
No known key found for this signature in database
2 changed files with 173 additions and 253 deletions

View file

@ -20,15 +20,10 @@ import { Likes, Notes } from "@versia/kit/tables";
import type { SocketAddress } from "bun"; import type { SocketAddress } from "bun";
import chalk from "chalk"; import chalk from "chalk";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
import type { StatusCode } from "hono/utils/http-status";
import { matches } from "ip-matching"; import { matches } from "ip-matching";
import { type ValidationError, isValidationError } from "zod-validation-error"; import { isValidationError } from "zod-validation-error";
import { config } from "~/config.ts"; import { config } from "~/config.ts";
import { ApiError } from "../errors/api-error.ts";
type ResponseBody = {
message?: string;
code: StatusCode;
};
/** /**
* Checks if the hostname is defederated using glob matching. * Checks if the hostname is defederated using glob matching.
@ -52,9 +47,7 @@ function isDefederated(hostname: string): boolean {
* ```typescript * ```typescript
* const processor = new InboxProcessor(context, body, sender, headers); * const processor = new InboxProcessor(context, body, sender, headers);
* *
* const response = await processor.process(); * await processor.process();
*
* return response;
* ``` * ```
*/ */
export class InboxProcessor { export class InboxProcessor {
@ -133,20 +126,14 @@ export class InboxProcessor {
* Determines if signature checks can be skipped. * Determines if signature checks can be skipped.
* Useful for requests from federation bridges. * Useful for requests from federation bridges.
* *
* @returns {boolean | ResponseBody} - Whether to skip signature checks. May include a response body if there are errors. * @returns {boolean} - Whether to skip signature checks.
*/ */
private shouldCheckSignature(): boolean | ResponseBody { private shouldCheckSignature(): boolean {
if (config.federation.bridge) { if (config.federation.bridge) {
const token = this.headers.authorization?.split("Bearer ")[1]; const token = this.headers.authorization?.split("Bearer ")[1];
if (token) { if (token) {
const isBridge = this.isRequestFromBridge(token); return this.isRequestFromBridge(token);
if (isBridge === true) {
return false;
}
return isBridge;
} }
} }
@ -157,23 +144,23 @@ export class InboxProcessor {
* Checks if a request is from a federation bridge. * Checks if a request is from a federation bridge.
* *
* @param token - Authorization token to check. * @param token - Authorization token to check.
* @returns * @returns {boolean} - Whether the request is from a federation bridge.
*/ */
private isRequestFromBridge(token: string): boolean | ResponseBody { private isRequestFromBridge(token: string): boolean {
if (!config.federation.bridge) { if (!config.federation.bridge) {
return { throw new ApiError(
message: 500,
"Bridge is not configured. Please remove the Authorization header.", "Bridge is not configured.",
code: 500, "Please remove the Authorization header.",
}; );
} }
if (token !== config.federation.bridge.token) { if (token !== config.federation.bridge.token) {
return { throw new ApiError(
message: 401,
"An invalid token was passed in the Authorization header. Please use the correct token, or remove the Authorization header.", "Invalid token.",
code: 401, "Please use the correct token, or remove the Authorization header.",
}; );
} }
if (config.federation.bridge.allowed_ips.length === 0) { if (config.federation.bridge.allowed_ips.length === 0) {
@ -181,10 +168,11 @@ export class InboxProcessor {
} }
if (!this.requestIp) { if (!this.requestIp) {
return { throw new ApiError(
message: "The request IP address could not be determined.", 500,
code: 500, "The request IP address could not be determined.",
}; "This may be due to an incorrectly configured reverse proxy.",
);
} }
for (const ip of config.federation.bridge.allowed_ips) { for (const ip of config.federation.bridge.allowed_ips) {
@ -193,18 +181,20 @@ export class InboxProcessor {
} }
} }
return { throw new ApiError(
message: "The request is not from a trusted bridge IP address.", 403,
code: 403, "The request is not from a trusted bridge IP address.",
}; "Remove the Authorization header if you are not trying to access this API as a bridge.",
);
} }
/** /**
* Performs request processing. * Performs request processing.
* *
* @returns {Promise<Response | null>} - HTTP response to send back. Null if no response is needed (no errors). * @returns {Promise<void>}
* @throws {ApiError} - If there is an error processing the request.
*/ */
public async process(): Promise<Response | null> { public async process(): Promise<void> {
!this.sender && !this.sender &&
this.logger.debug`Processing request from potential bridge`; this.logger.debug`Processing request from potential bridge`;
@ -212,7 +202,7 @@ export class InboxProcessor {
// Return 201 to avoid // Return 201 to avoid
// 1. Leaking defederated instance information // 1. Leaking defederated instance information
// 2. Preventing the sender from thinking the message was not delivered and retrying // 2. Preventing the sender from thinking the message was not delivered and retrying
return null; return;
} }
this.logger.debug`Instance ${chalk.gray( this.logger.debug`Instance ${chalk.gray(
@ -221,13 +211,6 @@ export class InboxProcessor {
const shouldCheckSignature = this.shouldCheckSignature(); const shouldCheckSignature = this.shouldCheckSignature();
if (shouldCheckSignature !== true && shouldCheckSignature !== false) {
return Response.json(
{ error: shouldCheckSignature.message },
{ status: shouldCheckSignature.code },
);
}
shouldCheckSignature shouldCheckSignature
? this.logger.debug`Checking signature` ? this.logger.debug`Checking signature`
: this.logger.debug`Skipping signature check`; : this.logger.debug`Skipping signature check`;
@ -236,10 +219,7 @@ export class InboxProcessor {
const isValid = await this.isSignatureValid(); const isValid = await this.isSignatureValid();
if (!isValid) { if (!isValid) {
return Response.json( throw new ApiError(401, "Signature is not valid");
{ error: "Signature is not valid" },
{ status: 401 },
);
} }
} }
@ -249,23 +229,18 @@ export class InboxProcessor {
const handler = new RequestParserHandler(this.body, validator); const handler = new RequestParserHandler(this.body, validator);
try { try {
return await handler.parseBody<Response | null>({ return await handler.parseBody<void>({
note: (): Promise<Response | null> => this.processNote(), note: (): Promise<void> => this.processNote(),
follow: (): Promise<Response | null> => follow: (): Promise<void> => this.processFollowRequest(),
this.processFollowRequest(), followAccept: (): Promise<void> => this.processFollowAccept(),
followAccept: (): Promise<Response | null> => followReject: (): Promise<void> => this.processFollowReject(),
this.processFollowAccept(), "pub.versia:likes/Like": (): Promise<void> =>
followReject: (): Promise<Response | null> =>
this.processFollowReject(),
"pub.versia:likes/Like": (): Promise<Response | null> =>
this.processLikeRequest(), this.processLikeRequest(),
delete: (): Promise<Response | null> => this.processDelete(), delete: (): Promise<void> => this.processDelete(),
user: (): Promise<Response | null> => this.processUserRequest(), user: (): Promise<void> => this.processUserRequest(),
unknown: (): Response => unknown: (): void => {
Response.json( throw new ApiError(400, "Unknown entity type");
{ error: "Unknown entity type" }, },
{ status: 400 },
),
}); });
} catch (e) { } catch (e) {
return this.handleError(e as Error); return this.handleError(e as Error);
@ -275,54 +250,40 @@ export class InboxProcessor {
/** /**
* Handles Note entity processing. * Handles Note entity processing.
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
private async processNote(): Promise<Response | null> { private async processNote(): Promise<void> {
const note = this.body as VersiaNote; const note = this.body as VersiaNote;
const author = await User.resolve(new URL(note.author)); const author = await User.resolve(new URL(note.author));
const instance = await Instance.resolve(new URL(note.uri)); const instance = await Instance.resolve(new URL(note.uri));
if (!instance) { if (!instance) {
return Response.json( throw new ApiError(404, "Instance not found");
{ error: "Instance not found" },
{ status: 404 },
);
} }
if (!author) { if (!author) {
return Response.json( throw new ApiError(404, "Author not found");
{ error: "Author not found" },
{ status: 404 },
);
} }
await Note.fromVersia(note, author, instance); await Note.fromVersia(note, author, instance);
return null;
} }
/** /**
* Handles Follow entity processing. * Handles Follow entity processing.
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
private async processFollowRequest(): Promise<Response | null> { private async processFollowRequest(): Promise<void> {
const follow = this.body as unknown as VersiaFollow; const follow = this.body as unknown as VersiaFollow;
const author = await User.resolve(new URL(follow.author)); const author = await User.resolve(new URL(follow.author));
const followee = await User.resolve(new URL(follow.followee)); const followee = await User.resolve(new URL(follow.followee));
if (!author) { if (!author) {
return Response.json( throw new ApiError(404, "Author not found");
{ error: "Author not found" },
{ status: 404 },
);
} }
if (!followee) { if (!followee) {
return Response.json( throw new ApiError(404, "Followee not found");
{ error: "Followee not found" },
{ status: 404 },
);
} }
const foundRelationship = await Relationship.fromOwnerAndSubject( const foundRelationship = await Relationship.fromOwnerAndSubject(
@ -331,7 +292,7 @@ export class InboxProcessor {
); );
if (foundRelationship.data.following) { if (foundRelationship.data.following) {
return null; return;
} }
await foundRelationship.update({ await foundRelationship.update({
@ -351,32 +312,24 @@ export class InboxProcessor {
if (!followee.data.isLocked) { if (!followee.data.isLocked) {
await followee.sendFollowAccept(author); await followee.sendFollowAccept(author);
} }
return null;
} }
/** /**
* Handles FollowAccept entity processing * Handles FollowAccept entity processing
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
private async processFollowAccept(): Promise<Response | null> { private async processFollowAccept(): Promise<void> {
const followAccept = this.body as unknown as VersiaFollowAccept; const followAccept = this.body as unknown as VersiaFollowAccept;
const author = await User.resolve(new URL(followAccept.author)); const author = await User.resolve(new URL(followAccept.author));
const follower = await User.resolve(new URL(followAccept.follower)); const follower = await User.resolve(new URL(followAccept.follower));
if (!author) { if (!author) {
return Response.json( throw new ApiError(404, "Author not found");
{ error: "Author not found" },
{ status: 404 },
);
} }
if (!follower) { if (!follower) {
return Response.json( throw new ApiError(404, "Follower not found");
{ error: "Follower not found" },
{ status: 404 },
);
} }
const foundRelationship = await Relationship.fromOwnerAndSubject( const foundRelationship = await Relationship.fromOwnerAndSubject(
@ -385,39 +338,31 @@ export class InboxProcessor {
); );
if (!foundRelationship.data.requested) { if (!foundRelationship.data.requested) {
return null; return;
} }
await foundRelationship.update({ await foundRelationship.update({
requested: false, requested: false,
following: true, following: true,
}); });
return null;
} }
/** /**
* Handles FollowReject entity processing * Handles FollowReject entity processing
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
private async processFollowReject(): Promise<Response | null> { private async processFollowReject(): Promise<void> {
const followReject = this.body as unknown as VersiaFollowReject; const followReject = this.body as unknown as VersiaFollowReject;
const author = await User.resolve(new URL(followReject.author)); const author = await User.resolve(new URL(followReject.author));
const follower = await User.resolve(new URL(followReject.follower)); const follower = await User.resolve(new URL(followReject.follower));
if (!author) { if (!author) {
return Response.json( throw new ApiError(404, "Author not found");
{ error: "Author not found" },
{ status: 404 },
);
} }
if (!follower) { if (!follower) {
return Response.json( throw new ApiError(404, "Follower not found");
{ error: "Follower not found" },
{ status: 404 },
);
} }
const foundRelationship = await Relationship.fromOwnerAndSubject( const foundRelationship = await Relationship.fromOwnerAndSubject(
@ -426,23 +371,21 @@ export class InboxProcessor {
); );
if (!foundRelationship.data.requested) { if (!foundRelationship.data.requested) {
return null; return;
} }
await foundRelationship.update({ await foundRelationship.update({
requested: false, requested: false,
following: false, following: false,
}); });
return null;
} }
/** /**
* Handles Delete entity processing. * Handles Delete entity processing.
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
public async processDelete(): Promise<Response | null> { public async processDelete(): Promise<void> {
// JS doesn't allow the use of `delete` as a variable name // JS doesn't allow the use of `delete` as a variable name
const delete_ = this.body as unknown as VersiaDelete; const delete_ = this.body as unknown as VersiaDelete;
const toDelete = delete_.deleted; const toDelete = delete_.deleted;
@ -459,38 +402,28 @@ export class InboxProcessor {
); );
if (!note) { if (!note) {
return Response.json( throw new ApiError(
{ 404,
error: "Note to delete not found or not owned by sender", "Note to delete not found or not owned by sender",
},
{ status: 404 },
); );
} }
await note.delete(); await note.delete();
return null; return;
} }
case "User": { case "User": {
const userToDelete = await User.resolve(new URL(toDelete)); const userToDelete = await User.resolve(new URL(toDelete));
if (!userToDelete) { if (!userToDelete) {
return Response.json( throw new ApiError(404, "User to delete not found");
{ error: "User to delete not found" },
{ status: 404 },
);
} }
if (!author || userToDelete.id === author.id) { if (!author || userToDelete.id === author.id) {
await userToDelete.delete(); await userToDelete.delete();
return null; return;
} }
return Response.json( throw new ApiError(400, "Cannot delete other users than self");
{
error: "Cannot delete other users than self",
},
{ status: 400 },
);
} }
case "pub.versia:likes/Like": { case "pub.versia:likes/Like": {
const like = await Like.fromSql( const like = await Like.fromSql(
@ -499,21 +432,19 @@ export class InboxProcessor {
); );
if (!like) { if (!like) {
return Response.json( throw new ApiError(
{ error: "Like not found or not owned by sender" }, 404,
{ status: 404 }, "Like not found or not owned by sender",
); );
} }
await like.delete(); await like.delete();
return null; return;
} }
default: { default: {
return Response.json( throw new ApiError(
{ 400,
error: `Deletion of object ${toDelete} not implemented`, `Deletion of object ${toDelete} not implemented`,
},
{ status: 400 },
); );
} }
} }
@ -522,79 +453,55 @@ export class InboxProcessor {
/** /**
* Handles Like entity processing. * Handles Like entity processing.
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
private async processLikeRequest(): Promise<Response | null> { private async processLikeRequest(): Promise<void> {
const like = this.body as unknown as VersiaLikeExtension; const like = this.body as unknown as VersiaLikeExtension;
const author = await User.resolve(new URL(like.author)); const author = await User.resolve(new URL(like.author));
const likedNote = await Note.resolve(new URL(like.liked)); const likedNote = await Note.resolve(new URL(like.liked));
if (!author) { if (!author) {
return Response.json( throw new ApiError(404, "Author not found");
{ error: "Author not found" },
{ status: 404 },
);
} }
if (!likedNote) { if (!likedNote) {
return Response.json( throw new ApiError(404, "Liked Note not found");
{ error: "Liked Note not found" },
{ status: 404 },
);
} }
await author.like(likedNote, like.uri); await author.like(likedNote, like.uri);
return null;
} }
/** /**
* Handles User entity processing (profile edits). * Handles User entity processing (profile edits).
* *
* @returns {Promise<Response | null>} - The response. * @returns {Promise<void>}
*/ */
private async processUserRequest(): Promise<Response | null> { private async processUserRequest(): Promise<void> {
const user = this.body as unknown as VersiaUser; const user = this.body as unknown as VersiaUser;
const instance = await Instance.resolve(new URL(user.uri)); const instance = await Instance.resolve(new URL(user.uri));
if (!instance) { if (!instance) {
return Response.json( throw new ApiError(404, "Instance not found");
{ error: "Instance not found" },
{ status: 404 },
);
} }
await User.fromVersia(user, instance); await User.fromVersia(user, instance);
return null;
} }
/** /**
* Processes Errors into the appropriate HTTP response. * Processes Errors into the appropriate HTTP response.
* *
* @param {Error} e - The error object. * @param {Error} e - The error object.
* @returns {Response} - The error response. * @returns {void}
* @throws {ApiError} - The error response.
*/ */
private handleError(e: Error): Response { private handleError(e: Error): void {
if (isValidationError(e)) { if (isValidationError(e)) {
return Response.json( throw new ApiError(400, "Failed to process request", e.message);
{
error: "Failed to process request",
error_description: (e as ValidationError).message,
},
{ status: 400 },
);
} }
this.logger.error`${e}`; this.logger.error`${e}`;
sentry?.captureException(e); sentry?.captureException(e);
return Response.json( throw new ApiError(500, "Failed to process request", e.message);
{
error: "Failed to process request",
message: (e as Error).message,
},
{ status: 500 },
);
} }
} }

View file

@ -6,6 +6,7 @@ import { Worker } from "bullmq";
import type { SocketAddress } from "bun"; import type { SocketAddress } from "bun";
import { config } from "~/config.ts"; import { config } from "~/config.ts";
import { connection } from "~/utils/redis.ts"; import { connection } from "~/utils/redis.ts";
import { ApiError } from "../errors/api-error.ts";
import { InboxProcessor } from "../inbox/processor.ts"; import { InboxProcessor } from "../inbox/processor.ts";
export enum InboxJobType { export enum InboxJobType {
@ -46,6 +47,7 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
await job.log(`Processing entity [${data.id}]`); await job.log(`Processing entity [${data.id}]`);
if (headers.authorization) { if (headers.authorization) {
try {
const processor = new InboxProcessor( const processor = new InboxProcessor(
{ {
...request, ...request,
@ -64,12 +66,13 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
`Entity [${data.id}] is potentially from a bridge`, `Entity [${data.id}] is potentially from a bridge`,
); );
const output = await processor.process(); await processor.process();
} catch (e) {
if (output instanceof Response) { if (e instanceof ApiError) {
// Error occurred // Error occurred
const error = await output.json(); await job.log(
await job.log(`Error during processing: ${error}`); `Error during processing: ${e.message}`,
);
await job.log( await job.log(
`Failed processing entity [${data.id}]`, `Failed processing entity [${data.id}]`,
@ -78,6 +81,9 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
return; return;
} }
throw e;
}
await job.log( await job.log(
`✔ Finished processing entity [${data.id}]`, `✔ Finished processing entity [${data.id}]`,
); );
@ -133,6 +139,7 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
); );
} }
try {
const processor = new InboxProcessor( const processor = new InboxProcessor(
{ {
...request, ...request,
@ -154,14 +161,17 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
ip, ip,
); );
const output = await processor.process(); await processor.process();
} catch (e) {
if (output instanceof Response) { if (e instanceof ApiError) {
// Error occurred // Error occurred
const error = await output.json(); await job.log(
await job.log(`Error during processing: ${error}`); `Error during processing: ${e.message}`,
);
await job.log(`Failed processing entity [${data.id}]`); await job.log(
`Failed processing entity [${data.id}]`,
);
await job.log( await job.log(
`Sending error message to instance [${remoteInstance.data.baseUrl}]`, `Sending error message to instance [${remoteInstance.data.baseUrl}]`,
@ -169,7 +179,7 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
await remoteInstance.sendMessage( await remoteInstance.sendMessage(
`Failed processing entity [${data.uri}] delivered to inbox. Returned error:\n\n${JSON.stringify( `Failed processing entity [${data.uri}] delivered to inbox. Returned error:\n\n${JSON.stringify(
error, e.message,
null, null,
4, 4,
)}`, )}`,
@ -180,6 +190,9 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
return; return;
} }
throw e;
}
await job.log(`Finished processing entity [${data.id}]`); await job.log(`Finished processing entity [${data.id}]`);
return; return;