From 80b874e5fbd9752b3b444e1fe543e6f8ffdeaef6 Mon Sep 17 00:00:00 2001 From: Jesse Wierzbinski Date: Mon, 6 Jan 2025 19:21:57 +0100 Subject: [PATCH] refactor(api): :zap: Move media processing to background job --- CHANGELOG.md | 8 +++ api/api/v1/media/index.ts | 60 ++------------------ api/api/v2/media/index.ts | 60 ++------------------ classes/database/attachment.ts | 73 ++++++++++++++++++++++++ classes/media/media-manager.ts | 7 +-- classes/queues/media.ts | 15 +++++ classes/workers/media.ts | 78 ++++++++++++++++++++++++++ config/config.example.toml | 7 +++ config/config.schema.json | 22 ++++++++ entrypoints/worker/index.ts | 7 ++- packages/config-manager/config.type.ts | 22 ++++++++ utils/bull-board.ts | 2 + 12 files changed, 242 insertions(+), 119 deletions(-) create mode 100644 classes/queues/media.ts create mode 100644 classes/workers/media.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 63202abf..61a2ecfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Versia Server `0.8.0` is fully backwards compatible with `0.7.0`. - Outbound federation, inbox processing and data fetching are now handled by a queue system (like most federated software). - Added an administration UI for managing the queue. +- Media processing is now also handled by a queue system. - Added [Push Notifications](https://docs.joinmastodon.org/methods/push) support. - Upgraded Bun to `1.1.42`. - Implemented support for the [**Instance Messaging Extension**](https://versia.pub/extensions/instance-messaging) @@ -62,6 +63,13 @@ remove_on_complete = 31536000 # Time in seconds to remove failed jobs remove_on_failure = 31536000 +# Controls the media queue (for media processing) +[queues.media] +# Time in seconds to remove completed jobs +remove_on_complete = 31536000 +# Time in seconds to remove failed jobs +remove_on_failure = 31536000 + [validation] max_emoji_size = 1000000 max_emoji_shortcode_size = 100 diff --git a/api/api/v1/media/index.ts b/api/api/v1/media/index.ts index 3eefbff7..e292b09c 100644 --- a/api/api/v1/media/index.ts +++ b/api/api/v1/media/index.ts @@ -2,10 +2,7 @@ import { apiRoute, auth } from "@/api"; import { createRoute } from "@hono/zod-openapi"; import { Attachment } from "@versia/kit/db"; import { RolePermissions } from "@versia/kit/tables"; -import sharp from "sharp"; import { z } from "zod"; -import { ApiError } from "~/classes/errors/api-error"; -import { MediaManager } from "~/classes/media/media-manager"; import { config } from "~/packages/config-manager/index.ts"; import { ErrorSchema } from "~/types/api"; @@ -74,60 +71,11 @@ export default apiRoute((app) => app.openapi(route, async (context) => { const { file, thumbnail, description } = context.req.valid("form"); - if (file.size > config.validation.max_media_size) { - throw new ApiError( - 413, - `File too large, max size is ${config.validation.max_media_size} bytes`, - ); - } - - if ( - config.validation.enforce_mime_types && - !config.validation.allowed_mime_types.includes(file.type) - ) { - throw new ApiError( - 415, - `File type ${file.type} is not allowed`, - `Allowed types: ${config.validation.allowed_mime_types.join(", ")}`, - ); - } - - const sha256 = new Bun.SHA256(); - - const isImage = file.type.startsWith("image/"); - - const metadata = isImage - ? await sharp(await file.arrayBuffer()).metadata() - : null; - - const mediaManager = new MediaManager(config); - - const { path, blurhash } = await mediaManager.addFile(file); - - const url = Attachment.getUrl(path); - - let thumbnailUrl = ""; - - if (thumbnail) { - const { path } = await mediaManager.addFile(thumbnail); - - thumbnailUrl = Attachment.getUrl(path); - } - - const newAttachment = await Attachment.insert({ - url, - thumbnailUrl, - sha256: sha256.update(await file.arrayBuffer()).digest("hex"), - mimeType: file.type, - description: description ?? "", - size: file.size, - blurhash: blurhash ?? undefined, - width: metadata?.width ?? undefined, - height: metadata?.height ?? undefined, + const attachment = await Attachment.fromFile(file, { + thumbnail, + description, }); - // TODO: Add job to process videos and other media - - return context.json(newAttachment.toApi(), 200); + return context.json(attachment.toApi(), 200); }), ); diff --git a/api/api/v2/media/index.ts b/api/api/v2/media/index.ts index 7fb57a1b..98614866 100644 --- a/api/api/v2/media/index.ts +++ b/api/api/v2/media/index.ts @@ -2,10 +2,7 @@ import { apiRoute, auth } from "@/api"; import { createRoute } from "@hono/zod-openapi"; import { Attachment } from "@versia/kit/db"; import { RolePermissions } from "@versia/kit/tables"; -import sharp from "sharp"; import { z } from "zod"; -import { ApiError } from "~/classes/errors/api-error"; -import { MediaManager } from "~/classes/media/media-manager"; import { config } from "~/packages/config-manager/index.ts"; import { ErrorSchema } from "~/types/api"; @@ -73,60 +70,11 @@ export default apiRoute((app) => app.openapi(route, async (context) => { const { file, thumbnail, description } = context.req.valid("form"); - if (file.size > config.validation.max_media_size) { - throw new ApiError( - 413, - `File too large, max size is ${config.validation.max_media_size} bytes`, - ); - } - - if ( - config.validation.enforce_mime_types && - !config.validation.allowed_mime_types.includes(file.type) - ) { - throw new ApiError( - 415, - `File type ${file.type} is not allowed`, - `Allowed types: ${config.validation.allowed_mime_types.join(", ")}`, - ); - } - - const sha256 = new Bun.SHA256(); - - const isImage = file.type.startsWith("image/"); - - const metadata = isImage - ? await sharp(await file.arrayBuffer()).metadata() - : null; - - const mediaManager = new MediaManager(config); - - const { path, blurhash } = await mediaManager.addFile(file); - - const url = Attachment.getUrl(path); - - let thumbnailUrl = ""; - - if (thumbnail) { - const { path } = await mediaManager.addFile(thumbnail); - - thumbnailUrl = Attachment.getUrl(path); - } - - const newAttachment = await Attachment.insert({ - url, - thumbnailUrl, - sha256: sha256.update(await file.arrayBuffer()).digest("hex"), - mimeType: file.type, - description: description ?? "", - size: file.size, - blurhash: blurhash ?? undefined, - width: metadata?.width ?? undefined, - height: metadata?.height ?? undefined, + const attachment = await Attachment.fromFile(file, { + thumbnail, + description, }); - // TODO: Add job to process videos and other media - - return context.json(newAttachment.toApi(), 200); + return context.json(attachment.toApi(), 200); }), ); diff --git a/classes/database/attachment.ts b/classes/database/attachment.ts index 2d8dec03..8ae5e384 100644 --- a/classes/database/attachment.ts +++ b/classes/database/attachment.ts @@ -11,9 +11,13 @@ import { eq, inArray, } from "drizzle-orm"; +import sharp from "sharp"; import { z } from "zod"; import { MediaBackendType } from "~/packages/config-manager/config.type"; import { config } from "~/packages/config-manager/index.ts"; +import { ApiError } from "../errors/api-error.ts"; +import { MediaManager } from "../media/media-manager.ts"; +import { MediaJobType, mediaQueue } from "../queues/media.ts"; import { BaseInterface } from "./base.ts"; type AttachmentType = InferSelectModel; @@ -150,6 +154,75 @@ export class Attachment extends BaseInterface { return attachment; } + public static async fromFile( + file: File, + options?: { + description?: string; + thumbnail?: File; + }, + ): Promise { + if (file.size > config.validation.max_media_size) { + throw new ApiError( + 413, + `File too large, max size is ${config.validation.max_media_size} bytes`, + ); + } + + if ( + config.validation.enforce_mime_types && + !config.validation.allowed_mime_types.includes(file.type) + ) { + throw new ApiError( + 415, + `File type ${file.type} is not allowed`, + `Allowed types: ${config.validation.allowed_mime_types.join(", ")}`, + ); + } + + const sha256 = new Bun.SHA256(); + + const isImage = file.type.startsWith("image/"); + + const metadata = isImage + ? await sharp(await file.arrayBuffer()).metadata() + : null; + + const mediaManager = new MediaManager(config); + + const { path, blurhash } = await mediaManager.addFile(file); + + const url = Attachment.getUrl(path); + + let thumbnailUrl = ""; + + if (options?.thumbnail) { + const { path } = await mediaManager.addFile(options.thumbnail); + + thumbnailUrl = Attachment.getUrl(path); + } + + const newAttachment = await Attachment.insert({ + url, + thumbnailUrl: thumbnailUrl || undefined, + sha256: sha256.update(await file.arrayBuffer()).digest("hex"), + mimeType: file.type, + description: options?.description ?? "", + size: file.size, + blurhash: blurhash ?? undefined, + width: metadata?.width ?? undefined, + height: metadata?.height ?? undefined, + }); + + if (config.media.conversion.convert_images) { + await mediaQueue.add(MediaJobType.ConvertMedia, { + attachmentId: newAttachment.id, + filename: file.name, + }); + } + + return newAttachment; + } + public get id(): string { return this.data.id; } diff --git a/classes/media/media-manager.ts b/classes/media/media-manager.ts index f117bf1d..00cb7920 100644 --- a/classes/media/media-manager.ts +++ b/classes/media/media-manager.ts @@ -8,7 +8,6 @@ import { DiskMediaDriver } from "./drivers/disk.ts"; import type { MediaDriver } from "./drivers/media-driver.ts"; import { S3MediaDriver } from "./drivers/s3.ts"; import { BlurhashPreprocessor } from "./preprocessors/blurhash.ts"; -import { ImageConversionPreprocessor } from "./preprocessors/image-conversion.ts"; import type { MediaPreprocessor } from "./preprocessors/media-preprocessor.ts"; /** @@ -58,11 +57,6 @@ export class MediaManager { * Initializes the preprocessors based on the configuration. */ private initializePreprocessors(): void { - if (this.config.media.conversion.convert_images) { - this.preprocessors.push( - new ImageConversionPreprocessor(this.config), - ); - } this.preprocessors.push(new BlurhashPreprocessor()); // Add other preprocessors here as needed } @@ -87,6 +81,7 @@ export class MediaManager { } const uploadResult = await this.driver.addFile(processedFile); + return { ...uploadResult, blurhash }; } /** diff --git a/classes/queues/media.ts b/classes/queues/media.ts new file mode 100644 index 00000000..a9005eaf --- /dev/null +++ b/classes/queues/media.ts @@ -0,0 +1,15 @@ +import { Queue } from "bullmq"; +import { connection } from "~/utils/redis.ts"; + +export enum MediaJobType { + ConvertMedia = "convertMedia", +} + +export type MediaJobData = { + attachmentId: string; + filename: string; +}; + +export const mediaQueue = new Queue("media", { + connection, +}); diff --git a/classes/workers/media.ts b/classes/workers/media.ts new file mode 100644 index 00000000..06ff2c49 --- /dev/null +++ b/classes/workers/media.ts @@ -0,0 +1,78 @@ +import { Attachment } from "@versia/kit/db"; +import { Worker } from "bullmq"; +import { config } from "~/packages/config-manager"; +import { connection } from "~/utils/redis.ts"; +import { MediaManager } from "../media/media-manager.ts"; +import { ImageConversionPreprocessor } from "../media/preprocessors/image-conversion.ts"; +import { + type MediaJobData, + MediaJobType, + mediaQueue, +} from "../queues/media.ts"; + +export const getMediaWorker = (): Worker => + new Worker( + mediaQueue.name, + async (job) => { + switch (job.name) { + case MediaJobType.ConvertMedia: { + const { attachmentId, filename } = job.data; + + const attachment = await Attachment.fromId(attachmentId); + + if (!attachment) { + throw new Error( + `Attachment not found: [${attachmentId}]`, + ); + } + + const processor = new ImageConversionPreprocessor(config); + + const hash = attachment?.data.sha256; + + if (!hash) { + throw new Error( + `Attachment [${attachmentId}] has no hash, cannot process.`, + ); + } + + // Download the file and process it. + const blob = await ( + await fetch(attachment.data.url) + ).blob(); + + const file = new File([blob], filename); + + const { file: processedFile } = + await processor.process(file); + + const mediaManager = new MediaManager(config); + + const { path, uploadedFile } = + await mediaManager.addFile(processedFile); + + const url = Attachment.getUrl(path); + + const sha256 = new Bun.SHA256(); + + await attachment.update({ + url, + sha256: sha256 + .update(await uploadedFile.arrayBuffer()) + .digest("hex"), + mimeType: uploadedFile.type, + size: uploadedFile.size, + }); + } + } + }, + { + connection, + removeOnComplete: { + age: config.queues.media.remove_on_complete, + }, + removeOnFail: { + age: config.queues.media.remove_on_failure, + }, + }, + ); diff --git a/config/config.example.toml b/config/config.example.toml index 2e06a5a4..c28d7823 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -304,6 +304,13 @@ remove_on_complete = 31536000 # Time in seconds to remove failed jobs remove_on_failure = 31536000 +# Controls the media queue (for media processing) +[queues.media] +# Time in seconds to remove completed jobs +remove_on_complete = 31536000 +# Time in seconds to remove failed jobs +remove_on_failure = 31536000 + [federation] # This is a list of domain names, such as "mastodon.social" or "pleroma.site" # These changes will not retroactively apply to existing data before they were changed diff --git a/config/config.schema.json b/config/config.schema.json index 031508ac..6229101e 100644 --- a/config/config.schema.json +++ b/config/config.schema.json @@ -3425,6 +3425,24 @@ "remove_on_complete": 31536000, "remove_on_failure": 31536000 } + }, + "media": { + "type": "object", + "properties": { + "remove_on_complete": { + "type": "integer", + "default": 31536000 + }, + "remove_on_failure": { + "type": "integer", + "default": 31536000 + } + }, + "additionalProperties": false, + "default": { + "remove_on_complete": 31536000, + "remove_on_failure": 31536000 + } } }, "additionalProperties": false, @@ -3444,6 +3462,10 @@ "push": { "remove_on_complete": 31536000, "remove_on_failure": 31536000 + }, + "media": { + "remove_on_complete": 31536000, + "remove_on_failure": 31536000 } } }, diff --git a/entrypoints/worker/index.ts b/entrypoints/worker/index.ts index f77c87cd..0795bd26 100644 --- a/entrypoints/worker/index.ts +++ b/entrypoints/worker/index.ts @@ -4,6 +4,7 @@ import chalk from "chalk"; import { getDeliveryWorker } from "~/classes/workers/delivery"; import { getFetchWorker } from "~/classes/workers/fetch"; import { getInboxWorker } from "~/classes/workers/inbox"; +import { getMediaWorker } from "~/classes/workers/media"; import { getPushWorker } from "~/classes/workers/push"; process.on("SIGINT", () => { @@ -31,4 +32,8 @@ serverLogger.info`Starting Push Worker...`; getPushWorker(); serverLogger.info`${chalk.green("✔")} Push Worker started`; -serverLogger.info`${chalk.green("✔✔✔✔")} All workers started`; +serverLogger.info`Starting Media Worker...`; +getMediaWorker(); +serverLogger.info`${chalk.green("✔")} Media Worker started`; + +serverLogger.info`${chalk.green("✔✔✔✔✔")} All workers started`; diff --git a/packages/config-manager/config.type.ts b/packages/config-manager/config.type.ts index bd346c50..7156be24 100644 --- a/packages/config-manager/config.type.ts +++ b/packages/config-manager/config.type.ts @@ -630,6 +630,24 @@ export const configValidator = z remove_on_complete: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365, }), + media: z + .object({ + remove_on_complete: z + .number() + .int() + // 1 year + .default(60 * 60 * 24 * 365), + remove_on_failure: z + .number() + .int() + // 1 year + .default(60 * 60 * 24 * 365), + }) + .strict() + .default({ + remove_on_complete: 60 * 60 * 24 * 365, + remove_on_failure: 60 * 60 * 24 * 365, + }), }) .strict() .default({ @@ -649,6 +667,10 @@ export const configValidator = z remove_on_complete: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365, }, + media: { + remove_on_complete: 60 * 60 * 24 * 365, + remove_on_failure: 60 * 60 * 24 * 365, + }, }), instance: z .object({ diff --git a/utils/bull-board.ts b/utils/bull-board.ts index 2feba895..8ba1d845 100644 --- a/utils/bull-board.ts +++ b/utils/bull-board.ts @@ -6,6 +6,7 @@ import { serveStatic } from "hono/bun"; import { deliveryQueue } from "~/classes/queues/delivery"; import { fetchQueue } from "~/classes/queues/fetch"; import { inboxQueue } from "~/classes/queues/inbox"; +import { mediaQueue } from "~/classes/queues/media"; import { pushQueue } from "~/classes/queues/push"; import { config } from "~/packages/config-manager"; import type { HonoEnv } from "~/types/api"; @@ -19,6 +20,7 @@ export const applyToHono = (app: OpenAPIHono): void => { new BullMQAdapter(deliveryQueue), new BullMQAdapter(fetchQueue), new BullMQAdapter(pushQueue), + new BullMQAdapter(mediaQueue), ], serverAdapter, options: {