diff --git a/api/users/:uuid/inbox/index.ts b/api/users/:uuid/inbox/index.ts index 5fba0c54..71c4f295 100644 --- a/api/users/:uuid/inbox/index.ts +++ b/api/users/:uuid/inbox/index.ts @@ -1,11 +1,9 @@ import { apiRoute, applyConfig } from "@/api"; import { createRoute } from "@hono/zod-openapi"; -import { getLogger } from "@logtape/logtape"; import type { Entity } from "@versia/federation/types"; -import { Instance, User } from "@versia/kit/db"; import { z } from "zod"; -import { InboxProcessor } from "~/classes/inbox/processor"; import { ErrorSchema } from "~/types/api"; +import { InboxJobType, inboxQueue, inboxWorker } from "~/worker"; export const meta = applyConfig({ auth: { @@ -105,83 +103,25 @@ const route = createRoute({ export default apiRoute((app) => app.openapi(route, async (context) => { - const { - "x-signature": signature, - "x-nonce": nonce, - "x-signed-by": signedBy, - authorization, - } = context.req.valid("header"); - - const logger = getLogger(["federation", "inbox"]); const body: Entity = await context.req.valid("json"); - if (authorization) { - const processor = new InboxProcessor( - context, - body, - null, - { - signature, - nonce, - authorization, - }, - logger, - ); - - return await processor.process(); - } - - // If not potentially from bridge, check for required headers - if (!(signature && nonce && signedBy)) { - return context.json( - { - error: "Missing required headers: x-signature, x-nonce, or x-signed-by", - }, - 400, - ); - } - - const sender = await User.resolve(signedBy); - - if (!(sender || signedBy.startsWith("instance "))) { - return context.json( - { error: `Couldn't resolve sender URI ${signedBy}` }, - 404, - ); - } - - if (sender?.isLocal()) { - return context.json( - { - error: "Cannot process federation requests from local users", - }, - 400, - ); - } - - const remoteInstance = sender - ? await Instance.fromUser(sender) - : await Instance.resolveFromHost(signedBy.split(" ")[1]); - - if (!remoteInstance) { - return context.json( - { error: "Could not resolve the remote instance." }, - 500, - ); - } - - const processor = new InboxProcessor( - context, - body, - remoteInstance, - { - signature, - nonce, - authorization, + const result = await inboxQueue.add(InboxJobType.ProcessEntity, { + data: body, + headers: context.req.valid("header"), + request: { + body: await context.req.text(), + method: context.req.method, + url: context.req.url, }, - logger, - ); + ip: context.env.ip ?? null, + }); - return await processor.process(); + return new Promise((resolve) => { + inboxWorker.on("completed", (job) => { + if (job.id === result.id) { + resolve(job.returnvalue); + } + }); + }); }), ); diff --git a/classes/inbox/processor.test.ts b/classes/inbox/processor.test.ts index 5d5459a2..7ec2b202 100644 --- a/classes/inbox/processor.test.ts +++ b/classes/inbox/processor.test.ts @@ -8,7 +8,7 @@ import { Relationship, User, } from "@versia/kit/db"; -import type { Context } from "hono"; +import type { SocketAddress } from "bun"; import { ValidationError } from "zod-validation-error"; import { config } from "~/packages/config-manager/index.ts"; import { InboxProcessor } from "./processor.ts"; @@ -75,7 +75,11 @@ mock.module("~/packages/config-manager/index.ts", () => ({ })); describe("InboxProcessor", () => { - let mockContext: Context; + let mockRequest: { + url: string; + method: string; + body: string; + }; let mockBody: Entity; let mockSenderInstance: Instance; let mockHeaders: { @@ -90,18 +94,11 @@ describe("InboxProcessor", () => { mock.restore(); // Setup basic mock context - mockContext = { - json: jest.fn(), - text: jest.fn(), - req: { - url: "https://test.com", - method: "POST", - text: jest.fn().mockResolvedValue("test-body"), - }, - env: { - ip: { address: "127.0.0.1" }, - }, - } as unknown as Context; + mockRequest = { + url: "https://test.com", + method: "POST", + body: "test-body", + }; // Setup basic mock sender mockSenderInstance = { @@ -124,10 +121,14 @@ describe("InboxProcessor", () => { // Create processor instance processor = new InboxProcessor( - mockContext, + mockRequest, mockBody, mockSenderInstance, mockHeaders, + undefined, + { + address: "127.0.0.1", + } as SocketAddress, ); }); @@ -197,28 +198,27 @@ describe("InboxProcessor", () => { User.resolve = jest.fn().mockResolvedValue(mockAuthor); Note.fromVersia = jest.fn().mockResolvedValue(true); - mockContext.text = jest.fn().mockReturnValue({ status: 201 }); // biome-ignore lint/complexity/useLiteralKeys: Private variable processor["body"] = mockNote as VersiaNote; // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processNote"](); + const result = await processor["processNote"](); expect(User.resolve).toHaveBeenCalledWith("test-author"); expect(Note.fromVersia).toHaveBeenCalledWith(mockNote, mockAuthor); - expect(mockContext.text).toHaveBeenCalledWith("Note created", 201); + expect(result).toEqual( + new Response("Note created", { status: 201 }), + ); }); test("returns 404 when author not found", async () => { User.resolve = jest.fn().mockResolvedValue(null); - mockContext.json = jest.fn().mockReturnValue({ status: 404 }); // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processNote"](); + const result = await processor["processNote"](); - expect(mockContext.json).toHaveBeenCalledWith( - { error: "Author not found" }, - 404, + expect(result).toEqual( + Response.json({ error: "Author not found" }, { status: 404 }), ); }); }); @@ -248,7 +248,6 @@ describe("InboxProcessor", () => { .fn() .mockResolvedValue(mockRelationship); Notification.insert = jest.fn(); - mockContext.text = jest.fn().mockReturnValue({ status: 200 }); // biome-ignore lint/complexity/useLiteralKeys: Private variable processor["body"] = mockFollow as unknown as Entity; @@ -266,14 +265,12 @@ describe("InboxProcessor", () => { test("returns 404 when author not found", async () => { User.resolve = jest.fn().mockResolvedValue(null); - mockContext.json = jest.fn().mockReturnValue({ status: 404 }); // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processFollowRequest"](); + const result = await processor["processFollowRequest"](); - expect(mockContext.json).toHaveBeenCalledWith( - { error: "Author not found" }, - 404, + expect(result).toEqual( + Response.json({ error: "Author not found" }, { status: 404 }), ); }); }); @@ -289,15 +286,14 @@ describe("InboxProcessor", () => { }; Note.fromSql = jest.fn().mockResolvedValue(mockNote); - mockContext.text = jest.fn().mockReturnValue({ status: 200 }); // biome-ignore lint/complexity/useLiteralKeys: Private variable processor["body"] = mockDelete as unknown as Entity; // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processDelete"](); + const result = await processor["processDelete"](); expect(mockNote.delete).toHaveBeenCalled(); - expect(mockContext.text).toHaveBeenCalledWith("Note deleted", 200); + expect(await result.text()).toBe("Note deleted"); }); test("returns 404 when note not found", async () => { @@ -307,16 +303,19 @@ describe("InboxProcessor", () => { }; Note.fromSql = jest.fn().mockResolvedValue(null); - mockContext.json = jest.fn().mockReturnValue({ status: 404 }); // biome-ignore lint/complexity/useLiteralKeys: Private variable processor["body"] = mockDelete as unknown as Entity; // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processDelete"](); + const result = await processor["processDelete"](); - expect(mockContext.json).toHaveBeenCalledWith( - { error: "Note to delete not found or not owned by sender" }, - 404, + expect(result).toEqual( + Response.json( + { + error: "Note to delete not found or not owned by sender", + }, + { status: 404 }, + ), ); }); }); @@ -335,27 +334,26 @@ describe("InboxProcessor", () => { User.resolve = jest.fn().mockResolvedValue(mockAuthor); Note.resolve = jest.fn().mockResolvedValue(mockNote); - mockContext.text = jest.fn().mockReturnValue({ status: 200 }); // biome-ignore lint/complexity/useLiteralKeys: Private variable processor["body"] = mockLike as unknown as Entity; // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processLikeRequest"](); + const result = await processor["processLikeRequest"](); expect(mockAuthor.like).toHaveBeenCalledWith(mockNote, "test-uri"); - expect(mockContext.text).toHaveBeenCalledWith("Like created", 200); + expect(result).toEqual( + new Response("Like created", { status: 200 }), + ); }); test("returns 404 when author not found", async () => { User.resolve = jest.fn().mockResolvedValue(null); - mockContext.json = jest.fn().mockReturnValue({ status: 404 }); // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processLikeRequest"](); + const result = await processor["processLikeRequest"](); - expect(mockContext.json).toHaveBeenCalledWith( - { error: "Author not found" }, - 404, + expect(result).toEqual( + Response.json({ error: "Author not found" }, { status: 404 }), ); }); }); @@ -368,27 +366,29 @@ describe("InboxProcessor", () => { const mockUpdatedUser = { id: "user-id" }; User.saveFromRemote = jest.fn().mockResolvedValue(mockUpdatedUser); - mockContext.text = jest.fn().mockReturnValue({ status: 200 }); // biome-ignore lint/complexity/useLiteralKeys: Private variable processor["body"] = mockUser as unknown as Entity; // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processUserRequest"](); + const result = await processor["processUserRequest"](); expect(User.saveFromRemote).toHaveBeenCalledWith("test-uri"); - expect(mockContext.text).toHaveBeenCalledWith("User updated", 200); + expect(result).toEqual( + new Response("User updated", { status: 200 }), + ); }); test("returns 500 when update fails", async () => { User.saveFromRemote = jest.fn().mockResolvedValue(null); - mockContext.json = jest.fn().mockReturnValue({ status: 500 }); // biome-ignore lint/complexity/useLiteralKeys: Private method - await processor["processUserRequest"](); + const result = await processor["processUserRequest"](); - expect(mockContext.json).toHaveBeenCalledWith( - { error: "Failed to update user" }, - 500, + expect(result).toEqual( + Response.json( + { error: "Failed to update user" }, + { status: 500 }, + ), ); }); }); @@ -396,33 +396,35 @@ describe("InboxProcessor", () => { describe("handleError", () => { test("handles validation errors", () => { const validationError = new ValidationError("Invalid data"); - mockContext.json = jest.fn().mockReturnValue({ status: 400 }); // biome-ignore lint/complexity/useLiteralKeys: Private method - processor["handleError"](validationError); + const result = processor["handleError"](validationError); - expect(mockContext.json).toHaveBeenCalledWith( - { - error: "Failed to process request", - error_description: "Invalid data", - }, - 400, + expect(result).toEqual( + Response.json( + { + error: "Failed to process request", + error_description: "Invalid data", + }, + { status: 400 }, + ), ); }); test("handles general errors", () => { const error = new Error("Something went wrong"); - mockContext.json = jest.fn().mockReturnValue({ status: 500 }); // biome-ignore lint/complexity/useLiteralKeys: Private method - processor["handleError"](error); + const result = processor["handleError"](error); - expect(mockContext.json).toHaveBeenCalledWith( - { - error: "Failed to process request", - message: "Something went wrong", - }, - 500, + expect(result).toEqual( + Response.json( + { + error: "Failed to process request", + message: "Something went wrong", + }, + { status: 500 }, + ), ); }); }); diff --git a/classes/inbox/processor.ts b/classes/inbox/processor.ts index cda1409d..f70b5860 100644 --- a/classes/inbox/processor.ts +++ b/classes/inbox/processor.ts @@ -26,7 +26,6 @@ import { import { Likes, Notes } from "@versia/kit/tables"; import type { SocketAddress } from "bun"; import { eq } from "drizzle-orm"; -import type { Context, TypedResponse } from "hono"; import type { StatusCode } from "hono/utils/http-status"; import { matches } from "ip-matching"; import { type ValidationError, isValidationError } from "zod-validation-error"; @@ -68,7 +67,7 @@ export class InboxProcessor { /** * Creates a new InboxProcessor instance. * - * @param context Hono request context. + * @param request Request object. * @param body Entity JSON body. * @param senderInstance Sender of the request's instance (from X-Signed-By header). Null if request is from a bridge. * @param headers Various request headers. @@ -76,7 +75,11 @@ export class InboxProcessor { * @param requestIp Request IP address. Grabs it from the Hono context if not provided. */ public constructor( - private context: Context, + private request: { + url: string; + method: string; + body: string; + }, private body: Entity, private senderInstance: Instance | null, private headers: { @@ -85,7 +88,7 @@ export class InboxProcessor { authorization?: string; }, private logger: Logger = getLogger(["federation", "inbox"]), - private requestIp: SocketAddress | null = context.env?.ip ?? null, + private requestIp: SocketAddress | null = null, ) {} /** @@ -115,13 +118,13 @@ export class InboxProcessor { // HACK: Making a fake Request object instead of passing the values directly is necessary because otherwise the validation breaks for some unknown reason const isValid = await validator.validate( - new Request(this.context.req.url, { - method: this.context.req.method, + new Request(this.request.url, { + method: this.request.method, headers: { "X-Signature": this.headers.signature, "X-Nonce": this.headers.nonce, }, - body: await this.context.req.text(), + body: this.request.body, }), ); @@ -195,9 +198,7 @@ export class InboxProcessor { * * @returns {Promise} - HTTP response to send back. */ - public async process(): Promise< - (Response & TypedResponse<{ error: string }, 500, "json">) | Response - > { + public async process(): Promise { if ( this.senderInstance && isDefederated(this.senderInstance.data.baseUrl) @@ -205,15 +206,17 @@ export class InboxProcessor { // Return 201 to avoid // 1. Leaking defederated instance information // 2. Preventing the sender from thinking the message was not delivered and retrying - return this.context.text("", 201); + return new Response("", { + status: 201, + }); } const shouldCheckSignature = this.shouldCheckSignature(); if (shouldCheckSignature !== true && shouldCheckSignature !== false) { - return this.context.json( + return Response.json( { error: shouldCheckSignature.message }, - shouldCheckSignature.code, + { status: shouldCheckSignature.code }, ); } @@ -221,9 +224,9 @@ export class InboxProcessor { const isValid = await this.isSignatureValid(); if (!isValid) { - return this.context.json( + return Response.json( { error: "Signature is not valid" }, - 401, + { status: 401 }, ); } } @@ -243,9 +246,11 @@ export class InboxProcessor { this.processLikeRequest(), delete: (): Promise => this.processDelete(), user: (): Promise => this.processUserRequest(), - unknown: (): Response & - TypedResponse<{ error: string }, 400, "json"> => - this.context.json({ error: "Unknown entity type" }, 400), + unknown: (): Response => + Response.json( + { error: "Unknown entity type" }, + { status: 400 }, + ), }); } catch (e) { return this.handleError(e as Error); @@ -257,27 +262,20 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - private async processNote(): Promise< - Response & - TypedResponse< - | { - error: string; - } - | string, - 404 | 500 | 201, - "json" | "text" - > - > { + private async processNote(): Promise { const note = this.body as VersiaNote; const author = await User.resolve(note.author); if (!author) { - return this.context.json({ error: "Author not found" }, 404); + return Response.json( + { error: "Author not found" }, + { status: 404 }, + ); } await Note.fromVersia(note, author); - return this.context.text("Note created", 201); + return new Response("Note created", { status: 201 }); } /** @@ -285,27 +283,23 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - private async processFollowRequest(): Promise< - Response & - TypedResponse< - | { - error: string; - } - | string, - 200 | 404, - "text" | "json" - > - > { + private async processFollowRequest(): Promise { const follow = this.body as unknown as VersiaFollow; const author = await User.resolve(follow.author); const followee = await User.resolve(follow.followee); if (!author) { - return this.context.json({ error: "Author not found" }, 404); + return Response.json( + { error: "Author not found" }, + { status: 404 }, + ); } if (!followee) { - return this.context.json({ error: "Followee not found" }, 404); + return Response.json( + { error: "Followee not found" }, + { status: 404 }, + ); } const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -314,7 +308,7 @@ export class InboxProcessor { ); if (foundRelationship.data.following) { - return this.context.text("Already following", 200); + return new Response("Already following", { status: 200 }); } await foundRelationship.update({ @@ -336,7 +330,7 @@ export class InboxProcessor { await followee.sendFollowAccept(author); } - return this.context.text("Follow request sent", 200); + return new Response("Follow request sent", { status: 200 }); } /** @@ -344,24 +338,23 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - private async processFollowAccept(): Promise< - Response & - TypedResponse< - { error: string } | string, - 200 | 404, - "text" | "json" - > - > { + private async processFollowAccept(): Promise { const followAccept = this.body as unknown as VersiaFollowAccept; const author = await User.resolve(followAccept.author); const follower = await User.resolve(followAccept.follower); if (!author) { - return this.context.json({ error: "Author not found" }, 404); + return Response.json( + { error: "Author not found" }, + { status: 404 }, + ); } if (!follower) { - return this.context.json({ error: "Follower not found" }, 404); + return Response.json( + { error: "Follower not found" }, + { status: 404 }, + ); } const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -370,10 +363,9 @@ export class InboxProcessor { ); if (!foundRelationship.data.requested) { - return this.context.text( - "There is no follow request to accept", - 200, - ); + return new Response("There is no follow request to accept", { + status: 200, + }); } await foundRelationship.update({ @@ -381,7 +373,7 @@ export class InboxProcessor { following: true, }); - return this.context.text("Follow request accepted", 200); + return new Response("Follow request accepted", { status: 200 }); } /** @@ -389,24 +381,23 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - private async processFollowReject(): Promise< - Response & - TypedResponse< - { error: string } | string, - 200 | 404, - "text" | "json" - > - > { + private async processFollowReject(): Promise { const followReject = this.body as unknown as VersiaFollowReject; const author = await User.resolve(followReject.author); const follower = await User.resolve(followReject.follower); if (!author) { - return this.context.json({ error: "Author not found" }, 404); + return Response.json( + { error: "Author not found" }, + { status: 404 }, + ); } if (!follower) { - return this.context.json({ error: "Follower not found" }, 404); + return Response.json( + { error: "Follower not found" }, + { status: 404 }, + ); } const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -415,10 +406,9 @@ export class InboxProcessor { ); if (!foundRelationship.data.requested) { - return this.context.text( - "There is no follow request to reject", - 200, - ); + return new Response("There is no follow request to reject", { + status: 200, + }); } await foundRelationship.update({ @@ -426,7 +416,7 @@ export class InboxProcessor { following: false, }); - return this.context.text("Follow request rejected", 200); + return new Response("Follow request rejected", { status: 200 }); } /** @@ -434,14 +424,7 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - public async processDelete(): Promise< - Response & - TypedResponse< - { error: string } | string, - 200 | 400 | 404, - "text" | "json" - > - > { + public async processDelete(): Promise { // JS doesn't allow the use of `delete` as a variable name const delete_ = this.body as unknown as VersiaDelete; const toDelete = delete_.deleted; @@ -458,40 +441,39 @@ export class InboxProcessor { ); if (!note) { - return this.context.json( + return Response.json( { error: "Note to delete not found or not owned by sender", }, - 404, + { status: 404 }, ); } await note.delete(); - return this.context.text("Note deleted", 200); + return new Response("Note deleted", { status: 200 }); } case "User": { const userToDelete = await User.resolve(toDelete); if (!userToDelete) { - return this.context.json( + return Response.json( { error: "User to delete not found" }, - 404, + { status: 404 }, ); } if (!author || userToDelete.id === author.id) { await userToDelete.delete(); - return this.context.text( - "Account deleted, goodbye 👋", - 200, - ); + return new Response("Account deleted, goodbye 👋", { + status: 200, + }); } - return this.context.json( + return Response.json( { error: "Cannot delete other users than self", }, - 400, + { status: 400 }, ); } case "pub.versia:likes/Like": { @@ -501,21 +483,21 @@ export class InboxProcessor { ); if (!like) { - return this.context.json( + return Response.json( { error: "Like not found or not owned by sender" }, - 404, + { status: 404 }, ); } await like.delete(); - return this.context.text("Like deleted", 200); + return new Response("Like deleted", { status: 200 }); } default: { - return this.context.json( + return Response.json( { error: `Deletion of object ${toDelete} not implemented`, }, - 400, + { status: 400 }, ); } } @@ -526,29 +508,28 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - private async processLikeRequest(): Promise< - Response & - TypedResponse< - { error: string } | string, - 200 | 404, - "text" | "json" - > - > { + private async processLikeRequest(): Promise { const like = this.body as unknown as VersiaLikeExtension; const author = await User.resolve(like.author); const likedNote = await Note.resolve(like.liked); if (!author) { - return this.context.json({ error: "Author not found" }, 404); + return Response.json( + { error: "Author not found" }, + { status: 404 }, + ); } if (!likedNote) { - return this.context.json({ error: "Liked Note not found" }, 404); + return Response.json( + { error: "Liked Note not found" }, + { status: 404 }, + ); } await author.like(likedNote, like.uri); - return this.context.text("Like created", 200); + return new Response("Like created", { status: 200 }); } /** @@ -556,23 +537,19 @@ export class InboxProcessor { * * @returns {Promise} - The response. */ - private async processUserRequest(): Promise< - Response & - TypedResponse< - { error: string } | string, - 200 | 500, - "text" | "json" - > - > { + private async processUserRequest(): Promise { const user = this.body as unknown as VersiaUser; // FIXME: Instead of refetching the remote user, we should read the incoming json and update from that const updatedAccount = await User.saveFromRemote(user.uri); if (!updatedAccount) { - return this.context.json({ error: "Failed to update user" }, 500); + return Response.json( + { error: "Failed to update user" }, + { status: 500 }, + ); } - return this.context.text("User updated", 200); + return new Response("User updated", { status: 200 }); } /** @@ -581,44 +558,26 @@ export class InboxProcessor { * @param {Error} e - The error object. * @returns {Response} - The error response. */ - private handleError(e: Error): - | (Response & - TypedResponse< - { - error: string; - error_description: string; - }, - 400, - "json" - >) - | (Response & - TypedResponse< - { - error: string; - message: string; - }, - 500, - "json" - >) { + private handleError(e: Error): Response { if (isValidationError(e)) { - return this.context.json( + return Response.json( { error: "Failed to process request", error_description: (e as ValidationError).message, }, - 400, + { status: 400 }, ); } this.logger.error`${e}`; sentry?.captureException(e); - return this.context.json( + return Response.json( { error: "Failed to process request", message: (e as Error).message, }, - 500, + { status: 500 }, ); } } diff --git a/worker.ts b/worker.ts index d8743c6e..b346095a 100644 --- a/worker.ts +++ b/worker.ts @@ -1,7 +1,10 @@ +import { getLogger } from "@logtape/logtape"; import type { Entity } from "@versia/federation/types"; -import { Note } from "@versia/kit/db"; +import { Instance, Note, User } from "@versia/kit/db"; import { Queue, Worker } from "bullmq"; +import type { SocketAddress } from "bun"; import IORedis from "ioredis"; +import { InboxProcessor } from "./classes/inbox/processor.ts"; import { config } from "./packages/config-manager/index.ts"; const connection = new IORedis({ @@ -9,16 +12,33 @@ const connection = new IORedis({ port: config.redis.queue.port, password: config.redis.queue.password, db: config.redis.queue.database, + maxRetriesPerRequest: null, }); -enum DeliveryJobType { +export enum DeliveryJobType { FederateNote = "federateNote", } -enum InboxJobType { +export enum InboxJobType { ProcessEntity = "processEntity", } +type InboxJobData = { + data: Entity; + headers: { + "x-signature"?: string; + "x-nonce"?: string; + "x-signed-by"?: string; + authorization?: string; + }; + request: { + url: string; + method: string; + body: string; + }; + ip: SocketAddress | null; +}; + const deliveryQueue = new Queue<{ noteId: string }, void, DeliveryJobType>( "delivery", { @@ -26,14 +46,18 @@ const deliveryQueue = new Queue<{ noteId: string }, void, DeliveryJobType>( }, ); -export const inboxQueue = new Queue<{ data: Entity }, void, InboxJobType>( +export const inboxQueue = new Queue( "inbox", { connection, }, ); -export const worker = new Worker<{ noteId: string }, void, DeliveryJobType>( +export const deliveryWorker = new Worker< + { noteId: string }, + void, + DeliveryJobType +>( deliveryQueue.name, async (job) => { switch (job.name) { @@ -43,7 +67,9 @@ export const worker = new Worker<{ noteId: string }, void, DeliveryJobType>( const note = await Note.fromId(noteId); if (!note) { - throw new Error(`Note with ID ${noteId} not found`); + throw new Error( + `Note with ID ${noteId} not found in database`, + ); } await note.federateToUsers(); @@ -52,3 +78,110 @@ export const worker = new Worker<{ noteId: string }, void, DeliveryJobType>( }, { connection }, ); + +export const inboxWorker = new Worker( + inboxQueue.name, + async (job) => { + switch (job.name) { + case InboxJobType.ProcessEntity: { + const { + data, + headers: { + "x-signature": signature, + "x-nonce": nonce, + "x-signed-by": signedBy, + authorization, + }, + request, + ip, + } = job.data; + + const logger = getLogger(["federation", "inbox"]); + + if (authorization) { + const processor = new InboxProcessor( + request, + data, + null, + { + signature, + nonce, + authorization, + }, + logger, + ip, + ); + + return await processor.process(); + } + + // If not potentially from bridge, check for required headers + if (!(signature && nonce && signedBy)) { + return Response.json( + { + error: "Missing required headers: x-signature, x-nonce, or x-signed-by", + }, + { + status: 400, + }, + ); + } + + const sender = await User.resolve(signedBy); + + if (!(sender || signedBy.startsWith("instance "))) { + return Response.json( + { error: `Couldn't resolve sender URI ${signedBy}` }, + { + status: 404, + }, + ); + } + + if (sender?.isLocal()) { + return Response.json( + { + error: "Cannot process federation requests from local users", + }, + { + status: 400, + }, + ); + } + + const remoteInstance = sender + ? await Instance.fromUser(sender) + : await Instance.resolveFromHost(signedBy.split(" ")[1]); + + if (!remoteInstance) { + return Response.json( + { error: "Could not resolve the remote instance." }, + { + status: 500, + }, + ); + } + + const processor = new InboxProcessor( + request, + data, + remoteInstance, + { + signature, + nonce, + authorization, + }, + logger, + ip, + ); + + return await processor.process(); + } + + default: { + throw new Error(`Unknown job type: ${job.name}`); + } + } + }, + { connection }, +);