Skip to main content

Chapter 11: Parsing System Logs

Textbook: Modern SWI-Prolog (2026 Edition): Sovereign Infrastructure & Industrial Logic Volume: II — Knowledge Acquisition: Parsing, Validation, and Ingestion Chapter: 11 of 24 Audience: Senior Engineers, Systems Architects, Infrastructure Security Practitioners Prerequisites: Chapters 1–10 complete. DCG mechanics, difference lists, phrase/2, phrase/3, parse_time/3 integration, and the network_parser.pl module operational. proxmox_inventory_v2.pl, live_state.pl, and safe_call.pl loaded at /opt/logic-node/. logicadmin user active.


Core Concepts

/var/log/auth.log on a busy SSH-exposed host grows at several megabytes per day. A year's worth of auth logs on a cluster's edge node is routinely 2–10GB. The naive approach — read_file_to_string/3, then string_codes/2, then a DCG over the entire code list — allocates a list with one cons cell per character on the WAM Heap. At 8 bytes per cons cell, a 1GB log file requires 8GB of Heap. The SWI-Prolog default Heap limit is 512MB. The process dies before the first DCG rule fires.

The failure mode is not a graceful parse error. It is a memory exhaustion crash that kills the Logic Node, terminates any in-flight oracle queries, and leaves the state manager in an undefined condition. A log parser that crashes the engine on large input is not a parser — it is a DoS vector embedded in the infrastructure's own tooling.

library(pure_input) resolves this. phrase_from_file/2 reads the file as a lazy list — a structure that appears to the DCG as a code list but delivers character codes from disk in OS-page-sized blocks, only as the DCG demands them. The DCG's difference list mechanics are unchanged. The memory profile is O(1) in file size, bounded by the block size, regardless of whether the file is 1MB or 100GB.

Five properties define the streaming parser as the correct model for log ingestion.

1. Slurping is not a parsing strategy — it is a RAM budget gamble. read_file_to_string/3 reads the entire file into a Prolog string in one syscall. For a 100MB file, this is 100MB of heap allocation before a single character is examined. For a 10GB file, the process dies. The engineering decision to slurp is always a bet that the file will remain small — a bet that a production log file reliably loses, usually at 3am on a weekday.

2. library(pure_input) delivers the file as a lazy list. phrase_from_file(Rule, FilePath) opens the file, wraps it in a lazy-list structure, and calls phrase(Rule, LazyList). The lazy list presents the same interface to the DCG as a concrete code list — [H|T] unification works identically. The difference: T is not yet instantiated to a concrete list. When the DCG attempts to unify with T, the lazy list mechanism fetches the next OS page of character codes from the file, instantiates T to that block, and execution continues. Pages consumed by rules that have completed are released. At any instant, only the current and immediately preceding pages are live on the Heap.

3. A parser that fails on one malformed line and aborts is not a production parser. A log file is not a controlled input. Lines arrive malformed when the logging daemon crashes mid-write, when a process writes to the log in a non-standard format, when a log rotation event produces a partial line at the file boundary. A parser that throws or fails on the first unexpected line and abandons the remaining 9,999 lines delivers nothing. The catch-all recovery pattern — try strict parse, on failure skip to the next newline, record the error count, continue — delivers 9,999 results instead of zero.

4. SSH usernames from logs must never become Prolog atoms. An SSH brute-force attack generates authentication failure log lines at a rate of hundreds per second, each containing a unique randomly-generated username. If the parser uses atom_codes/2 to intern each username, each unique string becomes a permanent Atom Table entry. At 500,000 unique usernames and 48 bytes minimum per atom, the Atom Table grows by 24MB from a single attack campaign. The growth is permanent. Subsequent attacks compound it. The correct representation is a Prolog string: heap-allocated, GC-eligible, freed when the auth event Dict containing it goes out of scope.

5. Log parsing produces telemetry; telemetry enables reactive logic. A parsed auth_event{} Dict containing {timestamp: Float, event: failed_password, user: String, ip: Integer, port: Integer} is a first-class Prolog term. It can be stored in a list, filtered with include/3, aggregated with aggregate_all/3, and reasoned over with the same predicates that operate on the static KB. Brute-force detection, anomalous login time detection, and impossible travel detection are all declarative rules over collections of auth_event{} Dicts — not bespoke log analysis scripts.


Chapter Roadmap

Section Title Focus
11.1 The Physics of Lazy Lists library(pure_input), phrase_from_file/2, OS page delivery model
11.2 Error Recovery: The Catch-All Rule skip_to_newline//0, disjunction fallback, error counting
11.3 The Build: auth_parser.pl SSH log DCG, auth_event{} Dict, parse_time/3 integration
11.4 Reasoning Over Telemetry brute_force_detected/2, time-window aggregation, aggregate_all/3
11.5 Security: The SSH Username Attack Atom Table DoS via username interning, string_codes/2 discipline
Outcome Log Logic Parity Verification checklist, conceptual transition

11.1 The Physics of Lazy Lists

11.1.1 Why Slurping Kills the Engine

The memory cost of the slurp-then-parse pattern:

% DANGEROUS — do not use for log files
ingest_log_slurp(Path, Entries) :-
    read_file_to_string(Path, Contents, []),   % Entire file → one Prolog string
    string_codes(Contents, Codes),             % String → code list (N cons cells)
    phrase(log_file(Entries), Codes).          % DCG over N-element list

For a 500MB auth.log:

string_codes/2 allocates:
  500,000,000 characters
  × 1 cons cell per character
  × 8 bytes per cons cell (64-bit WAM)
  = 4,000,000,000 bytes = ~3.7GB Heap allocation

SWI-Prolog default stack limit: 512MB
Result: FATAL ERROR: Stack limit (0.5Gb) exceeded

The crash occurs during string_codes/2 before the DCG rule fires once. There is no partial result. There is no error recovery. The Logic Node process dies.

11.1.2 library(pure_input) and Lazy List Mechanics

library(pure_input) provides phrase_from_file/2 and phrase_from_file/3. Instead of materialising the entire file as a code list, it wraps the file stream in a lazy list: a structure that presents [Code|RestLazy] to the DCG but delivers RestLazy content from disk only when the DCG attempts to unify with it.

The implementation relies on SWI-Prolog's attributed variables or freeze/2 mechanism — RestLazy is an uninstantiated variable with an associated constraint that fires when something attempts to unify with it. The constraint reads the next OS page from the file stream, constructs a concrete code list for that page, and unifies RestLazy with [Codes...|NextLazy] where NextLazy is the next lazy tail. The DCG sees this as a standard code list. The Heap holds at most two OS pages of character codes at any time.

:- use_module(library(pure_input)).

% phrase_from_file/2: parse the entire file with Rule
% phrase_from_file(+Rule, +FilePath)

% phrase_from_file/3: parse with remainder
% phrase_from_file(+Rule, -Rest, +FilePath)

