fix(federation): 🐛 Correctly remove listeners after job processing finishes

This commit is contained in:
Jesse Wierzbinski 2024-11-25 08:59:48 +01:00
parent 3e19b11609
commit 3ef361f521
No known key found for this signature in database
2 changed files with 25 additions and 6 deletions

View file

@ -1,9 +1,15 @@
import { apiRoute, applyConfig } from "@/api"; import { apiRoute, applyConfig } from "@/api";
import { createRoute } from "@hono/zod-openapi"; import { createRoute } from "@hono/zod-openapi";
import type { Entity } from "@versia/federation/types"; import type { Entity } from "@versia/federation/types";
import type { Job } from "bullmq";
import { z } from "zod"; import { z } from "zod";
import { ErrorSchema } from "~/types/api"; import { ErrorSchema } from "~/types/api";
import { InboxJobType, inboxQueue, inboxWorker } from "~/worker"; import {
type InboxJobData,
InboxJobType,
inboxQueue,
inboxWorker,
} from "~/worker";
export const meta = applyConfig({ export const meta = applyConfig({
auth: { auth: {
@ -117,17 +123,30 @@ export default apiRoute((app) =>
}); });
return new Promise<Response>((resolve, reject) => { return new Promise<Response>((resolve, reject) => {
inboxWorker.on("completed", (job) => { const successCallback = (
job: Job<InboxJobData, Response, InboxJobType>,
): void => {
if (job.id === result.id) { if (job.id === result.id) {
inboxWorker.off("completed", successCallback);
inboxWorker.off("failed", failureCallback);
resolve(job.returnvalue); resolve(job.returnvalue);
} }
}); };
inboxWorker.on("failed", (job, error) => { const failureCallback = (
job: Job<InboxJobData, Response, InboxJobType> | undefined,
error: Error,
): void => {
if (job && job.id === result.id) { if (job && job.id === result.id) {
inboxWorker.off("completed", successCallback);
inboxWorker.off("failed", failureCallback);
reject(error); reject(error);
} }
}); };
inboxWorker.on("completed", successCallback);
inboxWorker.on("failed", failureCallback);
}); });
}), }),
); );

View file

@ -24,7 +24,7 @@ export enum InboxJobType {
ProcessEntity = "processEntity", ProcessEntity = "processEntity",
} }
type InboxJobData = { export type InboxJobData = {
data: Entity; data: Entity;
headers: { headers: {
"x-signature"?: string; "x-signature"?: string;