feat(federation): Prioritize delivery to instance inbox, and use delivery queue in more places

This commit is contained in:
Jesse Wierzbinski 2024-11-25 20:50:55 +01:00
parent 7a73a1a24e
commit c899f12893
No known key found for this signature in database
2 changed files with 88 additions and 123 deletions

View file

@ -17,6 +17,8 @@ import {
import type { import type {
Collection, Collection,
Unfollow, Unfollow,
FollowAccept as VersiaFollowAccept,
FollowReject as VersiaFollowReject,
User as VersiaUser, User as VersiaUser,
} from "@versia/federation/types"; } from "@versia/federation/types";
import { Notification, db } from "@versia/kit/db"; import { Notification, db } from "@versia/kit/db";
@ -47,15 +49,11 @@ import {
} from "drizzle-orm"; } from "drizzle-orm";
import { htmlToText } from "html-to-text"; import { htmlToText } from "html-to-text";
import { z } from "zod"; import { z } from "zod";
import { import { findManyUsers } from "~/classes/functions/user";
findManyUsers,
followAcceptToVersia,
followRejectToVersia,
followRequestToVersia,
} from "~/classes/functions/user";
import { searchManager } from "~/classes/search/search-manager"; import { searchManager } from "~/classes/search/search-manager";
import { type Config, config } from "~/packages/config-manager"; import { type Config, config } from "~/packages/config-manager";
import type { KnownEntity } from "~/types/api.ts"; import type { KnownEntity } from "~/types/api.ts";
import { DeliveryJobType, deliveryQueue } from "~/worker.ts";
import { BaseInterface } from "./base.ts"; import { BaseInterface } from "./base.ts";
import { Emoji } from "./emoji.ts"; import { Emoji } from "./emoji.ts";
import { Instance } from "./instance.ts"; import { Instance } from "./instance.ts";
@ -264,19 +262,17 @@ export class User extends BaseInterface<typeof Users, UserWithRelations> {
}); });
if (otherUser.isRemote()) { if (otherUser.isRemote()) {
const { ok } = await this.federateToUser( await deliveryQueue.add(DeliveryJobType.FederateEntity, {
followRequestToVersia(this, otherUser), entity: {
otherUser, type: "Follow",
); id: crypto.randomUUID(),
author: this.getUri(),
if (!ok) { followee: otherUser.getUri(),
await foundRelationship.update({ created_at: new Date().toISOString(),
requested: false, },
following: false, recipientId: otherUser.id,
senderId: this.id,
}); });
return foundRelationship;
}
} else { } else {
await Notification.insert({ await Notification.insert({
accountId: this.id, accountId: this.id,
@ -293,15 +289,11 @@ export class User extends BaseInterface<typeof Users, UserWithRelations> {
relationship: Relationship, relationship: Relationship,
): Promise<boolean> { ): Promise<boolean> {
if (followee.isRemote()) { if (followee.isRemote()) {
// TODO: This should reschedule for a later time and maybe notify the server admin if it fails too often await deliveryQueue.add(DeliveryJobType.FederateEntity, {
const { ok } = await this.federateToUser( entity: this.unfollowToVersia(followee),
this.unfollowToVersia(followee), recipientId: followee.id,
followee, senderId: this.id,
); });
if (!ok) {
return false;
}
} else if (!this.data.isLocked) { } else if (!this.data.isLocked) {
if (relationship.data.following) { if (relationship.data.following) {
await Notification.insert({ await Notification.insert({
@ -337,17 +329,51 @@ export class User extends BaseInterface<typeof Users, UserWithRelations> {
} }
public async sendFollowAccept(follower: User): Promise<void> { public async sendFollowAccept(follower: User): Promise<void> {
await this.federateToUser( if (!follower.isRemote()) {
followAcceptToVersia(follower, this), throw new Error("Follower must be a remote user");
follower, }
);
if (this.isRemote()) {
throw new Error("Followee must be a local user");
}
const entity: VersiaFollowAccept = {
type: "FollowAccept",
id: crypto.randomUUID(),
author: this.getUri(),
created_at: new Date().toISOString(),
follower: follower.getUri(),
};
await deliveryQueue.add(DeliveryJobType.FederateEntity, {
entity,
recipientId: follower.id,
senderId: this.id,
});
} }
public async sendFollowReject(follower: User): Promise<void> { public async sendFollowReject(follower: User): Promise<void> {
await this.federateToUser( if (!follower.isRemote()) {
followRejectToVersia(follower, this), throw new Error("Follower must be a remote user");
follower, }
);
if (this.isRemote()) {
throw new Error("Followee must be a local user");
}
const entity: VersiaFollowReject = {
type: "FollowReject",
id: crypto.randomUUID(),
author: this.getUri(),
created_at: new Date().toISOString(),
follower: follower.getUri(),
};
await deliveryQueue.add(DeliveryJobType.FederateEntity, {
entity,
recipientId: follower.id,
senderId: this.id,
});
} }
public static async webFinger( public static async webFinger(
@ -980,9 +1006,16 @@ export class User extends BaseInterface<typeof Users, UserWithRelations> {
), ),
); );
for (const follower of followers) { await deliveryQueue.addBulk(
await this.federateToUser(entity, follower); followers.map((follower) => ({
} name: DeliveryJobType.FederateEntity,
data: {
entity,
recipientId: follower.id,
senderId: this.id,
},
})),
);
} }
/** /**
@ -996,24 +1029,25 @@ export class User extends BaseInterface<typeof Users, UserWithRelations> {
entity: KnownEntity, entity: KnownEntity,
user: User, user: User,
): Promise<{ ok: boolean }> { ): Promise<{ ok: boolean }> {
const { headers } = await this.sign( const inbox = user.data.instance?.inbox || user.data.endpoints?.inbox;
entity,
user.data.endpoints?.inbox ?? "", if (!inbox) {
throw new Error(
`User ${chalk.gray(user.getUri())} does not have an inbox endpoint`,
); );
}
const { headers } = await this.sign(entity, inbox);
try { try {
await new FederationRequester().post( await new FederationRequester().post(inbox, entity, {
user.data.endpoints?.inbox ?? "",
entity,
{
// @ts-expect-error Bun extension // @ts-expect-error Bun extension
proxy: config.http.proxy.address, proxy: config.http.proxy.address,
headers: { headers: {
...headers.toJSON(), ...headers.toJSON(),
"Content-Type": "application/json; charset=utf-8", "Content-Type": "application/json; charset=utf-8",
}, },
}, });
);
} catch (e) { } catch (e) {
getLogger(["federation", "delivery"]) getLogger(["federation", "delivery"])
.error`Federating ${chalk.gray(entity.type)} to ${user.getUri()} ${chalk.bold.red("failed")}`; .error`Federating ${chalk.gray(entity.type)} to ${user.getUri()} ${chalk.bold.red("failed")}`;

View file

@ -1,8 +1,3 @@
import type {
Follow,
FollowAccept,
FollowReject,
} from "@versia/federation/types";
import { import {
type Application, type Application,
type Emoji, type Emoji,
@ -138,67 +133,3 @@ export const findManyUsers = async (
return output.map((user) => transformOutputToUserWithRelations(user)); return output.map((user) => transformOutputToUserWithRelations(user));
}; };
export const followRequestToVersia = (
follower: User,
followee: User,
): Follow => {
if (follower.isRemote()) {
throw new Error("Follower must be a local user");
}
if (!followee.isRemote()) {
throw new Error("Followee must be a remote user");
}
if (!followee.data.uri) {
throw new Error("Followee must have a URI in database");
}
const id = crypto.randomUUID();
return {
type: "Follow",
id,
author: follower.getUri(),
followee: followee.getUri(),
created_at: new Date().toISOString(),
};
};
export const followAcceptToVersia = (
follower: User,
followee: User,
): FollowAccept => {
if (!follower.isRemote()) {
throw new Error("Follower must be a remote user");
}
if (followee.isRemote()) {
throw new Error("Followee must be a local user");
}
if (!follower.data.uri) {
throw new Error("Follower must have a URI in database");
}
const id = crypto.randomUUID();
return {
type: "FollowAccept",
id,
author: followee.getUri(),
created_at: new Date().toISOString(),
follower: follower.getUri(),
};
};
export const followRejectToVersia = (
follower: User,
followee: User,
): FollowReject => {
return {
...followAcceptToVersia(follower, followee),
type: "FollowReject",
};
};