feat(api): Begin work on Streaming API

This commit is contained in:
Jesse Wierzbinski 2025-06-29 22:23:03 +02:00
parent 59cd519337
commit dc802ff5f6
No known key found for this signature in database
2 changed files with 583 additions and 0 deletions

View file

@ -0,0 +1,464 @@
import { afterAll, beforeAll, describe, expect, test } from "bun:test";
import { randomUUIDv7 } from "bun";
import { setupDatabase, User } from "../db/index.ts";
import { connection } from "../redis.ts";
import {
type EventTypes,
StreamingTimeline,
type TimelineTypes,
} from "./streaming.ts";
// Set up database and create test users
await setupDatabase();
const testUsers: User[] = [];
beforeAll(async (): Promise<void> => {
// Ensure Redis is connected
await connection.ping();
// Create test users*
// Can't use stuff from @versia-server/tests because it depends on this package
for (let i = 0; i < 2; i++) {
const user = await User.register(
`test-streaming-${randomUUIDv7().slice(0, 8)}`,
{
email: `test-streaming-${i}@example.com`,
},
);
testUsers.push(user);
}
});
afterAll(async (): Promise<void> => {
// Delete test users
for (const user of testUsers) {
await user.delete();
}
});
describe("StreamingTimeline", (): void => {
test("should create timeline with user", (): void => {
const timeline = new StreamingTimeline("user", testUsers[0]);
expect(timeline.timeline).toBe("user");
expect(timeline.user).toBe(testUsers[0]);
expect(timeline.emitter).toBeDefined();
timeline.close();
});
test("should create timeline without user", (): void => {
const timeline = new StreamingTimeline("public");
expect(timeline.timeline).toBe("public");
expect(timeline.user).toBe(null);
expect(timeline.emitter).toBeDefined();
timeline.close();
});
test("should generate correct channel name for user timeline", (): void => {
const timeline = new StreamingTimeline("user", testUsers[0]);
// Access private property for testing
const channelName = (timeline as unknown as { channelName: string })
.channelName;
expect(channelName).toBe(`timeline:user:${testUsers[0].id}`);
timeline.close();
});
test("should generate correct channel name for public timeline", (): void => {
const timeline = new StreamingTimeline("public");
// Access private property for testing
const channelName = (timeline as unknown as { channelName: string })
.channelName;
expect(channelName).toBe("timeline:public");
timeline.close();
});
test("should emit event when publishing to same channel", async (): Promise<void> => {
const emitterTimeline = new StreamingTimeline("public");
const receiverTimeline = new StreamingTimeline("public");
let receivedType = "";
let receivedPayload: unknown = null;
// Listen for update events
receiverTimeline.emitter.on("update", (payload): void => {
receivedType = "update";
receivedPayload = payload;
});
// Wait a bit for subscription to be established
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Emit an event
emitterTimeline.emitEvent("update", { id: "test-status-123" });
// Wait for event to be processed
await new Promise((resolve): void => {
setTimeout(resolve, 50);
});
expect(receivedType).toBe("update");
expect(receivedPayload).toEqual({ id: "test-status-123" });
emitterTimeline.close();
receiverTimeline.close();
});
test("should not emit event for different channel", async (): Promise<void> => {
const timeline1 = new StreamingTimeline("public");
const timeline2 = new StreamingTimeline("user", testUsers[0]);
let receivedEvent = false;
// Listen for events on timeline1
timeline1.emitter.on("update", (): void => {
receivedEvent = true;
});
// Wait a bit for subscription to be established
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Emit event on timeline2 (different channel)
timeline2.emitEvent("update", { id: "test-status-123" });
// Wait for potential event processing
await new Promise((resolve): void => {
setTimeout(resolve, 50);
});
expect(receivedEvent).toBe(false);
timeline1.close();
timeline2.close();
});
test("should handle different event types", async (): Promise<void> => {
const timeline = new StreamingTimeline("public");
const receivedEvents: Array<{ type: string; payload: unknown }> = [];
// Listen for different event types
timeline.emitter.on("update", (payload): void => {
receivedEvents.push({ type: "update", payload });
});
timeline.emitter.on("delete", (payload): void => {
receivedEvents.push({ type: "delete", payload });
});
timeline.emitter.on("notification", (payload): void => {
receivedEvents.push({ type: "notification", payload });
});
// Wait for subscription
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Emit different events
timeline.emitEvent("update", { id: "status-1" });
timeline.emitEvent("delete", { id: "status-2" });
timeline.emitEvent("notification", { id: "notif-1" });
// Wait for events
await new Promise((resolve): void => {
setTimeout(resolve, 100);
});
expect(receivedEvents).toHaveLength(3);
expect(receivedEvents[0]).toEqual({
type: "update",
payload: { id: "status-1" },
});
expect(receivedEvents[1]).toEqual({
type: "delete",
payload: { id: "status-2" },
});
expect(receivedEvents[2]).toEqual({
type: "notification",
payload: { id: "notif-1" },
});
timeline.close();
});
test("should handle malformed JSON messages gracefully", async (): Promise<void> => {
const timeline = new StreamingTimeline("public");
let receivedEvent = false;
let warningLogged = false;
// Mock console.warn to capture warnings
const originalWarn = console.warn;
console.warn = (): void => {
warningLogged = true;
};
timeline.emitter.on("update", (): void => {
receivedEvent = true;
});
// Wait for subscription
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Publish malformed JSON directly to Redis
const channelName = (timeline as unknown as { channelName: string })
.channelName;
await connection.publish(channelName, "invalid json");
// Wait for message processing
await new Promise((resolve): void => {
setTimeout(resolve, 50);
});
expect(receivedEvent).toBe(false);
expect(warningLogged).toBe(true);
// Restore console.warn
console.warn = originalWarn;
timeline.close();
});
test("should handle messages with missing type or payload", async (): Promise<void> => {
const timeline = new StreamingTimeline("public");
let receivedEvent = false;
timeline.emitter.on("update", (): void => {
receivedEvent = true;
});
// Wait for subscription
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Publish message with missing type
const channelName = (timeline as unknown as { channelName: string })
.channelName;
await connection.publish(
channelName,
JSON.stringify({ payload: { id: "test" } }),
);
// Publish message with missing payload
await connection.publish(
channelName,
JSON.stringify({ type: "update" }),
);
// Wait for message processing
await new Promise((resolve): void => {
setTimeout(resolve, 50);
});
expect(receivedEvent).toBe(false);
timeline.close();
});
test("should properly clean up resources on close", async (): Promise<void> => {
const timeline = new StreamingTimeline("public");
// Wait for subscription
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Verify subscription is active
const channelName = (timeline as unknown as { channelName: string })
.channelName;
expect(
timeline.redisConnection.listenerCount("message"),
).toBeGreaterThan(0);
// Close timeline
timeline.close();
// Wait a bit
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Try to emit event - should not be received
let receivedEvent = false;
timeline.emitter.on("update", (): void => {
receivedEvent = true;
});
await connection.publish(
channelName,
JSON.stringify({ type: "update", payload: { id: "test" } }),
);
await new Promise((resolve): void => {
setTimeout(resolve, 50);
});
expect(receivedEvent).toBe(false);
});
test("should handle multiple timelines with different channels", async (): Promise<void> => {
const timeline1 = new StreamingTimeline("public");
const timeline2 = new StreamingTimeline("user", testUsers[0]);
const timeline3 = new StreamingTimeline("user", testUsers[1]);
const events1: unknown[] = [];
const events2: unknown[] = [];
const events3: unknown[] = [];
timeline1.emitter.on("update", (payload): void => {
events1.push(payload);
});
timeline2.emitter.on("update", (payload): void => {
events2.push(payload);
});
timeline3.emitter.on("update", (payload): void => {
events3.push(payload);
});
// Wait for subscriptions
await new Promise((resolve): void => {
setTimeout(resolve, 20);
});
// Emit events to different timelines
timeline1.emitEvent("update", { id: "public-1" });
timeline2.emitEvent("update", { id: "user1-1" });
timeline3.emitEvent("update", { id: "user2-1" });
// Wait for events
await new Promise((resolve): void => {
setTimeout(resolve, 100);
});
expect(events1).toEqual([{ id: "public-1" }]);
expect(events2).toEqual([{ id: "user1-1" }]);
expect(events3).toEqual([{ id: "user2-1" }]);
timeline1.close();
timeline2.close();
timeline3.close();
});
test("should handle filters_changed event with null payload", async (): Promise<void> => {
const timeline = new StreamingTimeline("public");
let receivedType = "";
let receivedPayload: unknown;
timeline.emitter.on("filters_changed", (payload): void => {
receivedType = "filters_changed";
receivedPayload = payload;
});
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
timeline.emitEvent("filters_changed", null);
await new Promise((resolve): void => {
setTimeout(resolve, 50);
});
expect(receivedType).toBe("filters_changed");
expect(receivedPayload).toBeNull();
timeline.close();
});
test("should handle all supported timeline types", (): void => {
const timelineTypes: TimelineTypes[] = [
"public",
"public:media",
"public:local",
"public:local:media",
"public:remote",
"public:remote:media",
"hashtag",
"hashtag:local",
"user",
"user:notification",
"list",
];
const timelines = timelineTypes.map(
(type) => new StreamingTimeline(type),
);
for (const [index, timeline] of timelines.entries()) {
expect(timeline.timeline).toBe(timelineTypes[index]);
const channelName = (timeline as unknown as { channelName: string })
.channelName;
expect(channelName).toBe(`timeline:${timelineTypes[index]}`);
}
// Clean up
for (const timeline of timelines) {
timeline.close();
}
});
test("should handle all supported event types", async (): Promise<void> => {
const timeline = new StreamingTimeline("public");
const eventTypes: EventTypes[] = [
"update",
"delete",
"notification",
"filters_changed",
"announcement",
"announcement.reaction",
"announcement.delete",
"status.update",
];
const receivedEvents: Array<{ type: string; payload: unknown }> = [];
// Set up listeners for all event types
for (const eventType of eventTypes) {
timeline.emitter.on(eventType, (payload): void => {
receivedEvents.push({ type: eventType, payload });
});
}
await new Promise((resolve): void => {
setTimeout(resolve, 10);
});
// Emit all event types
for (const [index, eventType] of eventTypes.entries()) {
const payload =
eventType === "filters_changed"
? null
: { id: `${eventType}-${index}` };
timeline.emitEvent(eventType, payload as never);
}
await new Promise((resolve): void => {
setTimeout(resolve, 100);
});
expect(receivedEvents).toHaveLength(eventTypes.length);
for (const [index, eventType] of eventTypes.entries()) {
const expectedPayload =
eventType === "filters_changed"
? null
: { id: `${eventType}-${index}` };
expect(receivedEvents[index]).toEqual({
type: eventType,
payload: expectedPayload,
});
}
timeline.close();
});
});

