diff --git a/packages/kit/timelines/streaming.test.ts b/packages/kit/timelines/streaming.test.ts deleted file mode 100644 index 1e56ff67..00000000 --- a/packages/kit/timelines/streaming.test.ts +++ /dev/null @@ -1,465 +0,0 @@ -import { afterAll, beforeAll, describe, expect, test } from "bun:test"; -import { randomUUIDv7 } from "bun"; -import { User } from "../db/user.ts"; -import { connection } from "../redis.ts"; -import { setupDatabase } from "../tables/db.ts"; -import { - type EventTypes, - StreamingTimeline, - type TimelineTypes, -} from "./streaming.ts"; - -// Set up database and create test users -await setupDatabase(); - -const testUsers: User[] = []; - -beforeAll(async (): Promise => { - // Ensure Redis is connected - await connection.ping(); - - // Create test users* - // Can't use stuff from @versia-server/tests because it depends on this package - for (let i = 0; i < 2; i++) { - const user = await User.register( - `test-streaming-${randomUUIDv7().slice(0, 8)}`, - { - email: `test-streaming-${i}@example.com`, - }, - ); - testUsers.push(user); - } -}); - -afterAll(async (): Promise => { - // Delete test users - for (const user of testUsers) { - await user.delete(); - } -}); - -describe("StreamingTimeline", (): void => { - test("should create timeline with user", (): void => { - const timeline = new StreamingTimeline("user", testUsers[0]); - - expect(timeline.timeline).toBe("user"); - expect(timeline.user).toBe(testUsers[0]); - expect(timeline.emitter).toBeDefined(); - - timeline.close(); - }); - - test("should create timeline without user", (): void => { - const timeline = new StreamingTimeline("public"); - - expect(timeline.timeline).toBe("public"); - expect(timeline.user).toBe(null); - expect(timeline.emitter).toBeDefined(); - - timeline.close(); - }); - - test("should generate correct channel name for user timeline", (): void => { - const timeline = new StreamingTimeline("user", testUsers[0]); - - // Access private property for testing - const channelName = (timeline as unknown as { channelName: string }) - .channelName; - expect(channelName).toBe(`timeline:user:${testUsers[0].id}`); - - timeline.close(); - }); - - test("should generate correct channel name for public timeline", (): void => { - const timeline = new StreamingTimeline("public"); - - // Access private property for testing - const channelName = (timeline as unknown as { channelName: string }) - .channelName; - expect(channelName).toBe("timeline:public"); - - timeline.close(); - }); - - test("should emit event when publishing to same channel", async (): Promise => { - const emitterTimeline = new StreamingTimeline("public"); - const receiverTimeline = new StreamingTimeline("public"); - let receivedType = ""; - let receivedPayload: unknown = null; - - // Listen for update events - receiverTimeline.emitter.on("update", (payload): void => { - receivedType = "update"; - receivedPayload = payload; - }); - - // Wait a bit for subscription to be established - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Emit an event - emitterTimeline.emitEvent("update", { id: "test-status-123" }); - - // Wait for event to be processed - await new Promise((resolve): void => { - setTimeout(resolve, 50); - }); - - expect(receivedType).toBe("update"); - expect(receivedPayload).toEqual({ id: "test-status-123" }); - - emitterTimeline.close(); - receiverTimeline.close(); - }); - - test("should not emit event for different channel", async (): Promise => { - const timeline1 = new StreamingTimeline("public"); - const timeline2 = new StreamingTimeline("user", testUsers[0]); - - let receivedEvent = false; - - // Listen for events on timeline1 - timeline1.emitter.on("update", (): void => { - receivedEvent = true; - }); - - // Wait a bit for subscription to be established - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Emit event on timeline2 (different channel) - timeline2.emitEvent("update", { id: "test-status-123" }); - - // Wait for potential event processing - await new Promise((resolve): void => { - setTimeout(resolve, 50); - }); - - expect(receivedEvent).toBe(false); - - timeline1.close(); - timeline2.close(); - }); - - test("should handle different event types", async (): Promise => { - const timeline = new StreamingTimeline("public"); - const receivedEvents: Array<{ type: string; payload: unknown }> = []; - - // Listen for different event types - timeline.emitter.on("update", (payload): void => { - receivedEvents.push({ type: "update", payload }); - }); - timeline.emitter.on("delete", (payload): void => { - receivedEvents.push({ type: "delete", payload }); - }); - timeline.emitter.on("notification", (payload): void => { - receivedEvents.push({ type: "notification", payload }); - }); - - // Wait for subscription - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Emit different events - timeline.emitEvent("update", { id: "status-1" }); - timeline.emitEvent("delete", { id: "status-2" }); - timeline.emitEvent("notification", { id: "notif-1" }); - - // Wait for events - await new Promise((resolve): void => { - setTimeout(resolve, 100); - }); - - expect(receivedEvents).toHaveLength(3); - expect(receivedEvents[0]).toEqual({ - type: "update", - payload: { id: "status-1" }, - }); - expect(receivedEvents[1]).toEqual({ - type: "delete", - payload: { id: "status-2" }, - }); - expect(receivedEvents[2]).toEqual({ - type: "notification", - payload: { id: "notif-1" }, - }); - - timeline.close(); - }); - - test("should handle malformed JSON messages gracefully", async (): Promise => { - const timeline = new StreamingTimeline("public"); - - let receivedEvent = false; - let warningLogged = false; - - // Mock console.warn to capture warnings - const originalWarn = console.warn; - console.warn = (): void => { - warningLogged = true; - }; - - timeline.emitter.on("update", (): void => { - receivedEvent = true; - }); - - // Wait for subscription - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Publish malformed JSON directly to Redis - const channelName = (timeline as unknown as { channelName: string }) - .channelName; - await connection.publish(channelName, "invalid json"); - - // Wait for message processing - await new Promise((resolve): void => { - setTimeout(resolve, 50); - }); - - expect(receivedEvent).toBe(false); - expect(warningLogged).toBe(true); - - // Restore console.warn - console.warn = originalWarn; - timeline.close(); - }); - - test("should handle messages with missing type or payload", async (): Promise => { - const timeline = new StreamingTimeline("public"); - - let receivedEvent = false; - - timeline.emitter.on("update", (): void => { - receivedEvent = true; - }); - - // Wait for subscription - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Publish message with missing type - const channelName = (timeline as unknown as { channelName: string }) - .channelName; - await connection.publish( - channelName, - JSON.stringify({ payload: { id: "test" } }), - ); - - // Publish message with missing payload - await connection.publish( - channelName, - JSON.stringify({ type: "update" }), - ); - - // Wait for message processing - await new Promise((resolve): void => { - setTimeout(resolve, 50); - }); - - expect(receivedEvent).toBe(false); - - timeline.close(); - }); - - test("should properly clean up resources on close", async (): Promise => { - const timeline = new StreamingTimeline("public"); - - // Wait for subscription - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Verify subscription is active - const channelName = (timeline as unknown as { channelName: string }) - .channelName; - expect( - timeline.redisConnection.listenerCount("message"), - ).toBeGreaterThan(0); - - // Close timeline - timeline.close(); - - // Wait a bit - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Try to emit event - should not be received - let receivedEvent = false; - timeline.emitter.on("update", (): void => { - receivedEvent = true; - }); - - await connection.publish( - channelName, - JSON.stringify({ type: "update", payload: { id: "test" } }), - ); - - await new Promise((resolve): void => { - setTimeout(resolve, 50); - }); - - expect(receivedEvent).toBe(false); - }); - - test("should handle multiple timelines with different channels", async (): Promise => { - const timeline1 = new StreamingTimeline("public"); - const timeline2 = new StreamingTimeline("user", testUsers[0]); - const timeline3 = new StreamingTimeline("user", testUsers[1]); - - const events1: unknown[] = []; - const events2: unknown[] = []; - const events3: unknown[] = []; - - timeline1.emitter.on("update", (payload): void => { - events1.push(payload); - }); - timeline2.emitter.on("update", (payload): void => { - events2.push(payload); - }); - timeline3.emitter.on("update", (payload): void => { - events3.push(payload); - }); - - // Wait for subscriptions - await new Promise((resolve): void => { - setTimeout(resolve, 20); - }); - - // Emit events to different timelines - timeline1.emitEvent("update", { id: "public-1" }); - timeline2.emitEvent("update", { id: "user1-1" }); - timeline3.emitEvent("update", { id: "user2-1" }); - - // Wait for events - await new Promise((resolve): void => { - setTimeout(resolve, 100); - }); - - expect(events1).toEqual([{ id: "public-1" }]); - expect(events2).toEqual([{ id: "user1-1" }]); - expect(events3).toEqual([{ id: "user2-1" }]); - - timeline1.close(); - timeline2.close(); - timeline3.close(); - }); - - test("should handle filters_changed event with null payload", async (): Promise => { - const timeline = new StreamingTimeline("public"); - let receivedType = ""; - let receivedPayload: unknown; - - timeline.emitter.on("filters_changed", (payload): void => { - receivedType = "filters_changed"; - receivedPayload = payload; - }); - - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - timeline.emitEvent("filters_changed", null); - - await new Promise((resolve): void => { - setTimeout(resolve, 50); - }); - - expect(receivedType).toBe("filters_changed"); - expect(receivedPayload).toBeNull(); - - timeline.close(); - }); - - test("should handle all supported timeline types", (): void => { - const timelineTypes: TimelineTypes[] = [ - "public", - "public:media", - "public:local", - "public:local:media", - "public:remote", - "public:remote:media", - "hashtag", - "hashtag:local", - "user", - "user:notification", - "list", - ]; - - const timelines = timelineTypes.map( - (type) => new StreamingTimeline(type), - ); - - for (const [index, timeline] of timelines.entries()) { - expect(timeline.timeline).toBe(timelineTypes[index]); - const channelName = (timeline as unknown as { channelName: string }) - .channelName; - expect(channelName).toBe(`timeline:${timelineTypes[index]}`); - } - - // Clean up - for (const timeline of timelines) { - timeline.close(); - } - }); - - test("should handle all supported event types", async (): Promise => { - const timeline = new StreamingTimeline("public"); - const eventTypes: EventTypes[] = [ - "update", - "delete", - "notification", - "filters_changed", - "announcement", - "announcement.reaction", - "announcement.delete", - "status.update", - ]; - - const receivedEvents: Array<{ type: string; payload: unknown }> = []; - - // Set up listeners for all event types - for (const eventType of eventTypes) { - timeline.emitter.on(eventType, (payload): void => { - receivedEvents.push({ type: eventType, payload }); - }); - } - - await new Promise((resolve): void => { - setTimeout(resolve, 10); - }); - - // Emit all event types - for (const [index, eventType] of eventTypes.entries()) { - const payload = - eventType === "filters_changed" - ? null - : { id: `${eventType}-${index}` }; - timeline.emitEvent(eventType, payload as never); - } - - await new Promise((resolve): void => { - setTimeout(resolve, 100); - }); - - expect(receivedEvents).toHaveLength(eventTypes.length); - for (const [index, eventType] of eventTypes.entries()) { - const expectedPayload = - eventType === "filters_changed" - ? null - : { id: `${eventType}-${index}` }; - expect(receivedEvents[index]).toEqual({ - type: eventType, - payload: expectedPayload, - }); - } - - timeline.close(); - }); -});