refactor(api): Move media processing to background job

This commit is contained in:
Jesse Wierzbinski 2025-01-06 19:21:57 +01:00
parent dcdc8c7365
commit 80b874e5fb
No known key found for this signature in database
12 changed files with 242 additions and 119 deletions

View file

@ -8,6 +8,7 @@ Versia Server `0.8.0` is fully backwards compatible with `0.7.0`.
- Outbound federation, inbox processing and data fetching are now handled by a queue system (like most federated software). - Outbound federation, inbox processing and data fetching are now handled by a queue system (like most federated software).
- Added an administration UI for managing the queue. - Added an administration UI for managing the queue.
- Media processing is now also handled by a queue system.
- Added [Push Notifications](https://docs.joinmastodon.org/methods/push) support. - Added [Push Notifications](https://docs.joinmastodon.org/methods/push) support.
- Upgraded Bun to `1.1.42`. - Upgraded Bun to `1.1.42`.
- Implemented support for the [**Instance Messaging Extension**](https://versia.pub/extensions/instance-messaging) - Implemented support for the [**Instance Messaging Extension**](https://versia.pub/extensions/instance-messaging)
@ -62,6 +63,13 @@ remove_on_complete = 31536000
# Time in seconds to remove failed jobs # Time in seconds to remove failed jobs
remove_on_failure = 31536000 remove_on_failure = 31536000
# Controls the media queue (for media processing)
[queues.media]
# Time in seconds to remove completed jobs
remove_on_complete = 31536000
# Time in seconds to remove failed jobs
remove_on_failure = 31536000
[validation] [validation]
max_emoji_size = 1000000 max_emoji_size = 1000000
max_emoji_shortcode_size = 100 max_emoji_shortcode_size = 100

View file

@ -2,10 +2,7 @@ import { apiRoute, auth } from "@/api";
import { createRoute } from "@hono/zod-openapi"; import { createRoute } from "@hono/zod-openapi";
import { Attachment } from "@versia/kit/db"; import { Attachment } from "@versia/kit/db";
import { RolePermissions } from "@versia/kit/tables"; import { RolePermissions } from "@versia/kit/tables";
import sharp from "sharp";
import { z } from "zod"; import { z } from "zod";
import { ApiError } from "~/classes/errors/api-error";
import { MediaManager } from "~/classes/media/media-manager";
import { config } from "~/packages/config-manager/index.ts"; import { config } from "~/packages/config-manager/index.ts";
import { ErrorSchema } from "~/types/api"; import { ErrorSchema } from "~/types/api";
@ -74,60 +71,11 @@ export default apiRoute((app) =>
app.openapi(route, async (context) => { app.openapi(route, async (context) => {
const { file, thumbnail, description } = context.req.valid("form"); const { file, thumbnail, description } = context.req.valid("form");
if (file.size > config.validation.max_media_size) { const attachment = await Attachment.fromFile(file, {
throw new ApiError( thumbnail,
413, description,
`File too large, max size is ${config.validation.max_media_size} bytes`,
);
}
if (
config.validation.enforce_mime_types &&
!config.validation.allowed_mime_types.includes(file.type)
) {
throw new ApiError(
415,
`File type ${file.type} is not allowed`,
`Allowed types: ${config.validation.allowed_mime_types.join(", ")}`,
);
}
const sha256 = new Bun.SHA256();
const isImage = file.type.startsWith("image/");
const metadata = isImage
? await sharp(await file.arrayBuffer()).metadata()
: null;
const mediaManager = new MediaManager(config);
const { path, blurhash } = await mediaManager.addFile(file);
const url = Attachment.getUrl(path);
let thumbnailUrl = "";
if (thumbnail) {
const { path } = await mediaManager.addFile(thumbnail);
thumbnailUrl = Attachment.getUrl(path);
}
const newAttachment = await Attachment.insert({
url,
thumbnailUrl,
sha256: sha256.update(await file.arrayBuffer()).digest("hex"),
mimeType: file.type,
description: description ?? "",
size: file.size,
blurhash: blurhash ?? undefined,
width: metadata?.width ?? undefined,
height: metadata?.height ?? undefined,
}); });
// TODO: Add job to process videos and other media return context.json(attachment.toApi(), 200);
return context.json(newAttachment.toApi(), 200);
}), }),
); );

View file

@ -2,10 +2,7 @@ import { apiRoute, auth } from "@/api";
import { createRoute } from "@hono/zod-openapi"; import { createRoute } from "@hono/zod-openapi";
import { Attachment } from "@versia/kit/db"; import { Attachment } from "@versia/kit/db";
import { RolePermissions } from "@versia/kit/tables"; import { RolePermissions } from "@versia/kit/tables";
import sharp from "sharp";
import { z } from "zod"; import { z } from "zod";
import { ApiError } from "~/classes/errors/api-error";
import { MediaManager } from "~/classes/media/media-manager";
import { config } from "~/packages/config-manager/index.ts"; import { config } from "~/packages/config-manager/index.ts";
import { ErrorSchema } from "~/types/api"; import { ErrorSchema } from "~/types/api";
@ -73,60 +70,11 @@ export default apiRoute((app) =>
app.openapi(route, async (context) => { app.openapi(route, async (context) => {
const { file, thumbnail, description } = context.req.valid("form"); const { file, thumbnail, description } = context.req.valid("form");
if (file.size > config.validation.max_media_size) { const attachment = await Attachment.fromFile(file, {
throw new ApiError( thumbnail,
413, description,
`File too large, max size is ${config.validation.max_media_size} bytes`,
);
}
if (
config.validation.enforce_mime_types &&
!config.validation.allowed_mime_types.includes(file.type)
) {
throw new ApiError(
415,
`File type ${file.type} is not allowed`,
`Allowed types: ${config.validation.allowed_mime_types.join(", ")}`,
);
}
const sha256 = new Bun.SHA256();
const isImage = file.type.startsWith("image/");
const metadata = isImage
? await sharp(await file.arrayBuffer()).metadata()
: null;
const mediaManager = new MediaManager(config);
const { path, blurhash } = await mediaManager.addFile(file);
const url = Attachment.getUrl(path);
let thumbnailUrl = "";
if (thumbnail) {
const { path } = await mediaManager.addFile(thumbnail);
thumbnailUrl = Attachment.getUrl(path);
}
const newAttachment = await Attachment.insert({
url,
thumbnailUrl,
sha256: sha256.update(await file.arrayBuffer()).digest("hex"),
mimeType: file.type,
description: description ?? "",
size: file.size,
blurhash: blurhash ?? undefined,
width: metadata?.width ?? undefined,
height: metadata?.height ?? undefined,
}); });
// TODO: Add job to process videos and other media return context.json(attachment.toApi(), 200);
return context.json(newAttachment.toApi(), 200);
}), }),
); );

