mirror of
https://github.com/wisplite/tether.git
synced 2026-05-01 06:22:41 -05:00
fixed an n+1 bug with the query engine, and made it significantly more efficient
This commit is contained in:
@@ -82,39 +82,75 @@ func (e *Engine) InvalidateTable(tableName string) {
|
|||||||
for _, query := range dependentQueries {
|
for _, query := range dependentQueries {
|
||||||
slog.Debug("Invalidating query", "query", query)
|
slog.Debug("Invalidating query", "query", query)
|
||||||
subscriptions := e.tracker.GetQuerySubscriptions(query)
|
subscriptions := e.tracker.GetQuerySubscriptions(query)
|
||||||
slog.Debug("Subscriptions", "subscriptions", subscriptions)
|
|
||||||
|
groupedExecutions := make(map[string][]string)
|
||||||
|
|
||||||
|
type executionData struct {
|
||||||
|
Params map[string]interface{}
|
||||||
|
AuthID string
|
||||||
|
}
|
||||||
|
executionParams := make(map[string]executionData)
|
||||||
|
|
||||||
for _, subscription := range subscriptions {
|
for _, subscription := range subscriptions {
|
||||||
slog.Debug("Invalidating subscription", "subscription", subscription["clientID"])
|
|
||||||
params := map[string]interface{}{}
|
params := map[string]interface{}{}
|
||||||
err := json.Unmarshal([]byte(subscription["params"]), ¶ms)
|
err := json.Unmarshal([]byte(subscription["params"]), ¶ms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("Failed to unmarshal params", "error", err)
|
slog.Error("Failed to unmarshal params", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, err = e.ExecuteQuery(query, params, subscription["clientID"], false)
|
authID := e.tracker.GetAuthID(subscription["clientID"])
|
||||||
|
cacheKey := query + "?" + string(subscription["params"]) + "?" + authID
|
||||||
|
groupedExecutions[cacheKey] = append(groupedExecutions[cacheKey], subscription["clientID"])
|
||||||
|
executionParams[cacheKey] = executionData{Params: params, AuthID: authID}
|
||||||
|
}
|
||||||
|
|
||||||
|
for cacheKey, clients := range groupedExecutions {
|
||||||
|
data := executionParams[cacheKey]
|
||||||
|
go func() {
|
||||||
|
_, err := e.ExecuteQueryGroup(query, data.Params, data.AuthID, clients, cacheKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("Failed to execute query", "error", err)
|
slog.Error("Failed to execute query", "error", err)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Engine) ExecuteQueryGroup(query string, params map[string]interface{}, authID string, clientIDs []string, cacheKey string) (interface{}, error) {
|
||||||
|
e.hashMu.RLock()
|
||||||
|
lastHash := e.queryHashes[cacheKey]
|
||||||
|
e.hashMu.RUnlock()
|
||||||
|
|
||||||
|
authCtx := &AuthCtx{UserID: authID, IsLoggedIn: authID != ""}
|
||||||
|
queryCtx := &QueryCtx{DB: e.db, AuthCtx: authCtx, Params: params}
|
||||||
|
result := e.queries[query](queryCtx)
|
||||||
|
responseJSON, err := json.Marshal(map[string]interface{}{"type": "query", "location": query, "data": result})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
queryHash := xxhash.Sum64(responseJSON)
|
||||||
|
|
||||||
|
if lastHash == queryHash {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
e.hashMu.Lock()
|
||||||
|
e.queryHashes[cacheKey] = queryHash
|
||||||
|
e.hashMu.Unlock()
|
||||||
|
|
||||||
|
for _, clientID := range clientIDs { // send the response to all clients in the group
|
||||||
|
e.tracker.SendMessage(clientID, responseJSON)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (e *Engine) ExecuteQuery(query string, params map[string]interface{}, clientID string, forceSend bool) (interface{}, error) {
|
func (e *Engine) ExecuteQuery(query string, params map[string]interface{}, clientID string, forceSend bool) (interface{}, error) {
|
||||||
/*
|
|
||||||
TODO: implement the logic to execute the query
|
|
||||||
Steps needed:
|
|
||||||
1. Check which tables updated ✅
|
|
||||||
2. Get the queries that rely on the tables ✅
|
|
||||||
3. Get the subscriptions that need updating ✅
|
|
||||||
4. Calculate hash for every query
|
|
||||||
5. Send the updated queries if hash changed
|
|
||||||
*/
|
|
||||||
paramsJSON, err := json.Marshal(params)
|
paramsJSON, err := json.Marshal(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cacheKey := query + "?" + string(paramsJSON)
|
cacheKey := query + "?" + string(paramsJSON) + "?" + clientID
|
||||||
e.hashMu.Lock()
|
e.hashMu.Lock()
|
||||||
lastHash := e.queryHashes[cacheKey]
|
lastHash := e.queryHashes[cacheKey]
|
||||||
e.hashMu.Unlock()
|
e.hashMu.Unlock()
|
||||||
|
|||||||
+10
-1
@@ -11,10 +11,19 @@ type Client struct {
|
|||||||
ID string
|
ID string
|
||||||
Conn *websocket.Conn
|
Conn *websocket.Conn
|
||||||
Send chan []byte
|
Send chan []byte
|
||||||
|
AuthID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(conn *websocket.Conn) *Client {
|
func NewClient(conn *websocket.Conn) *Client {
|
||||||
return &Client{ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 256)}
|
return &Client{ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 256), AuthID: ""}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) SetAuthID(authID string) {
|
||||||
|
c.AuthID = authID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetAuthID() string {
|
||||||
|
return c.AuthID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) WritePump() {
|
func (c *Client) WritePump() {
|
||||||
|
|||||||
@@ -32,6 +32,16 @@ func (t *Tracker) Untrack(c *Client) {
|
|||||||
delete(t.clients, c.ID)
|
delete(t.clients, c.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Tracker) SetAuthID(clientID string, authID string) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
t.clients[clientID].SetAuthID(authID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tracker) GetAuthID(clientID string) string {
|
||||||
|
return t.clients[clientID].GetAuthID()
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Tracker) SubscribeToQuery(clientID string, query string, params string) {
|
func (t *Tracker) SubscribeToQuery(clientID string, query string, params string) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user