Skip to main content

Chapter 19: Building the Orchestrator UI flawed to be re written

Chapter 19: Building the Orchestrator UI

The Sovereign Orchestrator UI is a Single Page Application that operates across three simultaneous layers: the WASM WAM executing in the browser tab provides sub-millisecond logic feedback as the operator types; the Go server-sent events pipeline pushes authoritative KB state changes to every connected browser the moment a topology mutation completes; and the Go CGO worker pool re-validates every state-mutating request from scratch before any cluster change is permitted — because the browser is not a trust boundary, the edge KB can be stale, and neither condition should require a page reload to resolve. This chapter wires orchestrator_server.go to the Chapter 16 Pool, implements a live SSE channel that drives automatic WASM VFS resynchronisation, and closes with the double-verification security contract that makes the edge layer a performance optimisation and an availability convenience, never a security primitive.


19.1 The Double-Verification Contract and SPA Architecture

19.1.1 Why Vanilla JS

A React application has a build-time dependency graph of 800–1,200 npm packages. Any one of those packages can introduce a supply-chain compromise, a breaking API change, or an incompatible peer dependency on a timeline with no relation to the operator's schedule. The compiled bundle is opaque JavaScript that requires a build toolchain to modify, a CI pipeline to deploy, and a browser devtools session to debug in production. Three years from now, node_modules will fail to compile against a Node.js version that ships with the host OS in 2029. The framework becomes a maintenance liability on a cadence controlled by the npm ecosystem, not by the operator.

Vanilla JavaScript with direct DOM manipulation has no build step, no dependency graph, no package.json, and no version drift. The source file that serves the UI today serves it in 2036. Every API used in this chapter — fetch, addEventListener, querySelector, classList, EventSource, template literals, async/await — is in the HTML Living Standard and has been stable since 2020. The WASM runtime is the single external binary, versioned explicitly in build.sh and served as an immutable cached asset. An operator on an air-gapped network with no npm registry and no CDN can modify, deploy, and debug the entire UI with a text editor and a browser inspector.

19.1.2 The Verification Loop

%%{init: {"themeVariables": {"fontSize": "14px"}}}%%
flowchart TD
    INPUT["Operator Input\nIP address, port, protocol\nFree-text DOM fields\nNo server contact yet"]

    WASM["WASM WAM — Browser Tab\nfirewall_verdict/4 executes locally\nmust_be(ground, Src) guard fires\nparse_ipv4/2 validates semantically\nLatency: 15-30us\nResult: instant green/red UI feedback\nZero network round trips"]

    DECISION{"Deploy to Cluster\nclicked?"}

    FEEDBACK["Visual Feedback Only\nDOM class: allowed / denied\nNo state mutation\nNo server contact\nOperator continues editing"]

    FETCH["fetch() POST to Go API\n/api/v1/firewall/check\nJSON: source_ip, dest_port, protocol\nAuthorization: Bearer token\nRaw DOM values only\nWASM verdict NOT forwarded"]

    GOPOOL["Go CGO Worker Pool\nChapter 16: locked OS threads\nEach: independent WAM engine\nRe-parses JSON payload from scratch\nRe-runs firewall_verdict/4\nNo trust in any client assertion"]

    MUTATE["Cluster State Mutation\nretract_link / assert_link\nControlFlushTables broadcast\nSSE event pushed to all clients\nAudit log written"]

    REJECT["Reject 400 / 403\nStructured JSON error\nNo cluster mutation"]

    SSE["SSE Pipeline\nGo pushes kb_updated event\nAll connected browsers receive\nJS fetches updated .pl from server\nWrites to Emscripten MEMFS\nconsult() reloads edge KB\nNo page reload required"]

    INPUT --->|"on input event"| WASM
    WASM --->|"result"| DECISION
    DECISION --->|"still editing"| FEEDBACK
    FEEDBACK --->|"back to typing"| INPUT
    DECISION --->|"Deploy clicked"| FETCH
    FETCH --->|"HTTP POST"| GOPOOL
    GOPOOL --->|"valid"| MUTATE
    GOPOOL --->|"invalid"| REJECT
    MUTATE --->|"server event"| SSE
    SSE --->|"edge KB current"| WASM

    style INPUT fill:#1A2B4A,color:#FFFFFF
    style WASM fill:#7A1A1A,color:#FFFFFF
    style DECISION fill:#8B6914,color:#FFFFFF
    style FEEDBACK fill:#2A4A2A,color:#FFFFFF
    style FETCH fill:#1A4070,color:#FFFFFF
    style GOPOOL fill:#1A4070,color:#FFFFFF
    style MUTATE fill:#1A6B3A,color:#FFFFFF
    style REJECT fill:#7A1A1A,color:#FFFFFF
    style SSE fill:#5A1A6A,color:#FFFFFF

The SSE node closes the loop. The edge WAM's value is its latency — 15–30μs versus a 5–50ms round-trip to the Go API. That value disappears if the edge KB is stale. The SSE pipeline restores it: every topology mutation on the server triggers a kb_updated event that drives an automatic MEMFS write and consult() in every connected browser, without a page reload and without the operator taking any action.


19.2 The Go Event Horizon: SSE and Dynamic Topology

19.2.1 Architecture

The Go server maintains two structures that did not exist in the prototype: an SSE broker that fans out kb_updated events to all connected browser clients, and a dynamically updatable node vocabulary that allows the operator to approve new hypervisor nodes without restarting the server. The Chapter 16 worker pool, the LRU denial cache from the previous iteration, and the secureHeaders/requireJSON/requestSizeLimit middleware chain are unchanged.

The topology node vocabulary problem is architectural. A frozen map built once at startup means adding pve15 to the cluster requires a server restart, which drops all SSE connections, which causes every connected browser to fall back to server-only validation mode for the reconnection window. The correct design is an RWMutex-protected map with a dedicated HTTP handler for node approval — the operator POSTs a new node name to /api/v1/topology/approve, the server validates it against the live WAM clause database, and if known_node/1 succeeds, the name is added to the live vocabulary. No restart. No SSE disruption.

19.2.2 SSE Broker

// File: /opt/logic-node/go/orchestrator/sse.go
package main

import (
	"fmt"
	"log"
	"net/http"
	"sync"
)

// SSEBroker manages a set of connected browser clients and fans out
// server-sent events to all of them. Each client is represented by
// a channel of strings; the broker goroutine receives events on the
// broadcast channel and copies them to every registered client channel.
//
// Design constraints:
//   - A slow or disconnected client must not block event delivery to
//     other clients. Each client channel is buffered (32 events); if
//     a client's buffer is full when a broadcast arrives, that client
//     receives a disconnect signal rather than blocking the broker.
//   - The broker runs in its own goroutine for its full lifetime and
//     is the only writer to client channels, eliminating the need for
//     per-client locks.
type SSEBroker struct {
	broadcast  chan string
	register   chan chan string
	unregister chan chan string

	mu      sync.RWMutex
	clients map[chan string]struct{}
}

