refactor(worker): 🚚 Move queue code to plugin-kit package
Some checks failed
Mirror to Codeberg / Mirror (push) Failing after 1s
Test Publish / build (client) (push) Failing after 1s
Test Publish / build (sdk) (push) Failing after 1s

This commit is contained in:
Jesse Wierzbinski 2025-06-29 22:56:52 +02:00
parent dc802ff5f6
commit 7de4b573e3
No known key found for this signature in database
27 changed files with 68 additions and 57 deletions

View file

@ -23,7 +23,7 @@ import sharp from "sharp";
import type { z } from "zod";
import { mimeLookup } from "@/content_types.ts";
import { getMediaHash } from "../../../classes/media/media-hasher.ts";
import { MediaJobType, mediaQueue } from "../../../classes/queues/media.ts";
import { MediaJobType, mediaQueue } from "../queues/media.ts";
import { BaseInterface } from "./base.ts";
type MediaType = InferSelectModel<typeof Medias>;

View file

@ -30,10 +30,7 @@ import { createRegExp, exactly, global } from "magic-regexp";
import type { z } from "zod";
import { mergeAndDeduplicate } from "@/lib.ts";
import { sanitizedHtmlStrip } from "@/sanitization";
import {
DeliveryJobType,
deliveryQueue,
} from "../../../classes/queues/delivery.ts";
import { DeliveryJobType, deliveryQueue } from "../queues/delivery.ts";
import { Application } from "./application.ts";
import { BaseInterface } from "./base.ts";
import { Emoji } from "./emoji.ts";

View file

@ -54,11 +54,8 @@ import { getBestContentType } from "@/content_types";
import { randomString } from "@/math";
import { searchManager } from "~/classes/search/search-manager";
import type { HttpVerb, KnownEntity } from "~/types/api.ts";
import {
DeliveryJobType,
deliveryQueue,
} from "../../../classes/queues/delivery.ts";
import { PushJobType, pushQueue } from "../../../classes/queues/push.ts";
import { DeliveryJobType, deliveryQueue } from "../queues/delivery.ts";
import { PushJobType, pushQueue } from "../queues/push.ts";
import { BaseInterface } from "./base.ts";
import { Emoji } from "./emoji.ts";
import { Instance } from "./instance.ts";

View file

