diff --git a/classes/inbox/processor.ts b/classes/inbox/processor.ts index 4b643466..3e96bd7b 100644 --- a/classes/inbox/processor.ts +++ b/classes/inbox/processor.ts @@ -20,15 +20,10 @@ import { Likes, Notes } from "@versia/kit/tables"; import type { SocketAddress } from "bun"; import chalk from "chalk"; import { eq } from "drizzle-orm"; -import type { StatusCode } from "hono/utils/http-status"; import { matches } from "ip-matching"; -import { type ValidationError, isValidationError } from "zod-validation-error"; +import { isValidationError } from "zod-validation-error"; import { config } from "~/config.ts"; - -type ResponseBody = { - message?: string; - code: StatusCode; -}; +import { ApiError } from "../errors/api-error.ts"; /** * Checks if the hostname is defederated using glob matching. @@ -52,9 +47,7 @@ function isDefederated(hostname: string): boolean { * ```typescript * const processor = new InboxProcessor(context, body, sender, headers); * - * const response = await processor.process(); - * - * return response; + * await processor.process(); * ``` */ export class InboxProcessor { @@ -133,20 +126,14 @@ export class InboxProcessor { * Determines if signature checks can be skipped. * Useful for requests from federation bridges. * - * @returns {boolean | ResponseBody} - Whether to skip signature checks. May include a response body if there are errors. + * @returns {boolean} - Whether to skip signature checks. */ - private shouldCheckSignature(): boolean | ResponseBody { + private shouldCheckSignature(): boolean { if (config.federation.bridge) { const token = this.headers.authorization?.split("Bearer ")[1]; if (token) { - const isBridge = this.isRequestFromBridge(token); - - if (isBridge === true) { - return false; - } - - return isBridge; + return this.isRequestFromBridge(token); } } @@ -157,23 +144,23 @@ export class InboxProcessor { * Checks if a request is from a federation bridge. * * @param token - Authorization token to check. - * @returns + * @returns {boolean} - Whether the request is from a federation bridge. */ - private isRequestFromBridge(token: string): boolean | ResponseBody { + private isRequestFromBridge(token: string): boolean { if (!config.federation.bridge) { - return { - message: - "Bridge is not configured. Please remove the Authorization header.", - code: 500, - }; + throw new ApiError( + 500, + "Bridge is not configured.", + "Please remove the Authorization header.", + ); } if (token !== config.federation.bridge.token) { - return { - message: - "An invalid token was passed in the Authorization header. Please use the correct token, or remove the Authorization header.", - code: 401, - }; + throw new ApiError( + 401, + "Invalid token.", + "Please use the correct token, or remove the Authorization header.", + ); } if (config.federation.bridge.allowed_ips.length === 0) { @@ -181,10 +168,11 @@ export class InboxProcessor { } if (!this.requestIp) { - return { - message: "The request IP address could not be determined.", - code: 500, - }; + throw new ApiError( + 500, + "The request IP address could not be determined.", + "This may be due to an incorrectly configured reverse proxy.", + ); } for (const ip of config.federation.bridge.allowed_ips) { @@ -193,18 +181,20 @@ export class InboxProcessor { } } - return { - message: "The request is not from a trusted bridge IP address.", - code: 403, - }; + throw new ApiError( + 403, + "The request is not from a trusted bridge IP address.", + "Remove the Authorization header if you are not trying to access this API as a bridge.", + ); } /** * Performs request processing. * - * @returns {Promise} - HTTP response to send back. Null if no response is needed (no errors). + * @returns {Promise} + * @throws {ApiError} - If there is an error processing the request. */ - public async process(): Promise { + public async process(): Promise { !this.sender && this.logger.debug`Processing request from potential bridge`; @@ -212,7 +202,7 @@ export class InboxProcessor { // Return 201 to avoid // 1. Leaking defederated instance information // 2. Preventing the sender from thinking the message was not delivered and retrying - return null; + return; } this.logger.debug`Instance ${chalk.gray( @@ -221,13 +211,6 @@ export class InboxProcessor { const shouldCheckSignature = this.shouldCheckSignature(); - if (shouldCheckSignature !== true && shouldCheckSignature !== false) { - return Response.json( - { error: shouldCheckSignature.message }, - { status: shouldCheckSignature.code }, - ); - } - shouldCheckSignature ? this.logger.debug`Checking signature` : this.logger.debug`Skipping signature check`; @@ -236,10 +219,7 @@ export class InboxProcessor { const isValid = await this.isSignatureValid(); if (!isValid) { - return Response.json( - { error: "Signature is not valid" }, - { status: 401 }, - ); + throw new ApiError(401, "Signature is not valid"); } } @@ -249,23 +229,18 @@ export class InboxProcessor { const handler = new RequestParserHandler(this.body, validator); try { - return await handler.parseBody({ - note: (): Promise => this.processNote(), - follow: (): Promise => - this.processFollowRequest(), - followAccept: (): Promise => - this.processFollowAccept(), - followReject: (): Promise => - this.processFollowReject(), - "pub.versia:likes/Like": (): Promise => + return await handler.parseBody({ + note: (): Promise => this.processNote(), + follow: (): Promise => this.processFollowRequest(), + followAccept: (): Promise => this.processFollowAccept(), + followReject: (): Promise => this.processFollowReject(), + "pub.versia:likes/Like": (): Promise => this.processLikeRequest(), - delete: (): Promise => this.processDelete(), - user: (): Promise => this.processUserRequest(), - unknown: (): Response => - Response.json( - { error: "Unknown entity type" }, - { status: 400 }, - ), + delete: (): Promise => this.processDelete(), + user: (): Promise => this.processUserRequest(), + unknown: (): void => { + throw new ApiError(400, "Unknown entity type"); + }, }); } catch (e) { return this.handleError(e as Error); @@ -275,54 +250,40 @@ export class InboxProcessor { /** * Handles Note entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} */ - private async processNote(): Promise { + private async processNote(): Promise { const note = this.body as VersiaNote; const author = await User.resolve(new URL(note.author)); const instance = await Instance.resolve(new URL(note.uri)); if (!instance) { - return Response.json( - { error: "Instance not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Instance not found"); } if (!author) { - return Response.json( - { error: "Author not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Author not found"); } await Note.fromVersia(note, author, instance); - - return null; } /** * Handles Follow entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} */ - private async processFollowRequest(): Promise { + private async processFollowRequest(): Promise { const follow = this.body as unknown as VersiaFollow; const author = await User.resolve(new URL(follow.author)); const followee = await User.resolve(new URL(follow.followee)); if (!author) { - return Response.json( - { error: "Author not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Author not found"); } if (!followee) { - return Response.json( - { error: "Followee not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Followee not found"); } const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -331,7 +292,7 @@ export class InboxProcessor { ); if (foundRelationship.data.following) { - return null; + return; } await foundRelationship.update({ @@ -351,32 +312,24 @@ export class InboxProcessor { if (!followee.data.isLocked) { await followee.sendFollowAccept(author); } - - return null; } /** * Handles FollowAccept entity processing * - * @returns {Promise} - The response. + * @returns {Promise} */ - private async processFollowAccept(): Promise { + private async processFollowAccept(): Promise { const followAccept = this.body as unknown as VersiaFollowAccept; const author = await User.resolve(new URL(followAccept.author)); const follower = await User.resolve(new URL(followAccept.follower)); if (!author) { - return Response.json( - { error: "Author not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Author not found"); } if (!follower) { - return Response.json( - { error: "Follower not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Follower not found"); } const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -385,39 +338,31 @@ export class InboxProcessor { ); if (!foundRelationship.data.requested) { - return null; + return; } await foundRelationship.update({ requested: false, following: true, }); - - return null; } /** * Handles FollowReject entity processing * - * @returns {Promise} - The response. + * @returns {Promise} */ - private async processFollowReject(): Promise { + private async processFollowReject(): Promise { const followReject = this.body as unknown as VersiaFollowReject; const author = await User.resolve(new URL(followReject.author)); const follower = await User.resolve(new URL(followReject.follower)); if (!author) { - return Response.json( - { error: "Author not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Author not found"); } if (!follower) { - return Response.json( - { error: "Follower not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Follower not found"); } const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -426,23 +371,21 @@ export class InboxProcessor { ); if (!foundRelationship.data.requested) { - return null; + return; } await foundRelationship.update({ requested: false, following: false, }); - - return null; } /** * Handles Delete entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} */ - public async processDelete(): Promise { + public async processDelete(): Promise { // JS doesn't allow the use of `delete` as a variable name const delete_ = this.body as unknown as VersiaDelete; const toDelete = delete_.deleted; @@ -459,38 +402,28 @@ export class InboxProcessor { ); if (!note) { - return Response.json( - { - error: "Note to delete not found or not owned by sender", - }, - { status: 404 }, + throw new ApiError( + 404, + "Note to delete not found or not owned by sender", ); } await note.delete(); - return null; + return; } case "User": { const userToDelete = await User.resolve(new URL(toDelete)); if (!userToDelete) { - return Response.json( - { error: "User to delete not found" }, - { status: 404 }, - ); + throw new ApiError(404, "User to delete not found"); } if (!author || userToDelete.id === author.id) { await userToDelete.delete(); - return null; + return; } - return Response.json( - { - error: "Cannot delete other users than self", - }, - { status: 400 }, - ); + throw new ApiError(400, "Cannot delete other users than self"); } case "pub.versia:likes/Like": { const like = await Like.fromSql( @@ -499,21 +432,19 @@ export class InboxProcessor { ); if (!like) { - return Response.json( - { error: "Like not found or not owned by sender" }, - { status: 404 }, + throw new ApiError( + 404, + "Like not found or not owned by sender", ); } await like.delete(); - return null; + return; } default: { - return Response.json( - { - error: `Deletion of object ${toDelete} not implemented`, - }, - { status: 400 }, + throw new ApiError( + 400, + `Deletion of object ${toDelete} not implemented`, ); } } @@ -522,79 +453,55 @@ export class InboxProcessor { /** * Handles Like entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} */ - private async processLikeRequest(): Promise { + private async processLikeRequest(): Promise { const like = this.body as unknown as VersiaLikeExtension; const author = await User.resolve(new URL(like.author)); const likedNote = await Note.resolve(new URL(like.liked)); if (!author) { - return Response.json( - { error: "Author not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Author not found"); } if (!likedNote) { - return Response.json( - { error: "Liked Note not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Liked Note not found"); } await author.like(likedNote, like.uri); - - return null; } /** * Handles User entity processing (profile edits). * - * @returns {Promise} - The response. + * @returns {Promise} */ - private async processUserRequest(): Promise { + private async processUserRequest(): Promise { const user = this.body as unknown as VersiaUser; const instance = await Instance.resolve(new URL(user.uri)); if (!instance) { - return Response.json( - { error: "Instance not found" }, - { status: 404 }, - ); + throw new ApiError(404, "Instance not found"); } await User.fromVersia(user, instance); - - return null; } /** * Processes Errors into the appropriate HTTP response. * * @param {Error} e - The error object. - * @returns {Response} - The error response. + * @returns {void} + * @throws {ApiError} - The error response. */ - private handleError(e: Error): Response { + private handleError(e: Error): void { if (isValidationError(e)) { - return Response.json( - { - error: "Failed to process request", - error_description: (e as ValidationError).message, - }, - { status: 400 }, - ); + throw new ApiError(400, "Failed to process request", e.message); } this.logger.error`${e}`; sentry?.captureException(e); - return Response.json( - { - error: "Failed to process request", - message: (e as Error).message, - }, - { status: 500 }, - ); + throw new ApiError(500, "Failed to process request", e.message); } } diff --git a/classes/queues/inbox.ts b/classes/queues/inbox.ts index ea7561dd..fb378507 100644 --- a/classes/queues/inbox.ts +++ b/classes/queues/inbox.ts @@ -6,6 +6,7 @@ import { Worker } from "bullmq"; import type { SocketAddress } from "bun"; import { config } from "~/config.ts"; import { connection } from "~/utils/redis.ts"; +import { ApiError } from "../errors/api-error.ts"; import { InboxProcessor } from "../inbox/processor.ts"; export enum InboxJobType { @@ -46,36 +47,41 @@ export const getInboxWorker = (): Worker => await job.log(`Processing entity [${data.id}]`); if (headers.authorization) { - const processor = new InboxProcessor( - { - ...request, - url: new URL(request.url), - }, - data, - null, - { - authorization: headers.authorization, - }, - getLogger(["federation", "inbox"]), - ip, - ); - - await job.log( - `Entity [${data.id}] is potentially from a bridge`, - ); - - const output = await processor.process(); - - if (output instanceof Response) { - // Error occurred - const error = await output.json(); - await job.log(`Error during processing: ${error}`); - - await job.log( - `Failed processing entity [${data.id}]`, + try { + const processor = new InboxProcessor( + { + ...request, + url: new URL(request.url), + }, + data, + null, + { + authorization: headers.authorization, + }, + getLogger(["federation", "inbox"]), + ip, ); - return; + await job.log( + `Entity [${data.id}] is potentially from a bridge`, + ); + + await processor.process(); + } catch (e) { + if (e instanceof ApiError) { + // Error occurred + await job.log( + `Error during processing: ${e.message}`, + ); + + await job.log( + `Failed processing entity [${data.id}]`, + ); + + return; + } + + throw e; } await job.log( @@ -133,51 +139,58 @@ export const getInboxWorker = (): Worker => ); } - const processor = new InboxProcessor( - { - ...request, - url: new URL(request.url), - }, - data, - { - instance: remoteInstance, - key: - sender?.data.publicKey ?? - remoteInstance.data.publicKey.key, - }, - { - signature, - signedAt: new Date(signedAt * 1000), - authorization: undefined, - }, - getLogger(["federation", "inbox"]), - ip, - ); - - const output = await processor.process(); - - if (output instanceof Response) { - // Error occurred - const error = await output.json(); - await job.log(`Error during processing: ${error}`); - - await job.log(`Failed processing entity [${data.id}]`); - - await job.log( - `Sending error message to instance [${remoteInstance.data.baseUrl}]`, + try { + const processor = new InboxProcessor( + { + ...request, + url: new URL(request.url), + }, + data, + { + instance: remoteInstance, + key: + sender?.data.publicKey ?? + remoteInstance.data.publicKey.key, + }, + { + signature, + signedAt: new Date(signedAt * 1000), + authorization: undefined, + }, + getLogger(["federation", "inbox"]), + ip, ); - await remoteInstance.sendMessage( - `Failed processing entity [${data.uri}] delivered to inbox. Returned error:\n\n${JSON.stringify( - error, - null, - 4, - )}`, - ); + await processor.process(); + } catch (e) { + if (e instanceof ApiError) { + // Error occurred + await job.log( + `Error during processing: ${e.message}`, + ); - await job.log("Message sent"); + await job.log( + `Failed processing entity [${data.id}]`, + ); - return; + await job.log( + `Sending error message to instance [${remoteInstance.data.baseUrl}]`, + ); + + await remoteInstance.sendMessage( + `Failed processing entity [${data.uri}] delivered to inbox. Returned error:\n\n${JSON.stringify( + e.message, + null, + 4, + )}`, + ); + + await job.log("Message sent"); + + return; + } + + throw e; } await job.log(`Finished processing entity [${data.id}]`);