func NewSSEBroker() *SSEBroker {
	b := &SSEBroker{
		broadcast:  make(chan string, 64),
		register:   make(chan chan string),
		unregister: make(chan chan string),
		clients:    make(map[chan string]struct{}),
	}
	go b.run()
	return b
}

func (b *SSEBroker) run() {
	for {
		select {
		case ch := <-b.register:
			b.mu.Lock()
			b.clients[ch] = struct{}{}
			b.mu.Unlock()
			log.Printf("[SSE] client connected — total: %d", len(b.clients))

		case ch := <-b.unregister:
			b.mu.Lock()
			delete(b.clients, ch)
			b.mu.Unlock()
			close(ch)
			log.Printf("[SSE] client disconnected — total: %d", len(b.clients))

		case msg := <-b.broadcast:
			b.mu.RLock()
			for ch := range b.clients {
				select {
				case ch <- msg:
				default:
					// Client buffer full — drop this client.
					// The browser will reconnect automatically via
					// EventSource's built-in reconnection logic and
					// will receive the next event after reconnection.
					log.Printf("[SSE] client buffer full — dropping")
					b.mu.RUnlock()
					b.unregister <- ch
					b.mu.RLock()
				}
			}
			b.mu.RUnlock()
		}
	}
}

// Publish sends an SSE event string to all connected clients.
// The event string must be a complete SSE-formatted payload including
// the trailing double newline.
func (b *SSEBroker) Publish(event string) {
	b.broadcast <- event
}

// ServeHTTP handles an SSE connection for one client.
// It registers the client, streams events until the client disconnects,
// and unregisters on return.
func (b *SSEBroker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "streaming not supported", http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("X-Accel-Buffering", "no") // disable nginx buffering if proxied

	ch := make(chan string, 32)
	b.register <- ch
	defer func() { b.unregister <- ch }()

	// Send an initial connection confirmation event so the browser
	// can distinguish "connected and waiting" from "not connected".
	fmt.Fprintf(w, "event: connected\ndata: {\"status\":\"ok\"}\n\n")
	flusher.Flush()

	ctx := r.Context()
	for {
		select {
		case <-ctx.Done():
			return
		case msg, ok := <-ch:
			if !ok {
				return
			}
			fmt.Fprint(w, msg)
			flusher.Flush()
		}
	}
}

19.2.3 TCP Keep-Alive and Ghost Connection Hygiene

The IdleTimeout: 120 * time.Second in NewServer handles idle HTTP/1.1 connections at the Go layer. It does not cover the case of a browser tab that has been frozen by a mobile OS battery manager or a desktop OS sleep cycle: the underlying TCP connection remains half-open on the server's routing fabric with no data flowing, the Go runtime sees no EOF and fires no r.Context().Done(), and the client channel registered in the broker's clients map accumulates indefinitely. On a management station that is suspended overnight with 12 browser tabs open, the server accrues 12 ghost client channels consuming goroutine stack, channel memory, and broker iteration overhead for every subsequent Publish call.

The correct defence is OS-level TCP keep-alive tuning on logic-node-01. With the default Linux tcp_keepalive_time of 7,200 seconds, the kernel waits two hours before sending the first keep-alive probe to a silent TCP connection. At a sovereign infrastructure scale of 20–50 operator sessions, this is a bounded but real nuisance; at 200+ sessions it becomes a memory pressure issue.

# /etc/sysctl.d/99-observability.conf (already established in Chapter 20 —
# append these three values to the existing file):

# SSE ghost connection hygiene.
# Send first TCP keep-alive probe after 60 seconds of silence.
net.ipv4.tcp_keepalive_time = 60
# Retry keep-alive probes every 10 seconds.
net.ipv4.tcp_keepalive_intvl = 10
# Drop connection after 5 unanswered probes (total: 110 seconds to detect a dead tab).
net.ipv4.tcp_keepalive_probes = 5

Apply with sysctl --system. The SSE ServeHTTP handler does not need changes — the TCP keep-alive mechanism operates below the HTTP layer. When the kernel's keep-alive probes receive no answer, the TCP stack closes the connection, the Go HTTP server fires r.Context().Done() in ServeHTTP, and the b.unregister <- ch defer runs, removing the ghost channel from the broker's clients map and closing it. The 110-second detection window (60s idle + 5 × 10s probes) is appropriate for sovereign infrastructure: fast enough to prevent ghost accumulation across a normal overnight cycle, slow enough to survive a mobile network handoff where the TCP connection is momentarily dark.

19.2.4 Dynamic Node Vocabulary

// File: /opt/logic-node/go/orchestrator/topology.go
package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

// nodeVocabulary is the live set of approved topology node names.
// It is initialised at startup by querying the WAM clause database
// and extended at runtime via handleApproveNode without a server restart.
//
// Read path (knownTopologyNode): called on every topology mutation request.
// Write path (approveNode): called only by handleApproveNode under the
// operator's explicit action. No automatic promotion of runtime assertions.
type nodeVocabulary struct {
	mu    sync.RWMutex
	nodes map[string]struct{}
}

var vocab = &nodeVocabulary{
	nodes: make(map[string]struct{}),
}

// buildKnownNodes populates vocab from the live WAM clause database.
// Called once after pool init at server startup.
func buildKnownNodes(pool *Pool) error {
	result, err := pool.Dispatch(WorkItem{
		Goal: "findall(N,proxmox_topology:known_node(N),Nodes)",
	}, 2*time.Second)
	if err != nil {
		return fmt.Errorf("dispatch: %w", err)
	}
	if result.Err != nil {
		return fmt.Errorf("WAM: %w", result.Err)
	}

	vocab.mu.Lock()
	defer vocab.mu.Unlock()
	for _, node := range result.NodeList {
		vocab.nodes[node] = struct{}{}
	}
	log.Printf("[Topology] startup vocabulary: %d nodes", len(vocab.nodes))
	return nil
}

