refactor(federation): ♻️ Make user inbox use the delayed processing

This commit is contained in:
Jesse Wierzbinski 2024-11-25 21:17:52 +01:00
parent a6574249df
commit 0b3e74107e
No known key found for this signature in database
2 changed files with 7 additions and 35 deletions

View file

@ -66,7 +66,7 @@ export default apiRoute((app) =>
}); });
return context.newResponse( return context.newResponse(
"Request processing initiated.\nImplement the Instance Messaging Extension to receive any eventual errors", "Request processing initiated.\nImplement the Instance Messaging Extension to receive any eventual feedback (errors, etc.)",
200, 200,
); );
}), }),

View file

@ -1,15 +1,9 @@
import { apiRoute, applyConfig } from "@/api"; import { apiRoute, applyConfig } from "@/api";
import { createRoute } from "@hono/zod-openapi"; import { createRoute } from "@hono/zod-openapi";
import type { Entity } from "@versia/federation/types"; import type { Entity } from "@versia/federation/types";
import type { Job } from "bullmq";
import { z } from "zod"; import { z } from "zod";
import { ErrorSchema } from "~/types/api"; import { ErrorSchema } from "~/types/api";
import { import { InboxJobType, inboxQueue } from "~/worker";
type InboxJobData,
InboxJobType,
inboxQueue,
inboxWorker,
} from "~/worker";
export const meta = applyConfig({ export const meta = applyConfig({
auth: { auth: {
@ -111,7 +105,7 @@ export default apiRoute((app) =>
app.openapi(route, async (context) => { app.openapi(route, async (context) => {
const body: Entity = await context.req.valid("json"); const body: Entity = await context.req.valid("json");
const result = await inboxQueue.add(InboxJobType.ProcessEntity, { await inboxQueue.add(InboxJobType.ProcessEntity, {
data: body, data: body,
headers: context.req.valid("header"), headers: context.req.valid("header"),
request: { request: {
@ -122,31 +116,9 @@ export default apiRoute((app) =>
ip: context.env.ip ?? null, ip: context.env.ip ?? null,
}); });
return new Promise<Response>((resolve, reject) => { return context.newResponse(
const successCallback = ( "Request processing initiated.\nImplement the Instance Messaging Extension to receive any eventual feedback (errors, etc.)",
job: Job<InboxJobData, Response, InboxJobType>, 200,
): void => { );
if (job.id === result.id) {
inboxWorker.off("completed", successCallback);
inboxWorker.off("failed", failureCallback);
resolve(job.returnvalue);
}
};
const failureCallback = (
job: Job<InboxJobData, Response, InboxJobType> | undefined,
error: Error,
): void => {
if (job && job.id === result.id) {
inboxWorker.off("completed", successCallback);
inboxWorker.off("failed", failureCallback);
reject(error);
}
};
inboxWorker.on("completed", successCallback);
inboxWorker.on("failed", failureCallback);
});
}), }),
); );