diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index df164600..19149fab 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -40,7 +40,7 @@ const streamingController: AppController = (c) => { socket.addEventListener('close', () => { console.log('websocket: connection closed'); - ws.unsubscribe(conn, { name: stream! }); + ws.unsubscribeAll(socket); }); return response; diff --git a/src/sign.ts b/src/sign.ts index 6cc11540..d6587967 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -9,8 +9,6 @@ function getSignStream(c: AppContext): WebSocket | undefined { const pubkey = c.get('pubkey'); const session = c.get('session'); - console.log(`nostr:${pubkey}:${session}`); - if (pubkey && session) { const [socket] = ws.getSockets(`nostr:${pubkey}:${session}`); return socket; diff --git a/src/stream.ts b/src/stream.ts index dc52a812..a96e71cd 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,49 +1,118 @@ +/** Internal key for event subscriptions. */ type Topic = string; +/** Only the necessary metadata needed from the request. */ interface StreamConn { + /** Hex pubkey parsed from the `Sec-Websocket-Protocol` header. */ pubkey?: string; + /** Base62 session UUID parsed from the `Sec-Websocket-Protocol` header. */ session?: string; + /** The WebSocket stream. */ socket: WebSocket; } +/** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */ // TODO: Make this a discriminated union (needed for hashtags). interface Stream { + /** Name of the channel, eg `user`. */ name: string; + /** Additional query params, eg `tag`. */ params?: Record; } -const sockets = new Map>(); +/** Class to organize WebSocket connections by topic. */ +class WebSocketConnections { + /** Set of WebSockets by topic. */ + #sockets = new Map>(); + /** Set of topics by WebSocket. We need to track this so we can unsubscribe properly. */ + #topics = new WeakMap>(); -function addSocket(socket: WebSocket, topic: Topic): void { - let subscribers = sockets.get(topic); - if (!subscribers) { - subscribers = new Set(); - sockets.set(topic, subscribers); - } - subscribers.add(socket); -} + /** Add the WebSocket to the streaming channel. */ + subscribe(conn: StreamConn, stream: Stream): void { + const topic = getTopic(conn, stream); -function removeSocket(socket: WebSocket, topic: Topic): void { - const subscribers = sockets.get(topic); - if (subscribers) { - subscribers.delete(socket); - if (subscribers.size === 0) { - sockets.delete(topic); + if (topic) { + this.#addSocket(conn.socket, topic); + this.#addTopic(conn.socket, topic); } } -} -function subscribe(conn: StreamConn, stream: Stream): void { - const topic = getTopic(conn, stream); - if (topic) { - addSocket(conn.socket, topic); + /** Remove the WebSocket from the streaming channel. */ + unsubscribe(conn: StreamConn, stream: Stream): void { + const topic = getTopic(conn, stream); + + if (topic) { + this.#removeSocket(conn.socket, topic); + this.#removeTopic(conn.socket, topic); + } } -} -function unsubscribe(conn: StreamConn, stream: Stream): void { - const topic = getTopic(conn, stream); - if (topic) { - removeSocket(conn.socket, topic); + /** Remove the WebSocket from all its streaming channels. */ + unsubscribeAll(socket: WebSocket): void { + const topics = this.#topics.get(socket); + + if (topics) { + for (const topic of topics) { + this.#removeSocket(socket, topic); + } + } + + this.#topics.delete(socket); + } + + /** Get WebSockets for the given topic. */ + getSockets(topic: Topic): Set { + return this.#sockets.get(topic) ?? new Set(); + } + + /** Add a WebSocket to a topics set in the state. */ + #addSocket(socket: WebSocket, topic: Topic): void { + let subscribers = this.#sockets.get(topic); + + if (!subscribers) { + subscribers = new Set(); + this.#sockets.set(topic, subscribers); + } + + subscribers.add(socket); + } + + /** Remove a WebSocket from a topics set in the state. */ + #removeSocket(socket: WebSocket, topic: Topic): void { + const subscribers = this.#sockets.get(topic); + + if (subscribers) { + subscribers.delete(socket); + + if (subscribers.size === 0) { + this.#sockets.delete(topic); + } + } + } + + /** Add a topic to a WebSocket set in the state. */ + #addTopic(socket: WebSocket, topic: Topic): void { + let topics = this.#topics.get(socket); + + if (!topics) { + topics = new Set(); + this.#topics.set(socket, topics); + } + + topics.add(topic); + } + + /** Remove a topic from a WebSocket set in the state. */ + #removeTopic(socket: WebSocket, topic: Topic): void { + const topics = this.#topics.get(socket); + + if (topics) { + topics.delete(topic); + + if (topics.size === 0) { + this.#topics.delete(socket); + } + } } } @@ -67,14 +136,6 @@ function getTopic(conn: StreamConn, stream: Stream): Topic | undefined { } } -function getSockets(topic: Topic): Set { - return sockets.get(topic) ?? new Set(); -} - -const ws = { - subscribe, - unsubscribe, - getSockets, -}; +const ws = new WebSocketConnections(); export default ws;