// approveNode adds a node name to the live vocabulary after verifying
// it exists in the WAM clause database. Returns an error if the WAM
// does not recognise the node — preventing arbitrary string injection
// into the vocabulary map.
//
// This is the only write path other than buildKnownNodes. The map is
// never written by topology mutation handlers; mutation handlers only
// read the map to validate their input.
func approveNode(pool *Pool, name string) error {
	// Validate against live WAM — not just the startup snapshot.
	goal := fmt.Sprintf("proxmox_topology:known_node(%s)", name)
	result, err := pool.Dispatch(WorkItem{Goal: goal}, 1*time.Second)
	if err != nil {
		return fmt.Errorf("dispatch: %w", err)
	}
	if result.Err != nil {
		return fmt.Errorf("node %q not in WAM clause database: %w", name, result.Err)
	}

	vocab.mu.Lock()
	vocab.nodes[name] = struct{}{}
	vocab.mu.Unlock()

	log.Printf("[Topology] approved new node: %q (vocabulary size: %d)",
		name, func() int {
			vocab.mu.RLock()
			defer vocab.mu.RUnlock()
			return len(vocab.nodes)
		}())
	return nil
}

// knownTopologyNode reports whether name is in the approved vocabulary.
func knownTopologyNode(name string) bool {
	vocab.mu.RLock()
	defer vocab.mu.RUnlock()
	_, ok := vocab.nodes[name]
	return ok
}

19.2.5 orchestrator_server.go

// File: /opt/logic-node/go/orchestrator/orchestrator_server.go
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"runtime"
	"strings"
	"syscall"
	"time"
)

// ─── Request / Response types ────────────────────────────────────────────────

type FirewallCheckReq struct {
	SourceIP string `json:"source_ip"`
	DestPort int    `json:"dest_port"`
	Protocol string `json:"protocol"`
}

type TopologyMutateReq struct {
	Action string `json:"action"` // "add_link" | "remove_link"
	Node1  string `json:"node1"`
	Node2  string `json:"node2"`
	Cost   int    `json:"cost"`
}

type ApproveNodeReq struct {
	Node string `json:"node"`
}

type APIResponse struct {
	OK      bool        `json:"ok"`
	Data    interface{} `json:"data,omitempty"`
	Error   string      `json:"error,omitempty"`
	Latency int64       `json:"latency_us,omitempty"`
}

func writeJSON(w http.ResponseWriter, status int, body APIResponse) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(status)
	if err := json.NewEncoder(w).Encode(body); err != nil {
		log.Printf("[API] encode error: %v", err)
	}
}

// ─── Server ───────────────────────────────────────────────────────────────────

type Server struct {
	pool      *Pool
	broker    *SSEBroker
	http      *http.Server
	staticDir string
	kbPath    string
}

func NewServer(pool *Pool, broker *SSEBroker, staticDir, kbPath, addr string) *Server {
	s := &Server{
		pool:      pool,
		broker:    broker,
		staticDir: staticDir,
		kbPath:    kbPath,
	}

	mux := http.NewServeMux()

	apiGuards := func(h http.HandlerFunc) http.Handler {
		return chain(h, requireJSON, requestSizeLimit(64*1024))
	}

	mux.Handle("/api/v1/firewall/check",    apiGuards(s.handleFirewallCheck))
	mux.Handle("/api/v1/topology/mutate",   apiGuards(s.handleTopologyMutate))
	mux.Handle("/api/v1/topology/approve",  apiGuards(s.handleApproveNode))
	mux.HandleFunc("/api/v1/pool/status",   s.handlePoolStatus)
	mux.Handle("/api/v1/events",            broker)
	mux.HandleFunc("/api/v1/kb/current",    s.handleKBCurrent)
	mux.Handle("/", staticHandler(staticDir))

	s.http = &http.Server{
		Addr:              addr,
		Handler:           secureHeaders(mux),
		ReadHeaderTimeout: 5 * time.Second,
		ReadTimeout:       10 * time.Second,
		WriteTimeout:      0, // SSE connections are long-lived; no write timeout
		IdleTimeout:       120 * time.Second,
	}
	return s
}

// WriteTimeout is set to 0 on the http.Server to accommodate SSE connections,
// which are indefinitely long-lived. The SSEBroker's per-client channel buffer
// and the client disconnect detection via r.Context().Done() provide the
// equivalent resource bound: a gone client is detected and cleaned up within
// one event cycle, not held open by a write deadline.

func (s *Server) ListenAndServe() error {
	log.Printf("[Server] listening on %s", s.http.Addr)
	if err := s.http.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
		return err
	}
	return nil
}

func (s *Server) Shutdown(ctx context.Context) {
	log.Printf("[Server] draining HTTP…")
	if err := s.http.Shutdown(ctx); err != nil {
		log.Printf("[Server] HTTP shutdown: %v", err)
	}
	log.Printf("[Server] draining pool…")
	if err := s.pool.Stop(10 * time.Second); err != nil {
		log.Printf("[Server] pool stop: %v", err)
	}
}

// ─── Handlers ────────────────────────────────────────────────────────────────

func (s *Server) handleFirewallCheck(w http.ResponseWriter, r *http.Request) {
	var req FirewallCheckReq
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("malformed JSON: %v", err),
		})
		return
	}

	cacheKey := fmt.Sprintf("%s:%d:%s", req.SourceIP, req.DestPort, req.Protocol)
	if cached, ok := firewallCache.Get(cacheKey); ok {
		writeJSON(w, http.StatusOK, APIResponse{
			OK: true,
			Data: map[string]interface{}{
				"allowed": false,
				"reason":  cached.Reason,
				"rule_id": cached.RuleID,
				"source":  "lru_cache",
			},
		})
		return
	}

	goal := fmt.Sprintf(
		`firewall_verdict(request{source_ip:"%s",dest_port:%d,protocol:"%s"},Verdict,Reason,RuleID)`,
		escapeProlog(req.SourceIP),
		req.DestPort,
		escapeProlog(req.Protocol),
	)

	result, err := s.pool.Dispatch(WorkItem{Goal: goal}, 500*time.Millisecond)
	if err != nil {
		writeJSON(w, http.StatusServiceUnavailable, APIResponse{Error: "worker pool unavailable"})
		return
	}
	if result.Err != nil {
		writeJSON(w, http.StatusBadRequest, APIResponse{Error: result.Err.Error()})
		return
	}

	verdict := result.Bindings["Verdict"] == "allowed"
	if !verdict {
		firewallCache.Add(cacheKey, CachedVerdict{
			Reason: result.Bindings["Reason"],
			RuleID: result.Bindings["RuleID"],
		})
	}

	writeJSON(w, http.StatusOK, APIResponse{
		OK: true,
		Data: map[string]interface{}{
			"allowed": verdict,
			"reason":  result.Bindings["Reason"],
			"rule_id": result.Bindings["RuleID"],
		},
		Latency: result.Duration.Microseconds(),
	})
}

