From a037448ebb4804cc7e2df1d77f82095b2b8b2735 Mon Sep 17 00:00:00 2001 From: Jesse Wierzbinski Date: Mon, 25 Nov 2024 23:11:17 +0100 Subject: [PATCH] refactor(federation): :recycle: Remove Response return semantics from inbox worker --- classes/database/instance.ts | 26 ++++++ classes/inbox/processor.test.ts | 14 +--- classes/inbox/processor.ts | 83 +++++++++---------- classes/workers/inbox.ts | 140 +++++++++++++++----------------- 4 files changed, 133 insertions(+), 130 deletions(-) diff --git a/classes/database/instance.ts b/classes/database/instance.ts index 8cab33a9..4f940f40 100644 --- a/classes/database/instance.ts +++ b/classes/database/instance.ts @@ -391,6 +391,32 @@ export class Instance extends BaseInterface { return this; } + public async sendMessage(content: string): Promise { + const logger = getLogger(["federation", "messaging"]); + + if ( + !this.data.extensions?.["pub.versia:instance_messaging"]?.endpoint + ) { + logger.info`Instance ${chalk.gray( + this.data.baseUrl, + )} does not support Instance Messaging, skipping message`; + + return; + } + + const endpoint = new URL( + this.data.extensions["pub.versia:instance_messaging"].endpoint, + ); + + await fetch(endpoint.href, { + method: "POST", + headers: { + "Content-Type": "text/plain", + }, + body: content, + }); + } + public static getCount(): Promise { return db.$count(Instances); } diff --git a/classes/inbox/processor.test.ts b/classes/inbox/processor.test.ts index e708ce0d..a8d2593e 100644 --- a/classes/inbox/processor.test.ts +++ b/classes/inbox/processor.test.ts @@ -209,9 +209,7 @@ describe("InboxProcessor", () => { expect(User.resolve).toHaveBeenCalledWith("test-author"); expect(Note.fromVersia).toHaveBeenCalledWith(mockNote, mockAuthor); - expect(result).toEqual( - new Response("Note created", { status: 201 }), - ); + expect(result).toBeNull(); }); test("returns 404 when author not found", async () => { @@ -296,7 +294,7 @@ describe("InboxProcessor", () => { const result = await processor["processDelete"](); expect(mockNote.delete).toHaveBeenCalled(); - expect(await result.text()).toBe("Note deleted"); + expect(result).toBeNull(); }); test("returns 404 when note not found", async () => { @@ -344,9 +342,7 @@ describe("InboxProcessor", () => { const result = await processor["processLikeRequest"](); expect(mockAuthor.like).toHaveBeenCalledWith(mockNote, "test-uri"); - expect(result).toEqual( - new Response("Like created", { status: 200 }), - ); + expect(result).toBeNull(); }); test("returns 404 when author not found", async () => { @@ -376,9 +372,7 @@ describe("InboxProcessor", () => { const result = await processor["processUserRequest"](); expect(User.saveFromRemote).toHaveBeenCalledWith("test-uri"); - expect(result).toEqual( - new Response("User updated", { status: 200 }), - ); + expect(result).toBeNull(); }); test("returns 500 when update fails", async () => { diff --git a/classes/inbox/processor.ts b/classes/inbox/processor.ts index 78cb00e3..cdc2729e 100644 --- a/classes/inbox/processor.ts +++ b/classes/inbox/processor.ts @@ -199,9 +199,9 @@ export class InboxProcessor { /** * Performs request processing. * - * @returns {Promise} - HTTP response to send back. + * @returns {Promise} - HTTP response to send back. Null if no response is needed (no errors). */ - public async process(): Promise { + public async process(): Promise { !this.sender && this.logger.debug`Processing request from potential bridge`; @@ -209,9 +209,7 @@ export class InboxProcessor { // Return 201 to avoid // 1. Leaking defederated instance information // 2. Preventing the sender from thinking the message was not delivered and retrying - return new Response("", { - status: 201, - }); + return null; } this.logger.debug`Instance ${chalk.gray( @@ -248,17 +246,18 @@ export class InboxProcessor { const handler = new RequestParserHandler(this.body, validator); try { - return await handler.parseBody({ - note: (): Promise => this.processNote(), - follow: (): Promise => this.processFollowRequest(), - followAccept: (): Promise => + return await handler.parseBody({ + note: (): Promise => this.processNote(), + follow: (): Promise => + this.processFollowRequest(), + followAccept: (): Promise => this.processFollowAccept(), - followReject: (): Promise => + followReject: (): Promise => this.processFollowReject(), - "pub.versia:likes/Like": (): Promise => + "pub.versia:likes/Like": (): Promise => this.processLikeRequest(), - delete: (): Promise => this.processDelete(), - user: (): Promise => this.processUserRequest(), + delete: (): Promise => this.processDelete(), + user: (): Promise => this.processUserRequest(), unknown: (): Response => Response.json( { error: "Unknown entity type" }, @@ -273,9 +272,9 @@ export class InboxProcessor { /** * Handles Note entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - private async processNote(): Promise { + private async processNote(): Promise { const note = this.body as VersiaNote; const author = await User.resolve(note.author); @@ -288,15 +287,15 @@ export class InboxProcessor { await Note.fromVersia(note, author); - return new Response("Note created", { status: 201 }); + return null; } /** * Handles Follow entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - private async processFollowRequest(): Promise { + private async processFollowRequest(): Promise { const follow = this.body as unknown as VersiaFollow; const author = await User.resolve(follow.author); const followee = await User.resolve(follow.followee); @@ -321,7 +320,7 @@ export class InboxProcessor { ); if (foundRelationship.data.following) { - return new Response("Already following", { status: 200 }); + return null; } await foundRelationship.update({ @@ -343,15 +342,15 @@ export class InboxProcessor { await followee.sendFollowAccept(author); } - return new Response("Follow request sent", { status: 200 }); + return null; } /** * Handles FollowAccept entity processing * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - private async processFollowAccept(): Promise { + private async processFollowAccept(): Promise { const followAccept = this.body as unknown as VersiaFollowAccept; const author = await User.resolve(followAccept.author); const follower = await User.resolve(followAccept.follower); @@ -376,9 +375,7 @@ export class InboxProcessor { ); if (!foundRelationship.data.requested) { - return new Response("There is no follow request to accept", { - status: 200, - }); + return null; } await foundRelationship.update({ @@ -386,15 +383,15 @@ export class InboxProcessor { following: true, }); - return new Response("Follow request accepted", { status: 200 }); + return null; } /** * Handles FollowReject entity processing * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - private async processFollowReject(): Promise { + private async processFollowReject(): Promise { const followReject = this.body as unknown as VersiaFollowReject; const author = await User.resolve(followReject.author); const follower = await User.resolve(followReject.follower); @@ -419,9 +416,7 @@ export class InboxProcessor { ); if (!foundRelationship.data.requested) { - return new Response("There is no follow request to reject", { - status: 200, - }); + return null; } await foundRelationship.update({ @@ -429,15 +424,15 @@ export class InboxProcessor { following: false, }); - return new Response("Follow request rejected", { status: 200 }); + return null; } /** * Handles Delete entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - public async processDelete(): Promise { + public async processDelete(): Promise { // JS doesn't allow the use of `delete` as a variable name const delete_ = this.body as unknown as VersiaDelete; const toDelete = delete_.deleted; @@ -463,7 +458,7 @@ export class InboxProcessor { } await note.delete(); - return new Response("Note deleted", { status: 200 }); + return null; } case "User": { const userToDelete = await User.resolve(toDelete); @@ -477,9 +472,7 @@ export class InboxProcessor { if (!author || userToDelete.id === author.id) { await userToDelete.delete(); - return new Response("Account deleted, goodbye 👋", { - status: 200, - }); + return null; } return Response.json( @@ -503,7 +496,7 @@ export class InboxProcessor { } await like.delete(); - return new Response("Like deleted", { status: 200 }); + return null; } default: { return Response.json( @@ -519,9 +512,9 @@ export class InboxProcessor { /** * Handles Like entity processing. * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - private async processLikeRequest(): Promise { + private async processLikeRequest(): Promise { const like = this.body as unknown as VersiaLikeExtension; const author = await User.resolve(like.author); const likedNote = await Note.resolve(like.liked); @@ -542,15 +535,15 @@ export class InboxProcessor { await author.like(likedNote, like.uri); - return new Response("Like created", { status: 200 }); + return null; } /** * Handles User entity processing (profile edits). * - * @returns {Promise} - The response. + * @returns {Promise} - The response. */ - private async processUserRequest(): Promise { + private async processUserRequest(): Promise { const user = this.body as unknown as VersiaUser; // FIXME: Instead of refetching the remote user, we should read the incoming json and update from that const updatedAccount = await User.saveFromRemote(user.uri); @@ -562,7 +555,7 @@ export class InboxProcessor { ); } - return new Response("User updated", { status: 200 }); + return null; } /** diff --git a/classes/workers/inbox.ts b/classes/workers/inbox.ts index 3ce3af56..918e2236 100644 --- a/classes/workers/inbox.ts +++ b/classes/workers/inbox.ts @@ -1,7 +1,6 @@ import { getLogger } from "@logtape/logtape"; import { Instance, User } from "@versia/kit/db"; import { Worker } from "bullmq"; -import chalk from "chalk"; import { config } from "~/packages/config-manager/index.ts"; import { connection } from "~/utils/redis.ts"; import { InboxProcessor } from "../inbox/processor.ts"; @@ -11,88 +10,76 @@ import { inboxQueue, } from "../queues/inbox.ts"; -export const getInboxWorker = (): Worker< - InboxJobData, - Response, - InboxJobType -> => - new Worker( +export const getInboxWorker = (): Worker => + new Worker( inboxQueue.name, async (job) => { switch (job.name) { case InboxJobType.ProcessEntity: { - const { - data, - headers: { - "x-signature": signature, - "x-nonce": nonce, - "x-signed-by": signedBy, - authorization, - }, - request, - ip, - } = job.data; + const { data, headers, request, ip } = job.data; - const logger = getLogger(["federation", "inbox"]); + await job.log(`Processing entity [${data.id}]`); - logger.debug`Processing entity ${chalk.gray( - data.id, - )} from ${chalk.gray(signedBy)}`; - - if (authorization) { + if (headers.authorization) { const processor = new InboxProcessor( request, data, null, { - signature, - nonce, - authorization, + authorization: headers.authorization, }, - logger, + getLogger(["federation", "inbox"]), ip, ); - logger.debug`Entity ${chalk.gray( - data.id, - )} is potentially from a bridge`; - - return await processor.process(); - } - - // If not potentially from bridge, check for required headers - if (!(signature && nonce && signedBy)) { - return Response.json( - { - error: "Missing required headers: x-signature, x-nonce, or x-signed-by", - }, - { - status: 400, - }, + await job.log( + `Entity [${data.id}] is potentially from a bridge`, ); + + const output = await processor.process(); + + if (output instanceof Response) { + // Error occurred + const error = await output.json(); + await job.log(`Error during processing: ${error}`); + + await job.log( + `Failed processing entity [${data.id}]`, + ); + + return; + } + + await job.log( + `Finished processing entity [${data.id}]`, + ); + + return; } + const { + "x-signature": signature, + "x-nonce": nonce, + "x-signed-by": signedBy, + } = headers as { + "x-signature": string; + "x-nonce": string; + "x-signed-by": string; + }; + const sender = await User.resolve(signedBy); if (!(sender || signedBy.startsWith("instance "))) { - return Response.json( - { - error: `Couldn't resolve sender URI ${signedBy}`, - }, - { - status: 404, - }, + await job.log( + `Could not resolve sender URI [${signedBy}]`, ); + + return; } if (sender?.isLocal()) { - return Response.json( - { - error: "Cannot process federation requests from local users", - }, - { - status: 400, - }, + throw new Error( + "Cannot process federation requests from local users", ); } @@ -103,19 +90,14 @@ export const getInboxWorker = (): Worker< ); if (!remoteInstance) { - return Response.json( - { error: "Could not resolve the remote instance." }, - { - status: 500, - }, - ); + await job.log("Could not resolve the remote instance."); + + return; } - logger.debug`Entity ${chalk.gray( - data.id, - )} is from remote instance ${chalk.gray( - remoteInstance.data.baseUrl, - )}`; + await job.log( + `Entity [${data.id}] is from remote instance [${remoteInstance.data.baseUrl}]`, + ); if (!remoteInstance.data.publicKey?.key) { throw new Error( @@ -135,19 +117,27 @@ export const getInboxWorker = (): Worker< { signature, nonce, - authorization, + authorization: undefined, }, - logger, + getLogger(["federation", "inbox"]), ip, ); const output = await processor.process(); - logger.debug`${chalk.green( - "✔", - )} Finished processing entity ${chalk.gray(data.id)}`; + if (output instanceof Response) { + // Error occurred + const error = await output.json(); + await job.log(`Error during processing: ${error}`); - return output; + await job.log(`Failed processing entity [${data.id}]`); + + return; + } + + await job.log(`Finished processing entity [${data.id}]`); + + return; } default: {