Chapter 23: Distributed Prolog (Pengines)
The Chapter 22 Ingestor centralises both the computation and the network traffic on a single WAM, making the orchestrator's per-cycle work proportional to the size of the entire cluster — a ceiling that a 14-node deployment approaches but a 140-node deployment would shatter. The Pengines architecture inverts this relationship: each hypervisor runs a local Prolog engine that evaluates health rules against its own telemetry and returns proofs rather than numbers, so the network carries a categorical verdict where it previously carried a raw metric stream. The Master node aggregates 14 proofs, not 84 floats, and the cluster becomes a distributed inference fabric that the orchestrator interrogates rather than a data warehouse it consumes.
23.1 Distributed Reasoning: The Pengines Architecture
23.1.1 The Pengines Protocol
Pengines (Prolog Engines) is a library that exposes a SWI-Prolog WAM engine as a stateful HTTP service. A client creates a Pengine on a remote server, asks it a Prolog query, and receives the solutions as JSON events over a persistent HTTP connection. The protocol is non-deterministic: a query that has multiple solutions returns them one at a time or in batches, with the client controlling backtracking by sending next requests. When the query is exhausted or the client issues stop, the Pengine is destroyed and its engine is released.
The protocol's defining characteristic is that the client sends logic, not just parameters. A call to pengine_ask(PengineId, local_health_check(Status, Anomalies), []) executes the goal local_health_check(Status, Anomalies) in the Pengine's WAM — evaluated against the KB that the Pengine server loaded at startup, using the local node's live telemetry as its database. The result is a bound Status atom and a bound Anomalies list produced by Prolog unification on the hypervisor, not a raw metric value that the Master must interpret.
This shifts the computation boundary. For a 14-node cluster with 6 metric families per node:
Pull-all-metrics model (Chapter 22):
Network: 84 float values per cycle × 15s interval = 336 bytes/s per node
Central WAM: evaluates all 84 health rules per cycle, 14 nodes × 6 metrics
Bottleneck: central WAM CPU, central network ingress
Push-proofs model (Chapter 23):
Network: 14 verdict terms per cycle (avg ~40 bytes each) = 37 bytes/s per node
Local WAM: each hypervisor evaluates its own 6 health rules
Master WAM: aggregates 14 proofs, no metric arithmetic
Bottleneck: none — work is distributed proportionally
The bandwidth advantage is secondary. The primary advantage is that the hypervisor's local KB can express health rules that reference facts only available locally — ZFS pool status from zpool status, IPMI sensor readings, Proxmox task queue depth — without requiring those facts to be transmitted to and asserted into the central WAM.
23.1.2 Why Proofs, Not Metrics
The architectural principle is the separation between data and inference. VictoriaMetrics stores data. The Pengine servers store inference — they load node_health.pl and evaluate it against the local metric store on demand. A query from the Master that asks degraded_workloads(X) does not retrieve a metric; it causes the hypervisor's WAM to search its local KB, evaluate node_health_compound/2, apply the hysteresis state from live_state, and return only the nodes satisfying the predicate. The Master receives a proof term: either X = [vm_id_5, vm_id_12] (two degraded workloads) or false (none found), computed entirely on the hypervisor.
This is the pull-based alert model forward-referenced in Chapter 20 §20.5.4: the Orchestrator's shortest_path/3 guard acts on asserted node_alert/3 facts the moment they arrive, without polling lag, because the hypervisor's Pengine server evaluated the condition locally and transmitted only the transition event.
23.1.3 Distributed Harvester Topology
%%{init: {"themeVariables": {"fontSize": "14px"}}}%%
flowchart TD
MASTER["Master Logic Node\nlogic-node-01\n192.168.100.10 (mgmt VLAN 10)\ncluster_aggregator.pl\npengine_create/1 per hypervisor\ncall_with_time_limit: 8s\ncatch/3: partition → state(partitioned)"]
VLAN["Metrics VLAN 40\n10.40.0.0/24\nIsolated from cluster data plane\nnftables: port 3030 ingress\nfrom 192.168.100.10 only\nno route to management VPN"]
PVE1["pve1 — Pengine Server\n10.40.0.11:3030\nharvester_daemon.pl\nnode_health.pl\nlive_state module\nsandbox: deny shell/assertz\nbind: 10.40.0.11 only"]
PVE2["pve2 — Pengine Server\n10.40.0.12:3030\nharvester_daemon.pl\nIdentical configuration"]
PVEN["pve3…pve14\n10.40.0.13…10.40.0.24:3030\nharvester_daemon.pl\nIdentical configuration"]
RESULT["Aggregated Proof\ncluster_health([\nnode_state(pve1, nominal, []),\nnode_state(pve3, critical,\n [cpu_steal_critical]),\nnode_state(pve7, partitioned, [])\n...])\nAsserted to central WAM\nStreamed via SSE to dashboard"]
MASTER --->|"pengine_create/1\nHTTP over VLAN 40"| VLAN
VLAN --->|"port 3030"| PVE1
VLAN --->|"port 3030"| PVE2
VLAN --->|"port 3030"| PVEN
PVE1 --->|"proof term\nJSON event"| MASTER
PVE2 --->|"proof term"| MASTER
PVEN --->|"proof term"| MASTER
MASTER --->|"asserts state"| RESULT
style MASTER fill:#1A2B4A,color:#FFFFFF
style VLAN fill:#8B6914,color:#FFFFFF
style PVE1 fill:#1A4070,color:#FFFFFF
style PVE2 fill:#1A4070,color:#FFFFFF
style PVEN fill:#1A4070,color:#FFFFFF
style RESULT fill:#1A6B3A,color:#FFFFFF
The Master connects to each Pengine server over VLAN 40 using the hypervisor's VLAN address. The Pengine servers bind exclusively to their VLAN 40 interface — they are not reachable from the cluster data plane, from the management VPN, or from any other interface. Each Pengine server's nftables ruleset permits port 3030 ingress only from 192.168.100.10 (the Master's management interface, which has a route to VLAN 40 via the management VM host).
23.2 Sovereign Security: The RPC Sandbox
23.2.1 The RCE Surface Without Sandboxing
A Pengine server that accepts arbitrary Prolog code over HTTP and executes it without restriction is an unauthenticated remote code execution service. The Pengines protocol includes a src_text option that allows the client to send Prolog clauses to be loaded into the Pengine's engine before the query runs. Without sandboxing, a client sending:
src_text(':- shell("rm -rf /opt/logic-node").')
or
src_text(':- open("/etc/passwd", read, S), read_term(S, T, []), writeln(T).')
would execute those goals with the Pengine server process's OS privileges. On a hypervisor running as root (standard Proxmox configuration), this is full host compromise via a single HTTP POST.
library(sandbox) is the defence. It intercepts every predicate call made inside a sandboxed Pengine and checks it against a whitelist of safe primitives before execution. Calls not on the whitelist throw error(permission_error(call, sandboxed, Goal), _) and the engine is destroyed without executing the goal.
23.2.2 How library(sandbox) Works
The sandbox operates through three extensible predicates:
sandbox:safe_primitive(+Goal) declares that a specific built-in or library predicate is safe to call with any arguments. The default whitelist covers pure arithmetic, list operations, term manipulation, and string operations. Filesystem access, process control, network operations, and database mutation are not on the default whitelist.
sandbox:safe_meta(+Goal, -Called) declares that a higher-order predicate is safe if all goals it calls are themselves safe. findall/3, aggregate_all/3, and maplist/2-5 are covered by default meta-safety rules: they are safe if their goal argument is safe.
sandbox:safe_global_variable(+Name) declares that reading a specific global variable is permitted. Writing to global variables is never permitted in the default configuration.
The sandbox is invoked automatically when a Pengine is created with sandboxed(true) — the default — or explicitly via safe_goal/1 for testing.
23.2.3 Sandbox Configuration for the Harvester
The Harvester Pengine server requires three non-default permissions: the ability to query live_state module predicates (which are dynamic, not built-in), the ability to call node_health.pl predicates, and the ability to read the get_time/1 wall clock (needed for the timestamp staleness check in cpu_steal_valid/2). All three are granted via safe_primitive/1 overrides. Everything else — shell access, file I/O, network calls, clause database mutation — is denied by the default sandbox.
% File: /opt/logic-node/kb/harvester_sandbox.pl
%
% Sandbox extension for the Hypervisor Harvester Pengine server.
% Loaded after library(sandbox) by harvester_daemon.pl.
% Grants the minimum permissions required for local health queries.
% Every predicate not listed here is denied by the default sandbox.
:- module(harvester_sandbox, []).
:- use_module(library(sandbox)).
% ── Permitted: live_state read access ────────────────────────────────────────
%
% The sandbox checks goals by functor and arity after module resolution.
% node_metric/4 is a dynamic predicate in live_state — it is not a built-in,
% so it does not appear on the default safe_primitive list. We grant read
% access explicitly. assertz, retract, and retractall into live_state are
% NOT granted — the Pengine client can query facts but cannot mutate them.
sandbox:safe_primitive(live_state:node_metric(_,_,_,_)).
sandbox:safe_primitive(live_state:node_health(_,_)).
sandbox:safe_primitive(live_state:node_health_compound(_,_)).
sandbox:safe_primitive(live_state:healthy_node(_)).
sandbox:safe_primitive(live_state:storage_degraded(_,_)).
sandbox:safe_primitive(live_state:memory_critical(_)).
sandbox:safe_primitive(live_state:cpu_steal_valid(_,_)).
% ── Permitted: topology read access ──────────────────────────────────────────
%
% The Master may ask the local Pengine about known topology facts.
% known_node/1 and known_link/3 are static facts, not dynamic — they are
% loaded at startup and not modified. Still requires explicit permission
% because they are module-qualified predicates, not built-ins.
sandbox:safe_primitive(proxmox_topology:known_node(_)).
sandbox:safe_primitive(proxmox_topology:known_link(_,_,_)).
% ── Permitted: alert state read access ───────────────────────────────────────
sandbox:safe_primitive(alert_dispatcher:alert_active(_,_,_,_)).
sandbox:safe_primitive(alert_dispatcher:active_alerts(_,_)).
% ── Permitted: wall clock read ───────────────────────────────────────────────
%
% get_time/1 is a built-in but is not on the default safe_primitive list
% because it provides covert channel information (timing side channels).
% For health monitoring, the timestamp is needed for staleness checks.
% Granting get_time/1 does not permit any write operation.
sandbox:safe_primitive(system:get_time(_)).
% ── Explicitly denied: state mutation ────────────────────────────────────────
%
% These are redundant — they are denied by default — but are listed explicitly
% as documentation of the threat model and to prevent accidental removal of
% the library(sandbox) import from causing silent permission expansion.
% sandbox:safe_primitive(system:assertz(_)). % NEVER grant
% sandbox:safe_primitive(system:retract(_)). % NEVER grant
% sandbox:safe_primitive(system:shell(_)). % NEVER grant
% sandbox:safe_primitive(system:open(_,_,_)). % NEVER grant
% sandbox:safe_primitive(system:process_create(_,_,_)). % NEVER grant
23.2.4 Verifying the Sandbox
# Verify that shell/1 is blocked before deploying the Pengine server:
root@pve1:~# swipl \
-l /opt/logic-node/kb/harvester_sandbox.pl \
-g "
use_module(library(sandbox)),
( catch(
safe_goal(shell('id')),
error(permission_error(call, sandboxed, shell('id')), _),
true
)
-> writeln('PASS: shell/1 sandboxed')
; writeln('FAIL: shell/1 permitted — do not deploy')
),
halt
"
PASS: shell/1 sandboxed
# Verify that open/3 is blocked:
root@pve1:~# swipl \
-l /opt/logic-node/kb/harvester_sandbox.pl \
-g "
use_module(library(sandbox)),
( catch(
safe_goal(open('/etc/passwd', read, _)),
error(permission_error(call, sandboxed, _), _),
true
)
-> writeln('PASS: open/3 sandboxed')
; writeln('FAIL: open/3 permitted — do not deploy')
),
halt
"
PASS: open/3 sandboxed
# Verify that assertz/1 into live_state is blocked:
root@pve1:~# swipl \
-l /opt/logic-node/kb/harvester_sandbox.pl \
-g "
use_module(library(sandbox)),
( catch(
safe_goal(assertz(live_state:node_metric(pve1, cpu_steal, 99.0, 0))),
error(permission_error(call, sandboxed, _), _),
true
)
-> writeln('PASS: assertz into live_state sandboxed')
; writeln('FAIL: assertz permitted — live_state mutable by remote client')
),
halt
"
PASS: assertz into live_state sandboxed
23.3 The Build: The Hypervisor Harvester Agent
23.3.1 Deployment Layout on Each Hypervisor
/opt/logic-harvester/
├── bin/
│ └── start.sh — startup script
├── kb/
│ ├── proxmox_topology.pl — copy of Master topology KB (static)
│ ├── node_health.pl — local health rules (§22.3)
│ ├── live_state.pl — live metric facts (§22.3.1)
│ ├── alert_dispatcher.pl — alert condition registry (§22.4.1)
│ └── harvester_sandbox.pl — sandbox extensions (§23.2.3)
└── harvester_daemon.pl — HTTP server entry point
The proxmox_topology.pl on each hypervisor is a stripped read-only copy: it contains known_node/1 facts for all 14 hypervisors (for compound health queries that reference cluster-wide topology) but no known_link/3 facts (those are managed exclusively by the Master). The live_state.pl on each hypervisor contains only facts about that node — node_metric/4 clauses for pveN, never for any other node. The Go ingestor on the hypervisor (a per-node instance of the Chapter 22 Ingestor) writes to the local live_state module; the Pengine server reads from it.
23.3.2 harvester_daemon.pl
% File: /opt/logic-harvester/harvester_daemon.pl
%
% SWI-Prolog HTTP server that hosts the Pengine endpoint for the local hypervisor.
% Binds exclusively to the Metrics VLAN interface (10.40.0.X:3030).
% Loads the local health KB and exposes it via the Pengines HTTP API.
%
% Start:
% swipl -l /opt/logic-harvester/harvester_daemon.pl -g "start_harvester(pve1)" -t halt
%
% The Node argument is used to label health-check results and verify that
% the local live_state contains only facts for this node.
:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).
:- use_module(library(pengines)).
:- use_module(library(sandbox)).
% Load the local KB stack.
:- use_module('/opt/logic-harvester/kb/proxmox_topology').
:- use_module('/opt/logic-harvester/kb/live_state').
:- use_module('/opt/logic-harvester/kb/alert_dispatcher').
:- use_module('/opt/logic-harvester/kb/node_health').
% Load sandbox extensions AFTER library(sandbox) has been loaded.
:- use_module('/opt/logic-harvester/kb/harvester_sandbox').
% ── Server configuration ──────────────────────────────────────────────────────
% harvester_port/1 and harvester_bind/2 are set at startup from arguments.
:- dynamic harvester_node/1.
:- dynamic harvester_address/1.
% start_harvester(+NodeAtom)
% Reads the VLAN 40 address for NodeAtom from the topology KB,
% starts the HTTP server on that address, port 3030, and blocks.
start_harvester(Node) :-
must_be(atom, Node),
proxmox_topology:known_node(Node),
vlan40_address(Node, Address),
assertz(harvester_node(Node)),
assertz(harvester_address(Address)),
format(atom(AddrPort), "~w:3030", [Address]),
format("[Harvester] Starting on ~w for node ~w~n", [AddrPort, Node]),
% Pengines HTTP endpoint is registered automatically by library(pengines)
% at /pengine/. The http_server dispatch table handles it.
http_server(http_dispatch,
[ port(3030),
bind_address(Address), % bind to VLAN 40 only — not 0.0.0.0
workers(4), % 4 HTTP worker threads
timeout(30) % 30s idle connection timeout
]),
format("[Harvester] Server running. KB loaded for ~w.~n", [Node]),
thread_get_message(_). % block main thread; server runs in background
% vlan40_address(+Node, -Address)
% Returns the VLAN 40 IP address string for a known hypervisor.
% The numbering follows Chapter 20: pve1 = 10.40.0.11, pve14 = 10.40.0.24.
vlan40_address(Node, Address) :-
atom_concat('pve', NumAtom, Node),
atom_number(NumAtom, Num),
Num >= 1, Num =< 14,
LastOctet is Num + 10,
format(atom(Address), "10.40.0.~w", [LastOctet]).
% ── Pengine application configuration ────────────────────────────────────────
% pengine_application declares the security policy for Pengines created
% against this server. We use a named application 'harvester' that:
% - enforces sandboxing (sandboxed: true)
% - disables src_text: clients cannot inject new Prolog clauses
% - restricts allowed modules to the local health KB
:- pengine_application(harvester).
pengines:pengine_application(harvester).
pengines:pengine_option(harvester, sandboxed, true).
pengines:pengine_option(harvester, src_text, ''). % no client-supplied code
pengines:pengine_option(harvester, application, harvester).
% ── Local health check predicate ─────────────────────────────────────────────
%
% This is the primary entry point the Master node queries via Pengine.
% It is declared in the 'harvester' module space so Pengines can find it.
% local_health_check(-Node, -Status, -Anomalies)
%
% Returns the current health status of this hypervisor and a list of
% active anomaly terms describing why the status is non-nominal.
%
% Node: the atom identifying this hypervisor (from harvester_node/1)
% Status: one of: nominal | degraded | critical
% Anomalies: list of anomaly(Type, Value, Threshold) terms for each breach
local_health_check(Node, Status, Anomalies) :-
harvester_node(Node),
live_state:node_health_compound(Node, Status),
collect_anomalies(Node, Anomalies).
% collect_anomalies(+Node, -Anomalies)
% Builds the list of metric breaches for the node's current state.
% Anomalies is empty if the node is nominal.
collect_anomalies(Node, Anomalies) :-
findall(Anomaly,
anomaly_check(Node, Anomaly),
Anomalies).
anomaly_check(Node, anomaly(cpu_steal_critical, V, 40.0)) :-
live_state:node_metric(Node, cpu_steal, V, _), V >= 40.0.
anomaly_check(Node, anomaly(cpu_steal_degraded, V, 10.0)) :-
live_state:node_metric(Node, cpu_steal, V, _), V >= 10.0, V < 40.0.
anomaly_check(Node, anomaly(disk_latency_critical, V, 5.0)) :-
live_state:node_metric(Node, disk_latency, V, _), V >= 5.0.
anomaly_check(Node, anomaly(disk_latency_degraded, V, 0.5)) :-
live_state:node_metric(Node, disk_latency, V, _), V >= 0.5, V < 5.0.
anomaly_check(Node, anomaly(arc_miss_critical, V, 20.0)) :-
live_state:node_metric(Node, arc_miss_rate, V, _), V >= 20.0.
anomaly_check(Node, anomaly(io_saturated, V, 95.0)) :-
live_state:node_metric(Node, disk_io_util, V, _), V >= 95.0.
anomaly_check(Node, anomaly(io_stressed_plus_steal, steal_V-lat_V, combined)) :-
live_state:node_metric(Node, cpu_steal, SV, _), SV >= 10.0,
live_state:node_metric(Node, disk_latency, LV, _), LV >= 0.5,
steal_V = SV, lat_V = LV.
% local_alert_summary(-Node, -ActiveAlerts)
% Returns the list of currently active alert condition IDs on this node.
% Used by the Master's alert aggregator to build a cluster-wide alert index.
local_alert_summary(Node, ActiveAlerts) :-
harvester_node(Node),
alert_dispatcher:active_alerts(Node, ActiveAlerts).
% metric_snapshot(-Node, -Snapshot)
% Returns all current node_metric/4 facts as a snapshot list.
% Snapshot = [metric(Type, Value, Ts), ...] sorted by Type.
% Used by the Master when it needs raw values, not just categorical status.
metric_snapshot(Node, Snapshot) :-
harvester_node(Node),
findall(metric(Type, Value, Ts),
live_state:node_metric(Node, Type, Value, Ts),
UnsortedSnapshot),
sort(1, @=<, UnsortedSnapshot, Snapshot).
23.3.3 Systemd Unit
# /etc/systemd/system/logic-harvester.service
[Unit]
Description=Logic Harvester Pengine Agent
Documentation=man:swipl(1)
After=network-online.target
Wants=network-online.target
# Start after the local Go telemetry ingestor has populated live_state:
After=logic-ingestor.service
[Service]
Type=simple
User=harvester
Group=harvester
WorkingDirectory=/opt/logic-harvester
# NODE_ID is set per-hypervisor during provisioning (e.g. pve3).
# VLAN40_ADDR is the 10.40.0.X address for this specific hypervisor.
EnvironmentFile=/etc/logic-harvester/node.env
ExecStart=/usr/bin/swipl \
--stack-limit=64M \
--table-space=32M \
-l /opt/logic-harvester/harvester_daemon.pl \
-g "start_harvester(${NODE_ID})" \
-t halt
Restart=on-failure
RestartSec=10s
TimeoutStartSec=30s
# Network isolation: Harvester can only communicate on VLAN 40.
# The 192.168.100.10 entry permits the Master's management interface
# (which routes to VLAN 40 via the Proxmox VM host) to reach port 3030.
IPAddressAllow=10.40.0.0/24 127.0.0.1/8 ::1/128
IPAddressDeny=any
# Socket bind restriction (systemd v249+, kernel eBPF enforcement).
# SocketBindAllow permits binding port 3030 TCP only. SocketBindDeny=any
# blocks all other bind(2) calls at the kernel level via cgroup BPF —
# the restriction survives a sandbox bypass because it operates below the
# SWI-Prolog process: the kernel rejects the bind(2) syscall before it
# reaches userspace. A compromised Harvester process cannot open a
# rogue reverse-shell listener on any port other than 3030.
SocketBindAllow=tcp:3030
SocketBindDeny=any
# Memory and CPU limits — Harvester is not latency-critical.
MemoryMax=256M
CPUQuota=25%
# No new privileges; no root filesystem writes.
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes
ReadOnlyPaths=/opt/logic-harvester
[Install]
WantedBy=multi-user.target
23.3.4 Provisioning Script
#!/bin/bash
# /opt/logic-harvester/bin/provision.sh
# Runs once per hypervisor during initial deployment.
# Sets NODE_ID and VLAN40_ADDR in /etc/logic-harvester/node.env
set -euo pipefail
HOSTNAME=$(hostname -s) # e.g. "pve3"
if ! [[ "$HOSTNAME" =~ ^pve([0-9]+)$ ]]; then
echo "ERROR: hostname $HOSTNAME does not match pveN pattern" >&2
exit 1
fi
NODE_NUM="${BASH_REMATCH[1]}"
if (( NODE_NUM < 1 || NODE_NUM > 14 )); then
echo "ERROR: node number $NODE_NUM outside 1..14" >&2
exit 1
fi
LAST_OCTET=$(( NODE_NUM + 10 ))
VLAN40_ADDR="10.40.0.${LAST_OCTET}"
install -d -m 750 -o harvester -g harvester /etc/logic-harvester
cat > /etc/logic-harvester/node.env <<EOF
NODE_ID=${HOSTNAME}
VLAN40_ADDR=${VLAN40_ADDR}
EOF
chmod 640 /etc/logic-harvester/node.env
chown harvester:harvester /etc/logic-harvester/node.env
echo "[Provision] ${HOSTNAME}: VLAN40_ADDR=${VLAN40_ADDR}"
echo "[Provision] Created /etc/logic-harvester/node.env"
23.4 The Build: Aggregating Distributed Proofs
23.4.1 The Pengines Client API
The SWI-Prolog Pengines client library provides four predicates used in the Master aggregator:
pengine_create(+Options) creates a new Pengine on a remote server (or locally). The Options list controls the server URL, the initial query (ask), the solution template (template), whether the Pengine is sandboxed on the remote end, and whether it auto-destroys after the query completes. pengine_create/1 is asynchronous: it returns immediately and sends a create event to the calling thread's event queue.
pengine_ask(+PengineId, +Goal, +Options) sends a query to an already-created Pengine. Used when the Pengine is reused across multiple queries within a session.
pengine_event(?Event) and pengine_event(?Event, +Options) receive the next event from any active Pengine associated with the calling thread. Events are terms: create(Id, _), success(Id, Solutions, Projection, Time, More), failure(Id, Time), error(Id, Error), destroy(Id). More is true if additional solutions exist and false if the query is exhausted.
pengine_event_loop(:Closure, +Options) dispatches events to Closure until all Pengines associated with the calling thread are destroyed. Used for multi-Pengine aggregation where events from multiple concurrent queries interleave in the thread's event queue.
% Minimal Pengines round-trip: single query, single solution.
:- use_module(library(pengines)).
query_single_node(NodeURL, Goal, Template, Solution) :-
pengine_create([
server(NodeURL),
ask(Goal),
template(Template),
application(harvester),
destroy(true)
]),
pengine_event(success(_, [Solution], _, _, _)).
The destroy(true) option above is appropriate for infrequent or one-shot queries. For the 15-second polling cycle that drives global_cluster_health/2, spawning and destroying a WAM engine on the remote hypervisor on every cycle incurs initialisation overhead — SWI-Prolog starts a fresh Prolog engine, loads the sandbox, and registers the application on each pengine_create/1 call. On a lightly-loaded hypervisor this overhead is approximately 1–2ms per cycle; across 14 nodes at 15-second intervals it is negligible. At sub-second polling frequencies — as used by Chapter 25's bin-packer when checking placement feasibility during an active migration — it becomes the dominant latency.
The alternative is a long-lived Pengine: create the Pengine once (without destroy(true)) and reuse it across multiple queries by calling pengine_ask/2 in a loop. The Pengine persists on the hypervisor between queries, its KB already loaded, its sandbox already active, and the HTTP connection optionally kept alive:
% Long-lived Pengine pattern — for high-frequency polling:
:- use_module(library(pengines)).
% open_node_pengine(+NodeURL, -PengineId)
% Creates a persistent Pengine on NodeURL without an initial query.
% The caller retains PengineId and passes it to query_persistent_pengine/3.
open_node_pengine(NodeURL, PengineId) :-
pengine_create([
server(NodeURL),
application(harvester),
destroy(false) % keep alive after each query completes
]),
pengine_event(create(PengineId, _)). % wait for creation confirmation
% query_persistent_pengine(+PengineId, +Goal, -Result)
% Sends a query to an already-open Pengine and returns the first solution.
% The Pengine remains alive after this call, ready for the next query.
query_persistent_pengine(PengineId, Goal, Result) :-
pengine_ask(PengineId, Goal, [template(Goal), chunk(1)]),
pengine_event(Event),
interpret_event(Event, Result).
% close_node_pengine(+PengineId)
% Destroys the Pengine when the polling session ends or on graceful shutdown.
close_node_pengine(PengineId) :-
pengine_destroy(PengineId).
Long-lived Pengines require the Master to manage Pengine lifecycle explicitly: tracking open PengineId handles in a per-node state map, detecting when a remote destroy event arrives (indicating the server restarted), and reopening the Pengine automatically. The query_node_safe/5 wrapper must be extended to detect the destroy(Id) event as a transient failure and re-issue open_node_pengine/2. For the 15-second cycle the per-call destroy(true) pattern is simpler and sufficient; the long-lived pattern is a Chapter 25 optimisation item.
23.4.2 Network Resilience: Timeout and Partition Handling
A Pengine query to an offline or partitioned hypervisor will either fail immediately (TCP connection refused) or hang (TCP SYN sent, no response, waiting for the OS connection timeout — up to 130 seconds with default Linux TCP parameters). Neither is acceptable: a connection refused should map to state(unreachable), and a hung connection should never block the Master's aggregation cycle.
The defence is two-layered. call_with_time_limit(+Seconds, :Goal) from library(time) terminates the goal if it has not completed within the deadline by throwing error(time_limit_exceeded, _). The catch/3 wrapper catches this error and the two TCP error terms that pengine_create/1 can throw on connection failure.
% query_node_safe(+NodeURL, +Goal, +Template, +TimeoutSecs, -Result)
%
% Executes Goal on the Pengine server at NodeURL with a timeout.
% Result is one of:
% solution(Solution) — query succeeded, Solution is the bound template
% no_solution — query completed with failure (no matching facts)
% state(partitioned) — timeout: node is unreachable or too slow
% state(unreachable) — TCP connection refused: node is down
% error(Error) — Pengine returned an error term
query_node_safe(NodeURL, Goal, Template, TimeoutSecs, Result) :-
catch(
call_with_time_limit(TimeoutSecs,
query_node_inner(NodeURL, Goal, Template, Result)
),
Error,
handle_query_error(Error, Result)
).
query_node_inner(NodeURL, Goal, Template, Result) :-
pengine_create([
server(NodeURL),
ask(Goal),
template(Template),
application(harvester),
destroy(true),
chunk(1) % request solutions one at a time
]),
collect_first_result(Result).
collect_first_result(Result) :-
pengine_event(Event),
interpret_event(Event, Result).
% interpret_event(+Event, -Result)
% Maps a Pengine event to the Result vocabulary.
% Only processes the first solution — the aggregator does not need
% all solutions for health-check predicates that return at most one.
interpret_event(success(_, [Solution|_], _, _, _), solution(Solution)) :- !.
interpret_event(failure(_, _), no_solution) :- !.
interpret_event(error(_, Error), error(Error)) :- !.
interpret_event(destroy(_), no_solution) :- !.
interpret_event(_OtherEvent, no_solution).
handle_query_error(error(time_limit_exceeded, _), state(partitioned)) :- !.
handle_query_error(error(existence_error(pengine, _), _), state(partitioned)) :- !.
handle_query_error(error(socket_error(_, _), _), state(unreachable)) :- !.
handle_query_error(error(tcp_error(_, _), _), state(unreachable)) :- !.
handle_query_error(Other, error(Other)).
23.4.3 cluster_aggregator.pl
% File: /opt/logic-node/kb/cluster_aggregator.pl
%
% Master node distributed proof aggregator.
% Queries all 14 hypervisor Pengine servers in parallel and assembles
% the results into a single cluster_health/1 term.
:- module(cluster_aggregator, [
global_cluster_health/2,
query_all_nodes/2,
node_pengine_url/2,
node_state_summary/1
]).
:- use_module(library(pengines)).
:- use_module(library(thread)).
:- use_module(library(aggregate)).
:- use_module(library(time)).
:- use_module(proxmox_topology, [known_node/1]).
:- use_module(live_state, [assert_node_metric/4]).
% ── URL construction ──────────────────────────────────────────────────────────
% node_pengine_url(+Node, -URL)
% Constructs the Pengines endpoint URL for a given hypervisor.
% Uses the VLAN 40 addressing scheme: pveN → 10.40.0.(N+10):3030.
node_pengine_url(Node, URL) :-
atom_concat('pve', NumAtom, Node),
atom_number(NumAtom, Num),
Num >= 1, Num =< 14,
LastOctet is Num + 10,
format(atom(URL), "http://10.40.0.~w:3030/pengine/", [LastOctet]).
% ── Core aggregation predicate ────────────────────────────────────────────────
% global_cluster_health(+Timestamp, -ClusterHealth)
%
% Queries all known hypervisors for their current health state.
% Executes all 14 queries concurrently using thread_maplist/3.
% Assembles results into:
% ClusterHealth = cluster_health(Timestamp, NodeStates)
% where NodeStates is a list of node_state(Node, Status, Anomalies) terms,
% one per known_node/1, in node-name sorted order.
%
% Nodes that are offline or partitioned appear as:
% node_state(Node, partitioned, [])
% node_state(Node, unreachable, [])
% rather than being absent from the list — the Master always produces
% a complete 14-element list regardless of how many nodes responded.
global_cluster_health(Timestamp, cluster_health(Timestamp, NodeStates)) :-
must_be(integer, Timestamp),
findall(Node, proxmox_topology:known_node(Node), Nodes),
sort(Nodes, SortedNodes), % deterministic ordering for SSE diffs
% Parallel query: one thread per node, 8s per-node timeout.
% thread_maplist/3 joins all threads before returning.
thread_maplist(query_single_node_health, SortedNodes, NodeStates).
% query_single_node_health(+Node, -NodeState)
%
% Queries the Pengine server on Node for its local_health_check result.
% Returns node_state(Node, Status, Anomalies) on success, or
% node_state(Node, partitioned, []) / node_state(Node, unreachable, [])
% on network failure.
%
% The 8-second timeout is chosen to fit within the 15-second scrape cycle:
% 14 parallel queries × 8s max each, but since they run concurrently,
% total aggregation time is bounded by the slowest responding node, not
% the sum. In practice, well-connected nodes respond in 50–200ms.
query_single_node_health(Node, NodeState) :-
node_pengine_url(Node, URL),
Goal = local_health_check(Node, Status, Anomalies),
Template = local_health_check(Node, Status, Anomalies),
query_node_safe(URL, Goal, Template, 8, QueryResult),
interpret_node_result(Node, QueryResult, NodeState).
interpret_node_result(Node, solution(local_health_check(Node, Status, Anomalies)),
node_state(Node, Status, Anomalies)) :- !.
interpret_node_result(Node, no_solution,
node_state(Node, nominal, [])) :- !.
interpret_node_result(Node, state(partitioned),
node_state(Node, partitioned, [])) :- !.
interpret_node_result(Node, state(unreachable),
node_state(Node, unreachable, [])) :- !.
interpret_node_result(Node, error(_E),
node_state(Node, error, [])).
% ── Convenience predicates ────────────────────────────────────────────────────
% query_all_nodes(+Goal, -Results)
%
% Generic parallel query: runs Goal on every known_node/1 Pengine server
% and returns a list of (Node, Result) pairs.
% Goal must be callable as a local_health_check-style predicate on each node.
% Used by the Chapter 25 bin-packer for distributed capacity queries.
query_all_nodes(GoalTemplate, NodeResults) :-
findall(Node, proxmox_topology:known_node(Node), Nodes),
thread_maplist(
{GoalTemplate}/[Node, Node-Result]>>(
node_pengine_url(Node, URL),
query_node_safe(URL, GoalTemplate, GoalTemplate, 8, QR),
Result = QR
),
Nodes,
NodeResults
).
% node_state_summary(-Summary)
%
% Returns counts of nodes in each health category.
% Summary = summary(Nominal, Degraded, Critical, Partitioned, Unreachable)
node_state_summary(summary(Nominal, Degraded, Critical, Partitioned, Unreachable)) :-
get_time(Now), Ts is round(Now),
global_cluster_health(Ts, cluster_health(_, NodeStates)),
aggregate_all(count, member(node_state(_, nominal, _), NodeStates), Nominal),
aggregate_all(count, member(node_state(_, degraded, _), NodeStates), Degraded),
aggregate_all(count, member(node_state(_, critical, _), NodeStates), Critical),
aggregate_all(count, member(node_state(_, partitioned, _), NodeStates), Partitioned),
aggregate_all(count, member(node_state(_, unreachable, _), NodeStates), Unreachable).
23.4.4 Aggregation Verification
# With harvester agents running on all 14 hypervisors,
# query the full cluster health from logic-node-01:
root@logic-node-01:~# swipl \
-l /opt/logic-node/kb/proxmox_topology.pl \
-l /opt/logic-node/kb/live_state.pl \
-l /opt/logic-node/kb/alert_dispatcher.pl \
-l /opt/logic-node/kb/cluster_aggregator.pl \
-g "
get_time(Now), Ts is round(Now),
cluster_aggregator:global_cluster_health(Ts, Health),
Health = cluster_health(_, NodeStates),
length(NodeStates, N),
format('Nodes queried: ~w~n', [N]),
maplist([node_state(Node,Status,Anomalies)]>>(
length(Anomalies, AC),
format(' ~w: ~w (~w anomalies)~n', [Node, Status, AC])
), NodeStates),
halt
"
Nodes queried: 14
pve1: nominal (0 anomalies)
pve2: nominal (0 anomalies)
pve3: critical (2 anomalies)
pve4: nominal (0 anomalies)
pve5: degraded (1 anomaly)
pve6: nominal (0 anomalies)
pve7: partitioned (0 anomalies)
pve8: nominal (0 anomalies)
pve9: nominal (0 anomalies)
pve10: nominal (0 anomalies)
pve11: degraded (1 anomaly)
pve12: nominal (0 anomalies)
pve13: nominal (0 anomalies)
pve14: nominal (0 anomalies)
pve7 returned state(partitioned) — its Pengine server did not respond within 8 seconds. pve3 returned two anomalies (cpu_steal_critical and arc_miss_critical). The Master node produced a complete 14-element list without hanging on pve7; the 8-second timeout fired, the catch/3 handler mapped it to node_state(pve7, partitioned, []), and the remaining results completed normally. Total aggregation time was bounded by the pve7 timeout plus the inter-thread synchronisation overhead of thread_maplist/3.
23.5 The Global Dashboard Integration
23.5.1 Go Dispatch: WAM to SSE
The Chapter 19 server architecture provides all the plumbing for this final connection. cluster_aggregator:global_cluster_health/2 is a standard Prolog goal — the Go orchestrator dispatches it to the WAM worker pool using the same WorkItem{Goal: goal} pattern used for firewall checks and topology mutations. The goal runs in a Worker 0 engine (the designated long-running query worker, as established for the Chapter 22 ingestor) and returns the serialised cluster_health/2 term in the result.
// File: /opt/logic-node/go/orchestrator/cluster_health_handler.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)
// ClusterNodeState is the Go-side representation of a Prolog
// node_state(Node, Status, Anomalies) term.
type ClusterNodeState struct {
Node string `json:"node"`
Status string `json:"status"`
Anomalies []string `json:"anomalies"`
}
// ClusterHealth is the Go-side representation of a Prolog
// cluster_health(Timestamp, NodeStates) term.
type ClusterHealth struct {
Timestamp int64 `json:"ts"`
NodeStates []ClusterNodeState `json:"nodes"`
}
// handleClusterHealth is the HTTP handler for GET /api/v1/cluster/health.
// It dispatches the global_cluster_health/2 goal to the WAM pool and returns
// the result as JSON. The handler uses a 20-second context deadline — long
// enough for the 14-node Pengine aggregation (max 8s per node, parallel) plus
// WAM dispatch overhead.
func (s *Server) handleClusterHealth(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 20*time.Second)
defer cancel()
ts := time.Now().Unix()
goal := fmt.Sprintf(
"cluster_aggregator:global_cluster_health(%d, Health), term_to_atom(Health, HealthAtom)",
ts,
)
result, err := s.pool.Dispatch(WorkItem{Goal: goal}, 20*time.Second)
if err != nil {
writeJSON(w, http.StatusServiceUnavailable,
APIResponse{Error: fmt.Sprintf("pool dispatch: %v", err)})
return
}
if result.Err != nil {
writeJSON(w, http.StatusInternalServerError,
APIResponse{Error: fmt.Sprintf("WAM: %v", result.Err)})
return
}
// The WAM returns the HealthAtom binding — the cluster_health/2 term
// serialised as a Prolog atom via term_to_atom/2. Parse it into Go structs.
health, err := parseClusterHealthAtom(result.HealthAtom)
if err != nil {
writeJSON(w, http.StatusInternalServerError,
APIResponse{Error: fmt.Sprintf("parse: %v", err)})
return
}
writeJSON(w, http.StatusOK, APIResponse{OK: true, Data: health})
// Push a cluster_health_updated SSE event so any connected dashboards
// refresh without polling. The event payload is the full health JSON.
healthJSON, _ := json.Marshal(health)
s.broker.Publish(fmt.Sprintf(
"event: cluster_health_updated\ndata: %s\n\n",
healthJSON,
))
}
// parseClusterHealthAtom parses the Prolog term atom returned by the WAM
// into a Go ClusterHealth struct. The atom has the form:
// cluster_health(1741267200,[node_state(pve1,nominal,[]),node_state(pve3,critical,[cpu_steal_critical]),…])
// This is a simplified parser that uses the Pengines JSON response format
// rather than full Prolog term parsing — in production, the WAM goal would
// return a JSON-serialised form via with_output_to(atom(J), json_write(…)).
func parseClusterHealthAtom(atom string) (*ClusterHealth, error) {
// In the full implementation, the WAM goal is extended to serialise
// the result directly to JSON using library(http/json):
//
// global_cluster_health(Ts, Health),
// Health = cluster_health(_, NodeStates),
// maplist(node_state_to_json, NodeStates, JSONTerms),
// with_output_to(atom(JSON), json_write(current_output, JSONTerms, []))
//
// For clarity, the JSON serialisation predicate is shown in §23.5.2.
// This function is a placeholder showing the contract.
_ = atom
return nil, fmt.Errorf("use JSON serialisation form from §23.5.2")
}
23.5.2 WAM-Side JSON Serialisation
Rather than parsing Prolog term atoms in Go, the WAM goal serialises the result to JSON directly before returning it. This eliminates the fragile atom-parsing step and produces a typed Go struct with no string manipulation:
% File: /opt/logic-node/kb/cluster_aggregator.pl (addition)
%
% JSON serialisation support for the Go HTTP handler.
% Converts a cluster_health/2 term to a JSON atom for Go consumption.
:- use_module(library(http/json)).
% cluster_health_json(+Timestamp, -JSONAtom)
% Calls global_cluster_health/2 and serialises the result to a JSON atom.
% The Go handler calls this goal and reads the JSONAtom binding.
cluster_health_json(Timestamp, JSONAtom) :-
global_cluster_health(Timestamp, cluster_health(Ts, NodeStates)),
maplist(node_state_to_dict, NodeStates, NodeDicts),
JSONTerm = json([ts=Ts, nodes=NodeDicts]),
with_output_to(atom(JSONAtom),
json_write(current_output, JSONTerm, [width(0)])).
node_state_to_dict(node_state(Node, Status, Anomalies),
json([node=Node, status=Status, anomalies=AnomalyStrings])) :-
maplist(anomaly_to_atom, Anomalies, AnomalyStrings).
anomaly_to_atom(anomaly(Type, Value, Threshold), AnomalyAtom) :-
format(atom(AnomalyAtom), "~w:~w/~w", [Type, Value, Threshold]).
anomaly_to_atom(Other, OtherAtom) :-
term_to_atom(Other, OtherAtom).
// Updated Go handler goal — returns JSON atom directly:
goal := fmt.Sprintf(
"cluster_aggregator:cluster_health_json(%d, JSONAtom)",
ts,
)
// result.JSONAtom is a valid JSON string. Unmarshal directly:
var health ClusterHealth
if err := json.Unmarshal([]byte(result.JSONAtom), &health); err != nil {
// handle error
}
23.5.3 SSE Event and Dashboard Consumption
The cluster_health_updated SSE event carries the full cluster state as a JSON payload. The Chapter 19 dashboard's initSSE function handles it with a single additional event listener:
// Addition to dashboard.js — in initSSE() after the existing kb_updated handler:
es.addEventListener('cluster_health_updated', async (e) => {
const health = JSON.parse(e.data);
// Update the cluster health panel with the new node states.
renderClusterHealth(health.nodes);
// Cross-check: any node marked 'critical' or 'partitioned' that is
// currently in an active firewall allow-list should trigger a banner.
const dangerNodes = health.nodes
.filter(n => n.status === 'critical' || n.status === 'partitioned')
.map(n => n.node);
if (dangerNodes.length > 0) {
showDangerBanner(
`Routing risk: ${dangerNodes.join(', ')} — status degraded or partitioned`
);
}
});
function renderClusterHealth(nodes) {
const container = document.getElementById('cluster-health-grid');
if (!container) return;
container.innerHTML = nodes.map(n => {
const statusClass = {
nominal: 'health-nominal',
degraded: 'health-degraded',
critical: 'health-critical',
partitioned: 'health-partitioned',
unreachable: 'health-unreachable',
error: 'health-error'
}[n.status] || 'health-unknown';
const anomalyText = n.anomalies.length > 0
? n.anomalies.join(', ')
: 'no anomalies';
return `<div class="node-card ${h(statusClass)}">
<span class="node-name">${h(n.node)}</span>
<span class="node-status">${h(n.status)}</span>
<span class="node-anomalies">${h(anomalyText)}</span>
</div>`;
}).join('');
}
23.5.4 The Architectural Outcome
The system built across Volumes III and IV is no longer a collection of isolated machines that a central poller interrogates for metric values. Each hypervisor runs a Prolog engine that knows the health rules, has access to its own live telemetry, and can evaluate arbitrary logical queries posed to it from the network. The Master node does not compute health — it collects proofs of health, already evaluated at source, with a well-defined vocabulary (nominal, degraded, critical, partitioned) that every component in the architecture from the firewall guard in §22.3.4 to the bin-packer in Chapter 25 reads from the same live_state interface.
The network carries verdicts, not numbers. The WAM carries topology, not state machines. The Pengine protocol carries goals, not schemas. Three design decisions — CLP(FD) bounded domains for the search phases, retractall-before-assertz for ephemeral facts, and sandboxed Pengines for distributed evaluation — make the system correct, bounded, and safe to extend. A new health rule added to node_health.pl on a single hypervisor is immediately queryable from the Master without redeployment, without schema migration, and without the Master knowing the rule exists — it simply becomes part of what local_health_check/3 returns.
A cluster of 14 Proxmox hypervisors configured as a sovereign inference fabric is queryable as a single logical entity: global_cluster_health(Ts, H) unifies H with a proof that encodes the physical state of every server in the cluster, derived from first principles, produced in under a second.
No comments to display
No comments to display