View file

@ -0,0 +1,119 @@
/**
* Handles live updates to timelines using Redis to communicate between instances.
*
* Used in the Streaming API to push updates to clients in real-time.
* @see https://docs.joinmastodon.org/methods/streaming/#websocket
*/
import { config } from "@versia-server/config";
import IORedis from "ioredis";
import mitt from "mitt";
import type { User } from "../db/user.ts";
import { connection } from "../redis.ts";
export type TimelineTypes =
| "public"
| "public:media"
| "public:local"
| "public:local:media"
| "public:remote"
| "public:remote:media"
| "hashtag"
| "hashtag:local"
| "user"
| "user:notification"
| "list";
export type EventTypes =
| "update"
| "delete"
| "notification"
| "filters_changed"
| "announcement"
| "announcement.reaction"
| "announcement.delete"
| "status.update";
export type EventPayloads = {
update: { id: string };
delete: { id: string };
notification: { id: string };
filters_changed: null;
announcement: { id: string };
"announcement.reaction": { id: string };
"announcement.delete": { id: string };
"status.update": { id: string };
};
export class StreamingTimeline {
public readonly emitter =
mitt<{
[K in EventTypes]: EventPayloads[K];
}>();
public readonly redisConnection: IORedis;
public constructor(
public readonly timeline: TimelineTypes,
public readonly user: User | null = null,
) {
this.redisConnection = new IORedis({
host: config.redis.queue.host,
port: config.redis.queue.port,
password: config.redis.queue.password,
db: config.redis.queue.database,
maxRetriesPerRequest: null,
});
this.initializeRedisWatcher();
}
private get channelName(): string {
if (this.user) {
return `timeline:${this.timeline}:${this.user.id}`;
}
return `timeline:${this.timeline}`;
}
private messageHandler = (channel: string, message: string): void => {
if (channel === this.channelName) {
try {
const parsed = JSON.parse(message);
if (
typeof parsed === "object" &&
parsed !== null &&
"type" in parsed &&
"payload" in parsed
) {
const { type, payload } = parsed as {
type: EventTypes;
payload: unknown;
};
this.emitter.emit(
type,
payload as EventPayloads[typeof type],
);
}
} catch (error) {
// Silently ignore malformed messages
console.warn("Failed to parse streaming message:", error);
}
}
};
private initializeRedisWatcher(): void {
this.redisConnection.subscribe(this.channelName);
this.redisConnection.on("message", this.messageHandler);
}
public close(): void {
this.redisConnection.unsubscribe(this.channelName);
this.redisConnection.off("message", this.messageHandler);
}
public emitEvent<K extends EventTypes>(
type: K,
payload: EventPayloads[K],
): void {
connection.publish(this.channelName, JSON.stringify({ type, payload }));
}
}