View file

@ -11,9 +11,13 @@ import {
eq, eq,
inArray, inArray,
} from "drizzle-orm"; } from "drizzle-orm";
import sharp from "sharp";
import { z } from "zod"; import { z } from "zod";
import { MediaBackendType } from "~/packages/config-manager/config.type"; import { MediaBackendType } from "~/packages/config-manager/config.type";
import { config } from "~/packages/config-manager/index.ts"; import { config } from "~/packages/config-manager/index.ts";
import { ApiError } from "../errors/api-error.ts";
import { MediaManager } from "../media/media-manager.ts";
import { MediaJobType, mediaQueue } from "../queues/media.ts";
import { BaseInterface } from "./base.ts"; import { BaseInterface } from "./base.ts";
type AttachmentType = InferSelectModel<typeof Attachments>; type AttachmentType = InferSelectModel<typeof Attachments>;
@ -150,6 +154,75 @@ export class Attachment extends BaseInterface<typeof Attachments> {
return attachment; return attachment;
} }
public static async fromFile(
file: File,
options?: {
description?: string;
thumbnail?: File;
},
): Promise<Attachment> {
if (file.size > config.validation.max_media_size) {
throw new ApiError(
413,
`File too large, max size is ${config.validation.max_media_size} bytes`,
);
}
if (
config.validation.enforce_mime_types &&
!config.validation.allowed_mime_types.includes(file.type)
) {
throw new ApiError(
415,
`File type ${file.type} is not allowed`,
`Allowed types: ${config.validation.allowed_mime_types.join(", ")}`,
);
}
const sha256 = new Bun.SHA256();
const isImage = file.type.startsWith("image/");
const metadata = isImage
? await sharp(await file.arrayBuffer()).metadata()
: null;
const mediaManager = new MediaManager(config);
const { path, blurhash } = await mediaManager.addFile(file);
const url = Attachment.getUrl(path);
let thumbnailUrl = "";
if (options?.thumbnail) {
const { path } = await mediaManager.addFile(options.thumbnail);
thumbnailUrl = Attachment.getUrl(path);
}
const newAttachment = await Attachment.insert({
url,
thumbnailUrl: thumbnailUrl || undefined,
sha256: sha256.update(await file.arrayBuffer()).digest("hex"),
mimeType: file.type,
description: options?.description ?? "",
size: file.size,
blurhash: blurhash ?? undefined,
width: metadata?.width ?? undefined,
height: metadata?.height ?? undefined,
});
if (config.media.conversion.convert_images) {
await mediaQueue.add(MediaJobType.ConvertMedia, {
attachmentId: newAttachment.id,
filename: file.name,
});
}
return newAttachment;
}
public get id(): string { public get id(): string {
return this.data.id; return this.data.id;
} }