// handleTopologyMutate: POST /api/v1/topology/mutate
//
// On successful mutation: purges the denial cache, broadcasts ControlFlushTables
// to all worker engines, and publishes a kb_updated SSE event to all connected
// browsers. The SSE event carries the Unix timestamp of the mutation so that
// browsers can detect if they missed events during a reconnection window.
func (s *Server) handleTopologyMutate(w http.ResponseWriter, r *http.Request) {
	var req TopologyMutateReq
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("malformed JSON: %v", err),
		})
		return
	}

	if !knownTopologyNode(req.Node1) || !knownTopologyNode(req.Node2) {
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("unknown topology node: %q or %q", req.Node1, req.Node2),
		})
		return
	}
	if req.Cost < 0 || req.Cost > 1_000_000 {
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("cost out of range: %d", req.Cost),
		})
		return
	}

	var goal string
	switch req.Action {
	case "add_link":
		goal = fmt.Sprintf("proxmox_topology:assert_link(%s,%s,%d)", req.Node1, req.Node2, req.Cost)
	case "remove_link":
		goal = fmt.Sprintf("proxmox_topology:retract_link(%s,%s,%d)", req.Node1, req.Node2, req.Cost)
	default:
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("unknown action: %q", req.Action),
		})
		return
	}

	result, err := s.pool.Dispatch(WorkItem{Goal: goal}, 2*time.Second)
	if err != nil || result.Err != nil {
		msg := ""
		if err != nil {
			msg = err.Error()
		} else {
			msg = result.Err.Error()
		}
		writeJSON(w, http.StatusInternalServerError, APIResponse{Error: msg})
		return
	}

	// Mutation succeeded. Invalidate caches and notify clients.
	firewallCache.Purge()
	s.pool.Broadcast(ControlMsg{Kind: ControlFlushTables})

	// Publish SSE event. The data payload includes the mutation timestamp
	// so that browsers can compare it against their local KB version and
	// detect missed events during a reconnection window.
	ts := time.Now().UnixMilli()
	sseEvent := fmt.Sprintf(
		"event: kb_updated\ndata: {\"ts\":%d,\"action\":%q,\"node1\":%q,\"node2\":%q}\n\n",
		ts, req.Action, req.Node1, req.Node2,
	)
	s.broker.Publish(sseEvent)

	writeJSON(w, http.StatusOK, APIResponse{
		OK: true,
		Data: map[string]interface{}{
			"status": "topology_updated",
			"ts":     ts,
		},
	})
}

// handleApproveNode: POST /api/v1/topology/approve
//
// Adds a node name to the live vocabulary after WAM validation.
// Does not add the node to the Prolog KB — that must be done via
// the standard Prolog KB management tools and a subsequent assert_link
// mutation. This handler only extends the Go-level vocabulary so that
// the new node can be used as a mutation target without a server restart.
func (s *Server) handleApproveNode(w http.ResponseWriter, r *http.Request) {
	var req ApproveNodeReq
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("malformed JSON: %v", err),
		})
		return
	}

	name := strings.TrimSpace(req.Node)
	if name == "" {
		writeJSON(w, http.StatusBadRequest, APIResponse{Error: "node name is empty"})
		return
	}
	// Reject names containing characters that are not valid Prolog atoms.
	// Prolog atoms used as topology node identifiers are lowercase alphanumeric
	// with underscores. Any other character is structurally suspicious.
	for _, c := range name {
		if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_') {
			writeJSON(w, http.StatusBadRequest, APIResponse{
				Error: fmt.Sprintf("invalid node name %q: only [a-z0-9_] permitted", name),
			})
			return
		}
	}

	if err := approveNode(s.pool, name); err != nil {
		writeJSON(w, http.StatusBadRequest, APIResponse{
			Error: fmt.Sprintf("node approval failed: %v", err),
		})
		return
	}

	writeJSON(w, http.StatusOK, APIResponse{
		OK:   true,
		Data: map[string]string{"approved": name},
	})
}

// handleKBCurrent: GET /api/v1/kb/current
//
// Returns the current contents of the firewall policy KB file as plain text.
// Called by the browser's SSE handler after receiving a kb_updated event to
// fetch the updated policy and write it into the Emscripten MEMFS.
// No authentication is required beyond the network perimeter — the KB file
// is not secret; it is the same file served as a static asset. This handler
// ensures the browser always fetches the live on-disk version rather than
// a potentially stale cached static asset.
func (s *Server) handleKBCurrent(w http.ResponseWriter, r *http.Request) {
	data, err := os.ReadFile(s.kbPath)
	if err != nil {
		http.Error(w, "KB file unavailable", http.StatusInternalServerError)
		return
	}
	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
	w.Header().Set("Cache-Control", "no-store")
	w.Write(data)
}

func (s *Server) handlePoolStatus(w http.ResponseWriter, r *http.Request) {
	writeJSON(w, http.StatusOK, APIResponse{OK: true, Data: s.pool.Status()})
}

// ─── Input sanitisation ───────────────────────────────────────────────────────

func escapeProlog(s string) string {
	out := make([]byte, 0, len(s)+4)
	for i := 0; i < len(s); i++ {
		c := s[i]
		if c == '\\' || c == '"' {
			out = append(out, '\\')
		}
		out = append(out, c)
	}
	return string(out)
}

// ─── Main ─────────────────────────────────────────────────────────────────────

func main() {
	staticDir := envOr("STATIC_DIR",  "/opt/logic-node/wasm-ui/dist")
	kbPath    := envOr("KB_PATH",     "/opt/logic-node/kb/firewall_policy.pl")
	listenOn  := envOr("LISTEN_ADDR", ":8080")

	poolSize := runtime.GOMAXPROCS(0) - 2
	if poolSize < 1 {
		poolSize = 1
	}

	log.Printf("[Main] pool: %d workers  KB: %s", poolSize, kbPath)
	pool, err := NewPool(poolSize, kbPath)
	if err != nil {
		log.Fatalf("[Main] pool init: %v", err)
	}

	firewallCache = newFirewallCache()
	log.Printf("[Main] denial cache ready: size=%d TTL=%s", cacheSize, cacheTTL)

	if err := buildKnownNodes(pool); err != nil {
		log.Printf("[Main] WARNING: known_node vocabulary: %v", err)
	}

	broker := NewSSEBroker()
	srv := NewServer(pool, broker, staticDir, kbPath, listenOn)

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)

	go func() {
		if err := srv.ListenAndServe(); err != nil {
			log.Printf("[Main] server: %v", err)
		}
	}()

	log.Printf("[Main] ready — http://%s/", listenOn)
	<-quit

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	srv.Shutdown(ctx)
}

func envOr(key, def string) string {
	if v := os.Getenv(key); v != "" {
		return v
	}
	return def
}

19.3 The Reality of CGO Timeouts

19.3.1 What Go's Context Actually Stops

