From c366164a9a852a35ad042f4286b08f4f25725a20 Mon Sep 17 00:00:00 2001 From: wisplite Date: Thu, 23 Apr 2026 09:39:15 -0500 Subject: [PATCH] fix the bugs that were causing mutation response errors, and add duplicate subscription checks MUTATIONS WORK BTW!!! --- engine.go | 11 ++++++++-- example/client/index.ts | 3 ++- example/client/package-lock.json | 2 +- example/main.go | 7 +++++-- reactivity/tracker.go | 35 ++++++++++++++++++++++++++++++-- 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/engine.go b/engine.go index 82c1e2a..324d7da 100644 --- a/engine.go +++ b/engine.go @@ -176,8 +176,15 @@ func (e *Engine) ExecuteQuery(query string, params map[string]interface{}, clien return result, nil } -func (e *Engine) ExecuteMutation(mutation string, params map[string]interface{}, clientID string) (interface{}, error) { +func (e *Engine) ExecuteMutation(mutation string, params map[string]interface{}, clientID string, mutationID string) (interface{}, error) { result := e.mutations[mutation](&MutationCtx{DB: e.db, AuthCtx: &AuthCtx{UserID: "", IsLoggedIn: true}, Params: params}) + slog.Debug("Executing mutation", "mutation", mutation, "params", params, "result", result) + responseJSON, err := json.Marshal(map[string]interface{}{"type": "mutation", "location": mutation, "data": result, "mutation_id": mutationID}) + if err != nil { + slog.Error("Failed to encode mutation result", "mutation", mutation, "error", err) + return nil, err + } + e.tracker.SendMessage(clientID, responseJSON) return result, nil } @@ -192,7 +199,7 @@ func (e *Engine) OnReceiveMessage(clientID string, msg map[string]interface{}) e e.tracker.SubscribeToQuery(clientID, msg["location"].(string), string(paramsJSON)) e.ExecuteQuery(msg["location"].(string), msg["params"].(map[string]interface{}), clientID, true) case "mutation": - e.ExecuteMutation(msg["location"].(string), msg["params"].(map[string]interface{}), clientID) + e.ExecuteMutation(msg["location"].(string), msg["params"].(map[string]interface{}), clientID, msg["mutation_id"].(string)) } return nil } diff --git a/example/client/index.ts b/example/client/index.ts index dcdecb3..352afa8 100644 --- a/example/client/index.ts +++ b/example/client/index.ts @@ -16,6 +16,7 @@ const rl = createInterface({ while (true) { const message = await rl.question("Enter a message"); if (message) { - client.sendMutation("createMessage", { room: "1", message: message }); + const result = await client.sendMutation("createMessage", { room: "1", message: message }); + console.log("Mutation result", result); } } \ No newline at end of file diff --git a/example/client/package-lock.json b/example/client/package-lock.json index a3a36bf..18d57e9 100644 --- a/example/client/package-lock.json +++ b/example/client/package-lock.json @@ -28,7 +28,7 @@ }, "node_modules/tether-ts": { "version": "1.0.3", - "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#8945de00ece92ce1e939a3ff70c5312f80e56dda", + "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#82a462ee116789dd6832fe5ff079a63fcb91afc4", "license": "ISC" }, "node_modules/typescript": { diff --git a/example/main.go b/example/main.go index 28af5a7..c4f9611 100644 --- a/example/main.go +++ b/example/main.go @@ -42,8 +42,11 @@ func main() { }, []string{"messages"}) engine.RegisterMutation("createMessage", func(ctx *tether.MutationCtx) interface{} { - ctx.DB.Create(&Messages{ID: uuid.NewString(), Message: ctx.Params["message"].(string), SenderID: ctx.AuthCtx.UserID, RoomID: ctx.Params["room"].(string)}) - return nil + msg := &Messages{ID: uuid.NewString(), Message: ctx.Params["message"].(string), SenderID: ctx.AuthCtx.UserID, RoomID: ctx.Params["room"].(string)} + if err := ctx.DB.Create(msg).Error; err != nil { + return map[string]interface{}{"error": err.Error()} + } + return msg }) http.HandleFunc("/tether", engine.Handle) diff --git a/reactivity/tracker.go b/reactivity/tracker.go index 2453d63..e01161e 100644 --- a/reactivity/tracker.go +++ b/reactivity/tracker.go @@ -2,6 +2,7 @@ package reactivity import ( "log/slog" + "slices" "sync" ) @@ -13,11 +14,16 @@ type Tracker struct { clients map[string]*Client // Maps a Query Hash (e.g. "getUser?id=1") to a Set of Client IDs - subscriptions map[string][]map[string]string + subscriptions map[string][]map[string]string + clientSubscriptions map[string][]string } func NewTracker() *Tracker { - return &Tracker{clients: make(map[string]*Client), subscriptions: make(map[string][]map[string]string)} + return &Tracker{ + clients: make(map[string]*Client), + subscriptions: make(map[string][]map[string]string), + clientSubscriptions: make(map[string][]string), + } } func (t *Tracker) Track(c *Client) { @@ -30,6 +36,20 @@ func (t *Tracker) Untrack(c *Client) { t.mu.Lock() defer t.mu.Unlock() delete(t.clients, c.ID) + delete(t.clientSubscriptions, c.ID) + for query, subs := range t.subscriptions { + kept := subs[:0] + for _, sub := range subs { + if sub["clientID"] != c.ID { + kept = append(kept, sub) + } + } + if len(kept) == 0 { + delete(t.subscriptions, query) + } else { + t.subscriptions[query] = kept + } + } } func (t *Tracker) SetAuthID(clientID string, authID string) { @@ -39,6 +59,10 @@ func (t *Tracker) SetAuthID(clientID string, authID string) { } func (t *Tracker) GetAuthID(clientID string) string { + if _, ok := t.clients[clientID]; !ok { + slog.Error("Tracker: Client not found", "clientID", clientID) + return "" + } return t.clients[clientID].GetAuthID() } @@ -48,6 +72,13 @@ func (t *Tracker) SubscribeToQuery(clientID string, query string, params string) if t.subscriptions[query] == nil { t.subscriptions[query] = make([]map[string]string, 0) } + if t.clientSubscriptions[clientID] == nil { + t.clientSubscriptions[clientID] = make([]string, 0) + } + if slices.Contains(t.clientSubscriptions[clientID], query) { + return // avoid duplicate subscriptions + } + t.clientSubscriptions[clientID] = append(t.clientSubscriptions[clientID], query) // set t.subscriptions[query] to a map of client IDs and their params t.subscriptions[query] = append(t.subscriptions[query], map[string]string{"clientID": clientID, "params": params}) slog.Debug("Tracker: Subscribed to query", "query", query, "clientID", clientID, "params", params)