View file

@ -8,7 +8,6 @@ import { DiskMediaDriver } from "./drivers/disk.ts";
import type { MediaDriver } from "./drivers/media-driver.ts"; import type { MediaDriver } from "./drivers/media-driver.ts";
import { S3MediaDriver } from "./drivers/s3.ts"; import { S3MediaDriver } from "./drivers/s3.ts";
import { BlurhashPreprocessor } from "./preprocessors/blurhash.ts"; import { BlurhashPreprocessor } from "./preprocessors/blurhash.ts";
import { ImageConversionPreprocessor } from "./preprocessors/image-conversion.ts";
import type { MediaPreprocessor } from "./preprocessors/media-preprocessor.ts"; import type { MediaPreprocessor } from "./preprocessors/media-preprocessor.ts";
/** /**
@ -58,11 +57,6 @@ export class MediaManager {
* Initializes the preprocessors based on the configuration. * Initializes the preprocessors based on the configuration.
*/ */
private initializePreprocessors(): void { private initializePreprocessors(): void {
if (this.config.media.conversion.convert_images) {
this.preprocessors.push(
new ImageConversionPreprocessor(this.config),
);
}
this.preprocessors.push(new BlurhashPreprocessor()); this.preprocessors.push(new BlurhashPreprocessor());
// Add other preprocessors here as needed // Add other preprocessors here as needed
} }
@ -87,6 +81,7 @@ export class MediaManager {
} }
const uploadResult = await this.driver.addFile(processedFile); const uploadResult = await this.driver.addFile(processedFile);
return { ...uploadResult, blurhash }; return { ...uploadResult, blurhash };
} }
/** /**

15
classes/queues/media.ts Normal file
View file

@ -0,0 +1,15 @@
import { Queue } from "bullmq";
import { connection } from "~/utils/redis.ts";
export enum MediaJobType {
ConvertMedia = "convertMedia",
}
export type MediaJobData = {
attachmentId: string;
filename: string;
};
export const mediaQueue = new Queue<MediaJobData, void, MediaJobType>("media", {
connection,
});

78
classes/workers/media.ts Normal file
View file

@ -0,0 +1,78 @@
import { Attachment } from "@versia/kit/db";
import { Worker } from "bullmq";
import { config } from "~/packages/config-manager";
import { connection } from "~/utils/redis.ts";
import { MediaManager } from "../media/media-manager.ts";
import { ImageConversionPreprocessor } from "../media/preprocessors/image-conversion.ts";
import {
type MediaJobData,
MediaJobType,
mediaQueue,
} from "../queues/media.ts";
export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
new Worker<MediaJobData, void, MediaJobType>(
mediaQueue.name,
async (job) => {
switch (job.name) {
case MediaJobType.ConvertMedia: {
const { attachmentId, filename } = job.data;
const attachment = await Attachment.fromId(attachmentId);
if (!attachment) {
throw new Error(
`Attachment not found: [${attachmentId}]`,
);
}
const processor = new ImageConversionPreprocessor(config);
const hash = attachment?.data.sha256;
if (!hash) {
throw new Error(
`Attachment [${attachmentId}] has no hash, cannot process.`,
);
}
// Download the file and process it.
const blob = await (
await fetch(attachment.data.url)
).blob();
const file = new File([blob], filename);
const { file: processedFile } =
await processor.process(file);
const mediaManager = new MediaManager(config);
const { path, uploadedFile } =
await mediaManager.addFile(processedFile);
const url = Attachment.getUrl(path);
const sha256 = new Bun.SHA256();
await attachment.update({
url,
sha256: sha256
.update(await uploadedFile.arrayBuffer())
.digest("hex"),
mimeType: uploadedFile.type,
size: uploadedFile.size,
});
}
}
},
{
connection,
removeOnComplete: {
age: config.queues.media.remove_on_complete,
},
removeOnFail: {
age: config.queues.media.remove_on_failure,
},
},
);

View file

@ -304,6 +304,13 @@ remove_on_complete = 31536000
# Time in seconds to remove failed jobs # Time in seconds to remove failed jobs
remove_on_failure = 31536000 remove_on_failure = 31536000
# Controls the media queue (for media processing)
[queues.media]
# Time in seconds to remove completed jobs
remove_on_complete = 31536000
# Time in seconds to remove failed jobs
remove_on_failure = 31536000
[federation] [federation]
# This is a list of domain names, such as "mastodon.social" or "pleroma.site" # This is a list of domain names, such as "mastodon.social" or "pleroma.site"
# These changes will not retroactively apply to existing data before they were changed # These changes will not retroactively apply to existing data before they were changed

View file

@ -3425,6 +3425,24 @@
"remove_on_complete": 31536000, "remove_on_complete": 31536000,
"remove_on_failure": 31536000 "remove_on_failure": 31536000
} }
},
"media": {
"type": "object",
"properties": {
"remove_on_complete": {
"type": "integer",
"default": 31536000
},
"remove_on_failure": {
"type": "integer",
"default": 31536000
}
},
"additionalProperties": false,
"default": {
"remove_on_complete": 31536000,
"remove_on_failure": 31536000
}
} }
}, },
"additionalProperties": false, "additionalProperties": false,
@ -3444,6 +3462,10 @@
"push": { "push": {
"remove_on_complete": 31536000, "remove_on_complete": 31536000,
"remove_on_failure": 31536000 "remove_on_failure": 31536000
},
"media": {
"remove_on_complete": 31536000,
"remove_on_failure": 31536000
} }
} }
}, },

View file

@ -4,6 +4,7 @@ import chalk from "chalk";
import { getDeliveryWorker } from "~/classes/workers/delivery"; import { getDeliveryWorker } from "~/classes/workers/delivery";
import { getFetchWorker } from "~/classes/workers/fetch"; import { getFetchWorker } from "~/classes/workers/fetch";
import { getInboxWorker } from "~/classes/workers/inbox"; import { getInboxWorker } from "~/classes/workers/inbox";
import { getMediaWorker } from "~/classes/workers/media";
import { getPushWorker } from "~/classes/workers/push"; import { getPushWorker } from "~/classes/workers/push";
process.on("SIGINT", () => { process.on("SIGINT", () => {
@ -31,4 +32,8 @@ serverLogger.info`Starting Push Worker...`;
getPushWorker(); getPushWorker();
serverLogger.info`${chalk.green("✔")} Push Worker started`; serverLogger.info`${chalk.green("✔")} Push Worker started`;
serverLogger.info`${chalk.green("✔✔✔✔")} All workers started`; serverLogger.info`Starting Media Worker...`;
getMediaWorker();
serverLogger.info`${chalk.green("✔")} Media Worker started`;
serverLogger.info`${chalk.green("✔✔✔✔✔")} All workers started`;

View file

@ -630,6 +630,24 @@ export const configValidator = z
remove_on_complete: 60 * 60 * 24 * 365, remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365,
}), }),
media: z
.object({
remove_on_complete: z
.number()
.int()
// 1 year
.default(60 * 60 * 24 * 365),
remove_on_failure: z
.number()
.int()
// 1 year
.default(60 * 60 * 24 * 365),
})
.strict()
.default({
remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365,
}),
}) })
.strict() .strict()
.default({ .default({
@ -649,6 +667,10 @@ export const configValidator = z
remove_on_complete: 60 * 60 * 24 * 365, remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365, remove_on_failure: 60 * 60 * 24 * 365,
}, },
media: {
remove_on_complete: 60 * 60 * 24 * 365,
remove_on_failure: 60 * 60 * 24 * 365,
},
}), }),
instance: z instance: z
.object({ .object({

View file

@ -6,6 +6,7 @@ import { serveStatic } from "hono/bun";
import { deliveryQueue } from "~/classes/queues/delivery"; import { deliveryQueue } from "~/classes/queues/delivery";
import { fetchQueue } from "~/classes/queues/fetch"; import { fetchQueue } from "~/classes/queues/fetch";
import { inboxQueue } from "~/classes/queues/inbox"; import { inboxQueue } from "~/classes/queues/inbox";
import { mediaQueue } from "~/classes/queues/media";
import { pushQueue } from "~/classes/queues/push"; import { pushQueue } from "~/classes/queues/push";
import { config } from "~/packages/config-manager"; import { config } from "~/packages/config-manager";
import type { HonoEnv } from "~/types/api"; import type { HonoEnv } from "~/types/api";
@ -19,6 +20,7 @@ export const applyToHono = (app: OpenAPIHono<HonoEnv>): void => {
new BullMQAdapter(deliveryQueue), new BullMQAdapter(deliveryQueue),
new BullMQAdapter(fetchQueue), new BullMQAdapter(fetchQueue),
new BullMQAdapter(pushQueue), new BullMQAdapter(pushQueue),
new BullMQAdapter(mediaQueue),
], ],
serverAdapter, serverAdapter,
options: { options: {