Go contexts stop the waiting goroutine, not the C thread. When the 500ms deadline expires, the goroutine dispatching to the pool receives a context.DeadlineExceeded error, writes a 503 response, and returns — freeing the HTTP goroutine and the connection resources associated with it.

What the Go context does not stop is the underlying CGO C thread. A (*C.swipl_t) WAM engine executing a Prolog goal runs on a locked OS thread — a native POSIX thread, not a Go goroutine. Go's scheduler has no visibility into it. context.Cancel() signals the Go side of the CGO boundary; it does not deliver a POSIX signal to the C thread, does not call PL_action(PL_ACTION_ABORT, 0), and does not unwind the C call stack. The locked OS thread continues executing whatever Prolog goal it was given until the WAM either succeeds, fails, throws an error, or the process exits.

The practical consequence: if a malformed goal causes the WAM to enter an infinite loop — a pathological query against a misconfigured knowledge base that has no base case — the 500ms Go context timeout will free the HTTP layer while the C thread continues spinning at 100% on one core indefinitely. With 14 concurrent WAM workers on a 16-core Proxmox node, fourteen such spinning C threads would saturate the CPU without any Go-layer visibility into what is consuming it.

19.3.2 Why Tabling Is the Correct Defence

The true protection against WAM engine lockup is not the Go context timeout. It is the :- table directive applied to every graph traversal predicate in Chapter 17. SLG resolution with tabling guarantees termination on any finite graph by maintaining a complete table of subgoal calls and their answers: if a subgoal has been called before and its answer set is complete, the engine returns the tabled answer rather than re-entering the goal. The call graph cannot cycle — the engine cannot re-enter shortest_path(pve1, pve1, _) infinitely because the first call registers the subgoal and any recursive call to the same subgoal suspends, waiting on the answer set that is being computed. When the computation completes, the suspended calls resume with the complete answer. Termination is a mathematical consequence of the finite graph and the complete answer set property of SLG, not a timeout policy.

The Go context timeout is a latency bound and a resource protection mechanism for the HTTP layer. It guarantees that a slow WAM query does not hold a Go goroutine and a browser connection open for unbounded time. It does not guarantee that the underlying C thread terminates. The WAM worker that timed out on the Go side will eventually complete its query and return its worker slot to the pool — it is not lost — but it is unavailable for the duration. Under tabling, that duration is bounded by the size of the graph; without tabling, it is unbounded.

The deployment architecture reinforces this at the OS level: the VictoriaMetrics scrape in Chapter 20 monitors the Proxmox hypervisor's CPU steal, and the Chapter 21 logic engine's healthy_node/1 guard will mark the logic-node VM as degraded if its CPU steal exceeds the configured threshold for more than two consecutive 15-second windows. A spinning C thread that escapes the tabling guarantee due to a KB misconfiguration is visible as a CPU anomaly before it becomes a service outage.

19.3.3 Incremental Tabling and the Mutation Cost Floor

Chapter 17 applied :- table shortest_path/3 with abolish_all_tables/0 as the post-mutation flush. That is the correct starting point for a read-heavy, write-rare topology: full table abolition is a single predicate call, its overhead is proportional to the number of tabled answers, and it happens once per mutation event. In Chapter 19's architecture, however, mutations arrive over an SSE pipeline that can carry rapid successive topology changes — an operator automating a failover sequence may issue remove_link and add_link calls for six nodes within a two-second window. Each mutation triggers abolish_all_tables/0 across all 14 worker WAM engines via ControlFlushTables, producing 14 × 6 = 84 full table reconstructions in two seconds. On a 14-node graph with 40–60 edges, each reconstruction executes shortest_path/3 for O(N²) node pairs; the aggregate CPU spike is brief but measurable on a Proxmox host shared with live VM workloads.

SWI-Prolog's incremental tabling mechanism eliminates this spike. Declared with the incremental property:

:- table shortest_path/3 as incremental.

the WAM maintains a dependency graph between tabled subgoal answers and the base facts they depend on. When link(pve1, pve5, 10) is asserted or retracted, the WAM marks only the tabled answers that transitively depended on that specific link/3 fact as invalid. On the next call to any of those answers, the WAM re-computes them. Answers that did not depend on the changed fact are served from the table unchanged — zero recomputation for the unaffected subgraph.

The consequence for the SSE pipeline is architectural: ControlFlushTables is no longer needed for topology mutations. The mutation goal assert_link(pve1, pve5, 10) runs, the WAM's incremental dependency graph marks the affected shortest_path answers as stale, and the next query against those paths triggers selective recomputation. The full table is never abolished; the worker pool remains available for queries during the update window rather than rebuilding from scratch.

Incremental tabling has two deployment constraints. First, every predicate in the dependency chain must also be tabled as incremental — if connected/2 calls link/3 and shortest_path/3 calls connected/2, both must carry the incremental property or the WAM will not propagate the invalidation correctly. Second, incremental tabling is incompatible with cyclic tabling (:- table ... as variant) — the dependency graph assumes an acyclic answer structure. The Proxmox topology graph is physically acyclic between any two endpoints (it is a data centre fabric, not a ring), so this constraint is satisfied. The Chapter 17 KB changes required to enable incremental tabling on the existing shortest_path/3 predicate are a Chapter 23 build item; the Go SSE and worker pool architecture in this chapter is the prerequisite.


19.4 The JavaScript Coordination Layer

19.4.1 Coordination Contract

WASM WAM (browser):  fires on every 'input' event — read-only, no server
                     contact, paints the UI green or red in under 5ms.
                     Edge KB is kept current by the SSE sync pipeline.

Go API (server):     fires on 'submit' — receives raw DOM values, re-validates
                     everything from scratch, returns the authoritative verdict.
                     The JS never forwards the WASM result to the server.

SSE pipeline:        EventSource on /api/v1/events. On kb_updated event:
                     fetch /api/v1/kb/current -> FS.writeFile -> consult().
                     No page reload. No setTimeout. No stale edge logic.

Atom Table contract: all user-controlled values cross the WASM boundary as
                     double-quoted Prolog strings. Never atoms. Never bare
                     unquoted values. double_quotes=string verified on init.

19.4.2 dashboard.js

// File: /opt/logic-node/wasm-ui/src/dashboard.js
'use strict';

const API       = '/api/v1';
const PROTOCOLS = new Set(['tcp', 'udp', 'icmp']);
const MAX_IP    = 45;

// ─── State ───────────────────────────────────────────────────────────────────

const wasm = {
    module:      null,
    ready:       false,
    failed:      false,
    syncing:     false,    // true while a KB reload is in progress
    lastVerdict: null,
    kbVersion:   0,        // Unix ms timestamp of last confirmed KB sync
};

// ─── WASM Init ───────────────────────────────────────────────────────────────

