From c9a158193225747f5807f672f3f8447507a239df Mon Sep 17 00:00:00 2001 From: Jesse Wierzbinski Date: Sun, 30 Mar 2025 20:54:47 +0200 Subject: [PATCH] feat(api): :sparkles: Implement duration controls on mutes --- CHANGELOG.md | 1 + api/api/v1/accounts/[id]/mute.test.ts | 27 +++- api/api/v1/accounts/[id]/mute.ts | 21 ++- classes/queues/delivery.ts | 55 ++++++++ classes/queues/fetch.ts | 55 ++++++++ classes/queues/inbox.ts | 171 +++++++++++++++++++++++++ classes/queues/media.ts | 110 ++++++++++++++++ classes/queues/push.ts | 137 ++++++++++++++++++++ classes/queues/relationships.ts | 67 ++++++++++ classes/workers/delivery.ts | 61 --------- classes/workers/fetch.ts | 61 --------- classes/workers/inbox.ts | 177 -------------------------- classes/workers/media.ts | 116 ----------------- classes/workers/push.ts | 143 --------------------- entrypoints/worker/index.ts | 17 ++- tests/setup.ts | 3 + utils/bull-board.ts | 2 + 17 files changed, 652 insertions(+), 572 deletions(-) create mode 100644 classes/queues/relationships.ts delete mode 100644 classes/workers/delivery.ts delete mode 100644 classes/workers/fetch.ts delete mode 100644 classes/workers/inbox.ts delete mode 100644 classes/workers/media.ts delete mode 100644 classes/workers/push.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c8f88e2..849afc41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Please see [Database Changes](#database-changes) and [New Configuration](#new-co - [x] 🔥 Removed nonstandard `/api/v1/accounts/id` endpoint (the same functionality was already possible with other endpoints). - [x] ✨️ Implemented rate limiting support for API endpoints. - [x] 🔒 Implemented `is_indexable` and `is_hiding_collections` fields to the [**Accounts API**](https://docs.joinmastodon.org/methods/accounts/#update_credentials). +- [x] ✨️ Muting other users now lets you specify a duration, after which the mute will be automatically removed. ### CLI diff --git a/api/api/v1/accounts/[id]/mute.test.ts b/api/api/v1/accounts/[id]/mute.test.ts index 1ec02ca6..92816afe 100644 --- a/api/api/v1/accounts/[id]/mute.test.ts +++ b/api/api/v1/accounts/[id]/mute.test.ts @@ -32,10 +32,9 @@ describe("/api/v1/accounts/:id/mute", () => { test("should mute user", async () => { await using client = await generateClient(users[0]); - const { data, ok, raw } = await client.muteAccount(users[1].id); + const { data, ok } = await client.muteAccount(users[1].id); expect(ok).toBe(true); - expect(raw.status).toBe(200); expect(data.muting).toBe(true); }); @@ -43,11 +42,31 @@ describe("/api/v1/accounts/:id/mute", () => { test("should return 200 if user already muted", async () => { await using client = await generateClient(users[0]); - const { data, ok, raw } = await client.muteAccount(users[1].id); + const { data, ok } = await client.muteAccount(users[1].id); expect(ok).toBe(true); - expect(raw.status).toBe(200); expect(data.muting).toBe(true); }); + + test("should unmute user after duration", async () => { + await using client = await generateClient(users[0]); + + const { data, ok } = await client.muteAccount(users[1].id, { + duration: 1, + }); + + expect(ok).toBe(true); + + expect(data.muting).toBe(true); + + await new Promise((resolve) => setTimeout(resolve, 1500)); + + const { data: data2, ok: ok2 } = await client.getRelationship( + users[1].id, + ); + + expect(ok2).toBe(true); + expect(data2.muting).toBe(false); + }); }); diff --git a/api/api/v1/accounts/[id]/mute.ts b/api/api/v1/accounts/[id]/mute.ts index c40ee190..a5d555c0 100644 --- a/api/api/v1/accounts/[id]/mute.ts +++ b/api/api/v1/accounts/[id]/mute.ts @@ -6,6 +6,8 @@ import { describeRoute } from "hono-openapi"; import { resolver, validator } from "hono-openapi/zod"; import { z } from "zod"; import { ApiError } from "~/classes/errors/api-error"; +import { RelationshipJobType } from "~/classes/queues/relationships"; +import { relationshipQueue } from "~/classes/queues/relationships"; export default apiRoute((app) => app.post( @@ -56,15 +58,14 @@ export default apiRoute((app) => .default(0) .openapi({ description: - "How long the mute should last, in seconds.", + "How long the mute should last, in seconds. 0 means indefinite.", }), }), handleZodError, ), async (context) => { const { user } = context.get("auth"); - // TODO: Add duration support - const { notifications } = context.req.valid("json"); + const { notifications, duration } = context.req.valid("json"); const otherUser = context.get("user"); const foundRelationship = await Relationship.fromOwnerAndSubject( @@ -72,12 +73,24 @@ export default apiRoute((app) => otherUser, ); - // TODO: Implement duration await foundRelationship.update({ muting: true, mutingNotifications: notifications, }); + if (duration > 0) { + await relationshipQueue.add( + RelationshipJobType.Unmute, + { + ownerId: user.id, + subjectId: otherUser.id, + }, + { + delay: duration * 1000, + }, + ); + } + return context.json(foundRelationship.toApi(), 200); }, ), diff --git a/classes/queues/delivery.ts b/classes/queues/delivery.ts index 85aebafc..0ddc9f8a 100644 --- a/classes/queues/delivery.ts +++ b/classes/queues/delivery.ts @@ -1,4 +1,8 @@ +import { User } from "@versia/kit/db"; import { Queue } from "bullmq"; +import { Worker } from "bullmq"; +import chalk from "chalk"; +import { config } from "~/config.ts"; import type { KnownEntity } from "~/types/api"; import { connection } from "~/utils/redis.ts"; @@ -18,3 +22,54 @@ export const deliveryQueue = new Queue( connection, }, ); + +export const getDeliveryWorker = (): Worker< + DeliveryJobData, + void, + DeliveryJobType +> => + new Worker( + deliveryQueue.name, + async (job) => { + switch (job.name) { + case DeliveryJobType.FederateEntity: { + const { entity, recipientId, senderId } = job.data; + + const sender = await User.fromId(senderId); + + if (!sender) { + throw new Error( + `Could not resolve sender ID ${chalk.gray(senderId)}`, + ); + } + + const recipient = await User.fromId(recipientId); + + if (!recipient) { + throw new Error( + `Could not resolve recipient ID ${chalk.gray(recipientId)}`, + ); + } + + await job.log( + `Federating entity [${entity.id}] from @${sender.getAcct()} to @${recipient.getAcct()}`, + ); + + await sender.federateToUser(entity, recipient); + + await job.log( + `✔ Finished federating entity [${entity.id}]`, + ); + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.delivery?.remove_after_complete_seconds, + }, + removeOnFail: { + age: config.queues.delivery?.remove_after_failure_seconds, + }, + }, + ); diff --git a/classes/queues/fetch.ts b/classes/queues/fetch.ts index 41e98ab3..8554bc52 100644 --- a/classes/queues/fetch.ts +++ b/classes/queues/fetch.ts @@ -1,4 +1,9 @@ +import { Instance } from "@versia/kit/db"; +import { Instances } from "@versia/kit/tables"; import { Queue } from "bullmq"; +import { Worker } from "bullmq"; +import { eq } from "drizzle-orm"; +import { config } from "~/config.ts"; import { connection } from "~/utils/redis.ts"; export enum FetchJobType { @@ -15,3 +20,53 @@ export type FetchJobData = { export const fetchQueue = new Queue("fetch", { connection, }); + +export const getFetchWorker = (): Worker => + new Worker( + fetchQueue.name, + async (job) => { + switch (job.name) { + case FetchJobType.Instance: { + const { uri } = job.data; + + await job.log(`Fetching instance metadata from [${uri}]`); + + // Check if exists + const host = new URL(uri).host; + + const existingInstance = await Instance.fromSql( + eq(Instances.baseUrl, host), + ); + + if (existingInstance) { + await job.log( + "Instance is known, refetching remote data.", + ); + + await existingInstance.updateFromRemote(); + + await job.log( + `Instance [${uri}] successfully refetched`, + ); + + return; + } + + await Instance.resolve(new URL(uri)); + + await job.log( + `✔ Finished fetching instance metadata from [${uri}]`, + ); + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.fetch?.remove_after_complete_seconds, + }, + removeOnFail: { + age: config.queues.fetch?.remove_after_failure_seconds, + }, + }, + ); diff --git a/classes/queues/inbox.ts b/classes/queues/inbox.ts index 543f2db8..ea7561dd 100644 --- a/classes/queues/inbox.ts +++ b/classes/queues/inbox.ts @@ -1,7 +1,12 @@ +import { getLogger } from "@logtape/logtape"; import type { Entity } from "@versia/federation/types"; +import { Instance, User } from "@versia/kit/db"; import { Queue } from "bullmq"; +import { Worker } from "bullmq"; import type { SocketAddress } from "bun"; +import { config } from "~/config.ts"; import { connection } from "~/utils/redis.ts"; +import { InboxProcessor } from "../inbox/processor.ts"; export enum InboxJobType { ProcessEntity = "processEntity", @@ -29,3 +34,169 @@ export const inboxQueue = new Queue( connection, }, ); + +export const getInboxWorker = (): Worker => + new Worker( + inboxQueue.name, + async (job) => { + switch (job.name) { + case InboxJobType.ProcessEntity: { + const { data, headers, request, ip } = job.data; + + await job.log(`Processing entity [${data.id}]`); + + if (headers.authorization) { + const processor = new InboxProcessor( + { + ...request, + url: new URL(request.url), + }, + data, + null, + { + authorization: headers.authorization, + }, + getLogger(["federation", "inbox"]), + ip, + ); + + await job.log( + `Entity [${data.id}] is potentially from a bridge`, + ); + + const output = await processor.process(); + + if (output instanceof Response) { + // Error occurred + const error = await output.json(); + await job.log(`Error during processing: ${error}`); + + await job.log( + `Failed processing entity [${data.id}]`, + ); + + return; + } + + await job.log( + `✔ Finished processing entity [${data.id}]`, + ); + + return; + } + + const { + "versia-signature": signature, + "versia-signed-at": signedAt, + "versia-signed-by": signedBy, + } = headers as { + "versia-signature": string; + "versia-signed-at": number; + "versia-signed-by": string; + }; + + const sender = await User.resolve(new URL(signedBy)); + + if (!(sender || signedBy.startsWith("instance "))) { + await job.log( + `Could not resolve sender URI [${signedBy}]`, + ); + + return; + } + + if (sender?.isLocal()) { + throw new Error( + "Cannot process federation requests from local users", + ); + } + + const remoteInstance = sender + ? await Instance.fromUser(sender) + : await Instance.resolveFromHost( + signedBy.split(" ")[1], + ); + + if (!remoteInstance) { + await job.log("Could not resolve the remote instance."); + + return; + } + + await job.log( + `Entity [${data.id}] is from remote instance [${remoteInstance.data.baseUrl}]`, + ); + + if (!remoteInstance.data.publicKey?.key) { + throw new Error( + `Instance ${remoteInstance.data.baseUrl} has no public key stored in database`, + ); + } + + const processor = new InboxProcessor( + { + ...request, + url: new URL(request.url), + }, + data, + { + instance: remoteInstance, + key: + sender?.data.publicKey ?? + remoteInstance.data.publicKey.key, + }, + { + signature, + signedAt: new Date(signedAt * 1000), + authorization: undefined, + }, + getLogger(["federation", "inbox"]), + ip, + ); + + const output = await processor.process(); + + if (output instanceof Response) { + // Error occurred + const error = await output.json(); + await job.log(`Error during processing: ${error}`); + + await job.log(`Failed processing entity [${data.id}]`); + + await job.log( + `Sending error message to instance [${remoteInstance.data.baseUrl}]`, + ); + + await remoteInstance.sendMessage( + `Failed processing entity [${data.uri}] delivered to inbox. Returned error:\n\n${JSON.stringify( + error, + null, + 4, + )}`, + ); + + await job.log("Message sent"); + + return; + } + + await job.log(`Finished processing entity [${data.id}]`); + + return; + } + + default: { + throw new Error(`Unknown job type: ${job.name}`); + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.inbox?.remove_after_complete_seconds, + }, + removeOnFail: { + age: config.queues.inbox?.remove_after_failure_seconds, + }, + }, + ); diff --git a/classes/queues/media.ts b/classes/queues/media.ts index e3177bf1..cca9c894 100644 --- a/classes/queues/media.ts +++ b/classes/queues/media.ts @@ -1,5 +1,10 @@ +import { Media } from "@versia/kit/db"; import { Queue } from "bullmq"; +import { Worker } from "bullmq"; +import { config } from "~/config.ts"; import { connection } from "~/utils/redis.ts"; +import { calculateBlurhash } from "../media/preprocessors/blurhash.ts"; +import { convertImage } from "../media/preprocessors/image-conversion.ts"; export enum MediaJobType { ConvertMedia = "convertMedia", @@ -14,3 +19,108 @@ export type MediaJobData = { export const mediaQueue = new Queue("media", { connection, }); + +export const getMediaWorker = (): Worker => + new Worker( + mediaQueue.name, + async (job) => { + switch (job.name) { + case MediaJobType.ConvertMedia: { + const { attachmentId, filename } = job.data; + + await job.log(`Fetching attachment ID [${attachmentId}]`); + + const attachment = await Media.fromId(attachmentId); + + if (!attachment) { + throw new Error( + `Attachment not found: [${attachmentId}]`, + ); + } + + await job.log(`Processing attachment [${attachmentId}]`); + await job.log( + `Fetching file from [${attachment.getUrl()}]`, + ); + + // Download the file and process it. + const blob = await ( + await fetch(attachment.getUrl()) + ).blob(); + + const file = new File([blob], filename); + + await job.log(`Converting attachment [${attachmentId}]`); + + const processedFile = await convertImage( + file, + config.media.conversion.convert_to, + { + convertVectors: + config.media.conversion.convert_vectors, + }, + ); + + await job.log(`Uploading attachment [${attachmentId}]`); + + await attachment.updateFromFile(processedFile); + + await job.log( + `✔ Finished processing attachment [${attachmentId}]`, + ); + + break; + } + case MediaJobType.CalculateMetadata: { + // Calculate blurhash + const { attachmentId } = job.data; + + await job.log(`Fetching attachment ID [${attachmentId}]`); + + const attachment = await Media.fromId(attachmentId); + + if (!attachment) { + throw new Error( + `Attachment not found: [${attachmentId}]`, + ); + } + + await job.log(`Processing attachment [${attachmentId}]`); + await job.log( + `Fetching file from [${attachment.getUrl()}]`, + ); + + // Download the file and process it. + const blob = await ( + await fetch(attachment.getUrl()) + ).blob(); + + // Filename is not important for blurhash + const file = new File([blob], ""); + + await job.log(`Generating blurhash for [${attachmentId}]`); + + const blurhash = await calculateBlurhash(file); + + await attachment.update({ + blurhash, + }); + + await job.log( + `✔ Finished processing attachment [${attachmentId}]`, + ); + + break; + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.media?.remove_after_complete_seconds, + }, + removeOnFail: { + age: config.queues.media?.remove_after_failure_seconds, + }, + }, + ); diff --git a/classes/queues/push.ts b/classes/queues/push.ts index cd804c13..4c2a75ce 100644 --- a/classes/queues/push.ts +++ b/classes/queues/push.ts @@ -1,4 +1,9 @@ +import { htmlToText } from "@/content_types.ts"; +import { Note, PushSubscription, Token, User } from "@versia/kit/db"; import { Queue } from "bullmq"; +import { Worker } from "bullmq"; +import { sendNotification } from "web-push"; +import { config } from "~/config.ts"; import { connection } from "~/utils/redis.ts"; export enum PushJobType { @@ -16,3 +21,135 @@ export type PushJobData = { export const pushQueue = new Queue("push", { connection, }); + +export const getPushWorker = (): Worker => + new Worker( + pushQueue.name, + async (job) => { + const { + data: { psId, relatedUserId, type, noteId, notificationId }, + } = job; + + if (!config.notifications.push) { + await job.log("Push notifications are disabled"); + return; + } + + await job.log( + `Sending push notification for note [${notificationId}]`, + ); + + const ps = await PushSubscription.fromId(psId); + + if (!ps) { + throw new Error( + `Could not resolve push subscription ID ${psId}`, + ); + } + + const token = await Token.fromId(ps.data.tokenId); + + if (!token) { + throw new Error( + `Could not resolve token ID ${ps.data.tokenId}`, + ); + } + + const relatedUser = await User.fromId(relatedUserId); + + if (!relatedUser) { + throw new Error( + `Could not resolve related user ID ${relatedUserId}`, + ); + } + + const note = noteId ? await Note.fromId(noteId) : null; + + const truncate = (str: string, len: number): string => { + if (str.length <= len) { + return str; + } + + return `${str.slice(0, len)}...`; + }; + + const name = truncate( + relatedUser.data.displayName || relatedUser.data.username, + 50, + ); + + let title = name; + + switch (type) { + case "mention": + title = `${name} mentioned you`; + break; + case "reply": + title = `${name} replied to you`; + break; + case "favourite": + title = `${name} liked your note`; + break; + case "reblog": + title = `${name} reblogged your note`; + break; + case "follow": + title = `${name} followed you`; + break; + case "follow_request": + title = `${name} requested to follow you`; + break; + case "poll": + title = "Poll ended"; + break; + } + + const body = note + ? htmlToText(note.data.spoilerText || note.data.content) + : htmlToText(relatedUser.data.note); + + await sendNotification( + { + endpoint: ps.data.endpoint, + keys: { + auth: ps.data.authSecret, + p256dh: ps.data.publicKey, + }, + }, + JSON.stringify({ + access_token: token.data.accessToken, + // FIXME + preferred_locale: "en-US", + notification_id: notificationId, + notification_type: type, + icon: relatedUser.getAvatarUrl(), + title, + body: truncate(body, 140), + }), + { + vapidDetails: { + subject: + config.notifications.push.subject || + config.http.base_url.origin, + privateKey: + config.notifications.push.vapid_keys.private, + publicKey: config.notifications.push.vapid_keys.public, + }, + contentEncoding: "aesgcm", + }, + ); + + await job.log( + `✔ Finished delivering push notification for note [${notificationId}]`, + ); + }, + { + connection, + removeOnComplete: { + age: config.queues.push?.remove_after_complete_seconds, + }, + removeOnFail: { + age: config.queues.push?.remove_after_failure_seconds, + }, + }, + ); diff --git a/classes/queues/relationships.ts b/classes/queues/relationships.ts new file mode 100644 index 00000000..06fa0999 --- /dev/null +++ b/classes/queues/relationships.ts @@ -0,0 +1,67 @@ +import { Relationship, User } from "@versia/kit/db"; +import { Queue } from "bullmq"; +import { Worker } from "bullmq"; +import { config } from "~/config.ts"; +import { connection } from "~/utils/redis.ts"; + +export enum RelationshipJobType { + Unmute = "unmute", +} + +export type RelationshipJobData = { + ownerId: string; + subjectId: string; +}; + +export const relationshipQueue = new Queue< + RelationshipJobData, + void, + RelationshipJobType +>("relationships", { + connection, +}); + +export const getRelationshipWorker = (): Worker< + RelationshipJobData, + void, + RelationshipJobType +> => + new Worker( + relationshipQueue.name, + async (job) => { + switch (job.name) { + case RelationshipJobType.Unmute: { + const { ownerId, subjectId } = job.data; + + const owner = await User.fromId(ownerId); + const subject = await User.fromId(subjectId); + + if (!(owner && subject)) { + await job.log("Users not found"); + return; + } + + const foundRelationship = + await Relationship.fromOwnerAndSubject(owner, subject); + + if (foundRelationship.data.muting) { + await foundRelationship.update({ + muting: false, + mutingNotifications: false, + }); + } + + await job.log(`✔ Finished unmuting [${subjectId}]`); + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.fetch?.remove_after_complete_seconds, + }, + removeOnFail: { + age: config.queues.fetch?.remove_after_failure_seconds, + }, + }, + ); diff --git a/classes/workers/delivery.ts b/classes/workers/delivery.ts deleted file mode 100644 index 2764e655..00000000 --- a/classes/workers/delivery.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { User } from "@versia/kit/db"; -import { Worker } from "bullmq"; -import chalk from "chalk"; -import { config } from "~/config.ts"; -import { connection } from "~/utils/redis.ts"; -import { - type DeliveryJobData, - DeliveryJobType, - deliveryQueue, -} from "../queues/delivery.ts"; - -export const getDeliveryWorker = (): Worker< - DeliveryJobData, - void, - DeliveryJobType -> => - new Worker( - deliveryQueue.name, - async (job) => { - switch (job.name) { - case DeliveryJobType.FederateEntity: { - const { entity, recipientId, senderId } = job.data; - - const sender = await User.fromId(senderId); - - if (!sender) { - throw new Error( - `Could not resolve sender ID ${chalk.gray(senderId)}`, - ); - } - - const recipient = await User.fromId(recipientId); - - if (!recipient) { - throw new Error( - `Could not resolve recipient ID ${chalk.gray(recipientId)}`, - ); - } - - await job.log( - `Federating entity [${entity.id}] from @${sender.getAcct()} to @${recipient.getAcct()}`, - ); - - await sender.federateToUser(entity, recipient); - - await job.log( - `✔ Finished federating entity [${entity.id}]`, - ); - } - } - }, - { - connection, - removeOnComplete: { - age: config.queues.delivery?.remove_after_complete_seconds, - }, - removeOnFail: { - age: config.queues.delivery?.remove_after_failure_seconds, - }, - }, - ); diff --git a/classes/workers/fetch.ts b/classes/workers/fetch.ts deleted file mode 100644 index da834227..00000000 --- a/classes/workers/fetch.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { Instance } from "@versia/kit/db"; -import { Instances } from "@versia/kit/tables"; -import { Worker } from "bullmq"; -import { eq } from "drizzle-orm"; -import { config } from "~/config.ts"; -import { connection } from "~/utils/redis.ts"; -import { - type FetchJobData, - FetchJobType, - fetchQueue, -} from "../queues/fetch.ts"; - -export const getFetchWorker = (): Worker => - new Worker( - fetchQueue.name, - async (job) => { - switch (job.name) { - case FetchJobType.Instance: { - const { uri } = job.data; - - await job.log(`Fetching instance metadata from [${uri}]`); - - // Check if exists - const host = new URL(uri).host; - - const existingInstance = await Instance.fromSql( - eq(Instances.baseUrl, host), - ); - - if (existingInstance) { - await job.log( - "Instance is known, refetching remote data.", - ); - - await existingInstance.updateFromRemote(); - - await job.log( - `Instance [${uri}] successfully refetched`, - ); - - return; - } - - await Instance.resolve(new URL(uri)); - - await job.log( - `✔ Finished fetching instance metadata from [${uri}]`, - ); - } - } - }, - { - connection, - removeOnComplete: { - age: config.queues.fetch?.remove_after_complete_seconds, - }, - removeOnFail: { - age: config.queues.fetch?.remove_after_failure_seconds, - }, - }, - ); diff --git a/classes/workers/inbox.ts b/classes/workers/inbox.ts deleted file mode 100644 index 6c086d9b..00000000 --- a/classes/workers/inbox.ts +++ /dev/null @@ -1,177 +0,0 @@ -import { getLogger } from "@logtape/logtape"; -import { Instance, User } from "@versia/kit/db"; -import { Worker } from "bullmq"; -import { config } from "~/config.ts"; -import { connection } from "~/utils/redis.ts"; -import { InboxProcessor } from "../inbox/processor.ts"; -import { - type InboxJobData, - InboxJobType, - inboxQueue, -} from "../queues/inbox.ts"; - -export const getInboxWorker = (): Worker => - new Worker( - inboxQueue.name, - async (job) => { - switch (job.name) { - case InboxJobType.ProcessEntity: { - const { data, headers, request, ip } = job.data; - - await job.log(`Processing entity [${data.id}]`); - - if (headers.authorization) { - const processor = new InboxProcessor( - { - ...request, - url: new URL(request.url), - }, - data, - null, - { - authorization: headers.authorization, - }, - getLogger(["federation", "inbox"]), - ip, - ); - - await job.log( - `Entity [${data.id}] is potentially from a bridge`, - ); - - const output = await processor.process(); - - if (output instanceof Response) { - // Error occurred - const error = await output.json(); - await job.log(`Error during processing: ${error}`); - - await job.log( - `Failed processing entity [${data.id}]`, - ); - - return; - } - - await job.log( - `✔ Finished processing entity [${data.id}]`, - ); - - return; - } - - const { - "versia-signature": signature, - "versia-signed-at": signedAt, - "versia-signed-by": signedBy, - } = headers as { - "versia-signature": string; - "versia-signed-at": number; - "versia-signed-by": string; - }; - - const sender = await User.resolve(new URL(signedBy)); - - if (!(sender || signedBy.startsWith("instance "))) { - await job.log( - `Could not resolve sender URI [${signedBy}]`, - ); - - return; - } - - if (sender?.isLocal()) { - throw new Error( - "Cannot process federation requests from local users", - ); - } - - const remoteInstance = sender - ? await Instance.fromUser(sender) - : await Instance.resolveFromHost( - signedBy.split(" ")[1], - ); - - if (!remoteInstance) { - await job.log("Could not resolve the remote instance."); - - return; - } - - await job.log( - `Entity [${data.id}] is from remote instance [${remoteInstance.data.baseUrl}]`, - ); - - if (!remoteInstance.data.publicKey?.key) { - throw new Error( - `Instance ${remoteInstance.data.baseUrl} has no public key stored in database`, - ); - } - - const processor = new InboxProcessor( - { - ...request, - url: new URL(request.url), - }, - data, - { - instance: remoteInstance, - key: - sender?.data.publicKey ?? - remoteInstance.data.publicKey.key, - }, - { - signature, - signedAt: new Date(signedAt * 1000), - authorization: undefined, - }, - getLogger(["federation", "inbox"]), - ip, - ); - - const output = await processor.process(); - - if (output instanceof Response) { - // Error occurred - const error = await output.json(); - await job.log(`Error during processing: ${error}`); - - await job.log(`Failed processing entity [${data.id}]`); - - await job.log( - `Sending error message to instance [${remoteInstance.data.baseUrl}]`, - ); - - await remoteInstance.sendMessage( - `Failed processing entity [${data.uri}] delivered to inbox. Returned error:\n\n${JSON.stringify( - error, - null, - 4, - )}`, - ); - - await job.log("Message sent"); - - return; - } - - await job.log(`Finished processing entity [${data.id}]`); - - return; - } - - default: { - throw new Error(`Unknown job type: ${job.name}`); - } - } - }, - { - connection, - removeOnComplete: { - age: config.queues.inbox?.remove_after_complete_seconds, - }, - removeOnFail: { - age: config.queues.inbox?.remove_after_failure_seconds, - }, - }, - ); diff --git a/classes/workers/media.ts b/classes/workers/media.ts deleted file mode 100644 index 03b2daec..00000000 --- a/classes/workers/media.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { Media } from "@versia/kit/db"; -import { Worker } from "bullmq"; -import { config } from "~/config.ts"; -import { connection } from "~/utils/redis.ts"; -import { calculateBlurhash } from "../media/preprocessors/blurhash.ts"; -import { convertImage } from "../media/preprocessors/image-conversion.ts"; -import { - type MediaJobData, - MediaJobType, - mediaQueue, -} from "../queues/media.ts"; - -export const getMediaWorker = (): Worker => - new Worker( - mediaQueue.name, - async (job) => { - switch (job.name) { - case MediaJobType.ConvertMedia: { - const { attachmentId, filename } = job.data; - - await job.log(`Fetching attachment ID [${attachmentId}]`); - - const attachment = await Media.fromId(attachmentId); - - if (!attachment) { - throw new Error( - `Attachment not found: [${attachmentId}]`, - ); - } - - await job.log(`Processing attachment [${attachmentId}]`); - await job.log( - `Fetching file from [${attachment.getUrl()}]`, - ); - - // Download the file and process it. - const blob = await ( - await fetch(attachment.getUrl()) - ).blob(); - - const file = new File([blob], filename); - - await job.log(`Converting attachment [${attachmentId}]`); - - const processedFile = await convertImage( - file, - config.media.conversion.convert_to, - { - convertVectors: - config.media.conversion.convert_vectors, - }, - ); - - await job.log(`Uploading attachment [${attachmentId}]`); - - await attachment.updateFromFile(processedFile); - - await job.log( - `✔ Finished processing attachment [${attachmentId}]`, - ); - - break; - } - case MediaJobType.CalculateMetadata: { - // Calculate blurhash - const { attachmentId } = job.data; - - await job.log(`Fetching attachment ID [${attachmentId}]`); - - const attachment = await Media.fromId(attachmentId); - - if (!attachment) { - throw new Error( - `Attachment not found: [${attachmentId}]`, - ); - } - - await job.log(`Processing attachment [${attachmentId}]`); - await job.log( - `Fetching file from [${attachment.getUrl()}]`, - ); - - // Download the file and process it. - const blob = await ( - await fetch(attachment.getUrl()) - ).blob(); - - // Filename is not important for blurhash - const file = new File([blob], ""); - - await job.log(`Generating blurhash for [${attachmentId}]`); - - const blurhash = await calculateBlurhash(file); - - await attachment.update({ - blurhash, - }); - - await job.log( - `✔ Finished processing attachment [${attachmentId}]`, - ); - - break; - } - } - }, - { - connection, - removeOnComplete: { - age: config.queues.media?.remove_after_complete_seconds, - }, - removeOnFail: { - age: config.queues.media?.remove_after_failure_seconds, - }, - }, - ); diff --git a/classes/workers/push.ts b/classes/workers/push.ts deleted file mode 100644 index 09b05f57..00000000 --- a/classes/workers/push.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { htmlToText } from "@/content_types.ts"; -import { Note, PushSubscription, Token, User } from "@versia/kit/db"; -import { Worker } from "bullmq"; -import { sendNotification } from "web-push"; -import { config } from "~/config.ts"; -import { connection } from "~/utils/redis.ts"; -import { - type PushJobData, - type PushJobType, - pushQueue, -} from "../queues/push.ts"; - -export const getPushWorker = (): Worker => - new Worker( - pushQueue.name, - async (job) => { - const { - data: { psId, relatedUserId, type, noteId, notificationId }, - } = job; - - if (!config.notifications.push) { - await job.log("Push notifications are disabled"); - return; - } - - await job.log( - `Sending push notification for note [${notificationId}]`, - ); - - const ps = await PushSubscription.fromId(psId); - - if (!ps) { - throw new Error( - `Could not resolve push subscription ID ${psId}`, - ); - } - - const token = await Token.fromId(ps.data.tokenId); - - if (!token) { - throw new Error( - `Could not resolve token ID ${ps.data.tokenId}`, - ); - } - - const relatedUser = await User.fromId(relatedUserId); - - if (!relatedUser) { - throw new Error( - `Could not resolve related user ID ${relatedUserId}`, - ); - } - - const note = noteId ? await Note.fromId(noteId) : null; - - const truncate = (str: string, len: number): string => { - if (str.length <= len) { - return str; - } - - return `${str.slice(0, len)}...`; - }; - - const name = truncate( - relatedUser.data.displayName || relatedUser.data.username, - 50, - ); - - let title = name; - - switch (type) { - case "mention": - title = `${name} mentioned you`; - break; - case "reply": - title = `${name} replied to you`; - break; - case "favourite": - title = `${name} liked your note`; - break; - case "reblog": - title = `${name} reblogged your note`; - break; - case "follow": - title = `${name} followed you`; - break; - case "follow_request": - title = `${name} requested to follow you`; - break; - case "poll": - title = "Poll ended"; - break; - } - - const body = note - ? htmlToText(note.data.spoilerText || note.data.content) - : htmlToText(relatedUser.data.note); - - await sendNotification( - { - endpoint: ps.data.endpoint, - keys: { - auth: ps.data.authSecret, - p256dh: ps.data.publicKey, - }, - }, - JSON.stringify({ - access_token: token.data.accessToken, - // FIXME - preferred_locale: "en-US", - notification_id: notificationId, - notification_type: type, - icon: relatedUser.getAvatarUrl(), - title, - body: truncate(body, 140), - }), - { - vapidDetails: { - subject: - config.notifications.push.subject || - config.http.base_url.origin, - privateKey: - config.notifications.push.vapid_keys.private, - publicKey: config.notifications.push.vapid_keys.public, - }, - contentEncoding: "aesgcm", - }, - ); - - await job.log( - `✔ Finished delivering push notification for note [${notificationId}]`, - ); - }, - { - connection, - removeOnComplete: { - age: config.queues.push?.remove_after_complete_seconds, - }, - removeOnFail: { - age: config.queues.push?.remove_after_failure_seconds, - }, - }, - ); diff --git a/entrypoints/worker/index.ts b/entrypoints/worker/index.ts index 0795bd26..9f12c701 100644 --- a/entrypoints/worker/index.ts +++ b/entrypoints/worker/index.ts @@ -1,11 +1,12 @@ import { sentry } from "@/sentry"; import { getLogger } from "@logtape/logtape"; import chalk from "chalk"; -import { getDeliveryWorker } from "~/classes/workers/delivery"; -import { getFetchWorker } from "~/classes/workers/fetch"; -import { getInboxWorker } from "~/classes/workers/inbox"; -import { getMediaWorker } from "~/classes/workers/media"; -import { getPushWorker } from "~/classes/workers/push"; +import { getDeliveryWorker } from "~/classes/queues/delivery"; +import { getFetchWorker } from "~/classes/queues/fetch"; +import { getInboxWorker } from "~/classes/queues/inbox"; +import { getMediaWorker } from "~/classes/queues/media"; +import { getPushWorker } from "~/classes/queues/push"; +import { getRelationshipWorker } from "~/classes/queues/relationships"; process.on("SIGINT", () => { process.exit(); @@ -36,4 +37,8 @@ serverLogger.info`Starting Media Worker...`; getMediaWorker(); serverLogger.info`${chalk.green("✔")} Media Worker started`; -serverLogger.info`${chalk.green("✔✔✔✔✔")} All workers started`; +serverLogger.info`Starting Relationship Worker...`; +getRelationshipWorker(); +serverLogger.info`${chalk.green("✔")} Relationship Worker started`; + +serverLogger.info`${chalk.green("✔✔✔✔✔✔")} All workers started`; diff --git a/tests/setup.ts b/tests/setup.ts index a9e64947..469d5f3d 100644 --- a/tests/setup.ts +++ b/tests/setup.ts @@ -3,3 +3,6 @@ import { deleteOldTestUsers } from "./utils.ts"; await setupDatabase(); await deleteOldTestUsers(); + +// Start workers +await import("~/entrypoints/worker/index.ts"); diff --git a/utils/bull-board.ts b/utils/bull-board.ts index e4ae1675..0e87884e 100644 --- a/utils/bull-board.ts +++ b/utils/bull-board.ts @@ -8,6 +8,7 @@ import { fetchQueue } from "~/classes/queues/fetch"; import { inboxQueue } from "~/classes/queues/inbox"; import { mediaQueue } from "~/classes/queues/media"; import { pushQueue } from "~/classes/queues/push"; +import { relationshipQueue } from "~/classes/queues/relationships"; import { config } from "~/config.ts"; import pkg from "~/package.json"; import type { HonoEnv } from "~/types/api"; @@ -22,6 +23,7 @@ export const applyToHono = (app: Hono): void => { new BullMQAdapter(fetchQueue), new BullMQAdapter(pushQueue), new BullMQAdapter(mediaQueue), + new BullMQAdapter(relationshipQueue), ], serverAdapter, options: {