diff --git a/dist/index.d.ts b/dist/index.d.ts index 222dfc3..1bab2a2 100644 --- a/dist/index.d.ts +++ b/dist/index.d.ts @@ -1,6 +1,7 @@ export declare class TetherClient { private websocketHandler; private subscribedQueries; + private pendingMutations; connect: (url: string) => void; disconnect: () => void; subscribe: (queryName: string, params: any, callback: (data: any) => void) => void; diff --git a/dist/index.js b/dist/index.js index fbd2c96..bd5313c 100644 --- a/dist/index.js +++ b/dist/index.js @@ -2,6 +2,7 @@ import { WebSocketHandler } from './utils/websocket.js'; export class TetherClient { websocketHandler = new WebSocketHandler(); subscribedQueries = new Map(); + pendingMutations = new Map(); connect = (url) => { this.websocketHandler.startConnection(url); this.websocketHandler.onQuery = (location, data) => { @@ -10,6 +11,15 @@ export class TetherClient { callback?.(data); } }; + this.websocketHandler.onMutation = (incoming_id, data) => { + const pending = this.pendingMutations.get(incoming_id); + if (!pending) { + return; + } + clearTimeout(pending.timeoutId); + this.pendingMutations.delete(incoming_id); + pending.resolve(data); + }; this.websocketHandler.onOpen = () => { this.subscribedQueries.forEach(({ params }, queryName) => { this.websocketHandler.send(JSON.stringify({ @@ -40,22 +50,19 @@ export class TetherClient { }; sendMutation = (mutationName, params) => { const mutation_id = crypto.randomUUID(); + const promise = new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + this.pendingMutations.delete(mutation_id); + reject(new Error('Mutation timeout')); + }, 10000); + this.pendingMutations.set(mutation_id, { resolve, reject, timeoutId }); + }); this.websocketHandler.send(JSON.stringify({ type: 'mutation', location: mutationName, params: params, mutation_id: mutation_id })); - return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => { - reject(new Error('Mutation timeout')); - }, 10000); - this.websocketHandler.onMutation = (incoming_id, data) => { - if (incoming_id === mutation_id) { - clearTimeout(timeoutId); - resolve(data); - } - }; - }); + return promise; }; } diff --git a/dist/utils/websocket.js b/dist/utils/websocket.js index 6a89d86..6dad246 100644 --- a/dist/utils/websocket.js +++ b/dist/utils/websocket.js @@ -23,7 +23,14 @@ export class WebSocketHandler { this.reconnectAttempts = 0; }; ws.onmessage = (event) => { - const data = JSON.parse(String(event.data)); + let data; + try { + data = JSON.parse(String(event.data)); + } + catch (e) { + console.error('Tether: invalid JSON message', event.data, e); + return; + } if (data.type === 'query') { this.onQuery(data.location, data.data); } @@ -34,8 +41,8 @@ export class WebSocketHandler { console.error(data.error); } }; - ws.onclose = () => { - console.log('Disconnected from Tether'); + ws.onclose = (event) => { + console.log('Disconnected from Tether', 'code:', event.code, 'reason:', event.reason || '(none)', 'wasClean:', event.wasClean); this.attemptReconnect(); }; }; diff --git a/src/index.ts b/src/index.ts index 8c7a090..e5a110a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,14 @@ import { WebSocketHandler } from './utils/websocket.js'; +type PendingMutation = { + resolve: (value: unknown) => void; + reject: (reason: Error) => void; + timeoutId: ReturnType; +}; + export class TetherClient { private websocketHandler: WebSocketHandler = new WebSocketHandler(); private subscribedQueries = new Map void, params: any }>(); + private pendingMutations = new Map(); connect = (url: string) => { this.websocketHandler.startConnection(url); @@ -11,6 +18,15 @@ export class TetherClient { callback?.(data); } }; + this.websocketHandler.onMutation = (incoming_id, data) => { + const pending = this.pendingMutations.get(incoming_id); + if (!pending) { + return; + } + clearTimeout(pending.timeoutId); + this.pendingMutations.delete(incoming_id); + pending.resolve(data); + }; this.websocketHandler.onOpen = () => { this.subscribedQueries.forEach(({ params }, queryName) => { this.websocketHandler.send(JSON.stringify({ @@ -45,22 +61,19 @@ export class TetherClient { sendMutation = (mutationName: string, params: any) => { const mutation_id = crypto.randomUUID(); + const promise = new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + this.pendingMutations.delete(mutation_id); + reject(new Error('Mutation timeout')); + }, 10000); + this.pendingMutations.set(mutation_id, { resolve, reject, timeoutId }); + }); this.websocketHandler.send(JSON.stringify({ type: 'mutation', location: mutationName, params: params, mutation_id: mutation_id })); - return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => { - reject(new Error('Mutation timeout')); - }, 10000); - this.websocketHandler.onMutation = (incoming_id: string, data: any) => { - if (incoming_id === mutation_id) { - clearTimeout(timeoutId); - resolve(data); - } - }; - }); + return promise; }; } \ No newline at end of file diff --git a/src/utils/websocket.ts b/src/utils/websocket.ts index 12e3fe9..776f7ea 100644 --- a/src/utils/websocket.ts +++ b/src/utils/websocket.ts @@ -25,13 +25,19 @@ export class WebSocketHandler { }; ws.onmessage = (event: MessageEvent) => { - const data = JSON.parse(String(event.data)) as { + let data: { type: string; location?: string; data?: unknown; error?: string; mutation_id?: string; }; + try { + data = JSON.parse(String(event.data)); + } catch (e) { + console.error('Tether: invalid JSON message', event.data, e); + return; + } if (data.type === 'query') { this.onQuery(data.location, data.data); } else if (data.type === 'mutation') { @@ -41,8 +47,16 @@ export class WebSocketHandler { } }; - ws.onclose = () => { - console.log('Disconnected from Tether'); + ws.onclose = (event: CloseEvent) => { + console.log( + 'Disconnected from Tether', + 'code:', + event.code, + 'reason:', + event.reason || '(none)', + 'wasClean:', + event.wasClean + ); this.attemptReconnect(); }; };