diff --git a/context.go b/context.go index 04ef191..fb7960d 100644 --- a/context.go +++ b/context.go @@ -10,9 +10,11 @@ type AuthCtx struct { type QueryCtx struct { DB *gorm.DB AuthCtx *AuthCtx + Params map[string]interface{} } type MutationCtx struct { DB *gorm.DB AuthCtx *AuthCtx + Params map[string]interface{} } diff --git a/engine.go b/engine.go index 9455abd..b18c3a3 100644 --- a/engine.go +++ b/engine.go @@ -1,7 +1,6 @@ package tether import ( - "encoding/json" "log/slog" "net/http" @@ -53,17 +52,32 @@ func (e *Engine) OnDisconnect(clientID string) error { return nil } +func (e *Engine) ExecuteQuery(query string) (interface{}, error) { + /* + TODO: implement the logic to execute the query + Steps needed: + 1. Check which tables updated + 2. Get the queries that rely on the tables + 3. Get the subscriptions that need updating + 4. Calculate hash for every query + 5. Send the updated queries if hash changed + */ + return nil, nil +} + +func (e *Engine) ExecuteMutation(mutation string, params map[string]interface{}) (interface{}, error) { + result := e.mutations[mutation](&MutationCtx{DB: e.db, AuthCtx: &AuthCtx{UserID: "", IsLoggedIn: true}, Params: params}) + return result, nil +} + func (e *Engine) OnReceiveMessage(clientID string, msg map[string]interface{}) error { slog.Debug("Received message", "from", clientID, "message", msg) - message, err := json.Marshal(map[string]interface{}{ - "type": "query", - "location": msg["query"], - "data": "test", - }) - if err != nil { - slog.Error("Failed to marshal message", "error", err) - return err + switch msg["type"] { + case "query": + query := msg["location"].(string) + "?" + msg["params"].(string) + e.tracker.SubscribeToQuery(clientID, query) + case "mutation": + e.ExecuteMutation(msg["location"].(string), msg["params"].(map[string]interface{})) } - e.tracker.SendMessage(clientID, message) return nil } diff --git a/example/client/index.ts b/example/client/index.ts index 65a83d0..6b3a77e 100644 --- a/example/client/index.ts +++ b/example/client/index.ts @@ -1,8 +1,21 @@ import { TetherClient } from "tether-ts"; +import { createInterface } from "node:readline/promises"; const client = new TetherClient(); client.connect("ws://localhost:8080/tether"); -client.subscribe("messages", (message) => { +client.subscribe("messages", { room: "1" }, (message) => { console.log("Received message", message); -}); \ No newline at end of file +}); + +const rl = createInterface({ + input: process.stdin, + output: process.stdout, +}); + +while (true) { + const message = await rl.question("Enter a message"); + if (message) { + client.sendMutation("createMessage", { room: "1", message: message }); + } +} \ No newline at end of file diff --git a/example/client/package-lock.json b/example/client/package-lock.json index 1c741ff..a3a36bf 100644 --- a/example/client/package-lock.json +++ b/example/client/package-lock.json @@ -10,12 +10,47 @@ "license": "ISC", "dependencies": { "tether-ts": "github:wisplite/tether-ts" + }, + "devDependencies": { + "@types/node": "^22.0.0", + "typescript": "^5.0.0" + } + }, + "node_modules/@types/node": { + "version": "22.19.17", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.17.tgz", + "integrity": "sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==", + "dev": true, + "license": "MIT", + "dependencies": { + "undici-types": "~6.21.0" } }, "node_modules/tether-ts": { "version": "1.0.3", - "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#5202a7f844be4ad7f3a463352fe366c4718a2749", + "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#8945de00ece92ce1e939a3ff70c5312f80e56dda", "license": "ISC" + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, + "node_modules/undici-types": { + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", + "dev": true, + "license": "MIT" } } } diff --git a/example/client/package.json b/example/client/package.json index 4425bb9..db803a9 100644 --- a/example/client/package.json +++ b/example/client/package.json @@ -11,5 +11,9 @@ }, "dependencies": { "tether-ts": "github:wisplite/tether-ts" + }, + "devDependencies": { + "@types/node": "^22.0.0", + "typescript": "^5.0.0" } } diff --git a/example/client/tsconfig.json b/example/client/tsconfig.json new file mode 100644 index 0000000..4b762ea --- /dev/null +++ b/example/client/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "strict": true, + "skipLibCheck": true, + "noEmit": true, + "types": ["node"] + }, + "include": ["*.ts"] +} diff --git a/example/main.go b/example/main.go index 4824d02..d7b18f7 100644 --- a/example/main.go +++ b/example/main.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/glebarez/sqlite" + "github.com/google/uuid" "github.com/wisplite/tether" "gorm.io/gorm" ) @@ -16,11 +17,11 @@ type User struct { } type Messages struct { - ID string `gorm:"primaryKey"` - Message string - SenderID string - ReceiverID string - CreatedAt time.Time + ID string `gorm:"primaryKey"` + Message string + SenderID string + RoomID string + CreatedAt time.Time } func main() { @@ -42,6 +43,11 @@ func main() { return nil }) + engine.RegisterMutation("createMessage", func(ctx *tether.MutationCtx) error { + ctx.DB.Create(&Messages{ID: uuid.NewString(), Message: ctx.Params["message"].(string), SenderID: ctx.AuthCtx.UserID, RoomID: ctx.Params["room"].(string)}) + return nil + }) + http.HandleFunc("/tether", engine.Handle) http.ListenAndServe(":8080", nil) } diff --git a/reactivity/tracker.go b/reactivity/tracker.go index 60dc426..e5907d0 100644 --- a/reactivity/tracker.go +++ b/reactivity/tracker.go @@ -32,25 +32,25 @@ func (t *Tracker) Untrack(c *Client) { delete(t.clients, c.ID) } -func (t *Tracker) SubscribeToQuery(clientID string, queryHash string) { +func (t *Tracker) SubscribeToQuery(clientID string, query string) { t.mu.Lock() defer t.mu.Unlock() - if t.subscriptions[queryHash] == nil { - t.subscriptions[queryHash] = make(map[string]bool) + if t.subscriptions[query] == nil { + t.subscriptions[query] = make(map[string]bool) } - t.subscriptions[queryHash][clientID] = true + t.subscriptions[query][clientID] = true } -func (t *Tracker) UnsubscribeFromQuery(clientID string, queryHash string) { +func (t *Tracker) UnsubscribeFromQuery(clientID string, query string) { t.mu.Lock() defer t.mu.Unlock() - delete(t.subscriptions[queryHash], clientID) + delete(t.subscriptions[query], clientID) } -func (t *Tracker) GetQuerySubscriptions(queryHash string) []string { +func (t *Tracker) GetQuerySubscriptions(query string) []string { t.mu.RLock() defer t.mu.RUnlock() - subscriptions := t.subscriptions[queryHash] + subscriptions := t.subscriptions[query] subscriptionIDs := make([]string, 0, len(subscriptions)) for clientID := range subscriptions { subscriptionIDs = append(subscriptionIDs, clientID)