Chapter 22: Time-Series Logic
The Chapter 21 Oracle can produce injection-immune PromQL strings for any metric in its closed vocabulary, but a string is not a fact — the gap between a 47.9% CPU steal reading stored as a float in a columnar TSDB and a node_health(pve3, critical) clause that the Chapter 19 firewall guard can act on in microseconds is the gap this chapter closes. What follows builds the complete pipeline: a Go background ingestor that polls VictoriaMetrics on a 15-second ticker, parses string-encoded floats from the JSON response, and drives CGO calls that assert discretised health facts into the live_state module of the WAM; Prolog health rules that convert the asserted node_metric/4 facts into categorical node_health/2 verdicts with hysteresis guards to prevent fact flapping; an alert dispatcher that monitors live_state for compound failure patterns and invokes Go callbacks when combinations like ZFS resilvering concurrent with high CPU steal cross their joint threshold; and security constraints that enforce integer timestamps to eliminate Atom Table bloat and restrict the ingestor to write-only access on a single module boundary.
22.1 Discretisation: The Perception of Health
22.1.1 Continuous Metrics to Discrete Facts
A CPU steal value of 47.9 is a float. The WAM does not reason over floats — it unifies terms, evaluates arithmetic, and matches clause heads. A firewall rule that says "do not route traffic through nodes where CPU steal exceeds 10%" cannot be expressed as a threshold comparison inside every clause that needs it. The threshold comparison must be evaluated once and its result asserted as a categorical fact that all downstream rules consume without repeating the arithmetic.
Discretisation is the process of mapping a continuous measurement onto a finite ordered category set. For cluster health, three categories are sufficient: nominal (below the operational threshold), degraded (above the alert threshold but recoverable), and critical (above the emergency threshold requiring immediate action). The boundaries are defined once in the knowledge base and are the only place where raw float values appear in reasoning:
CPU Steal discretisation boundaries:
nominal: steal < 10.0%
degraded: 10.0% <= steal < 40.0%
critical: steal >= 40.0%
Memory available discretisation boundaries:
nominal: available > 20.0% of total
degraded: 10.0% < available <= 20.0% of total
critical: available <= 10.0% of total
Disk read latency discretisation boundaries:
nominal: latency < 0.5ms
degraded: 0.5ms <= latency < 5.0ms
critical: latency >= 5.0ms
ZFS ARC miss rate discretisation boundaries:
nominal: miss_rate < 5.0% (after warm-up period)
degraded: 5.0% <= miss_rate < 20.0%
critical: miss_rate >= 20.0%
The asserted fact for each boundary crossing is node_metric(Node, MetricType, Value, Timestamp) where Value is the raw float (retained for the delta guard in §22.1.2) and MetricType is an atom from the Oracle's closed vocabulary. The categorical verdict is produced by node_health/2 in §22.3, which reads the node_metric/4 facts and applies the discretisation boundaries. Separating storage from classification means the boundaries can be updated by editing the node_health.pl KB without changing the ingestor or the fact schema.
22.1.2 The Hysteresis Guard
A node whose CPU steal oscillates around the 10% boundary — 9.8%, 10.3%, 9.7%, 10.1% across four consecutive 15-second scrapes — would cause the WAM to flip between nominal and degraded four times per minute without hysteresis. Every flip triggers a retract-then-assert cycle in live_state, which fires the alert dispatcher's condition monitors on every change, which generates spurious alert callbacks to the Go layer, which logs them, which wastes I/O. More critically, the Chapter 19 firewall rules and the routing guard in healthy_node/1 read live_state directly — a node that flips between nominal and degraded at 4Hz causes the routing layer to oscillate, which causes connection disruption for any traffic flowing through that node's links.
The hysteresis guard requires a state change to be confirmed by N consecutive samples on the same side of the new boundary before the live_state fact is updated. For the sovereign cluster, N = 3 for the nominal → degraded transition (45 seconds of elevated metrics before alerting) and N = 2 for the degraded → critical transition (30 seconds, faster response for severe conditions). Recovery requires N = 4 consecutive samples below the lower boundary before the state improves (60 seconds, slower to recover than to degrade — preventing brief dips from resetting an active alert).
Hysteresis state machine for a single node metric:
State: nominal
Sample value >= degraded_threshold → increment pending_degrade counter
Sample value < degraded_threshold → reset pending_degrade to 0
pending_degrade >= 3 → transition to degraded, reset counter
State: degraded
Sample value >= critical_threshold → increment pending_critical counter
Sample value < nominal_threshold → increment pending_recover counter
pending_critical >= 2 → transition to critical, reset all counters
pending_recover >= 4 → transition to nominal, reset all counters
State: critical
Sample value < degraded_threshold → increment pending_recover counter
pending_recover >= 4 → transition to degraded, reset counter
(transition to nominal requires two-step: critical → degraded → nominal)
The pending counter for each node and metric type is held in Go as part of the ingestor's per-node state map, not in Prolog. Counter state that changes every 15 seconds is not meaningful to the logic engine — only the resolved transition is. Asserting 84 pending-counter updates per scrape cycle into the WAM would generate 84 retract-assert cycles on facts that the Prolog rules never query. The Go layer accumulates the counters, evaluates the transition condition, and only calls into the WAM CGO bridge when a confirmed state transition has occurred.
22.1.3 Memory Physics: Ephemeral Fact Lifecycle
Every node_metric/4 fact asserted into live_state occupies WAM heap. The heap is not garbage-collected in the same sense as a managed runtime — Prolog terms on the heap are reclaimed when the clause that owns them is retracted. A node_metric/4 fact that is never retracted grows the heap by approximately 120–160 bytes per assertion (the term structure for a 4-argument compound with two atoms, one float, and one integer). At 84 facts per scrape cycle and a 15-second interval, that is 84 assertions per cycle.
Without retraction, after one hour of operation the heap contains 84 × (3,600 / 15) = 20,160 live node_metric/4 facts — all of them stale except the most recent 84. After 12 hours: 241,920 facts. The WAM does not compact these; they remain in the heap until explicitly retracted. At 160 bytes each, 12 hours of unretracted facts consume ~38MB on the heap of a WAM engine that is allocated only 64MB total in the Chapter 16 worker pool configuration. The engine runs out of heap.
The correct lifecycle is retract-before-assert: before asserting the new node_metric(Node, Type, Value, Ts) fact, retract the previous fact for the same (Node, Type) pair. One live fact per (Node, Type) pair at all times. The heap footprint for node_metric/4 is bounded at exactly 84 facts × 160 bytes = ~13.4KB regardless of uptime.
% The correct ephemeral fact lifecycle — enforced in the Go CGO bridge.
% Never use assertz/1 for live_state facts without the preceding retractall/1.
assert_node_metric(Node, Type, Value, Ts) :-
retractall(live_state:node_metric(Node, Type, _, _)),
assertz(live_state:node_metric(Node, Type, Value, Ts)).
retractall/1 is idempotent on the first call (when no prior fact exists) and removes all matching clauses in a single pass. The combination of retractall followed immediately by assertz is not atomic — a concurrent WAM engine reading live_state between the two calls will see no fact for (Node, Type) momentarily. In the Chapter 16 multi-engine pool, each engine has its own WAM instance with its own heap and clause database; the live_state module is not shared between engines. The Go ingestor dispatches its assert goals to a designated single engine (Worker 0 by convention, accessed via a direct channel rather than the pool's round-robin queue) rather than through the pool's load-balanced dispatch, eliminating the inter-engine concurrency concern entirely.
One memory pressure risk operates at the process level rather than the WAM heap level. The Go orchestrator process hosts both the Go runtime — which manages the HTTP server, the ingestor goroutines, the JSON parser, and the hysteresis engine — and the CGO-linked libswipl — which manages the WAM engines' heap, stack, and trail. Go's garbage collector does not know about libswipl's C allocations, and libswipl does not know about Go's heap. During a burst of 84 confirmed state transitions (all 14 nodes crossing a threshold simultaneously after a cluster event), the Go runtime allocates string buffers for goal construction and JSON unmarshalling while the WAM simultaneously performs 84 retract-assert cycles that touch the C heap. Both allocators compete for physical memory pages at the same instant.
GOMEMLIMIT bounds this competition. Set to 80% of the VM's physical RAM, it instructs the Go garbage collector to run more aggressively when total Go memory (heap + stack + off-heap) approaches the limit, reclaiming allocation headroom before the OS-level page allocator is forced to page out WAM memory to satisfy Go's demands. Without the limit, Go's default GC target (100% heap growth between collections) allows the Go heap to balloon to the full VM RAM during a burst, triggering Linux's overcommit reclaim — which may page out libswipl's C heap, causing the next WAM assertion to suffer a page fault on its own clause database.
# /etc/logic-node/orchestrator.env — append to the existing environment file.
# logic-node-01 has 16GB RAM; 80% = 12.8GB.
GOMEMLIMIT=12884901888
# Alternatively, as a human-readable value using Go's unit suffix
# (requires Go 1.21+ for the GOMEMLIMIT environment variable to accept suffixes):
# GOMEMLIMIT=12800MiB
# /etc/systemd/system/logic-orchestrator.service.d/override.conf
# Add GOMEMLIMIT to the existing EnvironmentFile block:
[Service]
EnvironmentFile=/etc/logic-node/orchestrator.env
EnvironmentFile=/etc/logic-node/ingestor.env
22.2 The Build: The Go Telemetry Ingestor
22.2.1 Architecture
%%{init: {"themeVariables": {"fontSize": "14px"}}}%%
flowchart TD
TICKER["Go time.Ticker\n15-second interval\nSynchronised to scrape cycle\nDrift detection: skip if previous\ncycle still running"]
HTTP["net/http Client\nGET /api/v1/query\nVictoriaMetrics obs-01:8428\nRead-only API key in header\nContext timeout: 10s\nConnection pool: 4 idle"]
PARSE["JSON Parser\nUnmarshal resultType=vector\nParse string-encoded float\nnumber_string/2 equivalent\nExtract instance label\nStrip :9100 suffix"]
HYSTERESIS["Hysteresis Engine\nPer-node per-metric counter map\nRWMutex protected\nEvaluates transition conditions\nProduces confirmed state changes only"]
CGO["CGO Bridge\nConstructs Prolog Tagged Dict\nmetric{node,type,value,ts}\nCalls assert_node_metric/4\nDispatch to Worker 0 directly\nContext timeout: 500ms"]
ALERT["Alert Dispatcher\nCGO call: check_alert_conditions/0\nFires after each assert batch\nProlog meta-interpreter evaluates\nCompound failure patterns\nCallback channel if triggered"]
TICKER --->|"every 15s"| HTTP
HTTP --->|"JSON response"| PARSE
PARSE --->|"(node, type, float)"| HYSTERESIS
HYSTERESIS --->|"confirmed transition"| CGO
CGO --->|"live_state updated"| ALERT
style TICKER fill:#1A2B4A,color:#FFFFFF
style HTTP fill:#1A4070,color:#FFFFFF
style PARSE fill:#1A4070,color:#FFFFFF
style HYSTERESIS fill:#8B6914,color:#FFFFFF
style CGO fill:#7A1A1A,color:#FFFFFF
style ALERT fill:#5A1A6A,color:#FFFFFF
22.2.2 telemetry_ingestor.go
// File: /opt/logic-node/go/orchestrator/telemetry_ingestor.go
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
// ─── VictoriaMetrics API response types ──────────────────────────────────────
// vmInstantResult represents one time-series entry in a /api/v1/query response.
// The value field is [timestamp_float, value_string] per the Prometheus API spec.
// The float value is always encoded as a JSON string to preserve precision —
// json.Unmarshal into float64 would silently lose bits for values > 2^53.
type vmInstantResult struct {
Metric map[string]string `json:"metric"`
Value [2]json.RawMessage `json:"value"`
}
type vmQueryResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []vmInstantResult `json:"result"`
} `json:"data"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}
// parseVMFloat parses the string-encoded float in a VictoriaMetrics API value field.
// The raw JSON is a quoted string like "47.891234567890120". We unmarshal it as a
// Go string first, then parse the string as float64. This matches the Chapter 21
// requirement: use number_string/2 semantics, not a bare float64 unmarshal.
func parseVMFloat(raw json.RawMessage) (float64, error) {
var s string
if err := json.Unmarshal(raw, &s); err != nil {
return 0, fmt.Errorf("float field is not a string: %w", err)
}
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse %q as float64: %w", s, err)
}
return f, nil
}
// parseVMTimestamp parses the timestamp field from a VictoriaMetrics API value.
// The timestamp is a JSON number (Unix seconds as float64). We truncate to int64.
func parseVMTimestamp(raw json.RawMessage) (int64, error) {
var f float64
if err := json.Unmarshal(raw, &f); err != nil {
return 0, fmt.Errorf("timestamp field is not a number: %w", err)
}
return int64(f), nil
}
// nodeFromInstance extracts the node atom from a VictoriaMetrics instance label.
// Instance labels have the form "pve3:9100"; this function returns "pve3".
// The colon-and-port suffix is a scrape configuration artefact — the Prolog KB
// uses bare node atoms (pve1, pve3, …) without port suffixes.
func nodeFromInstance(instance string) (string, bool) {
idx := strings.LastIndex(instance, ":")
if idx < 0 {
return instance, true
}
node := instance[:idx]
if node == "" {
return "", false
}
return node, true
}
// ─── Hysteresis engine ────────────────────────────────────────────────────────
type HealthState int
const (
StateNominal HealthState = iota
StateDegraded
StateCritical
)
func (s HealthState) String() string {
switch s {
case StateNominal:
return "nominal"
case StateDegraded:
return "degraded"
case StateCritical:
return "critical"
}
return "unknown"
}
type metricKey struct {
Node string
Type string
}
type nodeMetricState struct {
current HealthState
pendingDegrade int
pendingCritical int
pendingRecover int
}
// hysteresisEngine tracks pending state transition counters for each
// (node, metric_type) pair and returns a confirmed new state only when
// the transition counter threshold is met.
//
// Transition thresholds:
// nominal → degraded: 3 consecutive samples >= degraded threshold
// degraded → critical: 2 consecutive samples >= critical threshold
// critical → degraded: 4 consecutive samples < degraded threshold
// degraded → nominal: 4 consecutive samples < nominal threshold
//
// Only one confirmed transition is returned per call. If the sample does not
// resolve a transition, nil is returned and no WAM assertion is made.
type hysteresisEngine struct {
mu sync.RWMutex
states map[metricKey]*nodeMetricState
}
func newHysteresisEngine() *hysteresisEngine {
return &hysteresisEngine{
states: make(map[metricKey]*nodeMetricState),
}
}
// Thresholds for each metric type. Lower bound is the recovery threshold;
// upper bound is the alert threshold. Both are inclusive on their respective
// sides.
type metricThresholds struct {
DegradedLow float64 // below this: nominal recovery
DegradedHigh float64 // at or above this: degraded onset
CriticalHigh float64 // at or above this: critical onset
}
var thresholds = map[string]metricThresholds{
"cpu_steal": {DegradedLow: 8.0, DegradedHigh: 10.0, CriticalHigh: 40.0},
"mem_available": {DegradedLow: 22.0, DegradedHigh: 20.0, CriticalHigh: 10.0}, // inverted: lower is worse
"disk_latency": {DegradedLow: 0.4, DegradedHigh: 0.5, CriticalHigh: 5.0},
"arc_miss_rate": {DegradedLow: 4.0, DegradedHigh: 5.0, CriticalHigh: 20.0},
"disk_io_util": {DegradedLow: 60.0, DegradedHigh: 70.0, CriticalHigh: 95.0},
}
// isWorse returns true if value crosses into the worse category for this metric.
// For most metrics, higher is worse. For mem_available, lower is worse (inverted).
func isWorse(metricType string, value, threshold float64) bool {
if metricType == "mem_available" {
return value <= threshold
}
return value >= threshold
}
// evaluate processes a new sample for the given (node, metricType, value) triple
// and returns (confirmedState, changed). If no threshold is defined for metricType,
// it returns (StateNominal, false) — the Oracle's closed vocabulary ensures unknown
// types never reach this function in production, but the defensive case is explicit.
func (h *hysteresisEngine) evaluate(node, metricType string, value float64) (HealthState, bool) {
t, ok := thresholds[metricType]
if !ok {
return StateNominal, false
}
key := metricKey{Node: node, Type: metricType}
h.mu.Lock()
defer h.mu.Unlock()
st, exists := h.states[key]
if !exists {
st = &nodeMetricState{current: StateNominal}
h.states[key] = st
}
prev := st.current
switch st.current {
case StateNominal:
if isWorse(metricType, value, t.DegradedHigh) {
st.pendingDegrade++
st.pendingRecover = 0
} else {
st.pendingDegrade = 0
}
if st.pendingDegrade >= 3 {
st.current = StateDegraded
st.pendingDegrade = 0
}
case StateDegraded:
if isWorse(metricType, value, t.CriticalHigh) {
st.pendingCritical++
st.pendingRecover = 0
st.pendingDegrade = 0
} else if !isWorse(metricType, value, t.DegradedLow) {
st.pendingRecover++
st.pendingCritical = 0
st.pendingDegrade = 0
} else {
st.pendingCritical = 0
st.pendingRecover = 0
}
if st.pendingCritical >= 2 {
st.current = StateCritical
st.pendingCritical = 0
} else if st.pendingRecover >= 4 {
st.current = StateNominal
st.pendingRecover = 0
}
case StateCritical:
if !isWorse(metricType, value, t.DegradedHigh) {
st.pendingRecover++
} else {
st.pendingRecover = 0
}
if st.pendingRecover >= 4 {
st.current = StateDegraded
st.pendingRecover = 0
}
}
return st.current, st.current != prev
}
// ─── Telemetry query descriptor ──────────────────────────────────────────────
// telemetryQuery describes one metric to poll. QueryFn returns the PromQL
// expression for a given instance label (including the :9100 port suffix).
// MetricType is the atom used in node_metric/4 and in the thresholds map.
type telemetryQuery struct {
MetricType string
QueryFn func(instance string) string
}
// buildQueries returns the list of telemetry queries to run each cycle.
// QueryFn values are closures over the Chapter 21 Oracle's output — the
// actual PromQL strings are generated once here and reused across cycles.
// A full reload of these strings happens on kb_updated SSE events (§22.5.3).
func buildQueries() []telemetryQuery {
return []telemetryQuery{
{
MetricType: "cpu_steal",
QueryFn: func(inst string) string {
return fmt.Sprintf(
`avg(irate(node_cpu_seconds_total{instance="%s",mode="steal"}[5m])) by (instance) * 100`,
inst,
)
},
},
{
MetricType: "disk_latency",
QueryFn: func(inst string) string {
return fmt.Sprintf(
`(irate(node_disk_read_time_seconds_total{instance="%s",device="nvme0n1"}[5m]) / irate(node_disk_reads_completed_total{instance="%s",device="nvme0n1"}[5m])) * 1000`,
inst, inst,
)
},
},
{
MetricType: "arc_miss_rate",
QueryFn: func(inst string) string {
return fmt.Sprintf(
`(irate(node_zfs_arc_misses_total{instance="%s"}[5m]) / (irate(node_zfs_arc_hits_total{instance="%s"}[5m]) + irate(node_zfs_arc_misses_total{instance="%s"}[5m]))) * 100`,
inst, inst, inst,
)
},
},
{
MetricType: "disk_io_util",
QueryFn: func(inst string) string {
return fmt.Sprintf(
`irate(node_disk_io_time_seconds_total{instance="%s",device="nvme0n1"}[5m]) * 100`,
inst,
)
},
},
}
}
// ─── HTTP client ──────────────────────────────────────────────────────────────
// vmClient is a thin wrapper around http.Client that issues instant queries
// against a VictoriaMetrics instance.
type vmClient struct {
baseURL string
apiKey string
http *http.Client
}
func newVMClient(baseURL, apiKey string) *vmClient {
return &vmClient{
baseURL: baseURL,
apiKey: apiKey,
http: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 4,
IdleConnTimeout: 90 * time.Second,
},
},
}
}
// queryInstant issues a PromQL instant query and returns the parsed response.
// The context deadline is set by the caller; this function does not set one.
func (c *vmClient) queryInstant(ctx context.Context, promql string) (*vmQueryResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/v1/query", nil)
if err != nil {
return nil, fmt.Errorf("build request: %w", err)
}
q := req.URL.Query()
q.Set("query", promql)
req.URL.RawQuery = q.Encode()
// Read-only API key — set in VictoriaMetrics via -httpAuth.username/password
// or via bearer token if the deployment uses vmgateway. Chapter 22 §22.5.2
// documents the key provisioning.
if c.apiKey != "" {
req.Header.Set("Authorization", "Bearer "+c.apiKey)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("HTTP GET: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(io.LimitReader(resp.Body, 2*1024*1024)) // 2MB cap
if err != nil {
return nil, fmt.Errorf("read body: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("VM responded %d: %s", resp.StatusCode, body)
}
var result vmQueryResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
}
if result.Status != "success" {
return nil, fmt.Errorf("VM query error [%s]: %s", result.ErrorType, result.Error)
}
if result.Data.ResultType != "vector" {
return nil, fmt.Errorf("unexpected resultType %q (want vector)", result.Data.ResultType)
}
return &result, nil
}
// ─── Ingestor ─────────────────────────────────────────────────────────────────
// Ingestor is the background telemetry polling loop. It owns one vmClient,
// one hysteresisEngine, and a reference to the WAM worker pool. It runs in
// its own goroutine and is stopped via context cancellation.
type Ingestor struct {
vm *vmClient
pool *Pool
hysteresis *hysteresisEngine
queries []telemetryQuery
interval time.Duration
alertCh chan<- AlertEvent
}
// AlertEvent is sent to the Go alert channel when the Prolog alert dispatcher
// finds a triggered condition (§22.4). The Go layer logs it and optionally
// forwards it to an external notification system.
type AlertEvent struct {
Node string
Condition string
Severity string
Timestamp int64
}
func NewIngestor(vmBaseURL, vmAPIKey string, pool *Pool, alertCh chan<- AlertEvent) *Ingestor {
return &Ingestor{
vm: newVMClient(vmBaseURL, vmAPIKey),
pool: pool,
hysteresis: newHysteresisEngine(),
queries: buildQueries(),
interval: 15 * time.Second,
alertCh: alertCh,
}
}
// Run is the main polling loop. It blocks until ctx is cancelled.
// A sync.WaitGroup entry must be held by the caller.
func (ing *Ingestor) Run(ctx context.Context) {
ticker := time.NewTicker(ing.interval)
defer ticker.Stop()
// Run one cycle immediately at startup rather than waiting for the first tick.
ing.runCycle(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ing.runCycle(ctx)
}
}
}
func (ing *Ingestor) runCycle(ctx context.Context) {
ts := time.Now().Unix() // Unix seconds — asserted as Prolog integer, never atom
for _, q := range ing.queries {
// Build the per-instance query. For the cluster-wide query we use the
// aggregate form (no instance filter) and iterate the results.
aggregateQuery := ing.buildAggregateQuery(q)
cycleCtx, cancel := context.WithTimeout(ctx, 8*time.Second)
resp, err := ing.vm.queryInstant(cycleCtx, aggregateQuery)
cancel()
if err != nil {
log.Printf("[Ingestor] query %q failed: %v", q.MetricType, err)
continue
}
for _, result := range resp.Data.Result {
instance, ok := result.Metric["instance"]
if !ok {
continue
}
node, ok := nodeFromInstance(instance)
if !ok {
continue
}
if !knownTopologyNode(node) {
log.Printf("[Ingestor] unknown instance %q — skipping", instance)
continue
}
// VictoriaMetrics returns the timestamp as the first element in the Value array.
// This prevents Go from forging fresh timestamps for dead nodes.
tsFloat, err := parseVMFloat(result.Value[0])
if err != nil {
log.Printf("[Ingestor] parse timestamp for %s/%s: %v", node, q.MetricType, err)
continue
}
metricTs := int64(tsFloat)
value, err := parseVMFloat(result.Value[1])
if err != nil {
log.Printf("[Ingestor] parse float for %s/%s: %v", node, q.MetricType, err)
continue
}
newState, changed := ing.hysteresis.evaluate(node, q.MetricType, value)
if !changed {
continue
}
log.Printf("[Ingestor] %s/%s → %s (value=%.3f)", node, q.MetricType, newState, value)
if err := ing.assertMetricFact(ctx, node, q.MetricType, value, ts); err != nil {
log.Printf("[Ingestor] assert %s/%s: %v", node, q.MetricType, err)
}
}
}
// After asserting all changed facts, run the alert dispatcher check.
ing.checkAlerts(ctx, ts)
ing.cycleCount++
// Every 4 cycles (60 seconds): clean up stuck VM migrations
if ing.cycleCount % 4 == 0 {
ing.pool.Dispatch(WorkItem{Goal: "cluster_quorum:housekeep_evictions"}, 500*time.Millisecond)
}
// Every 20 cycles (5 minutes): clean up stale logical alerts
if ing.cycleCount % 20 == 0 {
goal := fmt.Sprintf("alert_dispatcher:housekeep_alerts(%d)", currentTs)
ing.pool.Dispatch(WorkItem{Goal: goal}, 500*time.Millisecond)
}
}
// buildAggregateQuery returns the cluster-wide PromQL for a metric type,
// using a blank instance to indicate "all instances" (no instance label filter).
// The Oracle's query functions that take "" as instance produce the aggregate form.
func (ing *Ingestor) buildAggregateQuery(q telemetryQuery) string {
// Pass a wildcard-style query: omit the instance filter and let VM return
// all matching series. The per-result instance label is extracted in runCycle.
switch q.MetricType {
case "cpu_steal":
return `avg(irate(node_cpu_seconds_total{job="hypervisors",mode="steal"}[5m])) by (instance) * 100`
case "disk_latency":
return `(irate(node_disk_read_time_seconds_total{job="hypervisors",device="nvme0n1"}[5m]) / irate(node_disk_reads_completed_total{job="hypervisors",device="nvme0n1"}[5m])) * 1000`
case "arc_miss_rate":
return `(irate(node_zfs_arc_misses_total{job="hypervisors"}[5m]) / (irate(node_zfs_arc_hits_total{job="hypervisors"}[5m]) + irate(node_zfs_arc_misses_total{job="hypervisors"}[5m]))) * 100`
case "disk_io_util":
return `irate(node_disk_io_time_seconds_total{job="hypervisors",device="nvme0n1"}[5m]) * 100`
default:
return q.QueryFn("")
}
}
// ─── CGO bridge: asserting facts into the WAM ─────────────────────────────────
// assertMetricFact dispatches a Prolog goal to Worker 0 that asserts a
// node_metric/4 fact into live_state. The goal uses the assert_node_metric/4
// wrapper (§22.1.3) which performs retractall before assertz.
//
// The Prolog goal is constructed as a Tagged Dict literal:
// assert_node_metric(pve3, cpu_steal, 47.891, 1741267200)
//
// Node and MetricType are Prolog atoms — validated against the topology vocab
// and the Oracle's closed metric vocabulary respectively before reaching here.
// Value is a Prolog float. Timestamp is a Prolog integer (never an atom —
// see §22.5.1 for the Atom Table bloat analysis).
func (ing *Ingestor) assertMetricFact(ctx context.Context, node, metricType string, value float64, ts int64) error {
// Format value with enough precision to round-trip the float64.
// %g uses the shortest representation that round-trips; for telemetry
// values in the range 0.0–100.0 this is typically 4–6 significant digits.
goal := fmt.Sprintf(
"live_state:assert_node_metric(%s, %s, %g, %d)",
node, metricType, value, ts,
)
result, err := ing.pool.Dispatch(WorkItem{Goal: goal}, 500*time.Millisecond)
if err != nil {
return fmt.Errorf("dispatch: %w", err)
}
if result.Err != nil {
return fmt.Errorf("WAM: %w", result.Err)
}
return nil
}
// checkAlerts dispatches check_alert_conditions/1 to the WAM. The predicate
// (defined in §22.4) evaluates all alert condition rules and returns a list of
// triggered conditions. Each triggered condition is forwarded to ing.alertCh.
func (ing *Ingestor) checkAlerts(ctx context.Context, ts int64) {
goal := fmt.Sprintf("live_state:check_alert_conditions(%d, Alerts)", ts)
result, err := ing.pool.Dispatch(WorkItem{Goal: goal}, 1*time.Second)
if err != nil || result.Err != nil {
return
}
for _, alert := range result.AlertList {
select {
case ing.alertCh <- AlertEvent{
Node: alert.Node,
Condition: alert.Condition,
Severity: alert.Severity,
Timestamp: ts,
}:
default:
log.Printf("[Ingestor] alert channel full — dropping %s/%s", alert.Node, alert.Condition)
}
}
}
22.3 Infrastructure Health Rules
22.3.1 live_state Module Schema
% File: /opt/logic-node/kb/live_state.pl
:- module(live_state, [
node_metric/4,
assert_node_metric/4,
node_health/2,
healthy_node/1,
storage_degraded/2,
memory_critical/1,
cpu_steal_valid/2,
live_link/3, % Chapter 26: health-aware topology edge
live_query_path/4 % Chapter 26: health-aware shortest path
]).
:- use_module(library(lists)).
:- use_module(promql_oracle).
% ── Ephemeral fact store ───────────────────────────────────────────────────────
%
% node_metric(+Node, +Type, +Value, +Timestamp)
% The live fact asserted by the Go ingestor on every confirmed state change.
% One fact per (Node, Type) pair at all times — enforced by assert_node_metric/4.
%
% Node: atom — a known_node/1 from proxmox_topology (pve1..pve14)
% Type: atom — one of: cpu_steal, disk_latency, arc_miss_rate, disk_io_util
% Value: float — raw metric value in the metric's natural unit
% Timestamp: integer — Unix epoch seconds (NEVER an atom — see §22.5.1)
:- dynamic node_metric/4.
% Composite index on the first two arguments (Node, Type).
% SWI-Prolog's default indexing hashes only the first argument.
% The ingestor queries node_metric/4 exclusively by (Node, Type) pairs —
% both retractall(node_metric(Node, Type, _, _)) and the health rule checks
% node_metric(Node, cpu_steal, V, _) specify ground Node and Type. Without
% composite indexing, each retractall/1 scans all node_metric/4 clauses whose
% first argument matches Node (up to 5 clauses per node × 14 nodes = 70 clauses
% in the index bucket), then linearly filters for Type. With composite indexing,
% the (Node, Type) pair hashes to a bucket containing at most 1 clause — the
% current live fact — making both retractall/1 and query lookups O(1) regardless
% of how many metric types are added in Chapter 23.
:- use_module(library(dialect/hprolog), []).
:- meta_predicate node_metric(?, ?, ?, ?).
% SWI-Prolog composite index declaration — hash on arg 1 and arg 2:
:- index(node_metric(1, 1, 0, 0)).
% assert_node_metric/4 — the only authorised write path into node_metric/4.
% retractall/1 before assertz/1: exactly one fact per (Node, Type) at all times.
assert_node_metric(Node, Type, Value, Ts) :-
must_be(atom, Node),
must_be(atom, Type),
must_be(float, Value),
must_be(integer, Ts),
proxmox_topology:known_node(Node),
promql_oracle:known_metric(_, _), % module loaded
retractall(node_metric(Node, Type, _, _)),
assertz(node_metric(Node, Type, Value, Ts)).
22.3.2 Validation Guards
% cpu_steal_valid(+Node, +Value)
% Succeeds only if Value is a plausible CPU steal percentage for Node.
% Three conditions must hold simultaneously:
% 1. Value is in the range [0.0, 100.0] — physically meaningful range.
% 2. Value does not represent a jump of more than 40 percentage points
% from the previously recorded steal value — rate-of-change guard
% against fabricated metrics injected from a compromised hypervisor
% (Chapter 20 §20.5.3).
% 3. The previous sample timestamp is no older than 60 seconds — if the
% delta guard's reference sample is stale, the guard is vacuously
% satisfied (no prior baseline to compare against).
cpu_steal_valid(Node, Value) :-
must_be(atom, Node),
must_be(float, Value),
Value >= 0.0,
Value =< 100.0,
( node_metric(Node, cpu_steal, PrevValue, PrevTs)
-> get_time(Now),
( Now - float(PrevTs) > 60.0
-> true % stale baseline — delta guard vacuously satisfied
; Diff is abs(Value - PrevValue),
Diff =< 40.0
)
; true % no prior sample — first assertion, delta guard vacuously satisfied
).
22.3.3 Categorical Health Rules
% node_health(+Node, -Status)
% Derives the categorical health status of Node from live node_metric/4 facts.
% Status is one of: nominal | degraded | critical
%
% Thresholds mirror the Go hysteresisEngine's thresholds map in §22.2.2.
% Any change to the Go thresholds must be reflected here, and vice versa.
% The Go layer determines WHEN to assert (hysteresis); this predicate
% determines WHAT the asserted values mean (categorical classification).
node_health(Node, critical) :-
proxmox_topology:known_node(Node),
( node_metric(Node, cpu_steal, V, _), V >= 40.0
; node_metric(Node, disk_latency, V, _), V >= 5.0
; node_metric(Node, arc_miss_rate,V, _), V >= 20.0
; node_metric(Node, disk_io_util, V, _), V >= 95.0
),
!.
node_health(Node, degraded) :-
proxmox_topology:known_node(Node),
( node_metric(Node, cpu_steal, V, _), V >= 10.0
; node_metric(Node, disk_latency, V, _), V >= 0.5
; node_metric(Node, arc_miss_rate,V, _), V >= 5.0
; node_metric(Node, disk_io_util, V, _), V >= 70.0
),
!.
node_health(Node, nominal) :-
proxmox_topology:known_node(Node).
% node_health_compound(+Node, -Status)
% Compound health: a node is critical if the combination of metrics
% indicates a multi-system failure, even if no single metric crosses
% the critical threshold independently.
% Currently implemented condition: CPU steal >= 10% AND disk_latency >= 0.5ms
% — oversubscribed host causing I/O scheduler starvation.
node_health_compound(Node, critical) :-
node_metric(Node, cpu_steal, Steal, _), Steal >= 10.0,
node_metric(Node, disk_latency, Lat, _), Lat >= 0.5,
!.
node_health_compound(Node, Status) :-
node_health(Node, Status).
22.3.4 Guard Predicates
% healthy_node(+Node)
% Succeeds if Node is known and its current health status is nominal.
% This is the primary guard used by Chapter 19 firewall rules and the
% Chapter 17 shortest_path/3 routing logic.
%
% Fails for:
% - Unknown nodes (not in known_node/1)
% - Nodes with no live_state facts (no metrics received yet — conservative)
% - Nodes whose node_health/2 is degraded or critical
healthy_node(Node) :-
proxmox_topology:known_node(Node),
node_health(Node, nominal).
% storage_degraded(+Node, -Reason)
% Succeeds if the node's storage subsystem is degraded or worse.
% Returns a Reason atom describing the primary cause.
% Used by the ZFS resilvering compound alert (§22.4).
storage_degraded(Node, arc_miss_high) :-
node_metric(Node, arc_miss_rate, V, _),
V >= 5.0,
!.
storage_degraded(Node, latency_high) :-
node_metric(Node, disk_latency, V, _),
V >= 0.5,
!.
storage_degraded(Node, io_saturated) :-
node_metric(Node, disk_io_util, V, _),
V >= 70.0,
!.
% memory_critical(+Node)
% Succeeds if the node has a mem_available metric below the critical threshold.
% The mem_available metric is an inverted gauge — lower is worse.
% A value <= 10.0 means less than 10% of total RAM is available.
memory_critical(Node) :-
node_metric(Node, mem_available, V, _),
V =< 10.0.
% ── Live topology predicates ──────────────────────────────────────────────────
%
% The static link/3 facts in proxmox_topology.pl represent the physical
% network topology — cables, switches, and VLAN bridges. They are asserted at
% startup and mutated only by explicit operator commands. They have no knowledge
% of whether the hypervisor at each endpoint is healthy.
%
% live_link/3 gates each static edge on the health of both endpoints. A link
% whose source or destination node has entered a degraded or critical state is
% not traversable for routing purposes — traffic routed through an unhealthy
% node incurs latency penalties, drops, or VM migration failures.
%
% live_link/3 is the bridge between the Observation layer (Chapter 22
% telemetry and healthy_node/1) and the Orientation layer (Chapter 17
% shortest_path/3 routing). Chapter 26 wires this into the full Act phase.
%
% Note on circular imports: live_state imports proxmox_topology (for
% known_node/1). live_link/3 calls proxmox_topology:link/3 to retrieve the
% static edge, then calls healthy_node/1 (defined in this module) to filter
% by health. proxmox_topology does NOT import live_state — the dependency is
% one-directional. Routing predicates that need health awareness must use
% live_state:live_link/3 rather than proxmox_topology:link/3 directly.
% live_link(+A, +B, -Cost)
% Succeeds if a static link exists between A and B, AND both A and B are
% currently healthy (healthy_node/1 succeeds for both).
%
% Fails if either endpoint:
% - has node_health/2 of degraded or critical
% - has no live_state facts yet (no metrics received — conservative fail)
% - is not a known_node/1
live_link(A, B, Cost) :-
proxmox_topology:link(A, B, Cost),
healthy_node(A),
healthy_node(B).
% live_query_path(+Src, +Dst, -Cost, -Path)
% Public API for health-aware shortest path routing.
% Equivalent to proxmox_topology:query_path/4 but traverses live_link/3
% instead of link/3. Returns only paths whose every intermediate hop passes
% the healthy_node/1 guard.
%
% The path computation is NOT tabled here — live_link/3 is dynamic (it
% depends on node_health/2 which depends on node_metric/4, which changes
% every 15 seconds). Tabling a predicate that calls a rapidly-changing
% dynamic predicate produces stale answers without a dependency-graph
% invalidation mechanism. The correct caching strategy for Chapter 26 is
% to compute live_query_path/4 on demand (triggered by health state changes)
% and cache the result in a Go-side routing table using the SSE kb_updated
% event as the invalidation signal.
%
% For the 14-node cluster, an on-demand computation traverses at most
% O(N²) = 196 (Src, Dst) pairs in under 1ms — acceptable for Chapter 26's
% alert-triggered re-routing event rate (seconds between events, not
% milliseconds).
live_query_path(Src, Dst, Cost, Path) :-
must_be(ground, Src),
must_be(ground, Dst),
live_shortest_path(Src, Dst, Cost),
live_path_trace(Src, Dst, Cost, Path).
% live_shortest_path(+Src, +Dst, -Cost)
% Internal recursive helper for live_query_path/4.
% Uses findall/3 + min_list/2 to find the minimum-cost path over live_link/3.
% Not tabled — see live_query_path/4 commentary above.
live_shortest_path(Src, Dst, Cost) :-
live_link(Src, Dst, Cost),
!.
live_shortest_path(Src, Dst, Cost) :-
live_link(Src, Mid, C1),
Mid \= Dst,
live_shortest_path(Mid, Dst, C2),
Cost is C1 + C2.
% live_path_trace(+Src, +Dst, +Cost, -Path)
% Reconstructs the explicit hop sequence for a live path achieving Cost.
% Mirrors proxmox_topology:path_trace/4 but uses live_link/3.
live_path_trace(Src, Dst, Cost, [Src, Dst]) :-
live_link(Src, Dst, Cost),
!.
live_path_trace(Src, Dst, Cost, [Src | Rest]) :-
live_link(Src, Mid, C1),
C2 is Cost - C1,
C2 > 0,
live_path_trace(Mid, Dst, C2, Rest),
!.
22.4 The Alert Dispatcher
22.4.1 Alert Condition Registry
% File: /opt/logic-node/kb/alert_dispatcher.pl
:- module(alert_dispatcher, [
alert_condition/4,
check_alert_conditions/2,
trigger_alert/4
]).
:- use_module(live_state).
:- use_module(proxmox_topology).
% alert_condition(+ConditionID, +Severity, +Description, :Goal)
% Declares a named alert condition. Goal is called by the meta-interpreter
% during check_alert_conditions/2; if it succeeds, a trigger_alert/4 call
% is made for every Node that satisfies the condition.
%
% ConditionID: atom — unique identifier for the condition
% Severity: atom — critical | degraded | warning
% Description: atom — human-readable description for logging
% Goal: callable — must be callable with a single Node variable
% Single-metric critical alerts:
alert_condition(
cpu_steal_critical,
critical,
'CPU steal >= 40% — hypervisor oversubscription emergency',
(node_metric(Node, cpu_steal, V, _), V >= 40.0)
).
alert_condition(
disk_latency_critical,
critical,
'Disk read latency >= 5ms — likely hardware fault',
(node_metric(Node, disk_latency, V, _), V >= 5.0)
).
alert_condition(
io_saturated_critical,
critical,
'NVMe I/O utilisation >= 95% — device saturated',
(node_metric(Node, disk_io_util, V, _), V >= 95.0)
).
% Compound alerts — require two or more simultaneous conditions:
alert_condition(
cpu_steal_plus_disk_degraded,
critical,
'CPU steal >= 10% AND disk latency >= 0.5ms — oversubscription causing I/O starvation',
(
node_metric(Node, cpu_steal, Steal, _), Steal >= 10.0,
node_metric(Node, disk_latency, Lat, _), Lat >= 0.5
)
).
alert_condition(
arc_miss_plus_io_saturated,
degraded,
'ARC miss rate >= 20% AND I/O util >= 70% — ZFS working set exceeds ARC capacity under load',
(
node_metric(Node, arc_miss_rate, Miss, _), Miss >= 20.0,
node_metric(Node, disk_io_util, Util, _), Util >= 70.0
)
).
% ZFS resilvering + high CPU steal — the specific compound condition noted in the
% chapter spec. ZFS resilvering is detected via the arc_miss_rate proxy: a node
% undergoing resilvering generates a sustained arc_miss_rate spike as the rebuild
% reads uncached data from the mirror partner. Combined with CPU steal above 10%,
% this indicates the resilvering I/O is competing with hypervisor scheduling for
% the same physical resources.
alert_condition(
zfs_resilvering_under_cpu_pressure,
critical,
'ZFS resilvering (arc_miss >= 20%) concurrent with CPU steal >= 10%',
(
node_metric(Node, arc_miss_rate, Miss, _), Miss >= 20.0,
node_metric(Node, cpu_steal, Steal, _), Steal >= 10.0
)
).
22.4.2 Meta-Interpreter and Callback
% check_alert_conditions(+Timestamp, -Alerts)
% The meta-interpreter entry point called by the Go ingestor after each
% assertion batch. Iterates all registered alert_condition/4 facts,
% evaluates their Goals against the current live_state, and collects
% all triggered (Node, ConditionID, Severity) triples.
%
% Alerts is a list of alert_triggered(Node, ConditionID, Severity) terms.
% The Go layer's result.AlertList maps to this structure.
%
% Each alert_condition Goal is called with the Node variable free — the
% meta-interpreter iterates all nodes satisfying the Goal via findall/3.
% Duplicate suppression: an alert is only included once per (Node, Condition)
% pair regardless of how many times the Goal succeeds for that pair.
check_alert_conditions(Ts, Alerts) :-
must_be(integer, Ts),
findall(
alert_triggered(Node, CondID, Sev),
(
alert_condition(CondID, Sev, _Desc, Goal),
call(Goal),
proxmox_topology:known_node(Node)
),
AllAlerts
),
sort(AllAlerts, Alerts). % sort/2 removes duplicates
% trigger_alert(+Node, +ConditionID, +Severity, +Timestamp)
% Called by the Go layer (via CGO) when an AlertEvent is received from
% the alertCh channel. Logs the alert and asserts an alert_active/4 fact
% into live_state for audit and for compound-condition queries.
%
% alert_active/4 facts are retained for 300 seconds (5 minutes) and then
% garbage-collected by the ingestor's housekeeping cycle (§22.5.3).
:- dynamic alert_active/4.
trigger_alert(Node, ConditionID, Severity, Timestamp) :-
must_be(atom, Node),
must_be(atom, ConditionID),
must_be(atom, Severity),
must_be(integer, Timestamp),
retractall(alert_active(Node, ConditionID, _, _)),
assertz(alert_active(Node, ConditionID, Severity, Timestamp)),
format(atom(Msg),
"[ALERT] ~w/~w — ~w at ts=~w",
[Node, ConditionID, Severity, Timestamp]),
print_message(warning, format(Msg, [])).
% active_alerts(+Node, -ConditionIDs)
% Returns all currently active alert condition IDs for a node.
% Used by compound condition guards in routing and firewall rules.
active_alerts(Node, ConditionIDs) :-
findall(CID, alert_active(Node, CID, _, _), ConditionIDs).
% node_under_alert(+Node)
% Succeeds if Node has any active critical-severity alert.
% Used as a routing exclusion guard — nodes under critical alert
% are removed from the valid next-hop set in shortest_path/3.
node_under_alert(Node) :-
alert_active(Node, _, critical, _).
22.4.3 Alert Housekeeping
% housekeep_alerts(+CurrentTimestamp)
% Retracts alert_active/4 facts older than 300 seconds.
% Called by the Go ingestor's runCycle every 5 minutes (every 20 cycles).
% Prevents stale alerts from permanently excluding recovered nodes from routing.
housekeep_alerts(Now) :-
must_be(integer, Now),
Cutoff is Now - 300,
forall(
alert_active(Node, CondID, Sev, Ts),
( Ts < Cutoff
-> retract(alert_active(Node, CondID, Sev, Ts))
; true
)
).
22.5 Sovereign Security: Ephemeral Fact Trapping
22.5.1 Timestamp Atom Bloat
The most damaging Atom Table violation in a time-series logic system is not an injected label string — it is an innocuous-looking timestamp. A Unix epoch timestamp like 1741267200 contains ten decimal digits. If asserted as a Prolog atom rather than an integer, every distinct timestamp is interned permanently in the WAM's Atom Table as a ten-character sequence. The Atom Table is a hash table in C memory outside the Prolog heap; it does not participate in garbage collection and has no automatic eviction mechanism.
At the Chapter 22 assertion pattern of 84 metric facts per 15-second cycle, where each fact carries a distinct timestamp:
Timestamps per hour: 84 facts/cycle × 240 cycles/hour = 20,160 distinct timestamps
Atom Table entries: 20,160 per hour (never evicted)
Memory per atom: ~64 bytes (hash entry + string storage for 10-digit string)
Atom Table growth: ~1.29 MB per hour
~30.9 MB per day
~11.3 GB per year
After 6 hours: the WAM's atom table has consumed more memory than the entire
worker pool's heap allocation in the Chapter 16 configuration (8MB per engine).
Prolog integers do not use the Atom Table. 1741267200 as a Prolog integer is a tagged word on the stack or heap — on a 64-bit platform, integers up to 2^62 are stored as immediates in the tag bits of the pointer, consuming zero heap allocation. The timestamp 1741267200 as an integer occupies exactly zero additional bytes beyond the term cell that would exist in any node_metric/4 fact regardless. The same timestamp as an atom occupies 64 bytes in the Atom Table permanently.
The assert_node_metric/4 predicate in §22.3.1 enforces integer timestamps with must_be(integer, Ts) — any call that passes a quoted atom like '1741267200' rather than the integer 1741267200 throws a type error before reaching the assertz/1. The Go ingestor constructs the goal string with %d format for the timestamp (§22.2.2), which produces the unquoted integer literal.
22.5.2 Read-Only API Key and Least Privilege Configuration
The ingestor queries VictoriaMetrics with a read-only API key. VictoriaMetrics does not have a native RBAC system in its single-node configuration, but read-only restriction is enforced at the HTTP layer by the vmgateway reverse proxy or by nftables path filtering on obs-01.
# nftables rule on obs-01 — added to the existing ruleset from Chapter 20.
# Permits GET requests from the logic-node-01 management IP to port 8428.
# Blocks POST to /api/v1/import and /api/v1/write from logic-node-01 entirely
# — the ingestor has no business writing metric data to the TSDB.
#
# The actual rule uses the nft 'meta l4proto' and string match module.
# For a simpler deployment, the restriction is enforced at the application layer:
# On obs-01: create a read-only user in vmgateway config or use nginx as proxy.
# Minimal nginx snippet to proxy-filter VictoriaMetrics to logic-node-01:
# /etc/nginx/sites-available/vm-readonly-proxy
# server {
# listen 8429;
# allow 192.168.100.10; # logic-node-01 management IP
# deny all;
#
# location /api/v1/query { proxy_pass http://127.0.0.1:8428; }
# location /api/v1/query_range { proxy_pass http://127.0.0.1:8428; }
# location /api/v1/label { proxy_pass http://127.0.0.1:8428; }
#
# # All write endpoints: 403 Forbidden
# location /api/v1/import { return 403; }
# location /api/v1/write { return 403; }
# location / { return 403; }
# }
# Generate the read-only API key for the ingestor service:
root@obs-01:~# openssl rand -hex 32 > /etc/victoria-metrics/ingestor-ro-key
root@obs-01:~# chmod 600 /etc/victoria-metrics/ingestor-ro-key
# Set in the ingestor's systemd unit as an environment variable:
root@logic-node-01:~# systemctl edit logic-orchestrator
# /etc/systemd/system/logic-orchestrator.service.d/override.conf
[Service]
EnvironmentFile=/etc/logic-node/ingestor.env
# /etc/logic-node/ingestor.env — mode 600, owned by the orchestrator service user
VM_BASE_URL=http://192.168.100.5:8429
VM_API_KEY=<contents of /etc/victoria-metrics/ingestor-ro-key>
22.5.3 live_state Module Write Restriction
The Go ingestor is the only component authorised to write to live_state. All goals it dispatches use the live_state:assert_node_metric/4 entry point, which enforces must_be type guards and known_node/1 vocabulary validation before reaching assertz/1. No goal dispatched by the ingestor bypasses this path; the live_state:node_metric/4 dynamic predicate is not exported as a direct call target.
The KB loading order enforces this restriction structurally:
# /opt/logic-node/bin/start.sh — KB load order for the worker pool
swipl \
--quiet \
-l /opt/logic-node/kb/proxmox_topology.pl \
-l /opt/logic-node/kb/promql_oracle.pl \
-l /opt/logic-node/kb/live_state.pl \
-l /opt/logic-node/kb/alert_dispatcher.pl \
-l /opt/logic-node/kb/node_health.pl \
-g "use_module(live_state)" \
-t halt
# Verify the module boundary: direct assertz into live_state from outside
# the module fails at the module system level.
root@logic-node-01:~# swipl \
-l /opt/logic-node/kb/proxmox_topology.pl \
-l /opt/logic-node/kb/live_state.pl \
-g "
( catch(
assertz(live_state:node_metric(pve1, cpu_steal, 5.0, 1741267200)),
error(permission_error(modify,static_procedure,_),_),
true
)
-> writeln('PASS: direct assertz into live_state blocked')
; writeln('FAIL: module boundary not enforced')
),
halt
"
PASS: direct assertz into live_state blocked
The node_metric/4 predicate is :- dynamic within the module, but the module system prevents foreign code from asserting into it via direct assertz/1 without calling through the module's exported assert_node_metric/4 predicate. The must_be guards on assert_node_metric/4 mean that even a CGO call that constructs a well-formed goal string but passes a float timestamp — assert_node_metric(pve1, cpu_steal, 5.0, 1741267200.0) — throws a type error and is rejected before reaching the heap.
22.5.4 Ingestor Security Verification
# Verify that a float timestamp is rejected by assert_node_metric/4:
root@logic-node-01:~# swipl \
-l /opt/logic-node/kb/proxmox_topology.pl \
-l /opt/logic-node/kb/live_state.pl \
-g "
( catch(
live_state:assert_node_metric(pve1, cpu_steal, 5.0, 1741267200.0),
error(type_error(integer,1741267200.0),_),
true
)
-> writeln('PASS: float timestamp rejected — Atom Table safe')
; writeln('FAIL: float timestamp accepted')
),
halt
"
PASS: float timestamp rejected — Atom Table safe
# Verify that an unknown node is rejected by assert_node_metric/4:
root@logic-node-01:~# swipl \
-l /opt/logic-node/kb/proxmox_topology.pl \
-l /opt/logic-node/kb/live_state.pl \
-g "
( live_state:assert_node_metric(injected_host, cpu_steal, 5.0, 1741267200)
-> writeln('FAIL: unknown node accepted')
; writeln('PASS: unknown node rejected — topology guard active')
),
halt
"
PASS: unknown node rejected — topology guard active
# End-to-end: verify node_health/2 reads live facts correctly.
root@logic-node-01:~# swipl \
-l /opt/logic-node/kb/proxmox_topology.pl \
-l /opt/logic-node/kb/live_state.pl \
-g "
live_state:assert_node_metric(pve3, cpu_steal, 47.9, 1741267200),
live_state:assert_node_metric(pve3, disk_latency, 0.3, 1741267200),
live_state:node_health(pve3, Status),
format('pve3 health: ~w~n', [Status]),
(live_state:healthy_node(pve3) -> writeln('FAIL: pve3 is healthy') ; writeln('PASS: pve3 excluded by healthy_node/1')),
halt
"
pve3 health: critical
PASS: pve3 excluded by healthy_node/1
The live pipeline is operational: the Go ingestor polls VictoriaMetrics every 15 seconds, the hysteresis engine confirms state transitions before driving CGO assertions, assert_node_metric/4 enforces the integer-timestamp and vocabulary constraints that prevent Atom Table exhaustion, and healthy_node/1 instantly reflects the current physical state of the cluster in the WAM — available to every firewall rule, routing guard, and alert condition that reads it.