refactor(federation): ♻️ Remove Response return semantics from inbox worker

This commit is contained in:
Jesse Wierzbinski 2024-11-25 23:11:17 +01:00
parent 025d5bea94
commit a037448ebb
No known key found for this signature in database
4 changed files with 133 additions and 130 deletions

View file

@ -391,6 +391,32 @@ export class Instance extends BaseInterface<typeof Instances> {
return this; return this;
} }
public async sendMessage(content: string): Promise<void> {
const logger = getLogger(["federation", "messaging"]);
if (
!this.data.extensions?.["pub.versia:instance_messaging"]?.endpoint
) {
logger.info`Instance ${chalk.gray(
this.data.baseUrl,
)} does not support Instance Messaging, skipping message`;
return;
}
const endpoint = new URL(
this.data.extensions["pub.versia:instance_messaging"].endpoint,
);
await fetch(endpoint.href, {
method: "POST",
headers: {
"Content-Type": "text/plain",
},
body: content,
});
}
public static getCount(): Promise<number> { public static getCount(): Promise<number> {
return db.$count(Instances); return db.$count(Instances);
} }

View file

@ -209,9 +209,7 @@ describe("InboxProcessor", () => {
expect(User.resolve).toHaveBeenCalledWith("test-author"); expect(User.resolve).toHaveBeenCalledWith("test-author");
expect(Note.fromVersia).toHaveBeenCalledWith(mockNote, mockAuthor); expect(Note.fromVersia).toHaveBeenCalledWith(mockNote, mockAuthor);
expect(result).toEqual( expect(result).toBeNull();
new Response("Note created", { status: 201 }),
);
}); });
test("returns 404 when author not found", async () => { test("returns 404 when author not found", async () => {
@ -296,7 +294,7 @@ describe("InboxProcessor", () => {
const result = await processor["processDelete"](); const result = await processor["processDelete"]();
expect(mockNote.delete).toHaveBeenCalled(); expect(mockNote.delete).toHaveBeenCalled();
expect(await result.text()).toBe("Note deleted"); expect(result).toBeNull();
}); });
test("returns 404 when note not found", async () => { test("returns 404 when note not found", async () => {
@ -344,9 +342,7 @@ describe("InboxProcessor", () => {
const result = await processor["processLikeRequest"](); const result = await processor["processLikeRequest"]();
expect(mockAuthor.like).toHaveBeenCalledWith(mockNote, "test-uri"); expect(mockAuthor.like).toHaveBeenCalledWith(mockNote, "test-uri");
expect(result).toEqual( expect(result).toBeNull();
new Response("Like created", { status: 200 }),
);
}); });
test("returns 404 when author not found", async () => { test("returns 404 when author not found", async () => {
@ -376,9 +372,7 @@ describe("InboxProcessor", () => {
const result = await processor["processUserRequest"](); const result = await processor["processUserRequest"]();
expect(User.saveFromRemote).toHaveBeenCalledWith("test-uri"); expect(User.saveFromRemote).toHaveBeenCalledWith("test-uri");
expect(result).toEqual( expect(result).toBeNull();
new Response("User updated", { status: 200 }),
);
}); });
test("returns 500 when update fails", async () => { test("returns 500 when update fails", async () => {

View file

@ -199,9 +199,9 @@ export class InboxProcessor {
/** /**
* Performs request processing. * Performs request processing.
* *
* @returns {Promise<Response>} - HTTP response to send back. * @returns {Promise<Response | null>} - HTTP response to send back. Null if no response is needed (no errors).
*/ */
public async process(): Promise<Response> { public async process(): Promise<Response | null> {
!this.sender && !this.sender &&
this.logger.debug`Processing request from potential bridge`; this.logger.debug`Processing request from potential bridge`;
@ -209,9 +209,7 @@ export class InboxProcessor {
// Return 201 to avoid // Return 201 to avoid
// 1. Leaking defederated instance information // 1. Leaking defederated instance information
// 2. Preventing the sender from thinking the message was not delivered and retrying // 2. Preventing the sender from thinking the message was not delivered and retrying
return new Response("", { return null;
status: 201,
});
} }
this.logger.debug`Instance ${chalk.gray( this.logger.debug`Instance ${chalk.gray(
@ -248,17 +246,18 @@ export class InboxProcessor {
const handler = new RequestParserHandler(this.body, validator); const handler = new RequestParserHandler(this.body, validator);
try { try {
return await handler.parseBody<Response>({ return await handler.parseBody<Response | null>({
note: (): Promise<Response> => this.processNote(), note: (): Promise<Response | null> => this.processNote(),
follow: (): Promise<Response> => this.processFollowRequest(), follow: (): Promise<Response | null> =>
followAccept: (): Promise<Response> => this.processFollowRequest(),
followAccept: (): Promise<Response | null> =>
this.processFollowAccept(), this.processFollowAccept(),
followReject: (): Promise<Response> => followReject: (): Promise<Response | null> =>
this.processFollowReject(), this.processFollowReject(),
"pub.versia:likes/Like": (): Promise<Response> => "pub.versia:likes/Like": (): Promise<Response | null> =>
this.processLikeRequest(), this.processLikeRequest(),
delete: (): Promise<Response> => this.processDelete(), delete: (): Promise<Response | null> => this.processDelete(),
user: (): Promise<Response> => this.processUserRequest(), user: (): Promise<Response | null> => this.processUserRequest(),
unknown: (): Response => unknown: (): Response =>
Response.json( Response.json(
{ error: "Unknown entity type" }, { error: "Unknown entity type" },
@ -273,9 +272,9 @@ export class InboxProcessor {
/** /**
* Handles Note entity processing. * Handles Note entity processing.
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
private async processNote(): Promise<Response> { private async processNote(): Promise<Response | null> {
const note = this.body as VersiaNote; const note = this.body as VersiaNote;
const author = await User.resolve(note.author); const author = await User.resolve(note.author);
@ -288,15 +287,15 @@ export class InboxProcessor {
await Note.fromVersia(note, author); await Note.fromVersia(note, author);
return new Response("Note created", { status: 201 }); return null;
} }
/** /**
* Handles Follow entity processing. * Handles Follow entity processing.
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
private async processFollowRequest(): Promise<Response> { private async processFollowRequest(): Promise<Response | null> {
const follow = this.body as unknown as VersiaFollow; const follow = this.body as unknown as VersiaFollow;
const author = await User.resolve(follow.author); const author = await User.resolve(follow.author);
const followee = await User.resolve(follow.followee); const followee = await User.resolve(follow.followee);
@ -321,7 +320,7 @@ export class InboxProcessor {
); );
if (foundRelationship.data.following) { if (foundRelationship.data.following) {
return new Response("Already following", { status: 200 }); return null;
} }
await foundRelationship.update({ await foundRelationship.update({
@ -343,15 +342,15 @@ export class InboxProcessor {
await followee.sendFollowAccept(author); await followee.sendFollowAccept(author);
} }
return new Response("Follow request sent", { status: 200 }); return null;
} }
/** /**
* Handles FollowAccept entity processing * Handles FollowAccept entity processing
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
private async processFollowAccept(): Promise<Response> { private async processFollowAccept(): Promise<Response | null> {
const followAccept = this.body as unknown as VersiaFollowAccept; const followAccept = this.body as unknown as VersiaFollowAccept;
const author = await User.resolve(followAccept.author); const author = await User.resolve(followAccept.author);
const follower = await User.resolve(followAccept.follower); const follower = await User.resolve(followAccept.follower);
@ -376,9 +375,7 @@ export class InboxProcessor {
); );
if (!foundRelationship.data.requested) { if (!foundRelationship.data.requested) {
return new Response("There is no follow request to accept", { return null;
status: 200,
});
} }
await foundRelationship.update({ await foundRelationship.update({
@ -386,15 +383,15 @@ export class InboxProcessor {
following: true, following: true,
}); });
return new Response("Follow request accepted", { status: 200 }); return null;
} }
/** /**
* Handles FollowReject entity processing * Handles FollowReject entity processing
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
private async processFollowReject(): Promise<Response> { private async processFollowReject(): Promise<Response | null> {
const followReject = this.body as unknown as VersiaFollowReject; const followReject = this.body as unknown as VersiaFollowReject;
const author = await User.resolve(followReject.author); const author = await User.resolve(followReject.author);
const follower = await User.resolve(followReject.follower); const follower = await User.resolve(followReject.follower);
@ -419,9 +416,7 @@ export class InboxProcessor {
); );
if (!foundRelationship.data.requested) { if (!foundRelationship.data.requested) {
return new Response("There is no follow request to reject", { return null;
status: 200,
});
} }
await foundRelationship.update({ await foundRelationship.update({
@ -429,15 +424,15 @@ export class InboxProcessor {
following: false, following: false,
}); });
return new Response("Follow request rejected", { status: 200 }); return null;
} }
/** /**
* Handles Delete entity processing. * Handles Delete entity processing.
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
public async processDelete(): Promise<Response> { public async processDelete(): Promise<Response | null> {
// JS doesn't allow the use of `delete` as a variable name // JS doesn't allow the use of `delete` as a variable name
const delete_ = this.body as unknown as VersiaDelete; const delete_ = this.body as unknown as VersiaDelete;
const toDelete = delete_.deleted; const toDelete = delete_.deleted;
@ -463,7 +458,7 @@ export class InboxProcessor {
} }
await note.delete(); await note.delete();
return new Response("Note deleted", { status: 200 }); return null;
} }
case "User": { case "User": {
const userToDelete = await User.resolve(toDelete); const userToDelete = await User.resolve(toDelete);
@ -477,9 +472,7 @@ export class InboxProcessor {
if (!author || userToDelete.id === author.id) { if (!author || userToDelete.id === author.id) {
await userToDelete.delete(); await userToDelete.delete();
return new Response("Account deleted, goodbye 👋", { return null;
status: 200,
});
} }
return Response.json( return Response.json(
@ -503,7 +496,7 @@ export class InboxProcessor {
} }
await like.delete(); await like.delete();
return new Response("Like deleted", { status: 200 }); return null;
} }
default: { default: {
return Response.json( return Response.json(
@ -519,9 +512,9 @@ export class InboxProcessor {
/** /**
* Handles Like entity processing. * Handles Like entity processing.
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
private async processLikeRequest(): Promise<Response> { private async processLikeRequest(): Promise<Response | null> {
const like = this.body as unknown as VersiaLikeExtension; const like = this.body as unknown as VersiaLikeExtension;
const author = await User.resolve(like.author); const author = await User.resolve(like.author);
const likedNote = await Note.resolve(like.liked); const likedNote = await Note.resolve(like.liked);
@ -542,15 +535,15 @@ export class InboxProcessor {
await author.like(likedNote, like.uri); await author.like(likedNote, like.uri);
return new Response("Like created", { status: 200 }); return null;
} }
/** /**
* Handles User entity processing (profile edits). * Handles User entity processing (profile edits).
* *
* @returns {Promise<Response>} - The response. * @returns {Promise<Response | null>} - The response.
*/ */
private async processUserRequest(): Promise<Response> { private async processUserRequest(): Promise<Response | null> {
const user = this.body as unknown as VersiaUser; const user = this.body as unknown as VersiaUser;
// FIXME: Instead of refetching the remote user, we should read the incoming json and update from that // FIXME: Instead of refetching the remote user, we should read the incoming json and update from that
const updatedAccount = await User.saveFromRemote(user.uri); const updatedAccount = await User.saveFromRemote(user.uri);
@ -562,7 +555,7 @@ export class InboxProcessor {
); );
} }
return new Response("User updated", { status: 200 }); return null;
} }
/** /**

View file

@ -1,7 +1,6 @@
import { getLogger } from "@logtape/logtape"; import { getLogger } from "@logtape/logtape";
import { Instance, User } from "@versia/kit/db"; import { Instance, User } from "@versia/kit/db";
import { Worker } from "bullmq"; import { Worker } from "bullmq";
import chalk from "chalk";
import { config } from "~/packages/config-manager/index.ts"; import { config } from "~/packages/config-manager/index.ts";
import { connection } from "~/utils/redis.ts"; import { connection } from "~/utils/redis.ts";
import { InboxProcessor } from "../inbox/processor.ts"; import { InboxProcessor } from "../inbox/processor.ts";
@ -11,88 +10,76 @@ import {
inboxQueue, inboxQueue,
} from "../queues/inbox.ts"; } from "../queues/inbox.ts";
export const getInboxWorker = (): Worker< export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
InboxJobData, new Worker<InboxJobData, void, InboxJobType>(
Response,
InboxJobType
> =>
new Worker<InboxJobData, Response, InboxJobType>(
inboxQueue.name, inboxQueue.name,
async (job) => { async (job) => {
switch (job.name) { switch (job.name) {
case InboxJobType.ProcessEntity: { case InboxJobType.ProcessEntity: {
const { const { data, headers, request, ip } = job.data;
data,
headers: {
"x-signature": signature,
"x-nonce": nonce,
"x-signed-by": signedBy,
authorization,
},
request,
ip,
} = job.data;
const logger = getLogger(["federation", "inbox"]); await job.log(`Processing entity [${data.id}]`);
logger.debug`Processing entity ${chalk.gray( if (headers.authorization) {
data.id,
)} from ${chalk.gray(signedBy)}`;
if (authorization) {
const processor = new InboxProcessor( const processor = new InboxProcessor(
request, request,
data, data,
null, null,
{ {
signature, authorization: headers.authorization,
nonce,
authorization,
}, },
logger, getLogger(["federation", "inbox"]),
ip, ip,
); );
logger.debug`Entity ${chalk.gray( await job.log(
data.id, `Entity [${data.id}] is potentially from a bridge`,
)} is potentially from a bridge`;
return await processor.process();
}
// If not potentially from bridge, check for required headers
if (!(signature && nonce && signedBy)) {
return Response.json(
{
error: "Missing required headers: x-signature, x-nonce, or x-signed-by",
},
{
status: 400,
},
); );
const output = await processor.process();
if (output instanceof Response) {
// Error occurred
const error = await output.json();
await job.log(`Error during processing: ${error}`);
await job.log(
`Failed processing entity [${data.id}]`,
);
return;
}
await job.log(
`Finished processing entity [${data.id}]`,
);
return;
} }
const {
"x-signature": signature,
"x-nonce": nonce,
"x-signed-by": signedBy,
} = headers as {
"x-signature": string;
"x-nonce": string;
"x-signed-by": string;
};
const sender = await User.resolve(signedBy); const sender = await User.resolve(signedBy);
if (!(sender || signedBy.startsWith("instance "))) { if (!(sender || signedBy.startsWith("instance "))) {
return Response.json( await job.log(
{ `Could not resolve sender URI [${signedBy}]`,
error: `Couldn't resolve sender URI ${signedBy}`,
},
{
status: 404,
},
); );
return;
} }
if (sender?.isLocal()) { if (sender?.isLocal()) {
return Response.json( throw new Error(
{ "Cannot process federation requests from local users",
error: "Cannot process federation requests from local users",
},
{
status: 400,
},
); );
} }
@ -103,19 +90,14 @@ export const getInboxWorker = (): Worker<
); );
if (!remoteInstance) { if (!remoteInstance) {
return Response.json( await job.log("Could not resolve the remote instance.");
{ error: "Could not resolve the remote instance." },
{ return;
status: 500,
},
);
} }
logger.debug`Entity ${chalk.gray( await job.log(
data.id, `Entity [${data.id}] is from remote instance [${remoteInstance.data.baseUrl}]`,
)} is from remote instance ${chalk.gray( );
remoteInstance.data.baseUrl,
)}`;
if (!remoteInstance.data.publicKey?.key) { if (!remoteInstance.data.publicKey?.key) {
throw new Error( throw new Error(
@ -135,19 +117,27 @@ export const getInboxWorker = (): Worker<
{ {
signature, signature,
nonce, nonce,
authorization, authorization: undefined,
}, },
logger, getLogger(["federation", "inbox"]),
ip, ip,
); );
const output = await processor.process(); const output = await processor.process();
logger.debug`${chalk.green( if (output instanceof Response) {
"✔", // Error occurred
)} Finished processing entity ${chalk.gray(data.id)}`; const error = await output.json();
await job.log(`Error during processing: ${error}`);
return output; await job.log(`Failed processing entity [${data.id}]`);
return;
}
await job.log(`Finished processing entity [${data.id}]`);
return;
} }
default: { default: {