async function initWasm() {
    setStatus('Loading WAM engine…', 'neutral');
    try {
        wasm.module = await SWIPL({
            arguments: [
                'swipl', '--quiet',
                '--stack-limit=8M',
                '--table-space=4M',
                '-g', 'true', '-t', 'halt',
            ],
            locateFile: (f) => `./${f}`,
        });

        // Verify double_quotes=string before any user query.
        // If this flag is 'atom', every IP typed by the operator is interned
        // permanently in the Atom Table. 100,000 keystrokes produce ~9MB of
        // permanent Atom Table growth and eventual WASM OOM.
        const flagResult = wasm.module.Prolog.query(
            'current_prolog_flag(double_quotes,F)'
        ).once();
        if (!flagResult || flagResult.F !== 'string') {
            throw new Error(
                `double_quotes='${flagResult?.F}', must be 'string' — Atom Table at risk`
            );
        }

        const ok = wasm.module.Prolog.call("consult('/prolog/firewall_policy.pl')");
        if (!ok) throw new Error('initial consult failed — check VFS packaging');

        wasm.ready = true;
        wasm.kbVersion = Date.now();
        setStatus('WAM engine ready — edge validation active.', 'ok');

    } catch (err) {
        wasm.failed = true;
        setStatus(`WAM unavailable: ${err.message}. Server validation only.`, 'warn');
        console.error('[WASM] init failed:', err);
    }
}

// ─── SSE Pipeline ────────────────────────────────────────────────────────────

// initSSE establishes the EventSource connection to the Go SSE endpoint.
// EventSource reconnects automatically on network interruption — the browser
// will retry with exponential backoff and re-register with the broker on
// reconnection. The server sends a 'connected' event on registration that
// triggers a KB version check: if the server's current KB is newer than the
// browser's last sync, a full reload is performed to close any gap that
// opened during the disconnection window.
function initSSE() {
    const es = new EventSource(`${API}/events`);

    es.addEventListener('connected', async () => {
        console.log('[SSE] connected');
        // Check whether we missed a kb_updated event while disconnected.
        // The /api/v1/pool/status response carries the last mutation timestamp;
        // if it is newer than wasm.kbVersion, reload unconditionally.
        try {
            const resp = await fetch(`${API}/pool/status`);
            const body = await resp.json();
            const serverTs = body.data?.last_mutation_ts ?? 0;
            if (serverTs > wasm.kbVersion) {
                console.log('[SSE] missed update detected — resyncing KB');
                await syncWasmKB(serverTs);
            }
        } catch (err) {
            console.warn('[SSE] reconnect version check failed:', err.message);
        }
    });

    es.addEventListener('kb_updated', async (e) => {
        let payload;
        try {
            payload = JSON.parse(e.data);
        } catch {
            console.warn('[SSE] malformed kb_updated payload:', e.data);
            return;
        }
        console.log('[SSE] kb_updated ts=%d action=%s', payload.ts, payload.action);
        await syncWasmKB(payload.ts);
    });

    es.onerror = (err) => {
        console.warn('[SSE] connection error — EventSource will reconnect:', err);
        setStatus('SSE reconnecting — edge KB may be stale.', 'warn');
    };
}

// syncWasmKB fetches the current KB from the server and reloads it into the
// WASM instance's MEMFS. This is the correct VFS sync mechanism: it does not
// rely on setTimeout, does not assume the MEMFS already contains a valid file,
// and does not use a stale cached static asset.
//
// The sequence is:
//   1. Fetch /api/v1/kb/current (Cache-Control: no-store guaranteed server-side).
//   2. Decode the response as text.
//   3. Encode as UTF-8 bytes.
//   4. Write into the WASM MEMFS via Module.FS.writeFile, overwriting any
//      previously consulted version.
//   5. Call consult('/prolog/firewall_policy.pl') to reload the KB into the
//      running WAM instance.
//   6. Update wasm.kbVersion to the server-provided timestamp so that the
//      next reconnect check can detect any further missed events.
//
// The wasm.syncing flag prevents concurrent syncs from a burst of SSE events
// (e.g., multiple rapid topology mutations) from issuing concurrent fetch+consult
// cycles that would interleave and leave the WAM in an inconsistent state.
async function syncWasmKB(serverTs) {
    if (!wasm.module || wasm.failed) return;
    if (wasm.syncing) {
        console.log('[WASM] sync already in progress — skipping duplicate');
        return;
    }
    if (serverTs <= wasm.kbVersion) {
        console.log('[WASM] KB already current (local=%d server=%d)', wasm.kbVersion, serverTs);
        return;
    }

    wasm.syncing = true;
    wasm.ready   = false;
    setStatus('Resyncing edge KB…', 'neutral');

    try {
        const resp = await fetch(`${API}/kb/current`, { cache: 'no-store' });
        if (!resp.ok) throw new Error(`KB fetch failed: ${resp.status}`);

        const text    = await resp.text();
        const encoder = new TextEncoder();
        const bytes   = encoder.encode(text);

        // Write into MEMFS. FS.writeFile overwrites the existing file at the
        // given path; the MEMFS path must match the path used in consult() and
        // in the initial VFS packaging performed by build.sh (Chapter 18).
        wasm.module.FS.writeFile('/prolog/firewall_policy.pl', bytes);

        const ok = wasm.module.Prolog.call("consult('/prolog/firewall_policy.pl')");
        if (!ok) throw new Error('consult failed after MEMFS write');

        wasm.kbVersion = serverTs;
        wasm.ready     = true;
        wasm.lastVerdict = null; // invalidate — previous verdict may be stale
        setStatus('Edge KB synchronised.', 'ok');
        console.log('[WASM] KB reloaded — version ts=%d', serverTs);

    } catch (err) {
        wasm.failed = true;
        setStatus(`Edge KB sync failed: ${err.message}. Server validation only.`, 'warn');
        console.error('[WASM] KB sync error:', err);
    } finally {
        wasm.syncing = false;
    }
}

// ─── WASM Live Check — fires on every input event ────────────────────────────

