Chapter 14: Concurrent Logic & Message Passing
The single-threaded REPL is the correct environment for reasoning about static facts. It is the wrong environment for an infrastructure daemon that must simultaneously stream auth events from four hosts, respond to config mutation requests from the operator interface, write structured results to an append-only audit log, and serve KB lookups to the RAG pipeline — all without any one of those activities blocking the others.
SWI-Prolog's threading model is not a bolted-on afterthought. The WAM allocates a separate local stack and separate choice point stack per thread. The global term heap and the clause database are shared across all threads. This architecture has one correct usage pattern and one catastrophic misuse pattern. The correct pattern: threads communicate exclusively by passing bounded, typed message terms through named queues. No thread reads or writes a shared mutable term directly. No thread calls assertz or retract from a worker — those operations run on a dedicated serialisation thread that owns the clause database. The catastrophic pattern: threads sharing a mutable term via a global variable, a dynamic predicate used as a blackboard, or an atom passed by name as an implicit rendezvous point. The first pattern composes. The second pattern produces data races that manifest as intermittent incorrect query results, not crashes — the worst possible failure mode for an infrastructure reasoning engine.
Five properties define the actor-model threading architecture as the correct concurrency primitive for the Logic Node.
1. Separate stacks, shared clause database — the WAM's threading contract. thread_create(Goal, ThreadId, Options) allocates a new local stack of configurable size (default 128MB, set via stack_limit option) and begins executing Goal in that stack. The global term heap is shared but terms on a thread's local stack are private to that thread. The clause database — all tutorial_fact/3 facts, all nginx_reversible.pl rules, all loaded modules — is shared read-write across all threads. Reads are always safe. Concurrent assertz/retract operations are serialised by SWI-Prolog's internal clause database mutex, but relying on that mutex for application-level correctness is not a design — it is an accident waiting for the right sequence of operations to manifest as a lost update.
2. Message passing eliminates shared mutable state from the design. thread_send_message(+QueueOrThread, +Term) copies Term from the sender's heap to the queue's internal storage. The copy is independent of the sender's heap — the sender can backtrack, GC, or exit without affecting the queued message. thread_get_message(+QueueOrThread, ?Pattern) dequeues the first message matching Pattern, copying it to the receiver's heap. Two threads communicating via a queue share no memory. There is no lock to acquire, no condition variable to signal, no semaphore to decrement. The correctness guarantee is structural: a message is either in the queue or it is not. A partially-transmitted message does not exist.
3. Unbounded queues are a DoS vector, not a feature. A message queue with no bound on its depth is a memory reservoir that any fast producer can fill. A log ingestion thread that produces events at 10,000 lines/second and a worker thread that processes at 8,000 lines/second accumulates 2,000 messages per second in the queue. After 500 seconds, the queue holds 1,000,000 messages. At 200 bytes per message average, the queue occupies 200MB of heap — and is growing. After 1,000 seconds it occupies 400MB. The producer never slows down because nothing tells it to. The bounded queue (message_queue_create(Queue, [max_size(N)])) applies back-pressure: thread_send_message/2 blocks when the queue is full, forcing the producer to wait for the consumer to catch up. The bound is the maximum latency contract, not a limit on throughput.
4. with_mutex/2 is a necessary evil strictly limited to disk I/O serialisation. with_mutex(+MutexId, +Goal) acquires a named mutex before executing Goal and releases it after. For any operation on shared mutable state that cannot be expressed as a message, with_mutex/2 is the correct primitive. For the Logic Node, the only legitimate application is serialising writes to shared append-only files — the audit log, the config write path in nginx_write_file/2. Using with_mutex/2 to protect shared Prolog terms is the wrong tool: it protects the operation but not the data invariant. A message queue protects both.
5. Message validation before dispatch is the security gate. A message queue that accepts arbitrary terms from arbitrary sources is an injection vector. A config mutation request carrying {upstream: "app_cluster } server 0.0.0.0", ip: "not_an_ip"} must be rejected at the queue entry point — not inside the worker, not after the message has been dequeued and partially processed. The validator runs in the caller's context, before thread_send_message/2 is called. If the message fails validation, the caller receives a typed error. The queue never sees the invalid message. Worker threads process only pre-validated, structurally correct messages.
14.1 WAM Threading Model
14.1.1 What thread_create/3 Actually Allocates
thread_create(:Goal, -ThreadId, +Options)
Each thread_create/3 call allocates:
Per-thread memory layout:
Local stack: 128MB default (configurable: stack_limit(N))
Trail stack: embedded in local stack
Choice point stack: embedded in local stack
Argument registers: per-thread CPU register save area
Thread-local flags: copy of relevant Prolog flags at creation time
Shared across all threads (single copy):
Global term heap: all compound terms, strings, large integers
Atom Table: all interned atoms
Clause database: all loaded predicates, dynamic facts
Module table: all loaded modules and their exports
The local stack is private. A thread's choice points, variable bindings, and activation frames are invisible to all other threads. When a thread calls thread_send_message/2, the term being sent is copied from the sender's local/global heap into the queue's storage — the copy operation uses copy_term/2 semantics, duplicating the entire term structure with fresh variables. After the copy, the sender's original term and the queued copy share no memory.
The clause database is shared and mutable. Every assertz/1 or retract/1 call from any thread modifies the same shared structure. SWI-Prolog serialises individual clause database operations with an internal mutex — the operation itself is atomic. But atomicity of individual operations does not imply atomicity of sequences of operations. A predicate that calls retract(counter(N)), N1 is N + 1, assertz(counter(N1)) is a read-modify-write cycle: between the retract and the assertz, another thread can assert its own counter/1 value. The final value depends on scheduling order. This is a classic lost update. The clause database internal mutex prevents the database from becoming structurally corrupt; it does not prevent application-level data races.
14.1.2 The Shared Clause Database Race — Demonstrated
% DANGEROUS: two threads incrementing a shared counter via assertz/retract
:- dynamic counter/1.
counter(0).
increment_counter :-
retract(counter(N)), % Thread A reads 5
% Thread B reads 5 simultaneously — both see 5
N1 is N + 1,
assertz(counter(N1)). % Thread A asserts counter(6)
% Thread B asserts counter(6) — lost update
% Run 1000 increments across 10 threads: expected counter(1000)
% Actual result: counter(K) where K is typically 950-998
% The deficit is proportional to thread scheduling contention
stress_test_counter :-
forall(between(1, 10, _),
thread_create(
forall(between(1, 100, _), increment_counter),
_,
[detached(true)]
)),
sleep(3), % Wait for threads to complete
counter(Final),
format("Expected: 1000, Got: ~w~n", [Final]).
% ?- stress_test_counter.
% Expected: 1000, Got: 961 ← 39 lost updates
The correct approach uses a dedicated serialisation thread that owns the counter and processes increment requests from a queue. No thread ever calls retract(counter(_)) directly — the request to increment is sent as a message, the owner thread performs the operation, and the owner thread responds with the new value.
14.1.3 Thread Lifecycle and Cleanup
% Thread creation with explicit stack limit and detach mode
thread_create(
worker_loop(Queue, WorkerId),
ThreadId,
[
alias(worker_1), % Named alias — accessible as thread by name
stack_limit(64_000_000), % 64MB local stack — sufficient for KB queries
detached(false) % Join-able — allows thread_join/2 for cleanup
]
).
% Thread termination
thread_join(+ThreadId, -Status)
% Status: true, false, exception(Term), or exited(Term)
% Graceful shutdown: send a poison pill message, then join
shutdown_worker(Queue, ThreadId) :-
thread_send_message(Queue, shutdown),
thread_join(ThreadId, Status),
( Status = true -> true
; format("[WARN] Worker ~w exited with status ~w~n", [ThreadId, Status]) ).
% thread_signal/2: deliver an exception to a running thread
% Used for forced abort — should not be needed in a well-designed system
% thread_signal(ThreadId, throw(abort))
14.2 The Actor Model
14.2.1 Message Queue Primitives
% Create a named, bounded queue
message_queue_create(?Queue, +Options)
% Options:
% alias(Name) — named alias for the queue atom
% max_size(N) — bounded queue; thread_send_message/2 blocks when full
% Create the orchestrator's central work queue
% Bounded at 500 messages — enforces back-pressure on producers
:- message_queue_create(_, [alias(work_queue), max_size(500)]).
% Create the logging thread's dedicated queue
% Bounded at 2000 — logging can absorb larger burst than work queue
:- message_queue_create(_, [alias(log_queue), max_size(2000)]).
% Send a message — blocks if queue is full (back-pressure)
thread_send_message(+QueueOrAlias, +Term)
% Receive a message — blocks until a matching message is available
thread_get_message(+QueueOrAlias, ?Pattern)
% Receive with timeout — does not block indefinitely
thread_get_message(+QueueOrAlias, ?Pattern, +Options)
% Options: timeout(Seconds) — throws time_limit_exceeded if no message in time
14.2.2 Why Message Passing Prevents the Lost Update
The counter race from Section 14.1.2, rewritten as an actor:
% Counter owner thread: receives {increment, ReplyTo} messages
counter_actor(N) :-
thread_get_message(counter_queue, Msg),
( Msg = increment(ReplyTo) ->
N1 is N + 1,
thread_send_message(ReplyTo, counter_value(N1)),
counter_actor(N1) % Tail-recursive loop — no stack growth
; Msg = shutdown ->
true % Clean exit
; Msg = get(ReplyTo) ->
thread_send_message(ReplyTo, counter_value(N)),
counter_actor(N)
;
counter_actor(N) % Unknown message — ignore, continue
).
% Any thread requesting an increment:
increment_and_get(NewValue) :-
thread_self(Me),
thread_send_message(counter_queue, increment(Me)),
thread_get_message(Me, counter_value(NewValue)).
The counter's state (N) exists only in counter_actor/1's argument on its own local stack. No other thread can read or modify it. The only way to affect the counter is via a message — which is processed sequentially by one thread, one at a time, in arrival order. There is no race condition because there is no shared state. The counter is serialised not by a mutex but by the queue's FIFO ordering and the single-threaded nature of the consumer.
This scales: any number of producer threads can call increment_and_get/1 concurrently. They all block on thread_get_message(Me, counter_value(_)) until the counter actor processes their request. The counter actor processes one request at a time. The final counter value equals the number of increment messages sent, exactly, regardless of scheduling.
14.2.3 Queue Depth as the System Health Signal
% message_queue_property/2: inspect queue state
message_queue_property(+Queue, ?Property)
% Properties:
% size(N) — current number of messages waiting
% max_size(N) — queue capacity (if bounded)
% alias(Name) — queue alias atom
% Health check predicate
queue_health_check :-
forall(
member(Queue-Limit, [work_queue-500, log_queue-2000]),
(
message_queue_property(Queue, size(Current)),
message_queue_property(Queue, max_size(Max)),
Pct is Current * 100 // Max,
( Pct > 80 ->
format("[WARN] Queue ~w at ~w% capacity (~w/~w)~n",
[Queue, Pct, Current, Max])
;
format("[OK] Queue ~w: ~w/~w messages (~w%)~n",
[Queue, Current, Max, Pct])
)
)
).
A queue at 80% capacity is not a performance issue — it is a signal that a worker is falling behind its input rate. The correct response is to examine the worker's processing time per message, not to increase the queue bound. Increasing the bound delays the symptom while growing the memory allocation.
14.3 Message Validation
14.3.1 The Queue Injection Vector
Without validation, thread_send_message(work_queue, Term) accepts any term from any caller. The worker thread that processes the message has no guarantee about its structure. A message config_mutation{upstream: X, ip: Y} where X is an unbound variable and Y is a 1,000,000-character string reaches the worker, causes parse_ipv4/2 to throw parse_failure, the worker's exception handler retries or logs the failure, and the attack moves to the next variant.
More subtly: a message {action: atom_from_external_input} where atom_from_external_input was produced by atom_string/2 on operator input interns a new atom in the shared Atom Table. If the operator interface sends 10,000 distinct action atoms, the Atom Table grows by 10,000 entries — permanently.
14.3.2 The Validation Schema
Every message type has a validator predicate that runs in the sender's context, before thread_send_message/2:
%% validate_message(+MsgType, +Payload, -ValidatedMsg)
%% Succeeds iff Payload is a structurally and semantically valid message
%% of MsgType. Throws typed error on failure.
%% ValidatedMsg is the canonical, bounded-size message term to enqueue.
validate_message(config_mutation, Payload, ValidatedMsg) :-
is_dict(Payload),
% Upstream name: string, bounded, no structural characters
get_dict(upstream, Payload, UpstreamRaw),
must_be(string, UpstreamRaw),
string_length(UpstreamRaw, ULen),
( ULen > 128 -> throw(error(message_field_too_long(upstream, ULen, 128), _)) ; true ),
\+ sub_string(UpstreamRaw, _, _, _, "{"),
\+ sub_string(UpstreamRaw, _, _, _, "}"),
\+ sub_string(UpstreamRaw, _, _, _, ";"),
% Old IP: must parse as valid IPv4 (throws on failure)
get_dict(old_ip, Payload, OldIPStr),
must_be(string, OldIPStr),
parse_ipv4(OldIPStr, OldIPInt),
% New IP: must parse as valid IPv4
get_dict(new_ip, Payload, NewIPStr),
must_be(string, NewIPStr),
parse_ipv4(NewIPStr, NewIPInt),
OldIPInt \=:= NewIPInt,
% Reply-to: must be a valid thread alias
get_dict(reply_to, Payload, ReplyTo),
( thread_property(ReplyTo, status(running)) -> true
; throw(error(invalid_reply_thread(ReplyTo), _)) ),
% Canonical validated message — all values typed, no raw strings in hot fields
ValidatedMsg = config_mutation{
upstream: UpstreamRaw,
old_ip: OldIPInt,
new_ip: NewIPInt,
reply_to: ReplyTo
}.
validate_message(kb_query, Payload, ValidatedMsg) :-
is_dict(Payload),
get_dict(topic, Payload, TopicRaw),
must_be(string, TopicRaw),
% Topic: must be in closed infrastructure vocabulary
( infrastructure_topic_string(TopicRaw, TopicAtom) ->
true
; throw(error(unknown_topic(TopicRaw), _)) ),
get_dict(query_text, Payload, QueryText),
must_be(string, QueryText),
string_length(QueryText, QLen),
( QLen > 512 -> throw(error(query_too_long(QLen, 512), _)) ; true ),
get_dict(reply_to, Payload, ReplyTo),
ValidatedMsg = kb_query{
topic: TopicAtom, % PRE-INTERNED atom from closed vocabulary
query: QueryText, % Prolog string — GC-eligible
reply_to: ReplyTo
}.
validate_message(log_event, Payload, ValidatedMsg) :-
is_dict(Payload),
get_dict(level, Payload, LevelRaw),
must_be(atom, LevelRaw),
( memberchk(LevelRaw, [debug, info, notice, warning, error, critical]) -> true
; throw(error(invalid_log_level(LevelRaw), _)) ),
get_dict(message, Payload, Msg),
must_be(string, Msg),
string_length(Msg, MLen),
( MLen > 4096 -> sub_string(Msg, 0, 4096, _, Truncated)
; Truncated = Msg ),
get_dict(timestamp, Payload, Ts),
must_be(float, Ts),
ValidatedMsg = log_event{
level: LevelRaw, % Atom from closed vocabulary
message: Truncated, % String, bounded at 4096
timestamp: Ts
}.
14.3.3 Safe Dispatch Predicate
%% dispatch_message(+MsgType, +Payload, +Queue)
%% Validates Payload as MsgType, then enqueues if valid.
%% Throws typed error if validation fails — queue is never touched.
dispatch_message(MsgType, Payload, Queue) :-
validate_message(MsgType, Payload, ValidatedMsg),
% Back-pressure check: warn if queue is near capacity before blocking
( message_queue_property(Queue, size(S)),
message_queue_property(Queue, max_size(M)),
S * 100 // M > 90 ->
format("[WARN] Queue ~w at ~w/~w before dispatch~n", [Queue, S, M])
; true ),
thread_send_message(Queue, ValidatedMsg).
% thread_send_message/2 blocks here if queue is full — back-pressure applied
14.4 with_mutex/2 and the Logging Thread
14.4.1 When a Mutex Is the Correct Tool
with_mutex(+MutexId, +Goal) serialises access to any resource that cannot be modelled as a message queue. For the Logic Node, exactly two operations require a mutex:
- Append-only file writes from the logging thread. The logging thread is itself single-threaded — it processes one log message at a time from
log_queue. The mutex is required only if a second code path can write to the same file outside the logging thread (e.g., a crash handler that writes directly to the log). The mutex scope is thewrite+flushpair. nginx_write_file/2from Chapter 12. Config file writes must be serialised even from a single writer thread, because the temp-file + rename cycle must not overlap with another instance of the same operation on the same file. The mutex identity is the config file path atom itself — different config files get different mutexes.
Nothing else in the Logic Node requires with_mutex/2. Dynamic predicate operations are serialised by SWI-Prolog's internal clause database mutex. Queue operations are lock-free by design (the queue implementation uses compare-and-swap internally). KB queries are read-only — concurrent reads on the clause database are always safe.
14.4.2 Mutex Scope Rules
% CORRECT: mutex scope exactly covers the non-atomic file operation
write_to_audit_log(Stream, LogEntry) :-
with_mutex(audit_log_mutex,
( format(Stream, "~w~n", [LogEntry]),
flush_output(Stream) )
).
% WRONG: mutex scope too wide — blocks other operations unnecessarily
write_to_audit_log_bad(Stream, LogEntry) :-
with_mutex(audit_log_mutex,
( format(atom(Line), "~w~n", [LogEntry]), % format/3 needs no mutex
write(Stream, Line), % ← only this needs mutex
flush_output(Stream),
update_log_stats(LogEntry) % ← this definitely doesn't
)
).
% WRONG: mutex held across a blocking call — guaranteed deadlock if
% another thread sends a message while the mutex is held
write_and_wait_bad(Stream, Queue) :-
with_mutex(audit_log_mutex,
( write(Stream, pending),
thread_get_message(Queue, reply) % DEADLOCK VECTOR
)
).
The third pattern is the deadlock vector that takes down long-running systems: a mutex held across a blocking thread_get_message/1. If the thread sending reply needs the same mutex before it can send — directly or through any call chain — both threads block forever. The rule is absolute: never call a blocking primitive inside with_mutex/2.
14.4.3 Deadlock Prevention Rule
The Logic Node uses the following mutex hierarchy to prevent deadlock:
Mutex ordering (acquisition must always be in this order):
1. config_file_mutex(Path) — per config file, lowest in hierarchy
2. audit_log_mutex — audit log write serialisation
3. [SWI internal clause DB] — managed by runtime, never explicitly acquired
A thread may hold at most ONE of the above mutexes at any time.
No thread acquires mutex N while holding mutex M where M > N.
Thread_get_message/1 and thread_send_message/2 must NEVER be called
while any application mutex is held.
This ordering is enforced by code review, not by the runtime. SWI-Prolog does not detect mutex ordering violations before the deadlock manifests.
14.5 The Build: query_orchestrator.pl
14.5.1 Architecture
%%{init: {"themeVariables": {"fontSize": "14px"}}}%%
graph TD
REPL["Main Thread / REPL\n`dispatch_message/3`\nValidates before enqueue\nNever touches queues directly"]
API["Operator Interface Thread\n`dispatch_message/3`\nSame validation gate\nProducer only"]
INGEST["Archive Ingestor Thread\n`dispatch_message(log_event, ...)`\nKB query results → log_queue\nProducer only"]
WQ["work_queue\nBounded: max_size(500)\nBack-pressure on producers\nAtom alias — pre-interned"]
W1["Worker Thread 1\n`worker_loop/2`\nProcesses config_mutation\nProcesses kb_query\nSends results to log_queue"]
W2["Worker Thread 2\n`worker_loop/2`\nIdentical to Worker 1\nNo shared state with W1"]
W3["Worker Thread N\n`worker_loop/2`\nPool size = CPU cores - 1\nAll read from same work_queue"]
LQ["log_queue\nBounded: max_size(2000)\nLarger buffer — absorbs bursts\nSingle consumer: logger thread"]
LOGGER["Logger Thread\n`logger_loop/2`\nAppend-only write\nwith_mutex(audit_log_mutex)\nSingle writer — no races"]
LOGFILE["Audit Log File\n/opt/logic-node/logs/audit.log\nAppend-only\nRotated by logrotate"]
REPL --->|"dispatch_message/3 → validated msg"| WQ
API --->|"dispatch_message/3 → validated msg"| WQ
INGEST --->|"dispatch_message/3 → log_event"| LQ
WQ --->|"thread_get_message/1"| W1
WQ --->|"thread_get_message/1"| W2
WQ --->|"thread_get_message/1"| W3
W1 --->|"result → log_queue"| LQ
W2 --->|"result → log_queue"| LQ
W3 --->|"result → log_queue"| LQ
LQ --->|"thread_get_message/1"| LOGGER
LOGGER --->|"with_mutex(audit_log_mutex)"| LOGFILE
style REPL fill:#1A2B4A,color:#FFFFFF
style API fill:#1A2B4A,color:#FFFFFF
style INGEST fill:#1A2B4A,color:#FFFFFF
style WQ fill:#7A1A1A,color:#FFFFFF
style W1 fill:#1A4070,color:#FFFFFF
style W2 fill:#1A4070,color:#FFFFFF
style W3 fill:#1A4070,color:#FFFFFF
style LQ fill:#7A1A1A,color:#FFFFFF
style LOGGER fill:#8B6914,color:#FFFFFF
style LOGFILE fill:#1A6B3A,color:#FFFFFF
Reading the diagram: Dark blue nodes are producers — they never read from queues, only write to them via dispatch_message/3. Red nodes are bounded queues — the back-pressure points. Blue nodes are workers — stateless consumers that read one message, process it, write a result to log_queue, and loop. Amber is the logger — the single writer, serialised by mutex. Green is the final output — an append-only audit file on disk.
14.5.2 Implementation
logicadmin@logic-node-01:~$ nano /opt/logic-node/query_orchestrator.pl
%% =============================================================================
%% FILE: /opt/logic-node/query_orchestrator.pl
%% PURPOSE: Concurrent query handler — worker pool, message queues, logger.
%%
%% ARCHITECTURE:
%% Producers → work_queue (bounded 500) → Worker Pool → log_queue (bounded 2000)
%% → Logger Thread → disk
%%
%% CONCURRENCY MODEL:
%% — All inter-thread communication via message queues (no shared mutable state).
%% — Clause database: READ-ONLY from worker threads.
%% — `assertz`/`retract` NEVER called from worker threads.
%% — `with_mutex/2` used ONLY in logger_loop/2 for file write serialisation.
%% — Queue bounds enforce back-pressure — producers block when queues full.
%% — Worker threads are stateless — each message is an independent transaction.
%%
%% SECURITY CONTRACT:
%% — All messages validated by validate_message/3 before dispatch.
%% — Invalid messages throw typed errors in the caller's context.
%% — Queue aliases are pre-interned atoms — no dynamic atom creation.
%% — Log messages bounded at 4096 chars — no unbounded heap growth from logging.
%% — Worker stack limit: 64MB — sufficient for KB queries, prevents runaway.
%%
%% EXPORTS:
%% start_orchestrator/0 — initialise queues, spawn worker pool and logger
%% stop_orchestrator/0 — graceful shutdown of all threads
%% dispatch_message/3 — validated enqueue (public API)
%% orchestrator_status/1 — runtime health report
%% =============================================================================
:- module(query_orchestrator, [
start_orchestrator/0,
stop_orchestrator/0,
dispatch_message/3,
orchestrator_status/1
]).
:- use_module(library(error)).
:- use_module(library(aggregate)).
:- use_module('/opt/logic-node/kb/parsers/network_parser', [parse_ipv4/2]).
:- use_module('/opt/logic-node/kb/config/nginx_mutations',
[replace_server_ip/5, verify_ha_compliance/1]).
:- use_module('/opt/logic-node/kb/config/nginx_reversible',
[nginx_parse_file/2, nginx_write_file/2]).
:- use_module('/opt/logic-node/kb/archive/archive_ingestor',
[tutorial_fact/3, infrastructure_topic_string/2]).
%% ---------------------------------------------------------------------------
%% CONFIGURATION
%% ---------------------------------------------------------------------------
%% orchestrator_config/2: tunable parameters (edit before start_orchestrator/0)
orchestrator_config(work_queue_bound, 500). % Max pending work items
orchestrator_config(log_queue_bound, 2000). % Max pending log events
orchestrator_config(worker_stack_mb, 64). % MB per worker thread local stack
orchestrator_config(audit_log_path, '/opt/logic-node/logs/audit.log').
%% worker_count/1: spawn one worker per physical CPU core minus one
%% (one core reserved for the logger thread and the main REPL).
worker_count(N) :-
current_prolog_flag(cpu_count, Total),
N is max(1, Total - 1).
%% ---------------------------------------------------------------------------
%% QUEUE AND MUTEX INITIALISATION
%% ---------------------------------------------------------------------------
:- mutex_create(audit_log_mutex).
%% audit_log_mutex: serialises writes to the audit log file.
%% Scope: STRICTLY limited to write(Stream, Line) + flush_output(Stream).
%% Never held across any blocking operation.
init_queues :-
orchestrator_config(work_queue_bound, WBound),
orchestrator_config(log_queue_bound, LBound),
% Create work queue if not already created
( catch(message_queue_property(work_queue, size(_)), _, fail) ->
true % Already exists
;
message_queue_create(_, [alias(work_queue), max_size(WBound)])
),
% Create log queue if not already created
( catch(message_queue_property(log_queue, size(_)), _, fail) ->
true
;
message_queue_create(_, [alias(log_queue), max_size(LBound)])
).
%% ---------------------------------------------------------------------------
%% MESSAGE VALIDATION
%% (Runs in caller's context — queue never sees invalid messages)
%% ---------------------------------------------------------------------------
%% validate_message(+Type, +Payload, -Validated)
validate_message(config_mutation, Payload, Validated) :-
is_dict(Payload),
get_dict(upstream, Payload, UpName),
must_be(string, UpName),
string_length(UpName, UL), UL =< 128,
\+ sub_string(UpName, _, _, _, "{"),
\+ sub_string(UpName, _, _, _, ";"),
get_dict(old_ip, Payload, OldStr),
must_be(string, OldStr),
parse_ipv4(OldStr, OldInt),
get_dict(new_ip, Payload, NewStr),
must_be(string, NewStr),
parse_ipv4(NewStr, NewInt),
OldInt \=:= NewInt,
get_dict(config_path, Payload, CPath),
must_be(atom, CPath),
get_dict(reply_to, Payload, ReplyTo),
( thread_property(ReplyTo, status(running)) -> true
; throw(error(invalid_reply_thread(ReplyTo),
context(validate_message/3, 'Reply thread not running')))
),
Validated = config_mutation{
upstream: UpName,
old_ip: OldInt,
new_ip: NewInt,
config_path: CPath,
reply_to: ReplyTo
}.
validate_message(kb_query, Payload, Validated) :-
is_dict(Payload),
get_dict(topic, Payload, TopicStr),
must_be(string, TopicStr),
( infrastructure_topic_string(TopicStr, TopicAtom) -> true
; throw(error(unknown_topic(TopicStr),
context(validate_message/3, 'Topic not in infrastructure vocabulary')))
),
get_dict(query, Payload, QText),
must_be(string, QText),
string_length(QText, QL),
( QL > 512 -> throw(error(query_too_long(QL, 512), _)) ; true ),
get_dict(reply_to, Payload, ReplyTo),
Validated = kb_query{
topic: TopicAtom, % Pre-interned from closed vocabulary
query: QText, % GC-eligible string
reply_to: ReplyTo
}.
validate_message(log_event, Payload, Validated) :-
is_dict(Payload),
get_dict(level, Payload, Level),
must_be(atom, Level),
( memberchk(Level, [debug, info, notice, warning, error, critical]) -> true
; throw(error(invalid_log_level(Level), _))
),
get_dict(message, Payload, Msg),
must_be(string, Msg),
( string_length(Msg, ML), ML > 4096 ->
sub_string(Msg, 0, 4096, _, Truncated)
; Truncated = Msg ),
get_dict(timestamp, Payload, Ts),
must_be(float, Ts),
Validated = log_event{
level: Level,
message: Truncated,
timestamp: Ts
}.
%% dispatch_message(+Type, +Payload, +Queue)
%% Validates, then enqueues. Throws on invalid payload.
%% Blocks if queue is full — back-pressure to caller.
dispatch_message(Type, Payload, Queue) :-
validate_message(Type, Payload, Validated),
thread_send_message(Queue, Validated).
%% If Queue is at max_size, this call blocks until a consumer dequeues.
%% The caller is the back-pressure recipient — intentional by design.
%% ---------------------------------------------------------------------------
%% WORKER THREAD
%% ---------------------------------------------------------------------------
%% worker_loop(+WorkerId, +WorkQueue)
%% Main loop for each worker thread.
%% Dequeues one message, dispatches, sends result to log_queue, repeats.
%% Exits cleanly on 'shutdown' message.
%% STATELESS: each iteration is an independent transaction.
%% No state carried between messages.
%% All reads from shared clause database are safe (read-only).
%% No assertz/retract calls here.
worker_loop(WorkerId, WorkQueue) :-
thread_get_message(WorkQueue, Msg),
( Msg = shutdown ->
format("[Worker ~w] Shutdown received — exiting~n", [WorkerId])
% No recursive call — thread exits cleanly
;
% Process the message; catch exceptions to prevent worker death
( catch(
process_message(WorkerId, Msg),
Error,
handle_worker_error(WorkerId, Msg, Error)
) -> true ; true ),
worker_loop(WorkerId, WorkQueue) % Tail-recursive — no stack growth
).
%% process_message(+WorkerId, +Msg)
%% Dispatches one validated message to the appropriate handler.
process_message(WorkerId, config_mutation{
upstream: UpName,
old_ip: OldInt,
new_ip: NewInt,
config_path: CPath,
reply_to: ReplyTo }) :-
!,
get_time(Start),
(
catch(
execute_config_mutation(CPath, UpName, OldInt, NewInt),
MutationError,
(
emit_log(error, format_string("Config mutation failed: ~w", [MutationError])),
thread_send_message(ReplyTo, mutation_error(MutationError)),
fail
)
)
->
get_time(End),
Elapsed is End - Start,
emit_log(info, format_string(
"Config mutation OK: upstream ~w ~w→~w in ~2fs",
[UpName, OldInt, NewInt, Elapsed])),
thread_send_message(ReplyTo, mutation_ok{
upstream: UpName,
elapsed: Elapsed
})
;
true % Error path already handled in catch/3 above
).
process_message(WorkerId, kb_query{
topic: Topic,
query: QueryText,
reply_to: ReplyTo }) :-
!,
% Read-only KB lookup — safe from worker thread
findall(
kb_result{title: Title, snippet: Snippet},
(
tutorial_fact(Topic, Title, Text),
sub_string(Text, 0, 200, _, Snippet),
% Simple relevance filter: query text appears in title or snippet
( sub_string(Title, _, _, _, QueryText)
; sub_string(Snippet, _, _, _, QueryText)
)
),
Results
),
length(Results, N),
emit_log(debug, format_string("KB query '~w'/~w → ~w results", [Topic, QueryText, N])),
thread_send_message(ReplyTo, kb_results{
topic: Topic,
query: QueryText,
results: Results,
count: N
}).
process_message(WorkerId, UnknownMsg) :-
emit_log(warning,
format_string("Worker ~w received unknown message type: ~w",
[WorkerId, UnknownMsg])).
%% execute_config_mutation(+Path, +UpName, +OldIPInt, +NewIPInt)
%% Performs the full parse→mutate→verify→write pipeline.
%% Runs inside worker thread but disk write is mutex-protected by nginx_write_file/2.
execute_config_mutation(CPath, UpName, OldIPInt, NewIPInt) :-
nginx_parse_file(CPath, OldBlocks),
% Decompose integers to IP strings for replace_server_ip/5
ip_int_to_string(OldIPInt, OldStr),
ip_int_to_string(NewIPInt, NewStr),
replace_server_ip(OldBlocks, UpName, OldStr, NewStr, NewBlocks),
verify_ha_compliance(NewBlocks),
with_mutex(CPath, % Per-file mutex — prevents concurrent writes to same file
nginx_write_file(CPath, NewBlocks)
).
ip_int_to_string(Int, Str) :-
A is (Int >> 24) /\ 255,
B is (Int >> 16) /\ 255,
C is (Int >> 8) /\ 255,
D is Int /\ 255,
format(string(Str), "~w.~w.~w.~w", [A, B, C, D]).
%% handle_worker_error(+WorkerId, +Msg, +Error)
%% Called when process_message/2 throws. Logs the error and continues the loop.
handle_worker_error(WorkerId, Msg, Error) :-
format(string(ErrStr), "Worker ~w error processing ~w: ~w",
[WorkerId, Msg, Error]),
emit_log(error, ErrStr).
%% ---------------------------------------------------------------------------
%% LOGGING THREAD
%% ---------------------------------------------------------------------------
%% logger_loop(+Stream, +LogQueue)
%% Single-threaded log consumer. Writes to Stream with mutex protection.
%% The mutex scope is exactly the write+flush pair — no wider.
logger_loop(Stream, LogQueue) :-
thread_get_message(LogQueue, Msg),
( Msg = shutdown ->
format("[Logger] Shutdown — flushing and closing~n"),
flush_output(Stream)
% Stream closed by setup_call_cleanup/3 in start_logger/1
;
Msg = log_event{level: Level, message: Text, timestamp: Ts},
format_time(string(TimeStr), "%Y-%m-%dT%H:%M:%S", Ts),
format(string(Line), "[~w] ~w ~w~n", [Level, TimeStr, Text]),
with_mutex(audit_log_mutex,
( write(Stream, Line),
flush_output(Stream) )
),
logger_loop(Stream, LogQueue)
).
%% start_logger/1: spawns the logger thread with setup_call_cleanup/3
%% for guaranteed stream closure on any exit condition.
start_logger(ThreadId) :-
orchestrator_config(audit_log_path, LogPath),
thread_create(
setup_call_cleanup(
open(LogPath, append, Stream, [encoding(utf8), buffer(line)]),
logger_loop(Stream, log_queue),
close(Stream)
),
ThreadId,
[
alias(logger_thread),
stack_limit(16_000_000), % Logger needs minimal stack
detached(false)
]
).
%% ---------------------------------------------------------------------------
%% EMIT LOG HELPER
%% (Called from worker threads — sends to log_queue asynchronously)
%% ---------------------------------------------------------------------------
emit_log(Level, Message) :-
get_time(Ts),
% Non-blocking send: if log_queue is full, log is dropped rather than
% blocking the worker. Logging must never slow down work processing.
( catch(
thread_send_message(log_queue,
log_event{level:Level, message:Message, timestamp:Ts}),
_,
true % Queue full or logger down — silently drop this log entry
) -> true ; true ).
format_string(Fmt, Args, Str) :-
format(string(Str), Fmt, Args).
format_string(Str, [], Str) :-
string(Str).
%% ---------------------------------------------------------------------------
%% ORCHESTRATOR LIFECYCLE
%% ---------------------------------------------------------------------------
:- dynamic orchestrator_thread/2.
%% orchestrator_thread(Role, ThreadId)
%% Role: worker(N) | logger
%% start_orchestrator/0
%% Initialises queues, spawns worker pool and logger.
%% Idempotent: safe to call again if already running.
start_orchestrator :-
( orchestrator_running ->
format("[Orchestrator] Already running~n")
;
init_queues,
worker_count(NWorkers),
start_workers(NWorkers),
start_logger(LoggerId),
assertz(orchestrator_thread(logger, LoggerId)),
format("[Orchestrator] Started: ~w workers + 1 logger~n", [NWorkers]),
orchestrator_status(_)
).
orchestrator_running :-
orchestrator_thread(logger, TId),
thread_property(TId, status(running)).
%% start_workers/1: spawns N worker threads, all reading from work_queue
start_workers(N) :-
orchestrator_config(worker_stack_mb, StackMB),
StackBytes is StackMB * 1_000_000,
forall(
between(1, N, I),
(
atom_concat(worker_, I, Alias),
thread_create(
worker_loop(I, work_queue),
ThreadId,
[
alias(Alias),
stack_limit(StackBytes),
detached(false)
]
),
assertz(orchestrator_thread(worker(I), ThreadId))
)
).
%% stop_orchestrator/0
%% Sends shutdown to all workers and logger, then joins all threads.
%% Blocks until all threads have exited cleanly.
stop_orchestrator :-
% Send one shutdown per worker
worker_count(NWorkers),
forall(
between(1, NWorkers, _),
thread_send_message(work_queue, shutdown)
),
% Send shutdown to logger
thread_send_message(log_queue, shutdown),
% Join all threads
forall(
orchestrator_thread(_, ThreadId),
(
thread_join(ThreadId, Status),
( Status \= true ->
format("[Orchestrator] Thread ~w exited: ~w~n", [ThreadId, Status])
; true )
)
),
retractall(orchestrator_thread(_, _)),
format("[Orchestrator] All threads stopped~n").
%% orchestrator_status/1: runtime health report
orchestrator_status(Status) :-
findall(
thread_info{id:TId, role:Role, status:S},
(
orchestrator_thread(Role, TId),
thread_property(TId, status(S))
),
Threads
),
message_queue_property(work_queue, size(WSize)),
message_queue_property(work_queue, max_size(WMax)),
message_queue_property(log_queue, size(LSize)),
message_queue_property(log_queue, max_size(LMax)),
Status = orchestrator_status{
threads: Threads,
work_queue: queue_status{size:WSize, max:WMax, pct: WSize*100//WMax},
log_queue: queue_status{size:LSize, max:LMax, pct: LSize*100//LMax}
},
format("Orchestrator status:~n"),
format(" Threads: ~w~n", [Threads]),
format(" work_queue: ~w/~w~n", [WSize, WMax]),
format(" log_queue: ~w/~w~n", [LSize, LMax]).
14.5.3 REPL Demonstration
?- use_module('/opt/logic-node/query_orchestrator').
true.
% Start the orchestrator — spawns 15 workers (16 cores - 1) + 1 logger
?- start_orchestrator.
[Orchestrator] Started: 15 workers + 1 logger
Orchestrator status:
Threads: [thread_info{id:<0x...>,role:worker(1),status:running}, ...]
work_queue: 0/500
log_queue: 0/2000
true.
% Dispatch a KB query from the main thread
?- thread_self(Me),
dispatch_message(kb_query,
kb_query{topic: "zfs", query: "scrub", reply_to: Me},
work_queue).
true.
?- thread_get_message(Me, Result).
Result = kb_results{
topic: zfs,
query: "scrub",
results: [
kb_result{title:"ZFS", snippet:"A scrub traverses all data..."},
kb_result{title:"zpool", snippet:"zpool scrub initiates..."}
],
count: 2
}.
% Dispatch a config mutation
?- thread_self(Me),
dispatch_message(config_mutation,
config_mutation{
upstream: "app_cluster",
old_ip: "192.168.1.10",
new_ip: "10.0.2.15",
config_path: '/etc/nginx/sites-available/app.conf',
reply_to: Me
},
work_queue).
true.
?- thread_get_message(Me, Result).
Result = mutation_ok{upstream:"app_cluster", elapsed:0.043}.
% Audit log entry written by logger thread while worker was processing.
% Invalid IP rejected at dispatch — queue never touched
?- thread_self(Me),
dispatch_message(config_mutation,
config_mutation{
upstream: "app_cluster",
old_ip: "999.0.0.1",
new_ip: "10.0.2.15",
config_path: '/etc/nginx/sites-available/app.conf',
reply_to: Me
},
work_queue).
ERROR: parse_failure(ipv4,"999.0.0.1")
% Thrown in the caller's context. Work queue untouched. Worker unaffected.
% Injection attempt rejected at dispatch
?- thread_self(Me),
dispatch_message(config_mutation,
config_mutation{
upstream: "app_cluster } server 0.0.0.0",
old_ip: "192.168.1.10",
new_ip: "10.0.2.15",
config_path: '/etc/nginx/sites-available/app.conf',
reply_to: Me
},
work_queue).
ERROR: type_error(valid_upstream_name, "app_cluster } server 0.0.0.0")
% Structural injection caught before the message reaches any queue.
% Graceful shutdown
?- stop_orchestrator.
[Worker 1] Shutdown received — exiting
[Worker 2] Shutdown received — exiting
...
[Logger] Shutdown — flushing and closing
[Orchestrator] All threads stopped
true.
Outcome: Concurrent Sovereign Infrastructure
14.6.1 The Conceptual Transition
The single-threaded REPL from Volumes I and II was the correct environment for building and validating the KB, the parsers, and the config management predicates. It was not the correct environment for running them continuously. The single-threaded model has two problems in a daemon context: a slow KB query blocks config mutation processing, and a slow config mutation blocks log event ingestion. Neither should affect the other.
The worker pool resolves this. Each worker is stateless — it processes one message, writes one log entry, and dequeues the next. KB queries from the RAG pipeline and config mutations from the operator interface compete equally for worker time. Neither blocks the other beyond the time to process one message. The log queue absorbs bursts. The bounded queues enforce back-pressure — the system applies load-shedding pressure to producers before it exhausts memory.
The validation gate at dispatch_message/3 is the security boundary between the untrusted outside world and the trusted Logic Node execution environment. No invalid, oversized, or injection-containing message reaches a worker thread. The queue is a trusted channel — everything inside it has passed the schema validator.
| Single-threaded REPL | Concurrent orchestrated daemon |
|---|---|
| KB query blocks log ingestion | KB queries and log ingestion run in parallel |
| One failure kills all pending work | Worker exception caught; worker continues |
| No back-pressure on producers | Bounded queues apply back-pressure automatically |
| Config mutation blocks KB queries | Config mutation is one work item among many |
| No isolation between work types | Message type dispatch separates concerns |
| Logging is synchronous and blocking | Logging is asynchronous via `log_queue` |
| No validation before processing | Full schema validation before queue entry |
14.6.2 Verification Checklist
?- use_module('/opt/logic-node/query_orchestrator').
true.
% 1. Orchestrator starts and reports correct thread count
?- start_orchestrator,
current_prolog_flag(cpu_count, Cores),
ExpectedWorkers is Cores - 1,
orchestrator_status(S),
length(S.threads, ThreadCount),
ThreadCount =:= ExpectedWorkers + 1. % Workers + logger
true.
% 2. Worker threads are running
?- thread_property(worker_1, status(running)).
true.
?- thread_property(logger_thread, status(running)).
true.
% 3. KB query round-trips through the worker pool
?- thread_self(Me),
dispatch_message(kb_query,
kb_query{topic:"zfs", query:"pool", reply_to:Me},
work_queue),
thread_get_message(Me, kb_results{count:N}),
N >= 0.
true.
% 4. Invalid message rejected at dispatch — no queue side effect
?- message_queue_property(work_queue, size(S0)),
catch(
dispatch_message(config_mutation,
config_mutation{upstream:"x",old_ip:"999.0.0.1",
new_ip:"10.0.0.1",
config_path:'/tmp/t',reply_to:main},
work_queue),
error(parse_failure(ipv4,_),_), true),
message_queue_property(work_queue, size(S1)),
S1 =:= S0.
true. % ✓ Queue size unchanged after rejected message
% 5. Worker exception does not kill the thread
?- thread_property(worker_1, status(running)).
true. % Worker survived the error from test 4
% 6. Back-pressure: send 501 messages to a 500-bound queue
% The 501st send blocks — verify via thread with timeout
?- thread_create(
( forall(between(1,501,_),
thread_send_message(work_queue, kb_query{topic:linux,
query:"",reply_to:main})) ),
TId, [detached(false)]),
sleep(0.1),
thread_property(TId, status(S)),
memberchk(S, [running, true]). % Thread is alive (possibly blocked)
true.
% 7. Audit log file was written to
?- orchestrator_config(audit_log_path, P),
exists_file(P),
size_file(P, Sz),
Sz > 0.
true.
% 8. Graceful shutdown
?- stop_orchestrator,
\+ orchestrator_thread(_, _).
true. % ✓ All thread records retracted
Exercises
Exercise 14.1 — Actor-Model Counter Implement counter_actor_start/1 that spawns the counter actor from Section 14.2.2 and creates its counter_queue. Implement counter_increment/1 and counter_get/1 as the public API. Repeat the stress test from Section 14.1.2 — 10 threads × 100 increments each — using the actor model, and verify that the final counter value is exactly 1,000. Measure the time overhead of the message-passing approach vs. the bare (racey) assertz/retract approach and report the ratio.
Exercise 14.2 — Bounded Queue Back-Pressure Test Write back_pressure_test/2 that takes a ProducerRate (messages/second) and a ConsumerRate (messages/second) and verifies that: (a) when ProducerRate > ConsumerRate, the work_queue reaches 80% capacity and thread_send_message/2 begins blocking the producer thread within a bounded time; (b) when ProducerRate < ConsumerRate, the queue depth stays near zero. Use thread_create/3 for the producer, a sleep/1-based consumer, and message_queue_property/2 to sample queue depth at 100ms intervals.
Exercise 14.3 — Worker Pool Hot Resize Implement resize_worker_pool/1 that takes a new worker count N and: (a) if N is larger than the current pool, spawns additional workers; (b) if N is smaller, sends shutdown to the surplus workers and joins them before returning. The existing workers must not be interrupted — only workers beyond the new target count receive shutdown. Verify that orchestrator_status/1 reports the correct thread count after resize.
Exercise 14.4 — Deadlock Detection Harness Write a test predicate deadlock_test/0 that deliberately creates a deadlock scenario: two threads, each holding one mutex, each waiting for the other's mutex. Verify that SWI-Prolog's with_mutex/2 does not detect the deadlock automatically (it does not — deadlock detection is not implemented). Then write safe_with_mutex/3 that uses thread_get_message/3 with a timeout(5) as a proxy for detecting mutex starvation — if the mutex is not acquired within 5 seconds, safe_with_mutex/3 throws error(mutex_timeout(MutexId), _). Explain why this is not a general deadlock solution but is an acceptable operational safeguard.
Exercise 14.5 — Message Queue Persistence When stop_orchestrator/0 is called with messages still in work_queue, those messages are lost — the workers dequeue shutdown and exit before processing the remaining items. Implement drain_work_queue/1 that: reads all remaining messages from work_queue before sending shutdown to workers, serialises them to /opt/logic-node/logs/unprocessed_queue.pl as Prolog terms using write_canonical/1, and returns the count of drained messages. On start_orchestrator/0, if unprocessed_queue.pl exists, load and re-dispatch all messages before accepting new ones. Verify that a shutdown with 47 pending messages followed by a restart and drain results in all 47 messages being re-processed.
Further Reading
- SWI-Prolog Manual: Multithreading —
https://www.swi-prolog.org/pldoc/man?section=threads - SWI-Prolog Manual:
thread_create/3—https://www.swi-prolog.org/pldoc/man?predicate=thread_create/3 - SWI-Prolog Manual:
thread_send_message/2—https://www.swi-prolog.org/pldoc/man?predicate=thread_send_message/2 - SWI-Prolog Manual:
message_queue_create/2—https://www.swi-prolog.org/pldoc/man?predicate=message_queue_create/2 - SWI-Prolog Manual:
with_mutex/2—https://www.swi-prolog.org/pldoc/man?predicate=with_mutex/2 - Hewitt, C., Bishop, P., & Steiger, R. (1973). "A Universal Modular ACTOR Formalism for Artificial Intelligence." IJCAI 1973. — the original actor model paper; the message-passing concurrency model implemented in this chapter is a direct descendant.
- Armstrong, J. (2003). Making Reliable Distributed Systems in the Presence of Software Errors. PhD Thesis, Royal Institute of Technology, Stockholm. — Erlang's actor model and fault-isolation principles; the worker supervision pattern in Section 14.5 follows the same isolation principles applied to SWI-Prolog threads.
- Herlihy, M. & Shavit, N. (2008). The Art of Multiprocessor Programming. Morgan Kaufmann. Chapter 9: Linked Lists, Lazy Synchronisation — the compare-and-swap basis for SWI-Prolog's internal queue implementation.
- Linux
inotify(7)man page —https://man7.org/linux/man-pages/man7/inotify.7.html— filesystem event API used in Chapter 15's hot-reload mechanism.
End of Chapter 14 — Next: Chapter 15: Reactive Infrastructure — inotify, Hot-Reload, and the File System Watcher
No comments to display
No comments to display