From c899f12893e1f07e8d1039ee367f9451183dda64 Mon Sep 17 00:00:00 2001 From: Jesse Wierzbinski Date: Mon, 25 Nov 2024 20:50:55 +0100 Subject: [PATCH] feat(federation): :sparkles: Prioritize delivery to instance inbox, and use delivery queue in more places --- classes/database/user.ts | 142 +++++++++++++++++++++++--------------- classes/functions/user.ts | 69 ------------------ 2 files changed, 88 insertions(+), 123 deletions(-) diff --git a/classes/database/user.ts b/classes/database/user.ts index 6ec91ef1..abd5c5b9 100644 --- a/classes/database/user.ts +++ b/classes/database/user.ts @@ -17,6 +17,8 @@ import { import type { Collection, Unfollow, + FollowAccept as VersiaFollowAccept, + FollowReject as VersiaFollowReject, User as VersiaUser, } from "@versia/federation/types"; import { Notification, db } from "@versia/kit/db"; @@ -47,15 +49,11 @@ import { } from "drizzle-orm"; import { htmlToText } from "html-to-text"; import { z } from "zod"; -import { - findManyUsers, - followAcceptToVersia, - followRejectToVersia, - followRequestToVersia, -} from "~/classes/functions/user"; +import { findManyUsers } from "~/classes/functions/user"; import { searchManager } from "~/classes/search/search-manager"; import { type Config, config } from "~/packages/config-manager"; import type { KnownEntity } from "~/types/api.ts"; +import { DeliveryJobType, deliveryQueue } from "~/worker.ts"; import { BaseInterface } from "./base.ts"; import { Emoji } from "./emoji.ts"; import { Instance } from "./instance.ts"; @@ -264,19 +262,17 @@ export class User extends BaseInterface { }); if (otherUser.isRemote()) { - const { ok } = await this.federateToUser( - followRequestToVersia(this, otherUser), - otherUser, - ); - - if (!ok) { - await foundRelationship.update({ - requested: false, - following: false, - }); - - return foundRelationship; - } + await deliveryQueue.add(DeliveryJobType.FederateEntity, { + entity: { + type: "Follow", + id: crypto.randomUUID(), + author: this.getUri(), + followee: otherUser.getUri(), + created_at: new Date().toISOString(), + }, + recipientId: otherUser.id, + senderId: this.id, + }); } else { await Notification.insert({ accountId: this.id, @@ -293,15 +289,11 @@ export class User extends BaseInterface { relationship: Relationship, ): Promise { if (followee.isRemote()) { - // TODO: This should reschedule for a later time and maybe notify the server admin if it fails too often - const { ok } = await this.federateToUser( - this.unfollowToVersia(followee), - followee, - ); - - if (!ok) { - return false; - } + await deliveryQueue.add(DeliveryJobType.FederateEntity, { + entity: this.unfollowToVersia(followee), + recipientId: followee.id, + senderId: this.id, + }); } else if (!this.data.isLocked) { if (relationship.data.following) { await Notification.insert({ @@ -337,17 +329,51 @@ export class User extends BaseInterface { } public async sendFollowAccept(follower: User): Promise { - await this.federateToUser( - followAcceptToVersia(follower, this), - follower, - ); + if (!follower.isRemote()) { + throw new Error("Follower must be a remote user"); + } + + if (this.isRemote()) { + throw new Error("Followee must be a local user"); + } + + const entity: VersiaFollowAccept = { + type: "FollowAccept", + id: crypto.randomUUID(), + author: this.getUri(), + created_at: new Date().toISOString(), + follower: follower.getUri(), + }; + + await deliveryQueue.add(DeliveryJobType.FederateEntity, { + entity, + recipientId: follower.id, + senderId: this.id, + }); } public async sendFollowReject(follower: User): Promise { - await this.federateToUser( - followRejectToVersia(follower, this), - follower, - ); + if (!follower.isRemote()) { + throw new Error("Follower must be a remote user"); + } + + if (this.isRemote()) { + throw new Error("Followee must be a local user"); + } + + const entity: VersiaFollowReject = { + type: "FollowReject", + id: crypto.randomUUID(), + author: this.getUri(), + created_at: new Date().toISOString(), + follower: follower.getUri(), + }; + + await deliveryQueue.add(DeliveryJobType.FederateEntity, { + entity, + recipientId: follower.id, + senderId: this.id, + }); } public static async webFinger( @@ -980,9 +1006,16 @@ export class User extends BaseInterface { ), ); - for (const follower of followers) { - await this.federateToUser(entity, follower); - } + await deliveryQueue.addBulk( + followers.map((follower) => ({ + name: DeliveryJobType.FederateEntity, + data: { + entity, + recipientId: follower.id, + senderId: this.id, + }, + })), + ); } /** @@ -996,24 +1029,25 @@ export class User extends BaseInterface { entity: KnownEntity, user: User, ): Promise<{ ok: boolean }> { - const { headers } = await this.sign( - entity, - user.data.endpoints?.inbox ?? "", - ); + const inbox = user.data.instance?.inbox || user.data.endpoints?.inbox; + + if (!inbox) { + throw new Error( + `User ${chalk.gray(user.getUri())} does not have an inbox endpoint`, + ); + } + + const { headers } = await this.sign(entity, inbox); try { - await new FederationRequester().post( - user.data.endpoints?.inbox ?? "", - entity, - { - // @ts-expect-error Bun extension - proxy: config.http.proxy.address, - headers: { - ...headers.toJSON(), - "Content-Type": "application/json; charset=utf-8", - }, + await new FederationRequester().post(inbox, entity, { + // @ts-expect-error Bun extension + proxy: config.http.proxy.address, + headers: { + ...headers.toJSON(), + "Content-Type": "application/json; charset=utf-8", }, - ); + }); } catch (e) { getLogger(["federation", "delivery"]) .error`Federating ${chalk.gray(entity.type)} to ${user.getUri()} ${chalk.bold.red("failed")}`; diff --git a/classes/functions/user.ts b/classes/functions/user.ts index 0f126996..476daa5f 100644 --- a/classes/functions/user.ts +++ b/classes/functions/user.ts @@ -1,8 +1,3 @@ -import type { - Follow, - FollowAccept, - FollowReject, -} from "@versia/federation/types"; import { type Application, type Emoji, @@ -138,67 +133,3 @@ export const findManyUsers = async ( return output.map((user) => transformOutputToUserWithRelations(user)); }; - -export const followRequestToVersia = ( - follower: User, - followee: User, -): Follow => { - if (follower.isRemote()) { - throw new Error("Follower must be a local user"); - } - - if (!followee.isRemote()) { - throw new Error("Followee must be a remote user"); - } - - if (!followee.data.uri) { - throw new Error("Followee must have a URI in database"); - } - - const id = crypto.randomUUID(); - - return { - type: "Follow", - id, - author: follower.getUri(), - followee: followee.getUri(), - created_at: new Date().toISOString(), - }; -}; - -export const followAcceptToVersia = ( - follower: User, - followee: User, -): FollowAccept => { - if (!follower.isRemote()) { - throw new Error("Follower must be a remote user"); - } - - if (followee.isRemote()) { - throw new Error("Followee must be a local user"); - } - - if (!follower.data.uri) { - throw new Error("Follower must have a URI in database"); - } - - const id = crypto.randomUUID(); - - return { - type: "FollowAccept", - id, - author: followee.getUri(), - created_at: new Date().toISOString(), - follower: follower.getUri(), - }; -}; - -export const followRejectToVersia = ( - follower: User, - followee: User, -): FollowReject => { - return { - ...followAcceptToVersia(follower, followee), - type: "FollowReject", - }; -};