function runWasmCheck(ip, port, protocol) {
    if (!wasm.ready || wasm.failed || wasm.syncing) { clearVerdict(); return; }

    if (!ipv4Valid(ip) || port < 1 || port > 65535 || !PROTOCOLS.has(protocol)) {
        clearVerdict();
        return;
    }

    // ip and protocol are embedded as double-quoted Prolog strings — not atoms.
    // double_quotes=string (verified on init) means these values live on the
    // WAM heap and are garbage-collected between queries. They are never interned
    // in the Atom Table regardless of how many keystrokes the operator makes.
    const goal =
        `firewall_verdict(` +
        `request{source_ip:"${ip}",dest_port:${port},protocol:"${protocol}"},` +
        `Verdict,Reason,_)`;

    try {
        const t0  = performance.now();
        const res = wasm.module.Prolog.query(goal).once();
        const lat = Math.round((performance.now() - t0) * 1000);

        if (res === null) { clearVerdict(); return; }

        wasm.lastVerdict = {
            allowed:   res.Verdict === 'allowed',
            reason:    res.Reason,
            latencyUs: lat,
        };
        paintVerdict(wasm.lastVerdict, 'edge');

    } catch (err) {
        console.warn('[WASM] live check error:', err.message);
        if (isUnrecoverable(err)) {
            wasm.failed = true;
            setStatus('Edge WAM crashed — server validation only.', 'warn');
        }
        clearVerdict();
    }
}

function isUnrecoverable(err) {
    const m = err.message || '';
    return m.includes('RuntimeError') ||
           m.includes('memory access out of bounds') ||
           m.includes('out_of_stack') ||
           m.includes('unreachable');
}

// ─── Go API Submission — fires only on Deploy button click ───────────────────

// submitToServer constructs a fresh JSON payload from raw DOM values.
// It does NOT read wasm.lastVerdict or forward it to the server.
// The server has no interest in what the browser computed.
async function submitToServer(ip, port, protocol) {
    setDeployState('loading');
    try {
        const resp = await fetch(`${API}/firewall/check`, {
            method:  'POST',
            headers: {
                'Content-Type':  'application/json',
                'Authorization': `Bearer ${getToken()}`,
            },
            body:   JSON.stringify({ source_ip: ip, dest_port: port, protocol }),
            signal: AbortSignal.timeout(5_000),
        });

        const body = await resp.json();
        if (!resp.ok || !body.ok) {
            setDeployResult('error', body.error || `Server error ${resp.status}`);
            return;
        }

        const sv = {
            allowed:   body.data.allowed,
            reason:    body.data.reason,
            latencyUs: body.latency_us,
        };

        if (wasm.lastVerdict !== null &&
            wasm.lastVerdict.allowed !== sv.allowed) {
            console.warn(
                `[Dashboard] discrepancy — edge:${wasm.lastVerdict.allowed}` +
                ` server:${sv.allowed} kbVersion:${wasm.kbVersion}`
            );
            showDiscrepancy(wasm.lastVerdict, sv);
        }

        paintVerdict(sv, 'server');
        setDeployResult('success',
            `${sv.allowed ? 'ALLOWED' : 'DENIED'} — ${h(sv.reason)} [${sv.latencyUs}us server]`
        );

    } catch (err) {
        const msg = err.name === 'TimeoutError'
            ? 'Server timeout — pool may be saturated.'
            : `Network error: ${err.message}`;
        setDeployResult('error', msg);
    } finally {
        setDeployState('idle');
    }
}

// ─── Topology Mutation ───────────────────────────────────────────────────────

async function submitTopologyMutation() {
    const action = qs('#mutate-action')?.value;
    const node1  = qs('#node1')?.value.trim()  ?? '';
    const node2  = qs('#node2')?.value.trim()  ?? '';
    const cost   = parseInt(qs('#link-cost')?.value ?? '0', 10);

    try {
        const resp = await fetch(`${API}/topology/mutate`, {
            method:  'POST',
            headers: {
                'Content-Type':  'application/json',
                'Authorization': `Bearer ${getToken()}`,
            },
            body:   JSON.stringify({ action, node1, node2, cost }),
            signal: AbortSignal.timeout(10_000),
        });
        const body = await resp.json();
        if (!resp.ok || !body.ok) {
            setDeployResult('error', `Mutation failed: ${h(body.error)}`);
            return;
        }
        // The server has already published the SSE event. The local SSE
        // listener will receive it and drive the KB resync independently.
        // No explicit syncWasmKB call is needed here — doing so would race
        // with the SSE-driven sync and risk a double-consult.
        setDeployResult('success',
            `Topology updated — SSE pipeline will resync edge KB (ts=${body.data.ts}).`
        );
    } catch (err) {
        setDeployResult('error', `Mutation error: ${h(err.message)}`);
    }
}

// ─── Node Approval ───────────────────────────────────────────────────────────

async function submitNodeApproval() {
    const node = qs('#approve-node')?.value.trim() ?? '';
    if (!node) { setDeployResult('error', 'Node name is empty.'); return; }

    try {
        const resp = await fetch(`${API}/topology/approve`, {
            method:  'POST',
            headers: {
                'Content-Type':  'application/json',
                'Authorization': `Bearer ${getToken()}`,
            },
            body:   JSON.stringify({ node }),
            signal: AbortSignal.timeout(5_000),
        });
        const body = await resp.json();
        if (!resp.ok || !body.ok) {
            setDeployResult('error', `Approval failed: ${h(body.error)}`);
            return;
        }
        setDeployResult('success', `Node ${h(body.data.approved)} approved for topology mutations.`);
    } catch (err) {
        setDeployResult('error', `Approval error: ${h(err.message)}`);
    }
}

// ─── DOM Helpers ─────────────────────────────────────────────────────────────

const qs = (sel) => document.querySelector(sel);

function paintVerdict(v, source) {
    const el = qs('#verdict');
    if (!el) return;
    el.className = `verdict ${v.allowed ? 'allowed' : 'denied'}`;
    el.textContent =
        `${v.allowed ? 'ALLOWED' : 'DENIED'} — ${h(v.reason)} [${source} ${v.latencyUs}us]`;
}

function clearVerdict() {
    const el = qs('#verdict');
    if (el) { el.className = 'verdict'; el.textContent = ''; }
}

function setStatus(msg, level) {
    const el = qs('#status');
    if (el) { el.textContent = msg; el.className = `status ${level}`; }
}

function setDeployState(st) {
    const btn = qs('#deploy-btn');
    if (!btn) return;
    btn.disabled    = st === 'loading';
    btn.textContent = st === 'loading' ? 'Deploying…' : 'Deploy to Cluster';
}

function setDeployResult(level, msg) {
    const el = qs('#deploy-result');
    if (el) { el.className = `deploy-result ${level}`; el.textContent = msg; }
}

function showDiscrepancy(edgeV, serverV) {
    const el = qs('#discrepancy-banner');
    if (!el) return;
    el.hidden = false;
    el.textContent =
        `Edge KB returned ${edgeV.allowed ? 'ALLOWED' : 'DENIED'} ` +
        `but server returned ${serverV.allowed ? 'ALLOWED' : 'DENIED'}. ` +
        `SSE pipeline is syncing — server verdict is authoritative.`;
}

function getToken() {
    return sessionStorage.getItem('auth_token') ?? '';
}