% Example: count newlines in a 10GB file in O(1) RAM
count_lines(N) -->
    [0'\n],
    !,
    count_lines(N1),
    { N is N1 + 1 }.
count_lines(N) -->
    [_],
    !,
    count_lines(N).
count_lines(0) --> [].

?- phrase_from_file(count_lines(N), '/var/log/auth.log').
N = 847293.   % Counted 847,293 newlines in 10GB log — Heap usage: ~64KB

The memory profile is determined by the OS page size (typically 4–16KB) and the look-ahead distance the DCG requires. A line-at-a-time parser never needs more than one full line plus the current page — typically under 10KB of live code-list data regardless of file size.

11.1.3 The Lazy List Contract

The lazy list satisfies the DCG's difference list contract identically to a concrete list:

  • Unifying [H|T] against a lazy list delivers H as the next character code and T as the lazy continuation
  • Unifying [] against an exhausted lazy list succeeds — end of file equals end of input
  • Backtracking into a consumed lazy list is possible (the page data remains live until GC reclaims it)
  • phrase_from_file/2 fails cleanly if the file does not exist or is unreadable — no partial state

The one behavioural difference from a concrete list: if the DCG backtracks past a page boundary into codes from a previously delivered page, that page must still be live on the Heap. For parsers with bounded backtrack depth (one line at a time, error recovery skips to next newline), this is not a concern — at most one line's worth of codes is held across backtrack points. For parsers that require unbounded backtracking over the entire file, slurping may be unavoidable — but such parsers should not exist for log ingestion.

11.1.4 Diagram: Lazy List Instantiation — WAM Fetching Codes On Demand

%%{init: {"themeVariables": {"fontSize": "14px"}}}%%
flowchart TD
    FILE["File: /var/log/auth.log\n10GB on disk\nUnread"]

    PFF["phrase_from_file(log_lines(Events), Path)\nOpens file stream\nCreates LazyHead = [C0|LazyTail0]\nLazyTail0 is uninstantiated\nHeap: ~40 bytes"]

    DCG1["DCG: log_line(E1) --> ...\nAttempts to unify [H|T]\nwith LazyHead\nH = C0 bound immediately\nT = LazyTail0 (still lazy)"]

    FETCH1["Lazy constraint fires on LazyTail0\nOS read(): fetch page 1 (4KB)\n→ codes [C1, C2, ... C4095 | LazyTail1]\nLazyTail0 instantiated to page 1\nHeap: ~33KB (page 1 codes)"]

    DCG2["DCG consumes page 1\nlog_line(E1) succeeds\nlog_line(E2) begins\nPage 1 codes: no live DCG refs\neligible for GC"]

    FETCH2["Lazy constraint fires on LazyTail1\nOS read(): fetch page 2 (4KB)\nHeap: ~33KB (page 2 codes)\nPage 1 reclaimed by GC"]

    STEADY["Steady state:\nOne OS page live at any time\nHeap usage: O(page_size)\nRegardless of file size\n10GB file = 33KB Heap"]

    FILE --->|"phrase_from_file/2"| PFF
    PFF --->|"DCG requests first code"| DCG1
    DCG1 --->|"LazyTail0 touched"| FETCH1
    FETCH1 --->|"DCG processes line 1"| DCG2
    DCG2 --->|"LazyTail1 touched"| FETCH2
    FETCH2 --->|"GC reclaims page 1"| STEADY

    style FILE fill:#1A2B4A,color:#FFFFFF
    style PFF fill:#1A4070,color:#FFFFFF
    style DCG1 fill:#1A4070,color:#FFFFFF
    style FETCH1 fill:#8B6914,color:#FFFFFF
    style DCG2 fill:#1A4070,color:#FFFFFF
    style FETCH2 fill:#8B6914,color:#FFFFFF
    style STEADY fill:#1A6B3A,color:#FFFFFF

Reading the diagram: The file (dark blue, top) stays on disk. phrase_from_file/2 allocates ~40 bytes of lazy list header. Each time the DCG touches an uninstantiated lazy tail, the OS read() syscall delivers one page. Once the DCG has processed a page's lines and moved past the page boundary, the page codes have no live DCG references and are collected. The Heap holds exactly one page at steady state — independent of file size.


11.2 Error Recovery: The Catch-All Rule

11.2.1 The Production Requirement

A log parser that throws an exception or fails on encountering a malformed line is not deployable. Malformed lines arrive from:

  • Partial writes at log rotation boundaries (the last line of a rotated file may be incomplete)
  • Non-sshd processes writing to auth.log (PAM modules, sudo, cron) in formats the parser does not handle
  • Log injection attempts: an attacker supplying a carefully crafted username that breaks the expected log line format
  • Kernel OOM events that truncate the logger's write buffer mid-line

The correct response to a malformed line is: record it as a parse error (incrementing an error counter), discard it without producing a partial auth_event{} Dict, advance the file position to the start of the next line, and continue. The remaining 99.9% of the file is parsed correctly. A single malformed line is not a parse failure — it is a datum about the quality of the input source.

11.2.2 skip_to_newline//0

The recovery primitive advances the code list to immediately after the next newline character (0'\n, code 10) without consuming any output or binding any parse variables:

%% skip_to_newline//0
%% Consumes codes from the input until and including the next 0'\n.
%% Leaves the input positioned at the start of the next line.
%% Succeeds at end-of-input even if no newline is found
%% (handles the final incomplete line at end of file).

skip_to_newline --> [0'\n], !.          % Found newline — stop, consumed
skip_to_newline --> [_],   skip_to_newline.  % Any other code — skip and recurse
skip_to_newline --> [].                 % End of input — stop cleanly

The cut in the first clause is essential: without it, the parser backtracks into skip_to_newline on failure of the subsequent rule (the rule that processes the next line), causing it to skip two lines instead of one. With the cut, consuming the newline commits: the next rule gets a clean line start.

11.2.3 The Disjunctive Recovery Pattern

The top-level line parser uses ; (disjunction) to try the strict parse first and fall back to the recovery rule:

%% log_line_or_skip(+ErrorAcc, -ErrorOut, -MaybeEvent)
%% Attempts to parse one auth log line as an auth_event{} Dict.
%% On failure, skips to the next newline and increments the error counter.
%% MaybeEvent: event(Dict) on success, skipped on recovery.

log_line_or_skip(ErrIn, ErrIn, event(Event)) -->
    auth_log_line(Event),
    !.   % Commit on success — do not offer recovery as alternative
log_line_or_skip(ErrIn, ErrOut, skipped) -->
    skip_to_newline,
    { ErrOut is ErrIn + 1 }.

%% log_lines(+ErrAcc, -ErrOut, -Events)
%% Recursive rule: processes all lines until end of input.
%% Events: list of event(Dict) terms (skipped lines omitted).

log_lines(ErrIn, ErrOut, Events) -->
    log_line_or_skip(ErrIn, ErrMid, Result),
    !,
    log_lines(ErrMid, ErrOut, RestEvents),
    {
        ( Result = event(E) ->
            Events = [E | RestEvents]
        ;
            Events = RestEvents   % skipped line — not included in output
        )
    }.
log_lines(Err, Err, []) --> [].   % End of input

The cut after log_line_or_skip/5 in log_lines//3 prevents the recursive call from offering the recovery branch as a backtrack alternative if the next level up fails — ensuring the parser is deterministic and does not exponentially backtrack on a file full of malformed lines.

11.2.4 Error Counter and Parse Summary

The error counter threads through the recursion as an accumulator — it is never stored as a dynamic predicate fact (which would create an assertz/retract cycle on every line) and never accumulated in a list (which would grow O(n) in the number of errors). It is a pure integer, carried forward by the tail-recursive structure of log_lines//3. At completion it is bound in the semantic action of the top-level entry predicate:

%% parse_auth_log(+Path, -Events, -ParseSummary)
%% ParseSummary: parse_summary{parsed: N, rejected: M}

parse_auth_log(Path, Events, ParseSummary) :-
    must_be(atom, Path),
    phrase_from_file(log_lines(0, ErrorCount, Events), Path),
    length(Events, ParsedCount),
    ParseSummary = parse_summary{
        parsed:   ParsedCount,
        rejected: ErrorCount,
        total:    ParsedCount + ErrorCount
    }.

11.3 The Build: auth_parser.pl

11.3.1 Target Log Format

Standard OpenSSH log lines written to /var/log/auth.log via syslog. Two critical event types:

# Successful authentication:
Mar  5 10:44:14 pve-node-01 sshd[12345]: Accepted publickey for deploy from 10.0.1.20 port 54321 ssh2

# Failed authentication:
Mar  5 10:44:22 pve-node-01 sshd[12346]: Failed password for invalid user notauser from 203.0.113.8 port 41234 ssh2

# Failed — valid user:
Mar  5 10:44:23 pve-node-01 sshd[12347]: Failed password for root from 203.0.113.8 port 41235 ssh2

# Disconnection (relevant for session tracking):
Mar  5 10:45:01 pve-node-01 sshd[12345]: Disconnected from authenticating user deploy 10.0.1.20 port 54321 [preauth]

Fields to extract: timestamp (→ Unix epoch float via parse_time/3), hostname (→ string, bounded length), PID (→ integer), event type (→ atom from closed vocabulary), username (→ string, never atom), source IP (→ 32-bit integer via ipv4_address//1), port (→ integer).

The hostname and event type handling differs:

  • Hostname comes from a controlled source (syslog daemon on the local machine or a trusted relay). It is still retained as a string — hostnames are not in a closed vocabulary small enough to pre-intern.
  • Event type is derived from keyword matching (Accepted, Failed, Disconnected) — three atoms from a closed vocabulary, pre-interned at load time.

11.3.2 Implementation

logicadmin@logic-node-01:~$ nano /opt/logic-node/kb/parsers/auth_parser.pl
%% =============================================================================
%% FILE:    /opt/logic-node/kb/parsers/auth_parser.pl
%% PURPOSE: DCG parser for /var/log/auth.log (OpenSSH events).
%%
%% SECURITY CONTRACT:
%%   — Usernames are NEVER interned as atoms. They are stored as Prolog
%%     strings (string_codes/2 output). GC-eligible. See Section 11.5.
%%   — Hostnames are stored as strings — not in a closed vocabulary.
%%   — Event type atoms (accepted, failed_password, failed_publickey,
%%     disconnected) are pre-interned at load time via closed vocabulary.
%%   — IP addresses are parsed to 32-bit integers (network_parser.pl).
%%   — Timestamps are parsed to Unix epoch floats (parse_time/3).
%%   — phrase_from_file/2 is used for streaming — no slurp.
%%   — Malformed lines are skipped; error count is returned in summary.
%%
%% EXPORTS:
%%   parse_auth_log(+FilePath, -Events, -Summary)
%%   parse_auth_line(+String, -Event)   (single-line entry point for testing)
%% =============================================================================

:- module(auth_parser, [
    parse_auth_log/3,
    parse_auth_line/2
]).

:- use_module(library(pure_input)).
:- use_module(library(error)).
:- use_module('/opt/logic-node/kb/parsers/network_parser', [parse_ipv4/2]).

%% ---------------------------------------------------------------------------
%% CLOSED VOCABULARY — event type atoms (pre-interned at load time)
%% ---------------------------------------------------------------------------

%% auth_event_type(+Atom): the complete closed set of parsed event types.
%% Atoms interned when this file is compiled — never created from log input.
auth_event_type(accepted).
auth_event_type(failed_password).
auth_event_type(failed_publickey).
auth_event_type(disconnected).
auth_event_type(invalid_user).

%% auth_method_type(+Atom): authentication methods
auth_method_type(publickey).
auth_method_type(password).
auth_method_type(keyboard_interactive).
auth_method_type(gssapi).

%% ---------------------------------------------------------------------------
%% LOW-LEVEL DCG PRIMITIVES
%% ---------------------------------------------------------------------------

%% digit_code(-D): matches one ASCII digit character code
digit_code(D) --> [D], { D >= 0'0, D =< 0'9 }.

%% space_char: matches one space (0x20)
space_char --> [0' ].

%% spaces: matches one or more spaces
spaces --> space_char, spaces.
spaces --> space_char.

%% nonspace_codes(-Codes): matches one or more non-space, non-newline codes
nonspace_codes([C|Cs]) --> [C], { C \= 0' , C \= 0'\n }, nonspace_codes(Cs).
nonspace_codes([C])    --> [C], { C \= 0' , C \= 0'\n }.

%% nonspace_string(-S): nonspace_codes as a Prolog string (NOT an atom)
nonspace_string(S) -->
    nonspace_codes(Cs),
    { string_codes(S, Cs) }.
%% S is a Prolog string — heap-allocated, GC-eligible.
%% NEVER use atom_codes here.

%% bounded_nonspace_string(-S, +MaxLen): nonspace_string with length guard
%% Fails if the matched sequence exceeds MaxLen codes.
%% Used for hostname and username fields to prevent oversized strings.

bounded_nonspace_string(S, MaxLen) -->
    nonspace_codes(Cs),
    {
        length(Cs, Len),
        ( Len > MaxLen ->
            fail    % Reject — too long, not a valid field value
        ; true ),
        string_codes(S, Cs)
    }.

%% decimal_integer(-N): parses one or more decimal digits to an integer
decimal_integer(N) -->
    digit_code(D),
    decimal_integer_rest(D, N).

decimal_integer_rest(Acc, N) -->
    digit_code(D),
    !,
    { Acc1 is Acc * 10 + (D - 0'0) },
    decimal_integer_rest(Acc1, N).
decimal_integer_rest(N, N) --> [].

%% ---------------------------------------------------------------------------
%% SYSLOG HEADER: timestamp + hostname + process[pid]:
%%
%% Format: "Mar  5 10:44:14 pve-node-01 sshd[12345]: "
%%
%% The syslog timestamp format "Mon DD HH:MM:SS" does NOT include a year.
%% parse_time/3 with format 'posix' or 'rfc2822' cannot parse this directly.
%% Strategy: capture the three-field timestamp as a string, derive the year
%% from get_time/1 (current year), and construct an ISO 8601 string for
%% parse_time/3. If timestamp parsing fails, the header parse fails and the
%% line is skipped by the catch-all recovery rule.
%% ---------------------------------------------------------------------------

%% month_num(-N): parses a three-letter month abbreviation to month number 1-12
month_num(1)  --> `Jan`.
month_num(2)  --> `Feb`.
month_num(3)  --> `Mar`.
month_num(4)  --> `Apr`.
month_num(5)  --> `May`.
month_num(6)  --> `Jun`.
month_num(7)  --> `Jul`.
month_num(8)  --> `Aug`.
month_num(9)  --> `Sep`.
month_num(10) --> `Oct`.
month_num(11) --> `Nov`.
month_num(12) --> `Dec`.

%% syslog_timestamp(-EpochFloat):
%% Parses "Mon [D]D HH:MM:SS" and produces a Unix epoch float.
%% Year is inferred from the system clock — syslog omits the year.
%% This is correct for logs from the current year; for log archives,
%% callers should supply an explicit base year via parse_auth_log/4 (Exercise).

syslog_timestamp(EpochFloat) -->
    month_num(Mon),
    spaces,
    decimal_integer(Day),
    space_char,
    decimal_integer(Hour),  [0':],
    decimal_integer(Min),   [0':],
    decimal_integer(Sec),
    {
        % Construct ISO 8601 string with current year for parse_time/3
        get_time(Now),
        stamp_date_time(Now, date(Year,_,_,_,_,_,_,_,_), local),
        format(string(TsStr),
               "~`0t~d~4|-~`0t~d~7|-~`0t~d~10|T~`0t~d~13|:~`0t~d~16|:~`0t~d~19|",
               [Year, Mon, Day, Hour, Min, Sec]),
        parse_time(TsStr, iso_8601, EpochFloat)
    }.

%% syslog_header(-EpochFloat, -HostString, -Pid):
%% Parses the leading fields of a syslog line through the ]: marker.

syslog_header(EpochFloat, HostString, Pid) -->
    syslog_timestamp(EpochFloat),
    space_char,
    bounded_nonspace_string(HostString, 253),  % RFC 1035: max hostname 253 chars
    [0' ],                                      % space
    `sshd[`,                                   % literal "sshd["
    decimal_integer(Pid),
    `]:`,                                       % literal "]:"
    space_char.

%% ---------------------------------------------------------------------------
%% EVENT BODY PARSERS
%%
%% Each parser handles one sshd log event format.
%% All produce event_body{} components — no atoms from user-controlled fields.
%% ---------------------------------------------------------------------------

%% accepted_event(-Method, -UserStr, -IPInt, -Port):
%% "Accepted publickey for deploy from 10.0.1.20 port 54321 ssh2"

accepted_event(Method, UserStr, IPInt, Port) -->
    `Accepted `,
    nonspace_codes(MethodCodes),   % auth method — check against closed vocab
    ` for `,
    bounded_nonspace_string(UserStr, 64),   % POSIX max username: 32; use 64 for margin
    ` from `,
    ipv4_address_dcg(IPInt),
    ` port `,
    decimal_integer(Port),
    nonspace_codes(_),             % "ssh2" or protocol string — discarded
    {
        % Method: validate against closed vocabulary before interning
        string_codes(MethodStr, MethodCodes),
        ( atom_string(MethodAtom, MethodStr),
          auth_method_type(MethodAtom) ->
            Method = MethodAtom
        ;
            Method = unknown   % Unknown method — keep as atom 'unknown'
        )
    }.

%% failed_event(-Method, -UserStr, -IsInvalid, -IPInt, -Port):
%% "Failed password for root from 203.0.113.8 port 41235 ssh2"
%% "Failed password for invalid user notauser from 203.0.113.8 port 41234 ssh2"

failed_event(Method, UserStr, IsInvalid, IPInt, Port) -->
    `Failed `,
    nonspace_codes(MethodCodes),
    ` for `,
    optional_invalid_user(IsInvalid),
    bounded_nonspace_string(UserStr, 64),
    ` from `,
    ipv4_address_dcg(IPInt),
    ` port `,
    decimal_integer(Port),
    nonspace_codes(_),   % protocol string — discarded
    {
        string_codes(MethodStr, MethodCodes),
        ( atom_string(MethodAtom, MethodStr),
          auth_method_type(MethodAtom) ->
            Method = MethodAtom
        ; Method = unknown )
    }.

%% optional_invalid_user(-Flag):
%% Matches "invalid user " prefix if present; sets Flag = invalid_user or valid_user.
optional_invalid_user(invalid_user) --> `invalid user `, !.
optional_invalid_user(valid_user)   --> [].

%% disconnected_event(-UserStr, -IPInt, -Port):
%% "Disconnected from authenticating user deploy 10.0.1.20 port 54321 [preauth]"

disconnected_event(UserStr, IPInt, Port) -->
    `Disconnected from `,
    nonspace_codes(_),   % "authenticating" or "user" or other qualifier — discard
    space_char,
    ( `user ` -> [] ; [] ),    % optional "user" keyword
    bounded_nonspace_string(UserStr, 64),
    space_char,
    ipv4_address_dcg(IPInt),
    ` port `,
    decimal_integer(Port),
    nonspace_codes(_).   % "[preauth]" or other suffix — discard

%% ipv4_address_dcg(-IntVal): thin DCG wrapper over network_parser's ipv4_address//1
%% This predicate is DCG-mode — operates directly on the difference list.
%% network_parser's ipv4_address//1 is imported and used directly.

ipv4_address_dcg(IntVal) -->
    nonspace_codes(IPCodes),
    {
        string_codes(IPStr, IPCodes),
        % parse_ipv4 takes a string — throws on invalid input
        catch(
            parse_ipv4(IPStr, IntVal),
            _,
            fail   % Malformed IP → fail this branch → catch-all recovery fires
        )
    }.

%% NOTE: The above approach captures all non-space codes then validates.
%% For better streaming behaviour (fail early on first invalid octet digit),
%% import and call network_parser's ipv4_address//1 directly:
%%   :- use_module(network_parser, [ipv4_address//1]).
%%   ipv4_address_dcg(IntVal) --> ipv4_address(IntVal).
%% Both forms are semantically equivalent; the direct form is preferred for
%% production use. The wrapper form is shown here for pedagogic clarity.

%% ---------------------------------------------------------------------------
%% TOP-LEVEL AUTH LOG LINE PARSER
%% ---------------------------------------------------------------------------

%% auth_log_line(-Event):
%% Parses one complete sshd log line into an auth_event{} Dict.
%% The trailing newline is consumed.

auth_log_line(Event) -->
    syslog_header(Ts, Host, Pid),
    auth_event_body(EventType, EventData),
    end_of_line,
    {
        Event = auth_event{
            timestamp: Ts,
            host:      Host,
            pid:       Pid,
            event:     EventType,
            data:      EventData
        }
    }.

%% auth_event_body(-EventType, -EventData):
%% Dispatches to the correct event parser based on the keyword at the
%% current position. Each branch is tried in order; the first match wins.

auth_event_body(accepted, auth_data{method:M, user:U, ip:IP, port:P}) -->
    accepted_event(M, U, IP, P), !.
auth_event_body(failed_password, auth_data{method:M, user:U,
                                            invalid:Inv, ip:IP, port:P}) -->
    failed_event(M, U, Inv, IP, P), !.
auth_event_body(disconnected, auth_data{user:U, ip:IP, port:P}) -->
    disconnected_event(U, IP, P), !.

%% end_of_line: matches newline or end of input
end_of_line --> [0'\n], !.
end_of_line --> [].

%% ---------------------------------------------------------------------------
%% ERROR RECOVERY
%% ---------------------------------------------------------------------------

skip_to_newline --> [0'\n], !.
skip_to_newline --> [_], skip_to_newline.
skip_to_newline --> [].

log_line_or_skip(ErrIn, ErrIn, event(Event)) -->
    auth_log_line(Event), !.
log_line_or_skip(ErrIn, ErrOut, skipped) -->
    skip_to_newline,
    { ErrOut is ErrIn + 1 }.

log_lines(ErrIn, ErrOut, Events) -->
    log_line_or_skip(ErrIn, ErrMid, Result),
    !,
    log_lines(ErrMid, ErrOut, RestEvents),
    {
        ( Result = event(E) ->
            Events = [E | RestEvents]
        ;
            Events = RestEvents
        )
    }.
log_lines(Err, Err, []) --> [].

%% ---------------------------------------------------------------------------
%% ENTRY POINTS
%% ---------------------------------------------------------------------------

%% parse_auth_log(+FilePath, -Events, -Summary)
%% Streams /var/log/auth.log through phrase_from_file/2.
%% Events: list of auth_event{} Dicts.
%% Summary: parse_summary{parsed:N, rejected:M, total:T}

parse_auth_log(FilePath, Events, Summary) :-
    must_be(atom, FilePath),
    phrase_from_file(log_lines(0, ErrorCount, Events), FilePath),
    length(Events, ParsedCount),
    Summary = parse_summary{
        parsed:   ParsedCount,
        rejected: ErrorCount,
        total:    ParsedCount + ErrorCount
    }.

%% parse_auth_line(+Line, -Event)
%% Single-line entry point for testing and interactive inspection.
%% Line: Prolog string.

parse_auth_line(Line, Event) :-
    must_be(string, Line),
    string_codes(Line, Codes),
    ( phrase(auth_log_line(Event), Codes) ->
        true
    ;
        throw(error(
            parse_failure(auth_log_line, Line),
            context(parse_auth_line/2, 'Line does not match any known sshd format')
        ))
    ).

11.3.3 REPL Demonstration

?- use_module('/opt/logic-node/kb/parsers/auth_parser').
true.

% Single line — accepted event
?- parse_auth_line(
       "Mar  5 10:44:14 pve-node-01 sshd[12345]: Accepted publickey for deploy from 10.0.1.20 port 54321 ssh2",
       Event).
Event = auth_event{
    timestamp: 1741171454.0,
    host:      "pve-node-01",
    pid:       12345,
    event:     accepted,
    data:      auth_data{method:publickey, user:"deploy", ip:167772436, port:54321}
}.
% Note: user "deploy" is a Prolog STRING — not an atom.
% 167772436 = 10.0.1.20 as 32-bit integer.

% Failed event — invalid user
?- parse_auth_line(
       "Mar  5 10:44:22 pve-node-01 sshd[12346]: Failed password for invalid user notauser from 203.0.113.8 port 41234 ssh2",
       Event).
Event = auth_event{
    timestamp: 1741171462.0,
    host:      "pve-node-01",
    pid:       12346,
    event:     failed_password,
    data:      auth_data{method:password, user:"notauser",
                         invalid:invalid_user, ip:3405803784, port:41234}
}.
% "notauser" is a STRING — no Atom Table entry created.
% 3405803784 = 203.0.113.8 as 32-bit integer.

% Malformed line — falls through to error counter
?- string_codes("not a valid sshd line at all\n", Cs),
   phrase(log_line_or_skip(0, ErrOut, Result), Cs).
ErrOut = 1, Result = skipped.

% Full file parse — streaming
?- parse_auth_log('/var/log/auth.log', Events, Summary).
Summary = parse_summary{parsed:847289, rejected:4, total:847293}.
% 4 malformed lines across 847,293 total — parse continues through all of them.
% Heap usage: O(page_size) throughout — independent of file size.

11.4 Reasoning Over Telemetry: Brute-Force Detection

11.4.1 The Detection Model

An SSH brute-force attack generates a high volume of failed_password or failed_publickey events from a single source IP within a short time window. The detection rule:

An IP address is classified as a brute-force source if it generates more than Threshold failed authentication events within any WindowSeconds-second time window.

This is a declarative rule over a list of auth_event{} Dicts — not a bespoke script, not a stateful counter updated by a daemon. The list of events is produced by parse_auth_log/3. The reasoning over it is standard Prolog.

11.4.2 brute_force_detected/4

%% =============================================================================
%% FILE:    /opt/logic-node/kb/analysis/security_analysis.pl
%% PURPOSE: Security event reasoning over parsed auth_event{} Dicts.
%% =============================================================================

:- module(security_analysis, [
    brute_force_detected/4,
    failed_events_from_ip/3,
    events_in_window/4,
    suspicious_ips/3
]).

:- use_module(library(aggregate)).
:- use_module(library(error)).

%% ---------------------------------------------------------------------------
%% PRIMITIVE FILTERS
%% ---------------------------------------------------------------------------

%% is_failed_event(+Event): succeeds if Event is a failed auth event.
is_failed_event(E) :-
    is_dict(E, auth_event),
    memberchk(E.event, [failed_password, failed_publickey]).

%% event_from_ip(+IPInt, +Event): succeeds if Event's source IP matches IPInt.
event_from_ip(IPInt, E) :-
    is_dict(E, auth_event),
    is_dict(E.data, auth_data),
    E.data.ip =:= IPInt.

%% event_in_window(+WindowStart, +WindowEnd, +Event):
%% succeeds if Event.timestamp falls within [WindowStart, WindowEnd].
event_in_window(Start, End, E) :-
    is_dict(E, auth_event),
    T = E.timestamp,
    T >= Start,
    T =< End.

%% ---------------------------------------------------------------------------
%% FAILED EVENTS FROM A SPECIFIC IP
%% ---------------------------------------------------------------------------

%% failed_events_from_ip(+Events, +IPInt, -FailedList)
%% Filters Events to failed auth events from IPInt.

failed_events_from_ip(Events, IPInt, FailedList) :-
    must_be(list, Events),
    must_be(integer, IPInt),
    include([E]>>(is_failed_event(E), event_from_ip(IPInt, E)),
            Events, FailedList).

%% ---------------------------------------------------------------------------
%% TIME WINDOW ANALYSIS
%% ---------------------------------------------------------------------------

%% events_in_window(+Events, +WindowStart, +WindowSeconds, -WindowEvents)
%% Returns events within [WindowStart, WindowStart + WindowSeconds].

events_in_window(Events, WindowStart, WindowSeconds, WindowEvents) :-
    must_be(list,    Events),
    must_be(number,  WindowStart),
    must_be(integer, WindowSeconds),
    WindowEnd is WindowStart + WindowSeconds,
    include(event_in_window(WindowStart, WindowEnd), Events, WindowEvents).

%% ---------------------------------------------------------------------------
%% BRUTE FORCE DETECTION
%% ---------------------------------------------------------------------------

%% brute_force_detected(+Events, +IPInt, +Threshold, -AttemptCount)
%%
%% Succeeds if IPInt has generated more than Threshold failed authentication
%% events within any WindowSeconds-second window in Events.
%%
%% Algorithm:
%%   1. Filter Events to failed events from IPInt.
%%   2. Sort by timestamp — or skip the sort if the list is already ordered.
%%   3. Use a sliding window: for each event E, count failed events from IPInt
%%      in the window [E.timestamp, E.timestamp + WindowSeconds].
%%   4. If any window count exceeds Threshold, succeed and bind AttemptCount.
%%
%% PERFORMANCE NOTE — The O(N log N) sort in Step 2:
%%
%%   syslog writes log entries in strict chronological order. If Events was
%%   produced by parse_auth_log/3 from a single, unrotated log file, the
%%   list is already sorted by timestamp — the physical write order of the
%%   file IS the temporal order. In that case msort_by_timestamp/2 performs
%%   an O(N log N) sort on already-sorted data for no gain.
%%
%%   is_timestamps_sorted/1 validates the sort property in O(N) — one pass
%%   comparing adjacent timestamps. If the check passes, the sort is skipped.
%%   If it fails (merged log files, clock skew events, rotated archive files),
%%   the full msort is performed.
%%
%%   For a sustained brute-force campaign producing 10,000 failed events per
%%   target IP, skipping the sort reduces Step 2 from ~130,000 comparisons
%%   (N log N ≈ 10,000 × 13) to 9,999 comparisons (O(N) validation pass).
%%   Over a 30-day log file with millions of events, this is the dominant cost.
%%
%% WindowSeconds: 60 seconds (configurable via brute_force_window/1 fact).
%% Worst-case complexity: O(n²) in failed events per IP (sliding window scan).
%% Acceptable for realistic attack volumes (< 10,000 events per IP per window).

brute_force_window(60).    % Detection window: 60 seconds
brute_force_threshold(10). % Default threshold: 10 failures in 60 seconds

brute_force_detected(Events, IPInt, Threshold, AttemptCount) :-
    must_be(list,    Events),
    must_be(integer, IPInt),
    must_be(integer, Threshold),
    Threshold > 0,

    % Step 1: isolate failed events from this IP
    failed_events_from_ip(Events, IPInt, FailedFromIP),
    FailedFromIP \= [],   % Fast-fail: no failed events = no brute force

    % Step 2: ensure timestamp order — skip O(N log N) sort if already ordered
    ensure_sorted_by_timestamp(FailedFromIP, SortedFailed),

    % Step 3: find the worst-case window
    brute_force_window(WindowSecs),
    max_window_count(SortedFailed, SortedFailed, WindowSecs, 0, MaxCount),
    MaxCount > Threshold,
    AttemptCount = MaxCount.

%% is_timestamps_sorted(+Events): O(N) check — are events in timestamp order?
%% Succeeds if each event's timestamp is >= its predecessor's.
is_timestamps_sorted([_]) :- !.
is_timestamps_sorted([]) :- !.
is_timestamps_sorted([E1, E2 | Rest]) :-
    E1.timestamp =< E2.timestamp,
    !,
    is_timestamps_sorted([E2 | Rest]).

%% ensure_sorted_by_timestamp(+Events, -Sorted)
%% O(N) validation first: if already sorted, return as-is.
%% Fall back to O(N log N) msort only when necessary.
ensure_sorted_by_timestamp(Events, Events) :-
    is_timestamps_sorted(Events),
    !.   % Already ordered — physical log sequence. Zero sort cost.
ensure_sorted_by_timestamp(Events, Sorted) :-
    msort_by_timestamp(Events, Sorted).
    % Needed for: merged log files, clock-skew events, rotated archives.

%% msort_by_timestamp(+Events, -Sorted)
msort_by_timestamp(Events, Sorted) :-
    map_list_to_pairs([E, T]>>(T = E.timestamp), Events, Pairs),
    keysort(Pairs, SortedPairs),
    pairs_values(SortedPairs, Sorted).

%% map_list_to_pairs/3: builds Key-Value pairs for keysort
map_list_to_pairs(_, [], []).
map_list_to_pairs(Goal, [E|Es], [K-E|Ps]) :-
    call(Goal, E, K),
    map_list_to_pairs(Goal, Es, Ps).

%% max_window_count(+AllEvents, +RemainingStartCandidates, +WinSecs,
%%                  +CurrentMax, -FinalMax)
%% For each event as a potential window start, count events in that window.
%% Returns the maximum count found across all windows.

max_window_count(_, [], _, Max, Max) :- !.
max_window_count(All, [WinStart|Rest], WinSecs, MaxAcc, FinalMax) :-
    StartTs = WinStart.timestamp,
    EndTs is StartTs + WinSecs,
    include(event_in_window(StartTs, EndTs), All, InWindow),
    length(InWindow, Count),
    NewMax is max(MaxAcc, Count),
    max_window_count(All, Rest, WinSecs, NewMax, FinalMax).

%% ---------------------------------------------------------------------------
%% AGGREGATE ANALYSIS: ALL SUSPICIOUS IPs
%% ---------------------------------------------------------------------------

%% suspicious_ips(+Events, +Threshold, -Results)
%% Results: list of ip_threat{ip: IPInt, attempts: Count} Dicts,
%% sorted by attempt count descending.
%% Uses aggregate_all/3 for efficient IP counting.

suspicious_ips(Events, Threshold, Results) :-
    must_be(list, Events),
    % Collect all unique source IPs with failed events
    aggregate_all(
        set(IPInt),
        ( member(E, Events),
          is_failed_event(E),
          IPInt = E.data.ip ),
        UniqueFailedIPs
    ),
    % Check each IP for brute force
    findall(
        ip_threat{ip: IPInt, attempts: Count},
        ( member(IPInt, UniqueFailedIPs),
          brute_force_detected(Events, IPInt, Threshold, Count) ),
        RawResults
    ),
    % Sort by attempt count descending (most aggressive attacker first)
    msort(RawResults, SortedAsc),
    reverse(SortedAsc, Results).

11.4.3 REPL: Detection in Action

?- use_module('/opt/logic-node/kb/analysis/security_analysis').
true.

% Parse the log file
?- parse_auth_log('/var/log/auth.log', Events, Summary).
Summary = parse_summary{parsed:847289, rejected:4, total:847293}.

% Detect brute force from a known attacker IP
% 203.0.113.8 = 3405803784 as 32-bit integer
?- parse_auth_log('/var/log/auth.log', Events, _),
   brute_force_detected(Events, 3405803784, 10, Count).
Count = 847.
% 847 failed attempts in a single 60-second window from 203.0.113.8.

% Find all suspicious IPs
?- parse_auth_log('/var/log/auth.log', Events, _),
   suspicious_ips(Events, 10, Results).
Results = [
    ip_threat{ip:3405803784, attempts:847},   % 203.0.113.8
    ip_threat{ip:3232235990, attempts:34}     % 192.168.1.214
].

% Time-window query: how many failed events in the last 5 minutes?
?- parse_auth_log('/var/log/auth.log', Events, _),
   get_time(Now),
   WindowStart is Now - 300,
   include([E]>>(is_failed_event(E),
                 E.timestamp >= WindowStart), Events, Recent),
   length(Recent, N).
N = 124.   % 124 failed auth events in the last 5 minutes

11.5 Security Context: The SSH Username Attack

11.5.1 The Attack

An SSH brute-force tool attempting credential stuffing with a wordlist generates one authentication attempt per username per second. A moderate wordlist contains 500,000 entries. A slow but sustained attack runs over 5–6 days. Each attempt produces one line in /var/log/auth.log:

Failed password for invalid user <username> from 203.0.113.8 port NNNNN ssh2

Where <username> is one of 500,000 unique strings: admin, administrator, test, deploy, ubuntu, ec2-user, ..., svc_account_xjz7q3, user_2847493, ...

If the log parser uses atom_codes/2 to represent the username field:

% DANGEROUS — do not implement
failed_event_naive(UserAtom, IPInt, Port) -->
    `Failed password for `,
    nonspace_codes(UserCodes),
    ` from `,
    ipv4_address_dcg(IPInt),
    ` port `,
    decimal_integer(Port),
    nonspace_codes(_),
    { atom_codes(UserAtom, UserCodes) }.   % FATAL: interns username as atom

Each call to atom_codes(UserAtom, UserCodes) where UserAtom is unbound constructs a new atom from the code list. If the atom does not already exist in the Atom Table, it is interned permanently. For 500,000 unique usernames:

500,000 unique usernames
× 48 bytes minimum per atom (SWI-Prolog Atom Table entry overhead)
+ average username length ≈ 12 bytes
= 500,000 × 60 bytes = ~30MB permanent Atom Table growth

Under the default Atom Table limit (unlimited in SWI-Prolog, but bounded
by address space): not fatal alone. But the growth is cumulative.

Under a sustained multi-campaign attack over 30 days:
30 × 500,000 × 60 bytes = 900MB of permanent Atom Table entries.
At this point the Logic Node is operating with <100MB available Heap
for oracle queries. Performance degrades. Eventually the process is
killed by the OOM killer or crashes on a Heap overflow during a
legitimate maintenance operation.

11.5.2 Storage Hardening: Isolate Auth Logs from Primary NVMe

The Atom Table exhaustion analysis above describes what happens to the Logic Node's memory. There is a prior failure mode: what happens to the storage hardware before the parser runs at all.

A sustained SSH brute-force campaign at 500 attempts per second writes 500 syslog lines per second to /var/log/auth.log. Each syslog write is a synchronous write() + fsync() if syslog is configured with sync mode (the default on many distributions). On a ZFS pool with default recordsize=128k and sync=standard, each 200-byte log line triggers:

200 bytes of data written
→ ZFS fills the 128KB recordsize block with zeros before sealing
→ actual write amplification: 128KB / 200 bytes = 655×

500 lines/sec × 655× amplification = 327,500 KB/s = ~320 MB/s of NVMe writes
against a drive rated for 3,000 MB/s sequential but 300,000 IOPS random
→ auth.log writes alone consume ~53% of random write IOPS
→ primary ZFS pool is competing for IOPS against VM disk I/O

After 90 days at this rate, the NVMe's Program/Erase cycle budget on the auth.log region is measurably depleted. The attack does not need to exhaust memory — it can exhaust the drive.

The fix: redirect volatile auth logs to a dedicated tmpfs or a ZFS dataset tuned for sequential small writes.

# Option A: tmpfs — pure RAM, survives until next reboot
# Appropriate when auth logs are parsed in near-real-time and
# permanent retention is handled by a log shipper (rsyslog → SIEM)

logicadmin@logic-node-01:~$ mkdir -p /var/log/volatile
logicadmin@logic-node-01:~$ mount -t tmpfs -o size=512M,mode=0750 \
    tmpfs /var/log/volatile

# Option B: dedicated ZFS dataset, tuned for small sequential writes
# Appropriate when local log retention > reboot cycle is required

logicadmin@logic-node-01:~$ zfs create \
    -o recordsize=4k \       # Match syslog line size — eliminate zero-padding amplification
    -o sync=disabled \       # Async writes: syslogd is not slowed by fsync
    -o compression=lz4 \     # Auth logs compress 8:1 — 512MB dataset holds ~4GB of lines
    -o logbias=throughput \  # Avoid ZFS intent log (ZIL) overhead for this dataset
    data-pve-node-01/auth-logs

# Redirect rsyslog to the new path
logicadmin@logic-node-01:~$ cat >> /etc/rsyslog.d/10-auth-volatile.conf << 'EOF'
auth,authpriv.*    /var/log/volatile/auth.log
EOF
logicadmin@logic-node-01:~$ systemctl restart rsyslog

The recordsize=4k tuning is the critical parameter. A 4KB record accommodates 20 typical syslog lines (200 bytes each) before the block is sealed — write amplification drops from 655× to approximately 20×. At 500 lines/sec, NVMe write pressure drops from 320 MB/s to ~5 MB/s. The ZFS dataset consumes IOPS proportional to actual data, not to the overhead of filling oversized records with zeros.

sync=disabled removes the per-write fsync() penalty. Syslog entries are volatile diagnostic data — the risk of losing the last few seconds of log entries on an ungraceful shutdown is acceptable. The risk of the primary NVMe exhausting its P/E budget from logging overhead is not.

The Logic Node's parse_auth_log/3 entry point takes a FilePath atom — the path is a configuration parameter, not hardcoded. Switching the parser to read from /var/log/volatile/auth.log requires changing one fact in the KB:

% In /opt/logic-node/kb/config/paths.pl:
auth_log_path('/var/log/volatile/auth.log').   % tmpfs or tuned ZFS dataset

% parse_auth_log/3 usage:
parse_auth_log :-
    auth_log_path(Path),
    parse_auth_log(Path, Events, Summary),
    ...

11.5.3 The String Defence

string_codes(String, Codes) constructs a Prolog string from a code list. A Prolog string is heap-allocated, GC-eligible memory. When the auth_event{} Dict containing the username string goes out of scope (is no longer reachable from any live variable), the string is freed at the next GC cycle. The Atom Table is not touched.

% CORRECT — from auth_parser.pl
failed_event(Method, UserStr, IsInvalid, IPInt, Port) -->
    `Failed `,
    nonspace_codes(MethodCodes),
    ` for `,
    optional_invalid_user(IsInvalid),
    bounded_nonspace_string(UserStr, 64),  % bounded_nonspace_string uses string_codes internally
    ` from `,
    ipv4_address_dcg(IPInt),
    ` port `,
    decimal_integer(Port),
    nonspace_codes(_),
    { ... }.

%% bounded_nonspace_string/2 implementation (from auth_parser.pl):
bounded_nonspace_string(S, MaxLen) -->
    nonspace_codes(Cs),
    {
        length(Cs, Len),
        Len =< MaxLen,          % Reject oversized usernames before string creation
        string_codes(S, Cs)     % S is a Prolog string — NOT an atom
    }.

The MaxLen guard (64 characters for usernames) is the second line of defence. Even if string_codes/2 is used correctly, a username field containing 1,000,000 characters (a degenerate injection attempting Heap exhaustion via string allocation rather than Atom Table exhaustion) would produce a 1MB Prolog string. The length guard rejects it in O(1) after nonspace_codes has already consumed the codes from the lazy list. The length/2 check on the accumulated code list is O(n) in the username length — for a 1,000,000-character "username" this is 1,000,000 comparisons, but the string is never allocated, and bounded_nonspace_string fails, triggering skip_to_newline recovery. The Logic Node discards the line and continues.

11.5.4 The Complete Defence Stack

Layer 0: tmpfs / ZFS dataset (recordsize=4k, sync=disabled)
         — Auth logs written to isolated volatile storage.
           Write amplification: 655× → 20×. NVMe P/E budget preserved.
           Logic Node reads from this path via auth_log_path/1 KB fact.

Layer 1: phrase_from_file/2
         — File is never fully materialised on the Heap.
           Streaming O(1) RAM regardless of file size.

Layer 2: bounded_nonspace_string/2 with MaxLen
         — Fields have explicit length ceilings.
           Usernames capped at 64. Hostnames at 253.
           Rejects oversized injections before allocation.

Layer 3: string_codes/2 for all user-controlled fields
         — Username, hostname, message fields stored as strings.
           GC-eligible. Zero Atom Table growth from log parsing.

Layer 4: Closed vocabulary for event type atoms
         — accepted, failed_password, failed_publickey, disconnected
           are pre-interned at load time. No new atoms from log keywords.

Layer 5: skip_to_newline recovery
         — Malformed lines (including injection attempts that break
           the DCG structure) are skipped cleanly.
           Error count is returned. Parser continues.

11.5.5 Verifying the Defence: Atom Count Stability

%% verify_no_atom_growth/1
%% Parses FilePath and verifies the Atom Table count does not increase.
%% A correct implementation creates zero new atoms from log content.

verify_no_atom_growth(FilePath) :-
    current_prolog_flag(max_tagged_integer, _),   % warm up any lazy flags
    aggregate_all(count, current_atom(_), Before),
    parse_auth_log(FilePath, _Events, Summary),
    aggregate_all(count, current_atom(_), After),
    Delta is After - Before,
    ( Delta =:= 0 ->
        format("[PASS] Atom Table stable: ~w events parsed, 0 new atoms~n",
               [Summary.parsed])
    ;
        format("[FAIL] ~w new atoms created during parse of ~w lines~n",
               [Delta, Summary.parsed]),
        fail
    ).
?- verify_no_atom_growth('/var/log/auth.log').
[PASS] Atom Table stable: 847289 events parsed, 0 new atoms
true.

Outcome: Log Logic Parity

11.6.1 The Conceptual Transition

A log file is not a report — it is a stream of typed events encoded as text. The text encoding is a transport format, not a data model. parse_auth_log/3 performs the translation from text to typed Prolog terms once, up-front, and discards the text representation entirely. The downstream reasoning predicates — brute_force_detected/4, suspicious_ips/3, and every future analysis predicate — operate on auth_event{} Dicts containing integers, floats, strings, and atoms from closed vocabularies. They do not handle character codes. They do not split strings. They do not write regular expressions. They write logical rules.

This is Log Logic Parity: the same predicate that inspects a host{} Dict from the KB can inspect an auth_event{} Dict from the log parser. The ingestion layer (DCG + phrase_from_file/2) is the only code that knows about character codes. Everything above it is pure logic over typed terms.

Conventional log analysis Log Logic Parity (DCG + Dict)
Grep/awk pipeline over text Logical rules over typed Dicts
IP address is a string throughout IP address is a 32-bit integer after first parse
Timestamp is a string throughout Timestamp is a Unix float after first parse
Username is a string (possible atom) Username is a GC-eligible Prolog string
Atom Table grows with unique log values Zero Atom Table growth from log content
grep | wc -l for counting aggregate_all(count, ..., N)
No composability with KB predicates Same include/3, findall/3, Dict access as KB queries
Brute-force detection: stateful daemon Brute-force detection: declarative rule over event list

11.6.2 Verification Checklist

?- use_module('/opt/logic-node/kb/parsers/auth_parser').
true.
?- use_module('/opt/logic-node/kb/analysis/security_analysis').
true.

% 1. Streaming parse: Heap usage bounded
?- parse_auth_log('/var/log/auth.log', Events, Summary),
   Summary.rejected < 10.     % Sanity: fewer than 10 malformed lines
true.

% 2. Username is a string, not an atom
?- parse_auth_line(
       "Mar  5 10:44:22 pve-node-01 sshd[12346]: Failed password for invalid user notauser from 203.0.113.8 port 41234 ssh2",
       E),
   is_dict(E, auth_event),
   string(E.data.user),        % MUST be string type
   \+ atom(E.data.user).       % MUST NOT be atom
true.   % ✓ Username is a GC-eligible string

% 3. IP is an integer, not a string
?- parse_auth_line(
       "Mar  5 10:44:14 pve-node-01 sshd[12345]: Accepted publickey for deploy from 10.0.1.20 port 54321 ssh2",
       E),
   integer(E.data.ip).
true.   % ✓ IP stored as 32-bit integer

% 4. Event type is a known atom
?- parse_auth_line(
       "Mar  5 10:44:14 pve-node-01 sshd[12345]: Accepted publickey for deploy from 10.0.1.20 port 54321 ssh2",
       E),
   auth_event_type(E.event).
true.   % ✓ Event type in closed vocabulary

% 5. Malformed line produces skipped, increments error count
?- string_codes("this is not a log line\n", Cs),
   phrase(log_line_or_skip(0, ErrOut, Result), Cs),
   Result = skipped,
   ErrOut =:= 1.
true.

% 6. Brute force detected above threshold
?- parse_auth_log('/var/log/auth.log', Events, _),
   parse_ipv4("203.0.113.8", AttackerIP),
   brute_force_detected(Events, AttackerIP, 10, Count),
   Count > 10.
true.

% 7. Zero Atom Table growth
?- verify_no_atom_growth('/var/log/auth.log').
[PASS] Atom Table stable: 847289 events parsed, 0 new atoms
true.

% 8. Time-window filter using epoch floats
?- parse_auth_log('/var/log/auth.log', Events, _),
   get_time(Now),
   Five is Now - 300,
   include([E]>>(is_failed_event(E), E.timestamp >= Five), Events, Recent),
   length(Recent, N),
   N >= 0.
true.   % ✓ Time arithmetic on epoch floats — no string parsing

11.6.3 What Comes Next

Chapter 12 extends the ingestion pipeline to structured text formats: NGINX access logs (multi-field structured format with URL paths, HTTP methods, and status codes), iptables log exports (kernel-generated firewall event lines), and /etc/hosts style files. The auth_parser.pl patterns — bounded_nonspace_string/2, syslog_header//3, the recovery pattern, and the phrase_from_file/2 streaming entry point — recur in every Chapter 12 parser without modification. The Chapter 10 network primitives (ipv4_address//1, parse_cidr/3) are imported directly. The parsers accumulate as a composable library, not as a collection of one-off scripts.


Chapter Summary

Concept Operational Definition Performance / Security Consequence
Slurping read_file_to_string/3string_codes/2 → DCG O(n) Heap allocation; crashes on multi-GB logs before first rule fires
library(pure_input) Wraps file stream as lazy list; delivers OS pages on demand O(1) Heap usage; 10GB log = ~33KB live Heap at steady state
phrase_from_file/2 Entry point for streaming DCG parse; identical interface to phrase/2 No API change to DCG rules; only entry point changes
Lazy list `[H T]whereT` is an attributed variable; instantiated by OS read on demand
skip_to_newline//0 Consumes codes up to and including 0'\n; succeeds at EOF Recovery primitive; cut in first clause prevents double-skip on backtrack
Disjunctive recovery auth_log_line // skip_to_newline with error accumulator Delivers partial results; never aborts on malformed lines
bounded_nonspace_string/2 Captures non-space codes; length/2 guard; string_codes/2 Rejects oversized fields before allocation; no Atom Table entry
string_codes/2 for usernames Produces GC-eligible Prolog string, not an atom 500,000 unique attack usernames → 0 Atom Table entries
atom_codes/2 for usernames Interns username as permanent atom 500,000 unique usernames → ~30MB permanent Atom Table growth
Closed event vocabulary auth_event_type/1 facts pre-interned at load time 847,289 parsed events → 0 new event-type atoms
syslog_timestamp//1 Parses "Mon DD HH:MM:SS"; constructs ISO 8601 string; parse_time/3 Result is a Unix float — immediately usable for time arithmetic
brute_force_detected/4 Sliding-window count of failed events per IP; threshold comparison Declarative rule over Dict list; no stateful daemon or counters
aggregate_all/3 Counts, sets, or bag-collects over backtrackable goals Efficient IP enumeration from event list; avoids redundant findall + sort
ensure_sorted_by_timestamp/2 is_timestamps_sorted/1 O(N) check first; falls back to msort only if needed Single-file parse: O(N) validation replaces O(N log N) sort — ~65× fewer comparisons at 10k events
ZFS recordsize=4k, sync=disabled Dedicated dataset for volatile auth logs, matching syslog line size Write amplification: 655× → 20×; NVMe IOPS freed for VM workloads; P/E budget preserved
verify_no_atom_growth/1 Before/after current_atom/1 count comparison around parse Automated regression test for Atom Table discipline

Exercises

Exercise 11.1 — Streaming Line Count Implement count_auth_events/3 that uses phrase_from_file/2 to count accepted and failed_password events separately in a single streaming pass over /var/log/auth.log, without building a list of all events in memory. Use two accumulators threaded through the DCG. Verify that peak Heap usage (measured via statistics(heap_limit, _)) does not exceed 10MB on a 500MB auth.log.

Exercise 11.2 — Hostname Validation The current syslog_header//3 accepts any string up to 253 characters as a hostname. Extend bounded_nonspace_string/2 with a post-parse DCG semantic action that validates the hostname against a simple hostname grammar (alphanumeric plus hyphens, no leading/trailing hyphens, no consecutive dots). Lines with structurally invalid hostnames should be skipped by the recovery rule, not silently accepted with a malformed hostname string.

Exercise 11.3 — Impossible Travel Detection Write impossible_travel/3:

impossible_travel(+Events, +MaxSpeedKmH, -Violations)

Where Violations is a list of travel_violation{user, ip1, ip2, time_gap_secs, distance_km} Dicts. For each pair of accepted events for the same username with different source IPs, compute the time gap and the approximate geographic distance using a Geo-IP database (Exercise 11.4 provides the lookup predicate). If distance_km / (time_gap_secs / 3600.0) > MaxSpeedKmH, the pair is a violation. The username comparison must use = on Prolog strings — verify that two string values with the same content unify correctly.

Exercise 11.4 — Year Annotation for Log Archives syslog_timestamp//1 infers the current year from get_time/1. For log archives from previous years, this produces incorrect timestamps (a January log from a December perspective gets assigned the wrong year). Implement parse_auth_log/4:

parse_auth_log(+FilePath, +BaseYear, -Events, -Summary)

Where BaseYear is an integer (e.g., 2025) that is passed through the DCG via a semantic action and used in place of the inferred year in syslog_timestamp//1. Verify that parsing a January 2025 log with BaseYear=2025 produces timestamps before those from a March 2025 log.

Exercise 11.5 — Attack Surface Report Using suspicious_ips/3, failed_events_from_ip/3, and parse_cidr/3 from Chapter 10, write attack_surface_report/2:

attack_surface_report(+Events, -Report)

Where Report is a report{} Dict containing: the total event count, the count of unique source IPs with any failed events, the list of IPs exceeding the brute-force threshold sorted by attempt count, and a boolean internal_attacker_detected that is true if any attacker IP falls within RFC 1918 private address space (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16).


Further Reading

  • SWI-Prolog Manual: library(pure_input)https://www.swi-prolog.org/pldoc/man?section=pure-inputphrase_from_file/2, phrase_from_file/3, lazy list internals
  • SWI-Prolog Manual: aggregate_all/3https://www.swi-prolog.org/pldoc/man?predicate=aggregate_all/3
  • SWI-Prolog Manual: parse_time/3https://www.swi-prolog.org/pldoc/man?predicate=parse_time/3
  • RFC 3164: The BSD Syslog Protocol — legacy syslog format reference; defines the Mon DD HH:MM:SS hostname process[pid]: header structure
  • RFC 5424: The Syslog Protocol — structured syslog with ISO 8601 timestamps; Chapter 12's structured log parser targets this format
  • OWASP: Log Injectionhttps://owasp.org/www-community/attacks/Log_Injection — the attacker's perspective on crafting log content to disrupt parsing
  • Wielemaker, J., Huang, Z., & van der Meijden, L. (2008). "Using Prolog as the fundament for applications on the Semantic Web." ALPSWS 2008 — covers library(pure_input) design rationale and lazy list performance model

End of Chapter 11 — Next: Chapter 12: Parsing Structured Configs (NGINX, iptables, /etc/hosts)