import { afterAll, beforeAll, describe, expect, test } from "bun:test"; import { randomUUIDv7 } from "bun"; import { User } from "../db/user.ts"; import { connection } from "../redis.ts"; import { setupDatabase } from "../tables/db.ts"; import { type EventTypes, StreamingTimeline, type TimelineTypes, } from "./streaming.ts"; // Set up database and create test users await setupDatabase(); const testUsers: User[] = []; beforeAll(async (): Promise => { // Ensure Redis is connected await connection.ping(); // Create test users* // Can't use stuff from @versia-server/tests because it depends on this package for (let i = 0; i < 2; i++) { const user = await User.register( `test-streaming-${randomUUIDv7().slice(0, 8)}`, { email: `test-streaming-${i}@example.com`, }, ); testUsers.push(user); } }); afterAll(async (): Promise => { // Delete test users for (const user of testUsers) { await user.delete(); } }); describe("StreamingTimeline", (): void => { test("should create timeline with user", (): void => { const timeline = new StreamingTimeline("user", testUsers[0]); expect(timeline.timeline).toBe("user"); expect(timeline.user).toBe(testUsers[0]); expect(timeline.emitter).toBeDefined(); timeline.close(); }); test("should create timeline without user", (): void => { const timeline = new StreamingTimeline("public"); expect(timeline.timeline).toBe("public"); expect(timeline.user).toBe(null); expect(timeline.emitter).toBeDefined(); timeline.close(); }); test("should generate correct channel name for user timeline", (): void => { const timeline = new StreamingTimeline("user", testUsers[0]); // Access private property for testing const channelName = (timeline as unknown as { channelName: string }) .channelName; expect(channelName).toBe(`timeline:user:${testUsers[0].id}`); timeline.close(); }); test("should generate correct channel name for public timeline", (): void => { const timeline = new StreamingTimeline("public"); // Access private property for testing const channelName = (timeline as unknown as { channelName: string }) .channelName; expect(channelName).toBe("timeline:public"); timeline.close(); }); test("should emit event when publishing to same channel", async (): Promise => { const emitterTimeline = new StreamingTimeline("public"); const receiverTimeline = new StreamingTimeline("public"); let receivedType = ""; let receivedPayload: unknown = null; // Listen for update events receiverTimeline.emitter.on("update", (payload): void => { receivedType = "update"; receivedPayload = payload; }); // Wait a bit for subscription to be established await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Emit an event emitterTimeline.emitEvent("update", { id: "test-status-123" }); // Wait for event to be processed await new Promise((resolve): void => { setTimeout(resolve, 50); }); expect(receivedType).toBe("update"); expect(receivedPayload).toEqual({ id: "test-status-123" }); emitterTimeline.close(); receiverTimeline.close(); }); test("should not emit event for different channel", async (): Promise => { const timeline1 = new StreamingTimeline("public"); const timeline2 = new StreamingTimeline("user", testUsers[0]); let receivedEvent = false; // Listen for events on timeline1 timeline1.emitter.on("update", (): void => { receivedEvent = true; }); // Wait a bit for subscription to be established await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Emit event on timeline2 (different channel) timeline2.emitEvent("update", { id: "test-status-123" }); // Wait for potential event processing await new Promise((resolve): void => { setTimeout(resolve, 50); }); expect(receivedEvent).toBe(false); timeline1.close(); timeline2.close(); }); test("should handle different event types", async (): Promise => { const timeline = new StreamingTimeline("public"); const receivedEvents: Array<{ type: string; payload: unknown }> = []; // Listen for different event types timeline.emitter.on("update", (payload): void => { receivedEvents.push({ type: "update", payload }); }); timeline.emitter.on("delete", (payload): void => { receivedEvents.push({ type: "delete", payload }); }); timeline.emitter.on("notification", (payload): void => { receivedEvents.push({ type: "notification", payload }); }); // Wait for subscription await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Emit different events timeline.emitEvent("update", { id: "status-1" }); timeline.emitEvent("delete", { id: "status-2" }); timeline.emitEvent("notification", { id: "notif-1" }); // Wait for events await new Promise((resolve): void => { setTimeout(resolve, 100); }); expect(receivedEvents).toHaveLength(3); expect(receivedEvents[0]).toEqual({ type: "update", payload: { id: "status-1" }, }); expect(receivedEvents[1]).toEqual({ type: "delete", payload: { id: "status-2" }, }); expect(receivedEvents[2]).toEqual({ type: "notification", payload: { id: "notif-1" }, }); timeline.close(); }); test("should handle malformed JSON messages gracefully", async (): Promise => { const timeline = new StreamingTimeline("public"); let receivedEvent = false; let warningLogged = false; // Mock console.warn to capture warnings const originalWarn = console.warn; console.warn = (): void => { warningLogged = true; }; timeline.emitter.on("update", (): void => { receivedEvent = true; }); // Wait for subscription await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Publish malformed JSON directly to Redis const channelName = (timeline as unknown as { channelName: string }) .channelName; await connection.publish(channelName, "invalid json"); // Wait for message processing await new Promise((resolve): void => { setTimeout(resolve, 50); }); expect(receivedEvent).toBe(false); expect(warningLogged).toBe(true); // Restore console.warn console.warn = originalWarn; timeline.close(); }); test("should handle messages with missing type or payload", async (): Promise => { const timeline = new StreamingTimeline("public"); let receivedEvent = false; timeline.emitter.on("update", (): void => { receivedEvent = true; }); // Wait for subscription await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Publish message with missing type const channelName = (timeline as unknown as { channelName: string }) .channelName; await connection.publish( channelName, JSON.stringify({ payload: { id: "test" } }), ); // Publish message with missing payload await connection.publish( channelName, JSON.stringify({ type: "update" }), ); // Wait for message processing await new Promise((resolve): void => { setTimeout(resolve, 50); }); expect(receivedEvent).toBe(false); timeline.close(); }); test("should properly clean up resources on close", async (): Promise => { const timeline = new StreamingTimeline("public"); // Wait for subscription await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Verify subscription is active const channelName = (timeline as unknown as { channelName: string }) .channelName; expect( timeline.redisConnection.listenerCount("message"), ).toBeGreaterThan(0); // Close timeline timeline.close(); // Wait a bit await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Try to emit event - should not be received let receivedEvent = false; timeline.emitter.on("update", (): void => { receivedEvent = true; }); await connection.publish( channelName, JSON.stringify({ type: "update", payload: { id: "test" } }), ); await new Promise((resolve): void => { setTimeout(resolve, 50); }); expect(receivedEvent).toBe(false); }); test("should handle multiple timelines with different channels", async (): Promise => { const timeline1 = new StreamingTimeline("public"); const timeline2 = new StreamingTimeline("user", testUsers[0]); const timeline3 = new StreamingTimeline("user", testUsers[1]); const events1: unknown[] = []; const events2: unknown[] = []; const events3: unknown[] = []; timeline1.emitter.on("update", (payload): void => { events1.push(payload); }); timeline2.emitter.on("update", (payload): void => { events2.push(payload); }); timeline3.emitter.on("update", (payload): void => { events3.push(payload); }); // Wait for subscriptions await new Promise((resolve): void => { setTimeout(resolve, 20); }); // Emit events to different timelines timeline1.emitEvent("update", { id: "public-1" }); timeline2.emitEvent("update", { id: "user1-1" }); timeline3.emitEvent("update", { id: "user2-1" }); // Wait for events await new Promise((resolve): void => { setTimeout(resolve, 100); }); expect(events1).toEqual([{ id: "public-1" }]); expect(events2).toEqual([{ id: "user1-1" }]); expect(events3).toEqual([{ id: "user2-1" }]); timeline1.close(); timeline2.close(); timeline3.close(); }); test("should handle filters_changed event with null payload", async (): Promise => { const timeline = new StreamingTimeline("public"); let receivedType = ""; let receivedPayload: unknown; timeline.emitter.on("filters_changed", (payload): void => { receivedType = "filters_changed"; receivedPayload = payload; }); await new Promise((resolve): void => { setTimeout(resolve, 10); }); timeline.emitEvent("filters_changed", null); await new Promise((resolve): void => { setTimeout(resolve, 50); }); expect(receivedType).toBe("filters_changed"); expect(receivedPayload).toBeNull(); timeline.close(); }); test("should handle all supported timeline types", (): void => { const timelineTypes: TimelineTypes[] = [ "public", "public:media", "public:local", "public:local:media", "public:remote", "public:remote:media", "hashtag", "hashtag:local", "user", "user:notification", "list", ]; const timelines = timelineTypes.map( (type) => new StreamingTimeline(type), ); for (const [index, timeline] of timelines.entries()) { expect(timeline.timeline).toBe(timelineTypes[index]); const channelName = (timeline as unknown as { channelName: string }) .channelName; expect(channelName).toBe(`timeline:${timelineTypes[index]}`); } // Clean up for (const timeline of timelines) { timeline.close(); } }); test("should handle all supported event types", async (): Promise => { const timeline = new StreamingTimeline("public"); const eventTypes: EventTypes[] = [ "update", "delete", "notification", "filters_changed", "announcement", "announcement.reaction", "announcement.delete", "status.update", ]; const receivedEvents: Array<{ type: string; payload: unknown }> = []; // Set up listeners for all event types for (const eventType of eventTypes) { timeline.emitter.on(eventType, (payload): void => { receivedEvents.push({ type: eventType, payload }); }); } await new Promise((resolve): void => { setTimeout(resolve, 10); }); // Emit all event types for (const [index, eventType] of eventTypes.entries()) { const payload = eventType === "filters_changed" ? null : { id: `${eventType}-${index}` }; timeline.emitEvent(eventType, payload as never); } await new Promise((resolve): void => { setTimeout(resolve, 100); }); expect(receivedEvents).toHaveLength(eventTypes.length); for (const [index, eventType] of eventTypes.entries()) { const expectedPayload = eventType === "filters_changed" ? null : { id: `${eventType}-${index}` }; expect(receivedEvents[index]).toEqual({ type: eventType, payload: expectedPayload, }); } timeline.close(); }); });