diff --git a/bun.lockb b/bun.lockb index 20aad1e5..d1271c65 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/config/config.example.toml b/config/config.example.toml index c95b70d4..081f1f89 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -1,10 +1,16 @@ [database] host = "localhost" -port = 48654 +port = 5432 username = "lysand" password = "password123" database = "lysand" +[redis.queue] +host = "localhost" +post = 6379 +password = "" +database = 0 + [http] base_url = "https://lysand.social" bind = "http://localhost" diff --git a/database/datasource.ts b/database/datasource.ts index a12b19ea..58ac08a4 100644 --- a/database/datasource.ts +++ b/database/datasource.ts @@ -1,3 +1,4 @@ +import { Queue } from "bullmq"; import { getConfig } from "../utils/config"; import { PrismaClient } from "@prisma/client"; @@ -7,4 +8,13 @@ const client = new PrismaClient({ datasourceUrl: `postgresql://${config.database.username}:${config.database.password}@${config.database.host}:${config.database.port}/${config.database.database}`, }); -export { client }; +const federationQueue = new Queue("federation", { + connection: { + host: config.redis.queue.host, + port: config.redis.queue.port, + password: config.redis.queue.password, + db: config.redis.queue.database, + }, +}); + +export { client, federationQueue }; diff --git a/database/entities/Queue.ts b/database/entities/Queue.ts new file mode 100644 index 00000000..29e05d35 --- /dev/null +++ b/database/entities/Queue.ts @@ -0,0 +1,197 @@ +import { getConfig } from "@config"; +import { Worker } from "bullmq"; +import { client, federationQueue } from "~database/datasource"; +import { + statusAndUserRelations, + statusToLysand, + type StatusWithRelations, +} from "./Status"; +import type { User } from "@prisma/client"; + +const config = getConfig(); + +export const federationWorker = new Worker( + "federation", + async job => { + await job.updateProgress(0); + + switch (job.name) { + case "federation": { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const statusId = job.data.id as string; + + const status = await client.status.findUnique({ + where: { id: statusId }, + include: statusAndUserRelations, + }); + + if (!status) return; + + // Only get remote users that follow the author of the status, and the remote mentioned users + const peopleToSendTo = await client.user.findMany({ + where: { + OR: [ + ["public", "unlisted", "private"].includes( + status.visibility + ) + ? { + relationships: { + some: { + subjectId: status.authorId, + following: true, + }, + }, + instanceId: { + not: null, + }, + } + : {}, + // Mentioned users + { + id: { + in: status.mentions.map(m => m.id), + }, + instanceId: { + not: null, + }, + }, + ], + }, + }); + + let peopleDone = 0; + + // Spawn sendToServer job for each user + for (const person of peopleToSendTo) { + await federationQueue.add("sendToServer", { + id: statusId, + user: person, + }); + + peopleDone++; + + await job.updateProgress( + Math.round((peopleDone / peopleToSendTo.length) * 100) + ); + } + break; + } + case "sendToServer": { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const statusId = job.data.id as string; + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const user = job.data.user as User; + + const status = await client.status.findUnique({ + where: { id: statusId }, + include: statusAndUserRelations, + }); + + if (!status) return; + + const response = await federateStatusTo( + status, + status.author, + user + ); + + if (response.status !== 200) { + throw new Error( + `Federation error: ${response.status} ${response.statusText}` + ); + } + + break; + } + } + + await job.updateProgress(100); + + return true; + }, + { + connection: { + host: config.redis.queue.host, + port: config.redis.queue.port, + password: config.redis.queue.password, + db: config.redis.queue.database, + }, + removeOnComplete: { + count: 400, + }, + removeOnFail: { + count: 3000, + }, + } +); + +/** + * Convert a string into an ArrayBuffer + * from https://developers.google.com/web/updates/2012/06/How-to-convert-ArrayBuffer-to-and-from-String + */ +export const str2ab = (str: string) => { + const buf = new ArrayBuffer(str.length); + const bufView = new Uint8Array(buf); + for (let i = 0, strLen = str.length; i < strLen; i++) { + bufView[i] = str.charCodeAt(i); + } + return buf; +}; + +export const federateStatusTo = async ( + status: StatusWithRelations, + sender: User, + user: User +) => { + const privateKey = await crypto.subtle.importKey( + "pkcs8", + str2ab(atob(user.privateKey ?? "")), + "Ed25519", + false, + ["sign"] + ); + + const digest = await crypto.subtle.digest( + "SHA-256", + new TextEncoder().encode("request_body") + ); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const userInbox = new URL((user.endpoints as any).inbox); + + const date = new Date(); + + const signature = await crypto.subtle.sign( + "Ed25519", + privateKey, + new TextEncoder().encode( + `(request-target): post ${userInbox.pathname}\n` + + `host: ${userInbox.host}\n` + + `date: ${date.toUTCString()}\n` + + `digest: SHA-256=${btoa( + String.fromCharCode(...new Uint8Array(digest)) + )}\n` + ) + ); + + const signatureBase64 = btoa( + String.fromCharCode(...new Uint8Array(signature)) + ); + + return fetch(userInbox, { + method: "POST", + headers: { + "Content-Type": "application/json", + Date: date.toUTCString(), + Origin: config.http.base_url, + Signature: `keyId="${sender.uri}",algorithm="ed25519",headers="(request-target) host date digest",signature="${signatureBase64}"`, + }, + body: JSON.stringify(statusToLysand(status)), + }); +}; + +export const addStatusFederationJob = async (statusId: string) => { + await federationQueue.add("federation", { + id: statusId, + }); +}; diff --git a/database/entities/Status.ts b/database/entities/Status.ts index 10fec1b6..7731c7f3 100644 --- a/database/entities/Status.ts +++ b/database/entities/Status.ts @@ -422,6 +422,7 @@ export const statusToAPI = async ( id: status.id, in_reply_to_id: status.inReplyToPostId || null, in_reply_to_account_id: status.inReplyToPost?.authorId || null, + // @ts-expect-error Prisma TypeScript types dont include relations account: userToAPI(status.author), created_at: new Date(status.createdAt).toISOString(), application: status.application diff --git a/package.json b/package.json index 44457c58..85e165d3 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "@aws-sdk/client-s3": "^3.429.0", "@prisma/client": "^5.6.0", "blurhash": "^2.0.5", + "bullmq": "^4.14.2", "chalk": "^5.3.0", "eventemitter3": "^5.0.1", "html-to-text": "^9.0.5", diff --git a/prisma/migrations/20231127010521_add_notifications/migration.sql b/prisma/migrations/20231127010521_add_notifications/migration.sql new file mode 100644 index 00000000..77d5b111 --- /dev/null +++ b/prisma/migrations/20231127010521_add_notifications/migration.sql @@ -0,0 +1,20 @@ +-- CreateTable +CREATE TABLE "Notification" ( + "id" UUID NOT NULL DEFAULT uuid_generate_v7(), + "type" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "notifiedId" UUID NOT NULL, + "accountId" UUID NOT NULL, + "statusId" UUID, + + CONSTRAINT "Notification_pkey" PRIMARY KEY ("id") +); + +-- AddForeignKey +ALTER TABLE "Notification" ADD CONSTRAINT "Notification_notifiedId_fkey" FOREIGN KEY ("notifiedId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Notification" ADD CONSTRAINT "Notification_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Notification" ADD CONSTRAINT "Notification_statusId_fkey" FOREIGN KEY ("statusId") REFERENCES "Status"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/utils/config.ts b/utils/config.ts index f48498f4..522e6e74 100644 --- a/utils/config.ts +++ b/utils/config.ts @@ -9,6 +9,15 @@ export interface ConfigType { database: string; }; + redis: { + queue: { + host: string; + port: number; + password: string; + database: number; + }; + }; + http: { base_url: string; bind: string; @@ -145,6 +154,14 @@ export const configDefaults: ConfigType = { password: "postgres", database: "lysand", }, + redis: { + queue: { + host: "localhost", + port: 6379, + password: "", + database: 0, + }, + }, instance: { banner: "", description: "",