From 167b2ff66530d4390fa4b8b7aea395e854553173 Mon Sep 17 00:00:00 2001 From: wisplite Date: Thu, 23 Apr 2026 08:11:39 -0500 Subject: [PATCH] fixed an n+1 bug with the query engine, and made it significantly more efficient --- engine.go | 70 ++++++++++++++++++++++++++++++++----------- reactivity/client.go | 17 ++++++++--- reactivity/tracker.go | 10 +++++++ 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/engine.go b/engine.go index 44230d4..c17aa71 100644 --- a/engine.go +++ b/engine.go @@ -82,39 +82,75 @@ func (e *Engine) InvalidateTable(tableName string) { for _, query := range dependentQueries { slog.Debug("Invalidating query", "query", query) subscriptions := e.tracker.GetQuerySubscriptions(query) - slog.Debug("Subscriptions", "subscriptions", subscriptions) + + groupedExecutions := make(map[string][]string) + + type executionData struct { + Params map[string]interface{} + AuthID string + } + executionParams := make(map[string]executionData) + for _, subscription := range subscriptions { - slog.Debug("Invalidating subscription", "subscription", subscription["clientID"]) params := map[string]interface{}{} err := json.Unmarshal([]byte(subscription["params"]), ¶ms) if err != nil { slog.Error("Failed to unmarshal params", "error", err) continue } - _, err = e.ExecuteQuery(query, params, subscription["clientID"], false) - if err != nil { - slog.Error("Failed to execute query", "error", err) - continue - } + authID := e.tracker.GetAuthID(subscription["clientID"]) + cacheKey := query + "?" + string(subscription["params"]) + "?" + authID + groupedExecutions[cacheKey] = append(groupedExecutions[cacheKey], subscription["clientID"]) + executionParams[cacheKey] = executionData{Params: params, AuthID: authID} + } + + for cacheKey, clients := range groupedExecutions { + data := executionParams[cacheKey] + go func() { + _, err := e.ExecuteQueryGroup(query, data.Params, data.AuthID, clients, cacheKey) + if err != nil { + slog.Error("Failed to execute query", "error", err) + return + } + }() } } } +func (e *Engine) ExecuteQueryGroup(query string, params map[string]interface{}, authID string, clientIDs []string, cacheKey string) (interface{}, error) { + e.hashMu.RLock() + lastHash := e.queryHashes[cacheKey] + e.hashMu.RUnlock() + + authCtx := &AuthCtx{UserID: authID, IsLoggedIn: authID != ""} + queryCtx := &QueryCtx{DB: e.db, AuthCtx: authCtx, Params: params} + result := e.queries[query](queryCtx) + responseJSON, err := json.Marshal(map[string]interface{}{"type": "query", "location": query, "data": result}) + if err != nil { + return nil, err + } + queryHash := xxhash.Sum64(responseJSON) + + if lastHash == queryHash { + return nil, nil + } + + e.hashMu.Lock() + e.queryHashes[cacheKey] = queryHash + e.hashMu.Unlock() + + for _, clientID := range clientIDs { // send the response to all clients in the group + e.tracker.SendMessage(clientID, responseJSON) + } + return result, nil +} + func (e *Engine) ExecuteQuery(query string, params map[string]interface{}, clientID string, forceSend bool) (interface{}, error) { - /* - TODO: implement the logic to execute the query - Steps needed: - 1. Check which tables updated ✅ - 2. Get the queries that rely on the tables ✅ - 3. Get the subscriptions that need updating ✅ - 4. Calculate hash for every query - 5. Send the updated queries if hash changed - */ paramsJSON, err := json.Marshal(params) if err != nil { return nil, err } - cacheKey := query + "?" + string(paramsJSON) + cacheKey := query + "?" + string(paramsJSON) + "?" + clientID e.hashMu.Lock() lastHash := e.queryHashes[cacheKey] e.hashMu.Unlock() diff --git a/reactivity/client.go b/reactivity/client.go index 662b353..b2464ed 100644 --- a/reactivity/client.go +++ b/reactivity/client.go @@ -8,13 +8,22 @@ import ( ) type Client struct { - ID string - Conn *websocket.Conn - Send chan []byte + ID string + Conn *websocket.Conn + Send chan []byte + AuthID string } func NewClient(conn *websocket.Conn) *Client { - return &Client{ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 256)} + return &Client{ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 256), AuthID: ""} +} + +func (c *Client) SetAuthID(authID string) { + c.AuthID = authID +} + +func (c *Client) GetAuthID() string { + return c.AuthID } func (c *Client) WritePump() { diff --git a/reactivity/tracker.go b/reactivity/tracker.go index 07955e9..2453d63 100644 --- a/reactivity/tracker.go +++ b/reactivity/tracker.go @@ -32,6 +32,16 @@ func (t *Tracker) Untrack(c *Client) { delete(t.clients, c.ID) } +func (t *Tracker) SetAuthID(clientID string, authID string) { + t.mu.Lock() + defer t.mu.Unlock() + t.clients[clientID].SetAuthID(authID) +} + +func (t *Tracker) GetAuthID(clientID string) string { + return t.clients[clientID].GetAuthID() +} + func (t *Tracker) SubscribeToQuery(clientID string, query string, params string) { t.mu.Lock() defer t.mu.Unlock()