mirror of
https://github.com/versia-pub/server.git
synced 2025-12-06 08:28:19 +01:00
refactor(worker): ⚡ Move blurhash processing to worker
This commit is contained in:
parent
8188a6ffc7
commit
b086e65404
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
|
|
@ -6,7 +6,8 @@
|
||||||
"cli",
|
"cli",
|
||||||
"federation",
|
"federation",
|
||||||
"config",
|
"config",
|
||||||
"plugin"
|
"plugin",
|
||||||
|
"worker"
|
||||||
],
|
],
|
||||||
"languageToolLinter.languageTool.ignoredWordsInWorkspace": ["versia"]
|
"languageToolLinter.languageTool.ignoredWordsInWorkspace": ["versia"]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -189,7 +189,7 @@ export class Attachment extends BaseInterface<typeof Attachments> {
|
||||||
|
|
||||||
const mediaManager = new MediaManager(config);
|
const mediaManager = new MediaManager(config);
|
||||||
|
|
||||||
const { path, blurhash } = await mediaManager.addFile(file);
|
const { path } = await mediaManager.addFile(file);
|
||||||
|
|
||||||
const url = Attachment.getUrl(path);
|
const url = Attachment.getUrl(path);
|
||||||
|
|
||||||
|
|
@ -208,7 +208,6 @@ export class Attachment extends BaseInterface<typeof Attachments> {
|
||||||
mimeType: file.type,
|
mimeType: file.type,
|
||||||
description: options?.description ?? "",
|
description: options?.description ?? "",
|
||||||
size: file.size,
|
size: file.size,
|
||||||
blurhash: blurhash ?? undefined,
|
|
||||||
width: metadata?.width ?? undefined,
|
width: metadata?.width ?? undefined,
|
||||||
height: metadata?.height ?? undefined,
|
height: metadata?.height ?? undefined,
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -86,7 +86,6 @@ describe("MediaManager", () => {
|
||||||
uploadedFile: new File(["hey"], "test.webp"),
|
uploadedFile: new File(["hey"], "test.webp"),
|
||||||
path: "test/test.webp",
|
path: "test/test.webp",
|
||||||
hash: "testhash",
|
hash: "testhash",
|
||||||
blurhash: null,
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import type { Config } from "~/packages/config-manager/config.type";
|
||||||
import { DiskMediaDriver } from "./drivers/disk.ts";
|
import { DiskMediaDriver } from "./drivers/disk.ts";
|
||||||
import type { MediaDriver } from "./drivers/media-driver.ts";
|
import type { MediaDriver } from "./drivers/media-driver.ts";
|
||||||
import { S3MediaDriver } from "./drivers/s3.ts";
|
import { S3MediaDriver } from "./drivers/s3.ts";
|
||||||
import { BlurhashPreprocessor } from "./preprocessors/blurhash.ts";
|
|
||||||
import type { MediaPreprocessor } from "./preprocessors/media-preprocessor.ts";
|
import type { MediaPreprocessor } from "./preprocessors/media-preprocessor.ts";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -33,7 +32,6 @@ export class MediaManager {
|
||||||
*/
|
*/
|
||||||
public constructor(private config: Config) {
|
public constructor(private config: Config) {
|
||||||
this.driver = this.initializeDriver();
|
this.driver = this.initializeDriver();
|
||||||
this.initializePreprocessors();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -53,14 +51,6 @@ export class MediaManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the preprocessors based on the configuration.
|
|
||||||
*/
|
|
||||||
private initializePreprocessors(): void {
|
|
||||||
this.preprocessors.push(new BlurhashPreprocessor());
|
|
||||||
// Add other preprocessors here as needed
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a file to the media storage.
|
* Adds a file to the media storage.
|
||||||
* @param file - The file to add.
|
* @param file - The file to add.
|
||||||
|
|
@ -68,21 +58,16 @@ export class MediaManager {
|
||||||
*/
|
*/
|
||||||
public async addFile(file: File): Promise<UploadedFileMetadata> {
|
public async addFile(file: File): Promise<UploadedFileMetadata> {
|
||||||
let processedFile = file;
|
let processedFile = file;
|
||||||
let blurhash: string | null = null;
|
|
||||||
|
|
||||||
for (const preprocessor of this.preprocessors) {
|
for (const preprocessor of this.preprocessors) {
|
||||||
const result = await preprocessor.process(processedFile);
|
const result = await preprocessor.process(processedFile);
|
||||||
|
|
||||||
if ("blurhash" in result) {
|
|
||||||
blurhash = result.blurhash as string;
|
|
||||||
}
|
|
||||||
|
|
||||||
processedFile = result.file;
|
processedFile = result.file;
|
||||||
}
|
}
|
||||||
|
|
||||||
const uploadResult = await this.driver.addFile(processedFile);
|
const uploadResult = await this.driver.addFile(processedFile);
|
||||||
|
|
||||||
return { ...uploadResult, blurhash };
|
return uploadResult;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Retrieves a file from the media storage by its hash.
|
* Retrieves a file from the media storage by its hash.
|
||||||
|
|
@ -123,5 +108,4 @@ export interface UploadedFileMetadata {
|
||||||
uploadedFile: File;
|
uploadedFile: File;
|
||||||
path: string;
|
path: string;
|
||||||
hash: string;
|
hash: string;
|
||||||
blurhash: string | null;
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,3 @@
|
||||||
import { getLogger } from "@logtape/logtape";
|
|
||||||
import { User } from "@versia/kit/db";
|
import { User } from "@versia/kit/db";
|
||||||
import { Worker } from "bullmq";
|
import { Worker } from "bullmq";
|
||||||
import chalk from "chalk";
|
import chalk from "chalk";
|
||||||
|
|
@ -22,8 +21,6 @@ export const getDeliveryWorker = (): Worker<
|
||||||
case DeliveryJobType.FederateEntity: {
|
case DeliveryJobType.FederateEntity: {
|
||||||
const { entity, recipientId, senderId } = job.data;
|
const { entity, recipientId, senderId } = job.data;
|
||||||
|
|
||||||
const logger = getLogger(["federation", "delivery"]);
|
|
||||||
|
|
||||||
const sender = await User.fromId(senderId);
|
const sender = await User.fromId(senderId);
|
||||||
|
|
||||||
if (!sender) {
|
if (!sender) {
|
||||||
|
|
@ -40,17 +37,15 @@ export const getDeliveryWorker = (): Worker<
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug`Federating entity ${chalk.gray(
|
await job.log(
|
||||||
entity.id,
|
`Federating entity [${entity.id}] from @${sender.getAcct()} to @${recipient.getAcct()}`,
|
||||||
)} from ${chalk.gray(`@${sender.getAcct()}`)} to ${chalk.gray(
|
);
|
||||||
recipient.getAcct(),
|
|
||||||
)}`;
|
|
||||||
|
|
||||||
await sender.federateToUser(entity, recipient);
|
await sender.federateToUser(entity, recipient);
|
||||||
|
|
||||||
logger.debug`${chalk.green(
|
await job.log(
|
||||||
"✔",
|
`✔ Finished federating entity [${entity.id}]`,
|
||||||
)} Finished federating entity ${chalk.gray(entity.id)}`;
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
import { Instance } from "@versia/kit/db";
|
import { Instance } from "@versia/kit/db";
|
||||||
import { Instances } from "@versia/kit/tables";
|
import { Instances } from "@versia/kit/tables";
|
||||||
import { Worker } from "bullmq";
|
import { Worker } from "bullmq";
|
||||||
import chalk from "chalk";
|
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { config } from "~/packages/config-manager";
|
import { config } from "~/packages/config-manager";
|
||||||
import { connection } from "~/utils/redis.ts";
|
import { connection } from "~/utils/redis.ts";
|
||||||
|
|
@ -45,9 +44,7 @@ export const getFetchWorker = (): Worker<FetchJobData, void, FetchJobType> =>
|
||||||
await Instance.resolve(uri);
|
await Instance.resolve(uri);
|
||||||
|
|
||||||
await job.log(
|
await job.log(
|
||||||
`${chalk.green(
|
`✔ Finished fetching instance metadata from [${uri}]`,
|
||||||
"✔",
|
|
||||||
)} Finished fetching instance metadata from [${uri}]`,
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ export const getInboxWorker = (): Worker<InboxJobData, void, InboxJobType> =>
|
||||||
}
|
}
|
||||||
|
|
||||||
await job.log(
|
await job.log(
|
||||||
`Finished processing entity [${data.id}]`,
|
`✔ Finished processing entity [${data.id}]`,
|
||||||
);
|
);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ import { Worker } from "bullmq";
|
||||||
import { config } from "~/packages/config-manager";
|
import { config } from "~/packages/config-manager";
|
||||||
import { connection } from "~/utils/redis.ts";
|
import { connection } from "~/utils/redis.ts";
|
||||||
import { MediaManager } from "../media/media-manager.ts";
|
import { MediaManager } from "../media/media-manager.ts";
|
||||||
|
import { BlurhashPreprocessor } from "../media/preprocessors/blurhash.ts";
|
||||||
import { ImageConversionPreprocessor } from "../media/preprocessors/image-conversion.ts";
|
import { ImageConversionPreprocessor } from "../media/preprocessors/image-conversion.ts";
|
||||||
import {
|
import {
|
||||||
type MediaJobData,
|
type MediaJobData,
|
||||||
|
|
@ -18,6 +19,8 @@ export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
|
||||||
case MediaJobType.ConvertMedia: {
|
case MediaJobType.ConvertMedia: {
|
||||||
const { attachmentId, filename } = job.data;
|
const { attachmentId, filename } = job.data;
|
||||||
|
|
||||||
|
await job.log(`Fetching attachment ID [${attachmentId}]`);
|
||||||
|
|
||||||
const attachment = await Attachment.fromId(attachmentId);
|
const attachment = await Attachment.fromId(attachmentId);
|
||||||
|
|
||||||
if (!attachment) {
|
if (!attachment) {
|
||||||
|
|
@ -27,6 +30,7 @@ export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
|
||||||
}
|
}
|
||||||
|
|
||||||
const processor = new ImageConversionPreprocessor(config);
|
const processor = new ImageConversionPreprocessor(config);
|
||||||
|
const blurhashProcessor = new BlurhashPreprocessor();
|
||||||
|
|
||||||
const hash = attachment?.data.sha256;
|
const hash = attachment?.data.sha256;
|
||||||
|
|
||||||
|
|
@ -36,6 +40,11 @@ export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await job.log(`Processing attachment [${attachmentId}]`);
|
||||||
|
await job.log(
|
||||||
|
`Fetching file from [${attachment.data.url}]`,
|
||||||
|
);
|
||||||
|
|
||||||
// Download the file and process it.
|
// Download the file and process it.
|
||||||
const blob = await (
|
const blob = await (
|
||||||
await fetch(attachment.data.url)
|
await fetch(attachment.data.url)
|
||||||
|
|
@ -43,11 +52,19 @@ export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
|
||||||
|
|
||||||
const file = new File([blob], filename);
|
const file = new File([blob], filename);
|
||||||
|
|
||||||
|
await job.log(`Converting attachment [${attachmentId}]`);
|
||||||
|
|
||||||
const { file: processedFile } =
|
const { file: processedFile } =
|
||||||
await processor.process(file);
|
await processor.process(file);
|
||||||
|
|
||||||
|
await job.log(`Generating blurhash for [${attachmentId}]`);
|
||||||
|
|
||||||
|
const { blurhash } = await blurhashProcessor.process(file);
|
||||||
|
|
||||||
const mediaManager = new MediaManager(config);
|
const mediaManager = new MediaManager(config);
|
||||||
|
|
||||||
|
await job.log(`Uploading attachment [${attachmentId}]`);
|
||||||
|
|
||||||
const { path, uploadedFile } =
|
const { path, uploadedFile } =
|
||||||
await mediaManager.addFile(processedFile);
|
await mediaManager.addFile(processedFile);
|
||||||
|
|
||||||
|
|
@ -62,7 +79,12 @@ export const getMediaWorker = (): Worker<MediaJobData, void, MediaJobType> =>
|
||||||
.digest("hex"),
|
.digest("hex"),
|
||||||
mimeType: uploadedFile.type,
|
mimeType: uploadedFile.type,
|
||||||
size: uploadedFile.size,
|
size: uploadedFile.size,
|
||||||
|
blurhash,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await job.log(
|
||||||
|
`✔ Finished processing attachment [${attachmentId}]`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue