From c834c11bc8846371e83b032c54202cc382c499e9 Mon Sep 17 00:00:00 2001 From: wisplite Date: Tue, 21 Apr 2026 22:41:27 -0500 Subject: [PATCH] messages are being received on both ends now! --- engine.go | 12 +++++++++++- example/client/package-lock.json | 2 +- reactivity/tracker.go | 16 ++++++++++++++++ reactivity/websocket.go | 6 +++++- 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/engine.go b/engine.go index 22a14b5..9455abd 100644 --- a/engine.go +++ b/engine.go @@ -1,6 +1,7 @@ package tether import ( + "encoding/json" "log/slog" "net/http" @@ -54,6 +55,15 @@ func (e *Engine) OnDisconnect(clientID string) error { func (e *Engine) OnReceiveMessage(clientID string, msg map[string]interface{}) error { slog.Debug("Received message", "from", clientID, "message", msg) - // TODO: implement the logic to handle the message + message, err := json.Marshal(map[string]interface{}{ + "type": "query", + "location": msg["query"], + "data": "test", + }) + if err != nil { + slog.Error("Failed to marshal message", "error", err) + return err + } + e.tracker.SendMessage(clientID, message) return nil } diff --git a/example/client/package-lock.json b/example/client/package-lock.json index bf5f69a..1c741ff 100644 --- a/example/client/package-lock.json +++ b/example/client/package-lock.json @@ -14,7 +14,7 @@ }, "node_modules/tether-ts": { "version": "1.0.3", - "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#00eeff128298d5d79d76c22ba609f83c09f5817f", + "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#5202a7f844be4ad7f3a463352fe366c4718a2749", "license": "ISC" } } diff --git a/reactivity/tracker.go b/reactivity/tracker.go index 8b67518..60dc426 100644 --- a/reactivity/tracker.go +++ b/reactivity/tracker.go @@ -1,6 +1,7 @@ package reactivity import ( + "log/slog" "sync" ) @@ -56,3 +57,18 @@ func (t *Tracker) GetQuerySubscriptions(queryHash string) []string { } return subscriptionIDs } + +func (t *Tracker) SendMessage(clientID string, message []byte) { + t.mu.RLock() + defer t.mu.RUnlock() + client := t.clients[clientID] + if client == nil { + slog.Error("Tracker: Client not found", "clientID", clientID) + return + } + select { + case client.Send <- message: + default: + slog.Error("Tracker: Client send channel is full", "clientID", clientID) + } +} diff --git a/reactivity/websocket.go b/reactivity/websocket.go index a6d55a2..7150ff8 100644 --- a/reactivity/websocket.go +++ b/reactivity/websocket.go @@ -23,7 +23,6 @@ func Handle(w http.ResponseWriter, r *http.Request, e EngineHandler, tracker *Tr slog.Error("WS: Failed to upgrade to websocket", "error", err) return } - defer ws.Close() client := NewClient(ws) err = e.OnConnect(client.ID) @@ -33,6 +32,11 @@ func Handle(w http.ResponseWriter, r *http.Request, e EngineHandler, tracker *Tr } defer e.OnDisconnect(client.ID) tracker.Track(client) + defer func() { + tracker.Untrack(client) + close(client.Send) + }() + go client.WritePump() for { _, message, err := ws.ReadMessage() if err != nil {