mutation proof of concept

client can now send mutations and server properly executes them
TODO: implement query execution, add auth, add storage
This commit is contained in:
2026-04-21 23:07:55 -05:00
parent c834c11bc8
commit 8092488b29
8 changed files with 112 additions and 26 deletions
+2
View File
@@ -10,9 +10,11 @@ type AuthCtx struct {
type QueryCtx struct { type QueryCtx struct {
DB *gorm.DB DB *gorm.DB
AuthCtx *AuthCtx AuthCtx *AuthCtx
Params map[string]interface{}
} }
type MutationCtx struct { type MutationCtx struct {
DB *gorm.DB DB *gorm.DB
AuthCtx *AuthCtx AuthCtx *AuthCtx
Params map[string]interface{}
} }
+24 -10
View File
@@ -1,7 +1,6 @@
package tether package tether
import ( import (
"encoding/json"
"log/slog" "log/slog"
"net/http" "net/http"
@@ -53,17 +52,32 @@ func (e *Engine) OnDisconnect(clientID string) error {
return nil return nil
} }
func (e *Engine) ExecuteQuery(query string) (interface{}, error) {
/*
TODO: implement the logic to execute the query
Steps needed:
1. Check which tables updated
2. Get the queries that rely on the tables
3. Get the subscriptions that need updating
4. Calculate hash for every query
5. Send the updated queries if hash changed
*/
return nil, nil
}
func (e *Engine) ExecuteMutation(mutation string, params map[string]interface{}) (interface{}, error) {
result := e.mutations[mutation](&MutationCtx{DB: e.db, AuthCtx: &AuthCtx{UserID: "", IsLoggedIn: true}, Params: params})
return result, nil
}
func (e *Engine) OnReceiveMessage(clientID string, msg map[string]interface{}) error { func (e *Engine) OnReceiveMessage(clientID string, msg map[string]interface{}) error {
slog.Debug("Received message", "from", clientID, "message", msg) slog.Debug("Received message", "from", clientID, "message", msg)
message, err := json.Marshal(map[string]interface{}{ switch msg["type"] {
"type": "query", case "query":
"location": msg["query"], query := msg["location"].(string) + "?" + msg["params"].(string)
"data": "test", e.tracker.SubscribeToQuery(clientID, query)
}) case "mutation":
if err != nil { e.ExecuteMutation(msg["location"].(string), msg["params"].(map[string]interface{}))
slog.Error("Failed to marshal message", "error", err)
return err
} }
e.tracker.SendMessage(clientID, message)
return nil return nil
} }
+14 -1
View File
@@ -1,8 +1,21 @@
import { TetherClient } from "tether-ts"; import { TetherClient } from "tether-ts";
import { createInterface } from "node:readline/promises";
const client = new TetherClient(); const client = new TetherClient();
client.connect("ws://localhost:8080/tether"); client.connect("ws://localhost:8080/tether");
client.subscribe("messages", (message) => { client.subscribe("messages", { room: "1" }, (message) => {
console.log("Received message", message); console.log("Received message", message);
}); });
const rl = createInterface({
input: process.stdin,
output: process.stdout,
});
while (true) {
const message = await rl.question("Enter a message");
if (message) {
client.sendMutation("createMessage", { room: "1", message: message });
}
}
+36 -1
View File
@@ -10,12 +10,47 @@
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"tether-ts": "github:wisplite/tether-ts" "tether-ts": "github:wisplite/tether-ts"
},
"devDependencies": {
"@types/node": "^22.0.0",
"typescript": "^5.0.0"
}
},
"node_modules/@types/node": {
"version": "22.19.17",
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.17.tgz",
"integrity": "sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==",
"dev": true,
"license": "MIT",
"dependencies": {
"undici-types": "~6.21.0"
} }
}, },
"node_modules/tether-ts": { "node_modules/tether-ts": {
"version": "1.0.3", "version": "1.0.3",
"resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#5202a7f844be4ad7f3a463352fe366c4718a2749", "resolved": "git+ssh://git@github.com/wisplite/tether-ts.git#8945de00ece92ce1e939a3ff70c5312f80e56dda",
"license": "ISC" "license": "ISC"
},
"node_modules/typescript": {
"version": "5.9.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz",
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
"dev": true,
"license": "Apache-2.0",
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=14.17"
}
},
"node_modules/undici-types": {
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz",
"integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==",
"dev": true,
"license": "MIT"
} }
} }
} }
+4
View File
@@ -11,5 +11,9 @@
}, },
"dependencies": { "dependencies": {
"tether-ts": "github:wisplite/tether-ts" "tether-ts": "github:wisplite/tether-ts"
},
"devDependencies": {
"@types/node": "^22.0.0",
"typescript": "^5.0.0"
} }
} }
+12
View File
@@ -0,0 +1,12 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"strict": true,
"skipLibCheck": true,
"noEmit": true,
"types": ["node"]
},
"include": ["*.ts"]
}
+11 -5
View File
@@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"github.com/glebarez/sqlite" "github.com/glebarez/sqlite"
"github.com/google/uuid"
"github.com/wisplite/tether" "github.com/wisplite/tether"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -16,11 +17,11 @@ type User struct {
} }
type Messages struct { type Messages struct {
ID string `gorm:"primaryKey"` ID string `gorm:"primaryKey"`
Message string Message string
SenderID string SenderID string
ReceiverID string RoomID string
CreatedAt time.Time CreatedAt time.Time
} }
func main() { func main() {
@@ -42,6 +43,11 @@ func main() {
return nil return nil
}) })
engine.RegisterMutation("createMessage", func(ctx *tether.MutationCtx) error {
ctx.DB.Create(&Messages{ID: uuid.NewString(), Message: ctx.Params["message"].(string), SenderID: ctx.AuthCtx.UserID, RoomID: ctx.Params["room"].(string)})
return nil
})
http.HandleFunc("/tether", engine.Handle) http.HandleFunc("/tether", engine.Handle)
http.ListenAndServe(":8080", nil) http.ListenAndServe(":8080", nil)
} }
+8 -8
View File
@@ -32,25 +32,25 @@ func (t *Tracker) Untrack(c *Client) {
delete(t.clients, c.ID) delete(t.clients, c.ID)
} }
func (t *Tracker) SubscribeToQuery(clientID string, queryHash string) { func (t *Tracker) SubscribeToQuery(clientID string, query string) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.subscriptions[queryHash] == nil { if t.subscriptions[query] == nil {
t.subscriptions[queryHash] = make(map[string]bool) t.subscriptions[query] = make(map[string]bool)
} }
t.subscriptions[queryHash][clientID] = true t.subscriptions[query][clientID] = true
} }
func (t *Tracker) UnsubscribeFromQuery(clientID string, queryHash string) { func (t *Tracker) UnsubscribeFromQuery(clientID string, query string) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
delete(t.subscriptions[queryHash], clientID) delete(t.subscriptions[query], clientID)
} }
func (t *Tracker) GetQuerySubscriptions(queryHash string) []string { func (t *Tracker) GetQuerySubscriptions(query string) []string {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
subscriptions := t.subscriptions[queryHash] subscriptions := t.subscriptions[query]
subscriptionIDs := make([]string, 0, len(subscriptions)) subscriptionIDs := make([]string, 0, len(subscriptions))
for clientID := range subscriptions { for clientID := range subscriptions {
subscriptionIDs = append(subscriptionIDs, clientID) subscriptionIDs = append(subscriptionIDs, clientID)