mirror of
https://github.com/versia-pub/server.git
synced 2026-04-27 12:49:16 +02:00
Some checks failed
CodeQL Scan / Analyze (push) Failing after 0s
Deploy Docs to GitHub Pages / build (push) Failing after 0s
Deploy Docs to GitHub Pages / Deploy (push) Has been skipped
Nix Build / check (push) Failing after 0s
Test Publish / build (client) (push) Failing after 0s
Test Publish / build (sdk) (push) Failing after 0s
Build Docker Images / lint (push) Has been cancelled
Build Docker Images / check (push) Has been cancelled
Build Docker Images / tests (push) Has been cancelled
Build Docker Images / detect-circular (push) Has been cancelled
Mirror to Codeberg / Mirror (push) Has been cancelled
Build Docker Images / build (server, Dockerfile, ${{ github.repository_owner }}/server) (push) Has been cancelled
Build Docker Images / build (worker, Worker.Dockerfile, ${{ github.repository_owner }}/worker) (push) Has been cancelled
87 lines
2.9 KiB
TypeScript
87 lines
2.9 KiB
TypeScript
import * as VersiaEntities from "@versia/sdk/entities";
|
|
import { config } from "@versia-server/config";
|
|
import { Worker } from "bullmq";
|
|
import chalk from "chalk";
|
|
import { User } from "../../db/user.ts";
|
|
import { connection } from "../../redis.ts";
|
|
import {
|
|
type DeliveryJobData,
|
|
DeliveryJobType,
|
|
deliveryQueue,
|
|
} from "./queue.ts";
|
|
|
|
export const getDeliveryWorker = (): Worker<
|
|
DeliveryJobData,
|
|
void,
|
|
DeliveryJobType
|
|
> =>
|
|
new Worker<DeliveryJobData, void, DeliveryJobType>(
|
|
deliveryQueue.name,
|
|
async (job) => {
|
|
switch (job.name) {
|
|
case DeliveryJobType.FederateEntity: {
|
|
const { entity, recipientId, senderId } = job.data;
|
|
|
|
const sender = await User.fromId(senderId);
|
|
|
|
if (!sender) {
|
|
throw new Error(
|
|
`Could not resolve sender ID ${chalk.gray(
|
|
senderId,
|
|
)}`,
|
|
);
|
|
}
|
|
|
|
const recipient = await User.fromId(recipientId);
|
|
|
|
if (!recipient) {
|
|
throw new Error(
|
|
`Could not resolve recipient ID ${chalk.gray(
|
|
recipientId,
|
|
)}`,
|
|
);
|
|
}
|
|
|
|
await job.log(
|
|
`Federating entity [${
|
|
entity.id
|
|
}] from @${sender.getAcct()} to @${recipient.getAcct()}`,
|
|
);
|
|
|
|
const type = entity.type;
|
|
const entityCtor = Object.values(VersiaEntities).find(
|
|
(ctor) => ctor.name === type,
|
|
) as typeof VersiaEntities.Entity | undefined;
|
|
|
|
if (!entityCtor) {
|
|
throw new Error(
|
|
`Could not resolve entity type ${chalk.gray(
|
|
type,
|
|
)} for entity [${entity.id}]`,
|
|
);
|
|
}
|
|
|
|
await sender.federateToUser(
|
|
await entityCtor.fromJSON(
|
|
entity,
|
|
config.http.base_url.hostname,
|
|
),
|
|
recipient,
|
|
);
|
|
|
|
await job.log(
|
|
`✔ Finished federating entity [${entity.id}]`,
|
|
);
|
|
}
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
removeOnComplete: {
|
|
age: config.queues.delivery?.remove_after_complete_seconds,
|
|
},
|
|
removeOnFail: {
|
|
age: config.queues.delivery?.remove_after_failure_seconds,
|
|
},
|
|
},
|
|
);
|