@ -0,0 +1,601 @@
import { EntitySorter, type JSONObject } from "@versia/sdk";
import { verify } from "@versia/sdk/crypto";
import * as VersiaEntities from "@versia/sdk/entities";
import { config } from "@versia-server/config";
import { ApiError } from "@versia-server/kit";
import {
type Instance,
Like,
Note,
Reaction,
Relationship,
User,
} from "@versia-server/kit/db";
import { Likes, Notes } from "@versia-server/kit/tables";
import { federationInboxLogger } from "@versia-server/logging";
import type { SocketAddress } from "bun";
import { Glob } from "bun";
import chalk from "chalk";
import { and, eq } from "drizzle-orm";
import { matches } from "ip-matching";
import { isValidationError } from "zod-validation-error";
/**
* Checks if the hostname is defederated using glob matching.
* @param {string} hostname - The hostname to check. Can contain glob patterns.
* @returns {boolean} - True if defederated, false otherwise.
*/
function isDefederated(hostname: string): boolean {
const pattern = new Glob(hostname);
return (
config.federation.blocked.find(
(blocked) => pattern.match(blocked.toString()) !== null,
) !== undefined
);
}
/**
* Processes incoming federation inbox messages.
*
* @example
* ```typescript
* const processor = new InboxProcessor(context, body, sender, headers);
*
* await processor.process();
* ```
*/
export class InboxProcessor {
/**
* Creates a new InboxProcessor instance.
*
* @param request Request object.
* @param body Entity JSON body.
* @param sender Sender of the request's instance and key (from Versia-Signed-By header). Null if request is from a bridge.
* @param headers Various request headers.
* @param logger LogTape logger instance.
* @param requestIp Request IP address. Grabs it from the Hono context if not provided.
*/
public constructor(
private request: Request,
private body: JSONObject,
private sender: {
instance: Instance;
key: CryptoKey;
} | null,
private authorizationHeader?: string,
private requestIp: SocketAddress | null = null,
) {}
/**
* Verifies the request signature.
*
* @returns {Promise<boolean>} - Whether the signature is valid.
*/
private isSignatureValid(): Promise<boolean> {
if (!this.sender) {
throw new Error("Sender is not defined");
}
return verify(this.sender.key, this.request);
}
/**
* Determines if signature checks can be skipped.
* Useful for requests from federation bridges.
*
* @returns {boolean} - Whether to skip signature checks.
*/
private shouldCheckSignature(): boolean {
if (config.federation.bridge) {
const token = this.authorizationHeader?.split("Bearer ")[1];
if (token) {
return this.isRequestFromBridge(token);
}
}
return true;
}
/**
* Checks if a request is from a federation bridge.
*
* @param token - Authorization token to check.
* @returns {boolean} - Whether the request is from a federation bridge.
*/
private isRequestFromBridge(token: string): boolean {
if (!config.federation.bridge) {
throw new ApiError(
500,
"Bridge is not configured.",
"Please remove the Authorization header.",
);
}
if (token !== config.federation.bridge.token) {
throw new ApiError(
401,
"Invalid token.",
"Please use the correct token, or remove the Authorization header.",
);
}
if (config.federation.bridge.allowed_ips.length === 0) {
return true;
}
if (!this.requestIp) {
throw new ApiError(
500,
"The request IP address could not be determined.",
"This may be due to an incorrectly configured reverse proxy.",
);
}
for (const ip of config.federation.bridge.allowed_ips) {
if (matches(ip, this.requestIp.address)) {
return true;
}
}
throw new ApiError(
403,
"The request is not from a trusted bridge IP address.",
"Remove the Authorization header if you are not trying to access this API as a bridge.",
);
}
/**
* Performs request processing.
*
* @returns {Promise<void>}
* @throws {ApiError} - If there is an error processing the request.
*/
public async process(): Promise<void> {
!this.sender &&
federationInboxLogger.debug`Processing request from potential bridge`;
if (this.sender && isDefederated(this.sender.instance.data.baseUrl)) {
// Return 201 to avoid
// 1. Leaking defederated instance information
// 2. Preventing the sender from thinking the message was not delivered and retrying
return;
}
federationInboxLogger.debug`Instance ${chalk.gray(
this.sender?.instance.data.baseUrl,
)} is not defederated`;
const shouldCheckSignature = this.shouldCheckSignature();
shouldCheckSignature
? federationInboxLogger.debug`Checking signature`
: federationInboxLogger.debug`Skipping signature check`;
if (shouldCheckSignature) {
const isValid = await this.isSignatureValid();
if (!isValid) {
throw new ApiError(401, "Signature is not valid");
}
}
shouldCheckSignature && federationInboxLogger.debug`Signature is valid`;
try {
await new EntitySorter(this.body)
.on(VersiaEntities.Note, (n) => InboxProcessor.processNote(n))
.on(VersiaEntities.Follow, (f) =>
InboxProcessor.processFollowRequest(f),
)
.on(VersiaEntities.FollowAccept, (f) =>
InboxProcessor.processFollowAccept(f),
)
.on(VersiaEntities.FollowReject, (f) =>
InboxProcessor.processFollowReject(f),
)
.on(VersiaEntities.Like, (l) =>
InboxProcessor.processLikeRequest(l),
)
.on(VersiaEntities.Delete, (d) =>
InboxProcessor.processDelete(d),
)
.on(VersiaEntities.User, (u) => InboxProcessor.processUser(u))
.on(VersiaEntities.Share, (s) => InboxProcessor.processShare(s))
.on(VersiaEntities.Reaction, (r) =>
InboxProcessor.processReaction(r),
)
.sort(() => {
throw new ApiError(400, "Unknown entity type");
});
} catch (e) {
return this.handleError(e as Error);
}
}
/**
* Handles Reaction entity processing
*
* @param {VersiaEntities.Reaction} reaction - The Reaction entity to process.
* @returns {Promise<void>}
*/
private static async processReaction(
reaction: VersiaEntities.Reaction,
): Promise<void> {
const author = await User.resolve(new URL(reaction.data.author));
const note = await Note.resolve(new URL(reaction.data.object));
if (!author) {
throw new ApiError(404, "Author not found");
}
if (!note) {
throw new ApiError(404, "Note not found");
}
await Reaction.fromVersia(reaction, author, note);
}
/**
* Handles Note entity processing
*
* @param {VersiaNote} note - The Note entity to process.
* @returns {Promise<void>}
*/
private static async processNote(note: VersiaEntities.Note): Promise<void> {
// If note has a blocked word
if (
Object.values(note.content?.data ?? {})
.flatMap((c) => c.content)
.some((content) =>
config.validation.filters.note_content.some((filter) =>
filter.test(content),
),
)
) {
// Drop silently
return;
}
await Note.fromVersia(note);
}
/**
* Handles User entity processing.
*
* @param {VersiaUser} user - The User entity to process.
* @returns {Promise<void>}
*/
private static async processUser(user: VersiaEntities.User): Promise<void> {
if (
config.validation.filters.username.some((filter) =>
filter.test(user.data.username),
) ||
(user.data.display_name &&
config.validation.filters.displayname.some((filter) =>
filter.test(user.data.display_name ?? ""),
))
) {
// Drop silently
return;
}
if (
Object.values(user.bio?.data ?? {})
.flatMap((c) => c.content)
.some((content) =>
config.validation.filters.bio.some((filter) =>
filter.test(content),
),
)
) {
// Drop silently
return;
}
await User.fromVersia(user);
}
/**
* Handles Follow entity processing.
*
* @param {VersiaFollow} follow - The Follow entity to process.
* @returns {Promise<void>}
*/
private static async processFollowRequest(
follow: VersiaEntities.Follow,
): Promise<void> {
const author = await User.resolve(new URL(follow.data.author));
const followee = await User.resolve(new URL(follow.data.followee));
if (!author) {
throw new ApiError(404, "Author not found");
}
if (!followee) {
throw new ApiError(404, "Followee not found");
}
const foundRelationship = await Relationship.fromOwnerAndSubject(
author,
followee,
);
if (foundRelationship.data.following) {
return;
}
await foundRelationship.update({
// If followee is not "locked" (doesn't manually approves follow requests), set following to true
following: !followee.data.isLocked,
requested: followee.data.isLocked,
showingReblogs: true,
notifying: true,
languages: [],
});
await followee.notify(
followee.data.isLocked ? "follow_request" : "follow",
author,
);
if (!followee.data.isLocked) {
await followee.acceptFollowRequest(author);
}
}
/**
* Handles FollowAccept entity processing
*
* @param {VersiaFollowAccept} followAccept - The FollowAccept entity to process.
* @returns {Promise<void>}
*/
private static async processFollowAccept(
followAccept: VersiaEntities.FollowAccept,
): Promise<void> {
const author = await User.resolve(new URL(followAccept.data.author));
const follower = await User.resolve(
new URL(followAccept.data.follower),
);
if (!author) {
throw new ApiError(404, "Author not found");
}
if (!follower) {
throw new ApiError(404, "Follower not found");
}
const foundRelationship = await Relationship.fromOwnerAndSubject(
follower,
author,
);
if (!foundRelationship.data.requested) {
return;
}
await foundRelationship.update({
requested: false,
following: true,
});
}
/**
* Handles FollowReject entity processing
*
* @param {VersiaFollowReject} followReject - The FollowReject entity to process.
* @returns {Promise<void>}
*/
private static async processFollowReject(
followReject: VersiaEntities.FollowReject,
): Promise<void> {
const author = await User.resolve(new URL(followReject.data.author));
const follower = await User.resolve(
new URL(followReject.data.follower),
);
if (!author) {
throw new ApiError(404, "Author not found");
}
if (!follower) {
throw new ApiError(404, "Follower not found");
}
const foundRelationship = await Relationship.fromOwnerAndSubject(
follower,
author,
);
if (!foundRelationship.data.requested) {
return;
}
await foundRelationship.update({
requested: false,
following: false,
});
}
/**
* Handles Share entity processing.
*
* @param {VersiaShare} share - The Share entity to process.
* @returns {Promise<void>}
*/
private static async processShare(
share: VersiaEntities.Share,
): Promise<void> {
const author = await User.resolve(new URL(share.data.author));
const sharedNote = await Note.resolve(new URL(share.data.shared));
if (!author) {
throw new ApiError(404, "Author not found");
}
if (!sharedNote) {
throw new ApiError(404, "Shared Note not found");
}
await author.reblog(sharedNote, "public", new URL(share.data.uri));
}
/**
* Handles Delete entity processing.
*
* @param {VersiaDelete} delete_ - The Delete entity to process.
* @returns {Promise<void>}
*/ // JS doesn't allow the use of `delete` as a variable name
public static async processDelete(
delete_: VersiaEntities.Delete,
): Promise<void> {
const toDelete = delete_.data.deleted;
const author = delete_.data.author
? await User.resolve(new URL(delete_.data.author))
: null;
switch (delete_.data.deleted_type) {
case "Note": {
const note = await Note.fromSql(
eq(Notes.uri, toDelete),
author ? eq(Notes.authorId, author.id) : undefined,
);
if (!note) {
throw new ApiError(
404,
"Note to delete not found or not owned by sender",
);
}
await note.delete();
return;
}
case "User": {
const userToDelete = await User.resolve(new URL(toDelete));
if (!userToDelete) {
throw new ApiError(404, "User to delete not found");
}
if (!author || userToDelete.id === author.id) {
await userToDelete.delete();
return;
}
throw new ApiError(400, "Cannot delete other users than self");
}
case "pub.versia:likes/Like": {
const like = await Like.fromSql(
eq(Likes.uri, toDelete),
author ? eq(Likes.likerId, author.id) : undefined,
);
if (!like) {
throw new ApiError(
404,
"Like not found or not owned by sender",
);
}
const likeAuthor = await User.fromId(like.data.likerId);
const liked = await Note.fromId(like.data.likedId);
if (!liked) {
throw new ApiError(
404,
"Liked Note not found or not owned by sender",
);
}
if (!likeAuthor) {
throw new ApiError(404, "Like author not found");
}
await likeAuthor.unlike(liked);
return;
}
case "pub.versia:shares/Share": {
if (!author) {
throw new ApiError(404, "Author not found");
}
const reblog = await Note.fromSql(
and(eq(Notes.uri, toDelete), eq(Notes.authorId, author.id)),
);
if (!reblog) {
throw new ApiError(
404,
"Share not found or not owned by sender",
);
}
const reblogged = await Note.fromId(
reblog.data.reblogId,
author.id,
);
if (!reblogged) {
throw new ApiError(
404,
"Share not found or not owned by sender",
);
}
await author.unreblog(reblogged);
return;
}
default: {
throw new ApiError(
400,
`Deletion of object ${toDelete} not implemented`,
);
}
}
}
/**
* Handles Like entity processing.
*
* @param {VersiaLikeExtension} like - The Like entity to process.
* @returns {Promise<void>}
*/
private static async processLikeRequest(
like: VersiaEntities.Like,
): Promise<void> {
const author = await User.resolve(new URL(like.data.author));
const likedNote = await Note.resolve(new URL(like.data.liked));
if (!author) {
throw new ApiError(404, "Author not found");
}
if (!likedNote) {
throw new ApiError(404, "Liked Note not found");
}
await author.like(likedNote, new URL(like.data.uri));
}
/**
* Processes Errors into the appropriate HTTP response.
*
* @param {Error} e - The error object.
* @returns {void}
* @throws {ApiError} - The error response.
*/
private handleError(e: Error): void {
if (isValidationError(e)) {
throw new ApiError(400, "Failed to process request", e.message);
}
federationInboxLogger.error`${e}`;
throw new ApiError(500, "Failed to process request", e.message);
}
}