function ipv4Valid(ip) {
    return ip.length > 0 &&
           ip.length <= MAX_IP &&
           /^(\d{1,3}\.){3}\d{1,3}$/.test(ip);
}

function h(s) {
    return String(s).replace(/[&<>"']/g, (c) =>
        ({'&':'&amp;','<':'&lt;','>':'&gt;','"':'&quot;',"'":'&#39;'}[c])
    );
}

// ─── Event Wiring ─────────────────────────────────────────────────────────────

document.addEventListener('DOMContentLoaded', () => {
    initWasm();
    initSSE();

    const ipEl    = qs('#src-ip');
    const portEl  = qs('#dest-port');
    const protoEl = qs('#protocol');

    function onAnyChange() {
        const ip    = ipEl?.value.trim()  ?? '';
        const port  = parseInt(portEl?.value ?? '0', 10);
        const proto = protoEl?.value       ?? 'tcp';
        runWasmCheck(ip, port, proto);
    }

    ipEl?.addEventListener('input',     onAnyChange);
    portEl?.addEventListener('input',   onAnyChange);
    protoEl?.addEventListener('change', onAnyChange);

    qs('#check-form')?.addEventListener('submit', async (e) => {
        e.preventDefault();
        await submitToServer(
            ipEl?.value.trim()       ?? '',
            parseInt(portEl?.value   ?? '0', 10),
            protoEl?.value           ?? 'tcp',
        );
    });

    qs('#mutate-form')?.addEventListener('submit', async (e) => {
        e.preventDefault();
        await submitTopologyMutation();
    });

    qs('#approve-form')?.addEventListener('submit', async (e) => {
        e.preventDefault();
        await submitNodeApproval();
    });
});

19.5 Verification and Outcome

19.5.1 Verifying the SSE Pipeline

# Terminal 1: watch the SSE stream live.
# The -N flag disables curl's output buffering.
curl -N \
    -H "Accept: text/event-stream" \
    http://logic-node-01.infra.internal:8080/api/v1/events
event: connected
data: {"status":"ok"}

# Terminal 2: submit a topology mutation.
curl -X POST http://logic-node-01.infra.internal:8080/api/v1/topology/mutate \
     -H "Authorization: Bearer $(cat ~/.sovereign_token)" \
     -H "Content-Type: application/json" \
     -d '{"action":"add_link","node1":"pve1","node2":"pve5","cost":10}'
{"ok":true,"data":{"status":"topology_updated","ts":1741267200000}}
# Terminal 1 immediately receives:
event: kb_updated
data: {"ts":1741267200000,"action":"add_link","node1":"pve1","node2":"pve5"}

The SSE event arrives in Terminal 1 within the same RTT as the mutation API response. The Go handler publishes to the broker before writing the HTTP response; the broker fans it out synchronously to the client channel; the http.Flusher.Flush() call in SSEBroker.ServeHTTP pushes the bytes to the wire without waiting for the next TCP segment boundary.

19.5.2 Verifying the Dynamic Node Vocabulary

# Before approval — the new node is not in the vocabulary.
curl -X POST http://logic-node-01.infra.internal:8080/api/v1/topology/mutate \
     -H "Authorization: Bearer $(cat ~/.sovereign_token)" \
     -H "Content-Type: application/json" \
     -d '{"action":"add_link","node1":"pve1","node2":"pve15","cost":5}'
{"ok":false,"error":"unknown topology node: \"pve1\" or \"pve15\""}
# Approve the new node — WAM validates it exists in known_node/1.
curl -X POST http://logic-node-01.infra.internal:8080/api/v1/topology/approve \
     -H "Authorization: Bearer $(cat ~/.sovereign_token)" \
     -H "Content-Type: application/json" \
     -d '{"node":"pve15"}'
{"ok":true,"data":{"approved":"pve15"}}
# After approval — mutation succeeds. No server restart required.
curl -X POST http://logic-node-01.infra.internal:8080/api/v1/topology/mutate \
     -H "Authorization: Bearer $(cat ~/.sovereign_token)" \
     -H "Content-Type: application/json" \
     -d '{"action":"add_link","node1":"pve1","node2":"pve15","cost":5}'
{"ok":true,"data":{"status":"topology_updated","ts":1741267210000}}

19.5.3 Verifying the WASM VFS Resync

Open the browser developer console before submitting the topology mutation. The SSE-driven sync produces the following console sequence without a page reload:

[SSE] connected
[SSE] kb_updated ts=1741267200000 action=add_link
[WASM] KB reloaded — version ts=1741267200000

After the sync completes, wasm.ready is true and wasm.kbVersion equals the server's mutation timestamp. A firewall check query typed immediately after the sync executes against the updated KB — the new pve1pve5 link is visible to shortest_path/3 in the edge WAM without any page reload, without any setTimeout, and without re-initialising the WASM module.

The KB fetch sequence visible in the Network panel:

GET /api/v1/kb/current   200  text/plain   Cache-Control: no-store

The no-store directive on the KB endpoint is enforced server-side in handleKBCurrent — the browser never serves a cached version of the KB file to the sync pipeline regardless of the browser's cache policy for the static assets served from the same origin.

19.5.4 Verifying Missed-Event Recovery

Disconnect the SSE stream during a topology mutation (kill Terminal 1, wait 5 seconds, reconnect). The connected event triggers the version check in initSSE:

# Reconnect:
curl -N -H "Accept: text/event-stream" \
    http://logic-node-01.infra.internal:8080/api/v1/events
event: connected
data: {"status":"ok"}

# Browser console:
[SSE] connected
[SSE] missed update detected — resyncing KB
[WASM] KB reloaded — version ts=1741267210000

The pool/status response carries last_mutation_ts, which the reconnect handler compares against wasm.kbVersion. Any mutation that occurred during the disconnection window is detected and the KB is reloaded to the current server state. The browser does not need to receive the specific SSE event it missed — it only needs to know that a newer version exists.

19.5.5 The Security Invariant

None of the above changes the security model. The SSE pipeline, the dynamic vocabulary, and the WASM VFS sync are operational conveniences. The security invariant stated at the end of the original chapter design remains:

The server CGO worker pool is the sole authority on whether a cluster mutation is permitted. The browser's WASM verdict is never forwarded to the server and is never accepted as input. The handleApproveNode handler validates every new node name against the live WAM clause database — a name that does not exist in known_node/1 is rejected regardless of what the operator's browser computes. A direct curl POST to /api/v1/topology/mutate bypassing the browser, the WASM, and the SSE layer is subject to the same five re-validation layers defined in the original security contract: requestSizeLimit, typed Go struct decode, knownTopologyNode vocabulary check, closed action switch, and WAM execution with must_be guards.