From 3504f8c16dd53d29b62579a40aab874286d43a62 Mon Sep 17 00:00:00 2001 From: wisplite Date: Thu, 23 Apr 2026 03:10:09 -0500 Subject: [PATCH] Working sync engine! --- engine.go | 57 +++++++++++++++++++++++++++++++++-------- example/client/index.ts | 4 +-- example/main.go | 14 +++++----- go.mod | 1 + go.sum | 4 +++ reactivity/tracker.go | 11 +++----- 6 files changed, 62 insertions(+), 29 deletions(-) diff --git a/engine.go b/engine.go index 4b81120..734ebd7 100644 --- a/engine.go +++ b/engine.go @@ -4,7 +4,9 @@ import ( "encoding/json" "log/slog" "net/http" + "sync" + "github.com/cespare/xxhash" "github.com/wisplite/tether/reactivity" "gorm.io/gorm" ) @@ -12,9 +14,11 @@ import ( type Engine struct { db *gorm.DB dbType string // sqlite or postgres - mutations map[string]func(ctx *MutationCtx) error - queries map[string]func(ctx *QueryCtx) error + mutations map[string]func(ctx *MutationCtx) interface{} + queries map[string]func(ctx *QueryCtx) interface{} dependencies map[string][]string + hashMu sync.RWMutex + queryHashes map[string]uint64 tracker *reactivity.Tracker } @@ -24,7 +28,7 @@ func NewEngine(db *gorm.DB, dbType string) *Engine { if dbType != "sqlite" && dbType != "postgres" { panic("Invalid database type") } - e := &Engine{db: db, dbType: dbType, mutations: make(map[string]func(ctx *MutationCtx) error), queries: make(map[string]func(ctx *QueryCtx) error), dependencies: make(map[string][]string), tracker: tracker} + e := &Engine{db: db, dbType: dbType, mutations: make(map[string]func(ctx *MutationCtx) interface{}), queries: make(map[string]func(ctx *QueryCtx) interface{}), dependencies: make(map[string][]string), queryHashes: make(map[string]uint64), tracker: tracker} db.Callback().Create().After("gorm:create").Register("tether:after_create", func(tx *gorm.DB) { if dbType == "postgres" { return @@ -34,12 +38,12 @@ func NewEngine(db *gorm.DB, dbType string) *Engine { return e } -func (e *Engine) RegisterMutation(name string, mutation func(ctx *MutationCtx) error) { +func (e *Engine) RegisterMutation(name string, mutation func(ctx *MutationCtx) interface{}) { e.mutations[name] = mutation // stores the mutation in the list of valid mutations slog.Debug("Registered mutation", "name", name) } -func (e *Engine) RegisterQuery(name string, query func(ctx *QueryCtx) error, dependencies []string) { +func (e *Engine) RegisterQuery(name string, query func(ctx *QueryCtx) interface{}, dependencies []string) { e.queries[name] = query // stores the query in the list of valid queries for _, dependency := range dependencies { e.dependencies[dependency] = append(e.dependencies[dependency], name) @@ -78,11 +82,16 @@ func (e *Engine) InvalidateTable(tableName string) { for _, query := range dependentQueries { slog.Debug("Invalidating query", "query", query) subscriptions := e.tracker.GetQuerySubscriptions(query) + slog.Debug("Subscriptions", "subscriptions", subscriptions) for _, subscription := range subscriptions { slog.Debug("Invalidating subscription", "subscription", subscription["clientID"]) params := map[string]interface{}{} - json.Unmarshal([]byte(subscription["params"]), ¶ms) - _, err := e.ExecuteQuery(query, params, subscription["clientID"]) + err := json.Unmarshal([]byte(subscription["params"]), ¶ms) + if err != nil { + slog.Error("Failed to unmarshal params", "error", err) + continue + } + _, err = e.ExecuteQuery(query, params, subscription["clientID"]) if err != nil { slog.Error("Failed to execute query", "error", err) continue @@ -101,9 +110,31 @@ func (e *Engine) ExecuteQuery(query string, params map[string]interface{}, clien 4. Calculate hash for every query 5. Send the updated queries if hash changed */ + paramsJSON, err := json.Marshal(params) + if err != nil { + return nil, err + } + cacheKey := query + "?" + string(paramsJSON) + e.hashMu.Lock() + lastHash := e.queryHashes[cacheKey] + e.hashMu.Unlock() slog.Debug("Executing query", "query", query, "params", params) - e.queries[query](&QueryCtx{DB: e.db, AuthCtx: &AuthCtx{UserID: "", IsLoggedIn: true}, Params: params}) - return nil, nil + result := e.queries[query](&QueryCtx{DB: e.db, AuthCtx: &AuthCtx{UserID: "", IsLoggedIn: true}, Params: params}) + responseJSON, err := json.Marshal(map[string]interface{}{"type": "query", "location": query, "data": result}) + if err != nil { + return nil, err + } + queryHash := xxhash.Sum64(responseJSON) + if lastHash == queryHash { + return result, nil + } + + e.hashMu.Lock() + e.queryHashes[cacheKey] = queryHash + e.hashMu.Unlock() + + e.tracker.SendMessage(clientID, responseJSON) + return result, nil } func (e *Engine) ExecuteMutation(mutation string, params map[string]interface{}, clientID string) (interface{}, error) { @@ -114,8 +145,12 @@ func (e *Engine) ExecuteMutation(mutation string, params map[string]interface{}, func (e *Engine) OnReceiveMessage(clientID string, msg map[string]interface{}) error { slog.Debug("Received message", "from", clientID, "message", msg) switch msg["type"] { - case "query": - e.tracker.SubscribeToQuery(clientID, msg["location"].(string), msg["params"].(map[string]string)) + case "subscribe": + paramsJSON, err := json.Marshal(msg["params"]) + if err != nil { + return err + } + e.tracker.SubscribeToQuery(clientID, msg["location"].(string), string(paramsJSON)) case "mutation": e.ExecuteMutation(msg["location"].(string), msg["params"].(map[string]interface{}), clientID) } diff --git a/example/client/index.ts b/example/client/index.ts index 6b3a77e..dcdecb3 100644 --- a/example/client/index.ts +++ b/example/client/index.ts @@ -4,8 +4,8 @@ import { createInterface } from "node:readline/promises"; const client = new TetherClient(); client.connect("ws://localhost:8080/tether"); -client.subscribe("messages", { room: "1" }, (message) => { - console.log("Received message", message); +client.subscribe("getMessages", { room: "1" }, (messages) => { + console.log("Received messages", messages); }); const rl = createInterface({ diff --git a/example/main.go b/example/main.go index 564c76e..28af5a7 100644 --- a/example/main.go +++ b/example/main.go @@ -35,15 +35,13 @@ func main() { engine.CreateTable("users", &User{}) engine.CreateTable("messages", &Messages{}) - engine.RegisterMutation("createUser", func(ctx *tether.MutationCtx) error { - return nil - }) + engine.RegisterQuery("getMessages", func(ctx *tether.QueryCtx) interface{} { + var messages []Messages + ctx.DB.Where("room_id = ?", ctx.Params["room"].(string)).Find(&messages) + return messages + }, []string{"messages"}) - engine.RegisterQuery("getUser", func(ctx *tether.QueryCtx) error { - return nil - }, []string{"users"}) - - engine.RegisterMutation("createMessage", func(ctx *tether.MutationCtx) error { + engine.RegisterMutation("createMessage", func(ctx *tether.MutationCtx) interface{} { ctx.DB.Create(&Messages{ID: uuid.NewString(), Message: ctx.Params["message"].(string), SenderID: ctx.AuthCtx.UserID, RoomID: ctx.Params["room"].(string)}) return nil }) diff --git a/go.mod b/go.mod index 186a04f..c426fd0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/wisplite/tether go 1.25.6 require ( + github.com/cespare/xxhash v1.1.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/glebarez/go-sqlite v1.21.2 // indirect github.com/glebarez/sqlite v1.11.0 // indirect diff --git a/go.sum b/go.sum index 68043fc..c8b4869 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,6 @@ +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= @@ -17,6 +20,7 @@ github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/reactivity/tracker.go b/reactivity/tracker.go index 658fcca..07955e9 100644 --- a/reactivity/tracker.go +++ b/reactivity/tracker.go @@ -1,7 +1,6 @@ package reactivity import ( - "encoding/json" "log/slog" "sync" ) @@ -33,19 +32,15 @@ func (t *Tracker) Untrack(c *Client) { delete(t.clients, c.ID) } -func (t *Tracker) SubscribeToQuery(clientID string, query string, params map[string]string) { +func (t *Tracker) SubscribeToQuery(clientID string, query string, params string) { t.mu.Lock() defer t.mu.Unlock() if t.subscriptions[query] == nil { t.subscriptions[query] = make([]map[string]string, 0) } // set t.subscriptions[query] to a map of client IDs and their params - paramsJSON, err := json.Marshal(params) - if err != nil { - slog.Error("Tracker: Failed to marshal params", "error", err) - return - } - t.subscriptions[query] = append(t.subscriptions[query], map[string]string{"clientID": clientID, "params": string(paramsJSON)}) + t.subscriptions[query] = append(t.subscriptions[query], map[string]string{"clientID": clientID, "params": params}) + slog.Debug("Tracker: Subscribed to query", "query", query, "clientID", clientID, "params", params) } func (t *Tracker) UnsubscribeFromQuery(clientID string, query string) {