View file

@ -55,7 +55,10 @@
"markdown-it": "catalog:",
"markdown-it-toc-done-right": "catalog:",
"markdown-it-container": "catalog:",
"@hackmd/markdown-it-task-lists": "catalog:"
"@hackmd/markdown-it-task-lists": "catalog:",
"bullmq": "catalog:",
"web-push": "catalog:",
"ip-matching": "catalog:"
},
"files": [
"tables/migrations"
@ -85,6 +88,10 @@
"import": "./regex.ts",
"default": "./regex.ts"
},
"./queues/*": {
"import": "./queues/*.ts",
"default": "./queues/*.ts"
},
"./markdown": {
"import": "./markdown.ts",
"default": "./markdown.ts"

View file

@ -0,0 +1,97 @@
import type { JSONObject } from "@versia/sdk";
import * as VersiaEntities from "@versia/sdk/entities";
import { config } from "@versia-server/config";
import { Queue, Worker } from "bullmq";
import chalk from "chalk";
import { User } from "../db/user.ts";
import { connection } from "../redis.ts";
export enum DeliveryJobType {
FederateEntity = "federateEntity",
}
export type DeliveryJobData = {
entity: JSONObject;
recipientId: string;
senderId: string;
};
export const deliveryQueue = new Queue<DeliveryJobData, void, DeliveryJobType>(
"delivery",
{
connection,
},
);
export const getDeliveryWorker = (): Worker<
DeliveryJobData,
void,
DeliveryJobType
> =>
new Worker<DeliveryJobData, void, DeliveryJobType>(
deliveryQueue.name,
async (job) => {
switch (job.name) {
case DeliveryJobType.FederateEntity: {
const { entity, recipientId, senderId } = job.data;
const sender = await User.fromId(senderId);
if (!sender) {
throw new Error(
`Could not resolve sender ID ${chalk.gray(
senderId,
)}`,
);
}
const recipient = await User.fromId(recipientId);
if (!recipient) {
throw new Error(
`Could not resolve recipient ID ${chalk.gray(
recipientId,
)}`,
);
}
await job.log(
`Federating entity [${
entity.id
}] from @${sender.getAcct()} to @${recipient.getAcct()}`,
);
const type = entity.type;
const entityCtor = Object.values(VersiaEntities).find(
(ctor) => ctor.name === type,
) as typeof VersiaEntities.Entity | undefined;
if (!entityCtor) {
throw new Error(
`Could not resolve entity type ${chalk.gray(
type,
)} for entity [${entity.id}]`,
);
}
await sender.federateToUser(
await entityCtor.fromJSON(entity),
recipient,
);
await job.log(
`✔ Finished federating entity [${entity.id}]`,
);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.delivery?.remove_after_complete_seconds,
},
removeOnFail: {
age: config.queues.delivery?.remove_after_failure_seconds,
},
},
);

View file

@ -0,0 +1,71 @@
import { config } from "@versia-server/config";
import { Queue, Worker } from "bullmq";
import { eq } from "drizzle-orm";
import { Instance } from "../db/instance.ts";
import { connection } from "../redis.ts";
import { Instances } from "../tables/schema.ts";
export enum FetchJobType {
Instance = "instance",
User = "user",
Note = "user",
}
export type FetchJobData = {
uri: string;
refetcher?: string;
};
export const fetchQueue = new Queue<FetchJobData, void, FetchJobType>("fetch", {
connection,
});
export const getFetchWorker = (): Worker<FetchJobData, void, FetchJobType> =>
new Worker<FetchJobData, void, FetchJobType>(
fetchQueue.name,
async (job) => {
switch (job.name) {
case FetchJobType.Instance: {
const { uri } = job.data;
await job.log(`Fetching instance metadata from [${uri}]`);
// Check if exists
const host = new URL(uri).host;
const existingInstance = await Instance.fromSql(
eq(Instances.baseUrl, host),
);
if (existingInstance) {
await job.log(
"Instance is known, refetching remote data.",
);
await existingInstance.updateFromRemote();
await job.log(
`Instance [${uri}] successfully refetched`,
);
return;
}
await Instance.resolve(new URL(uri));
await job.log(
`✔ Finished fetching instance metadata from [${uri}]`,
);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.fetch?.remove_after_complete_seconds,
},
removeOnFail: {
age: config.queues.fetch?.remove_after_failure_seconds,
},
},
);

View file

@ -0,0 +1,218 @@
import type { JSONObject } from "@versia/sdk";
import { config } from "@versia-server/config";
import { Queue, Worker } from "bullmq";
import type { SocketAddress } from "bun";
import { ApiError } from "../api-error.ts";
import { Instance } from "../db/instance.ts";
import { User } from "../db/user.ts";
import { InboxProcessor } from "../inbox-processor.ts";
import { connection } from "../redis.ts";
export enum InboxJobType {
ProcessEntity = "processEntity",
}
export type InboxJobData = {
data: JSONObject;
headers: {
"versia-signature"?: string;
"versia-signed-at"?: number;
"versia-signed-by"?: string;
authorization?: string;
};
request: {
url: string;
method: string;
body: string;
};
ip: SocketAddress | null;
};
export const inboxQueue = new Queue<InboxJobData, Response, InboxJobType>(
"inbox",
{
connection,
},
);
export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
new Worker<InboxJobData, void, InboxJobType>(
inboxQueue.name,
async (job) => {
switch (job.name) {
case InboxJobType.ProcessEntity: {
const { data, headers, request, ip } = job.data;
await job.log(`Processing entity [${data.id}]`);
const req = new Request(request.url, {
method: request.method,
headers: new Headers(
Object.entries(headers)
.map(([k, v]) => [k, String(v)])
.concat([
["content-type", "application/json"],
]) as [string, string][],
),
body: request.body,
});
if (headers.authorization) {
try {
const processor = new InboxProcessor(
req,
data,
null,
headers.authorization,
ip,
);
await job.log(
`Entity [${data.id}] is potentially from a bridge`,
);
await processor.process();
} catch (e) {
if (e instanceof ApiError) {
// Error occurred
await job.log(
`Error during processing: ${e.message}`,
);
await job.log(
`Failed processing entity [${data.id}]`,
);
return;
}
throw e;
}
await job.log(
`✔ Finished processing entity [${data.id}]`,
);
return;
}
const { "versia-signed-by": signedBy } = headers as {
"versia-signed-by": string;
};
const sender = await User.resolve(new URL(signedBy));
if (!(sender || signedBy.startsWith("instance "))) {
await job.log(
`Could not resolve sender URI [${signedBy}]`,
);
return;
}
if (sender?.local) {
throw new Error(
"Cannot process federation requests from local users",
);
}
const remoteInstance = sender
? await Instance.fromUser(sender)
: await Instance.resolveFromHost(
signedBy.split(" ")[1],
);
if (!remoteInstance) {
await job.log("Could not resolve the remote instance.");
return;
}
await job.log(
`Entity [${data.id}] is from remote instance [${remoteInstance.data.baseUrl}]`,
);
if (!remoteInstance.data.publicKey?.key) {
throw new Error(
`Instance ${remoteInstance.data.baseUrl} has no public key stored in database`,
);
}
const key = await crypto.subtle.importKey(
"spki",
Buffer.from(
sender?.data.publicKey ??
remoteInstance.data.publicKey.key,
"base64",
),
"Ed25519",
false,
["verify"],
);
try {
const processor = new InboxProcessor(
req,
data,
{
instance: remoteInstance,
key,
},
undefined,
ip,
);
await processor.process();
} catch (e) {
if (e instanceof ApiError) {
// Error occurred
await job.log(
`Error during processing: ${e.message}`,
);
await job.log(
`Failed processing entity [${data.id}]`,
);
await job.log(
`Sending error message to instance [${remoteInstance.data.baseUrl}]`,
);
await remoteInstance.sendMessage(
`Failed processing entity [${
data.uri
}] delivered to inbox. Returned error:\n\n${JSON.stringify(
e.message,
null,
4,
)}`,
);
await job.log("Message sent");
return;
}
throw e;
}
await job.log(`Finished processing entity [${data.id}]`);
return;
}
default: {
throw new Error(`Unknown job type: ${job.name}`);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.inbox?.remove_after_complete_seconds,
},
removeOnFail: {
age: config.queues.inbox?.remove_after_failure_seconds,
},
},
);

View file

@ -0,0 +1,125 @@
import { config } from "@versia-server/config";
import { Queue, Worker } from "bullmq";
import { calculateBlurhash } from "../../../classes/media/preprocessors/blurhash.ts";
import { convertImage } from "../../../classes/media/preprocessors/image-conversion.ts";
import { Media } from "../db/media.ts";
import { connection } from "../redis.ts";
export enum MediaJobType {
ConvertMedia = "convertMedia",
CalculateMetadata = "calculateMetadata",
}
export type MediaJobData = {
attachmentId: string;
filename: string;
};
export const mediaQueue = new Queue<MediaJobData, void, MediaJobType>("media", {
connection,
});
export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
new Worker<MediaJobData, void, MediaJobType>(
mediaQueue.name,
async (job) => {
switch (job.name) {
case MediaJobType.ConvertMedia: {
const { attachmentId, filename } = job.data;
await job.log(`Fetching attachment ID [${attachmentId}]`);
const attachment = await Media.fromId(attachmentId);
if (!attachment) {
throw new Error(
`Attachment not found: [${attachmentId}]`,
);
}
await job.log(`Processing attachment [${attachmentId}]`);
await job.log(
`Fetching file from [${attachment.getUrl()}]`,
);
// Download the file and process it.
const blob = await (
await fetch(attachment.getUrl())
).blob();
const file = new File([blob], filename);
await job.log(`Converting attachment [${attachmentId}]`);
const processedFile = await convertImage(
file,
config.media.conversion.convert_to,
{
convertVectors:
config.media.conversion.convert_vectors,
},
);
await job.log(`Uploading attachment [${attachmentId}]`);
await attachment.updateFromFile(processedFile);
await job.log(
`✔ Finished processing attachment [${attachmentId}]`,
);
break;
}
case MediaJobType.CalculateMetadata: {
// Calculate blurhash
const { attachmentId } = job.data;
await job.log(`Fetching attachment ID [${attachmentId}]`);
const attachment = await Media.fromId(attachmentId);
if (!attachment) {
throw new Error(
`Attachment not found: [${attachmentId}]`,
);
}
await job.log(`Processing attachment [${attachmentId}]`);
await job.log(
`Fetching file from [${attachment.getUrl()}]`,
);
// Download the file and process it.
const blob = await (
await fetch(attachment.getUrl())
).blob();
// Filename is not important for blurhash
const file = new File([blob], "");
await job.log(`Generating blurhash for [${attachmentId}]`);
const blurhash = await calculateBlurhash(file);
await attachment.update({
blurhash,
});
await job.log(
`✔ Finished processing attachment [${attachmentId}]`,
);
break;
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.media?.remove_after_complete_seconds,
},
removeOnFail: {
age: config.queues.media?.remove_after_failure_seconds,
},
},
);

View file

@ -0,0 +1,168 @@
import { config } from "@versia-server/config";
import { Queue, Worker } from "bullmq";
import { sendNotification } from "web-push";
import { htmlToText } from "@/content_types.ts";
import { Note } from "../db/note.ts";
import { PushSubscription } from "../db/pushsubscription.ts";
import { Token } from "../db/token.ts";
import { User } from "../db/user.ts";
import { connection } from "../redis.ts";
export enum PushJobType {
Notify = "notify",
}
export type PushJobData = {
psId: string;
type: string;
relatedUserId: string;
noteId?: string;
notificationId: string;
};
export const pushQueue = new Queue<PushJobData, void, PushJobType>("push", {
connection,
});
export const getPushWorker = (): Worker<PushJobData, void, PushJobType> =>
new Worker<PushJobData, void, PushJobType>(
pushQueue.name,
async (job) => {
const {
data: { psId, relatedUserId, type, noteId, notificationId },
} = job;
if (!config.notifications.push) {
await job.log("Push notifications are disabled");
return;
}
if (
!(
config.notifications.push.vapid_keys.private ||
config.notifications.push.vapid_keys.public
)
) {
await job.log("Push notifications are not configured");
return;
}
await job.log(
`Sending push notification for note [${notificationId}]`,
);
const ps = await PushSubscription.fromId(psId);
if (!ps) {
throw new Error(
`Could not resolve push subscription ID ${psId}`,
);
}
const token = await Token.fromId(ps.data.tokenId);
if (!token) {
throw new Error(
`Could not resolve token ID ${ps.data.tokenId}`,
);
}
const relatedUser = await User.fromId(relatedUserId);
if (!relatedUser) {
throw new Error(
`Could not resolve related user ID ${relatedUserId}`,
);
}
const note = noteId ? await Note.fromId(noteId) : null;
const truncate = (str: string, len: number): string => {
if (str.length <= len) {
return str;
}
return `${str.slice(0, len)}...`;
};
const name = truncate(
relatedUser.data.displayName || relatedUser.data.username,
50,
);
let title = name;
switch (type) {
case "mention":
title = `${name} mentioned you`;
break;
case "reply":
title = `${name} replied to you`;
break;
case "favourite":
title = `${name} liked your note`;
break;
case "reblog":
title = `${name} reblogged your note`;
break;
case "follow":
title = `${name} followed you`;
break;
case "follow_request":
title = `${name} requested to follow you`;
break;
case "poll":
title = "Poll ended";
break;
}
const body = note
? htmlToText(note.data.spoilerText || note.data.content)
: htmlToText(relatedUser.data.note);
await sendNotification(
{
endpoint: ps.data.endpoint,
keys: {
auth: ps.data.authSecret,
p256dh: ps.data.publicKey,
},
},
JSON.stringify({
access_token: token.data.accessToken,
// FIXME
preferred_locale: "en-US",
notification_id: notificationId,
notification_type: type,
icon: relatedUser.getAvatarUrl(),
title,
body: truncate(body, 140),
}),
{
vapidDetails: {
subject:
config.notifications.push.subject ||
config.http.base_url.origin,
privateKey:
config.notifications.push.vapid_keys.private ?? "",
publicKey:
config.notifications.push.vapid_keys.public ?? "",
},
contentEncoding: "aesgcm",
},
);
await job.log(
`✔ Finished delivering push notification for note [${notificationId}]`,
);
},
{
connection,
removeOnComplete: {
age: config.queues.push?.remove_after_complete_seconds,
},
removeOnFail: {
age: config.queues.push?.remove_after_failure_seconds,
},
},
);

View file

@ -0,0 +1,67 @@
import { config } from "@versia-server/config";
import { Queue, Worker } from "bullmq";
import { Relationship } from "../db/relationship.ts";
import { User } from "../db/user.ts";
import { connection } from "../redis.ts";
export enum RelationshipJobType {
Unmute = "unmute",
}
export type RelationshipJobData = {
ownerId: string;
subjectId: string;
};
export const relationshipQueue = new Queue<
RelationshipJobData,
void,
RelationshipJobType
>("relationships", {
connection,
});
export const getRelationshipWorker = (): Worker<
RelationshipJobData,
void,
RelationshipJobType
> =>
new Worker<RelationshipJobData, void, RelationshipJobType>(
relationshipQueue.name,
async (job) => {
switch (job.name) {
case RelationshipJobType.Unmute: {
const { ownerId, subjectId } = job.data;
const owner = await User.fromId(ownerId);
const subject = await User.fromId(subjectId);
if (!(owner && subject)) {
await job.log("Users not found");
return;
}
const foundRelationship =
await Relationship.fromOwnerAndSubject(owner, subject);
if (foundRelationship.data.muting) {
await foundRelationship.update({
muting: false,
mutingNotifications: false,
});
}
await job.log(`✔ Finished unmuting [${subjectId}]`);
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.fetch?.remove_after_complete_seconds,
},
removeOnFail: {
age: config.queues.fetch?.remove_after_failure_seconds,
},
},
);