Skip to main content

Chapter 5: DCGs — Parsing System Streams

Overview

Every serious computing environment generates text. Log files, configuration files, system status output, network packet captures, audit trails — all of these are streams of structured text that carry meaning in their format. The challenge for any infrastructure management system is not accessing this text; it is understanding it. A raw line from /var/log/auth.log like Jun 15 14:32:01 mint-logic-lab sshd[1234]: Accepted publickey for logicdev from 192.168.10.5 port 52341 ssh2 contains a timestamp, a hostname, a process name, a process ID, an event type, a username, a source IP address, and a port number. All of it is semantically rich, and all of it is locked inside a character string.

In most languages, extracting this information means writing regular expressions — patterns that are powerful but opaque, difficult to maintain, and essentially impossible to run in reverse. In Prolog, there is a better mechanism. Definite Clause Grammars, or DCGs, are a notation built into SWI-Prolog that allows us to write parsers in a form that reads almost like the grammar of the language being parsed. A DCG rule for a log timestamp looks like a description of a timestamp, not like a sequence of string manipulation operations. The engine handles the mechanics of consuming characters, tracking position, and backtracking on failed matches. We describe the structure; the engine does the parsing.

This chapter introduces DCGs from first principles, building up from the simplest possible grammar to a complete, working parser for real Linux system log entries. By the end of the chapter, we will have a module that can read a live /var/log/auth.log or /var/log/syslog file from the Mint VM, parse every line into a structured Dict, and make the resulting data available for the kind of logic-based querying we have been building toward since Chapter 1. We will also demonstrate DCG reversibility — the ability to use the same grammar to both parse input and generate output — and we will build a configuration file reader and writer that uses a single DCG for both directions.

DCGs are one of the features of SWI-Prolog that most clearly separates it from both toy Prolog implementations and from conventional programming languages. By the end of this chapter, the gap between the development environment on the Mint VM and a production-grade log analysis system will have narrowed considerably.

5.1 What a DCG Is and How It Works

A Definite Clause Grammar rule looks superficially like a Prolog rule, but uses --> instead of :- as the neck operator, and its body describes a sequence of things to be consumed from an input list rather than a conjunction of goals to be proved.

The simplest possible DCG to understand the mechanism:

greeting --> [hello], [world].

This rule says: the non-terminal greeting is recognised when the input sequence contains the element hello followed by the element world. Load this into the REPL and use phrase/2 to test it:

?- phrase(greeting, [hello, world]).
true.

?- phrase(greeting, [hello, prolog]).
false.

?- phrase(greeting, [hello, world, extra]).
false.

The phrase/2 predicate is the standard way to invoke a DCG. Its first argument is the non-terminal to try, and its second argument is the input list. A third argument form, phrase/3, takes a remainder list — the unconsumed portion of the input after the grammar rule has matched:

?- phrase(greeting, [hello, world, extra], Rest).
Rest = [extra].

The DCG notation is syntactic sugar over standard Prolog. When SWI-Prolog loads a DCG rule, it automatically translates it into a regular Prolog predicate with two extra arguments representing the input list and the remainder. The rule greeting --> [hello], [world]. becomes:

greeting([hello, world | T], T).
DCG TRANSLATION: THE DIFFERENCE LIST
─────────────────────────────────────────────────────────────────
  Rule:  greeting --> [hello], [world].

  Becomes:
  greeting(InputList, Remainder) :-
      InputList = [hello | T1],   ← consumes 'hello', leaves T1
      T1        = [world | Remainder].  ← consumes 'world', leaves Remainder

  Usage:
  ?- greeting([hello, world, extra], Rest).
  InputList = [hello, world, extra]
  T1        = [world, extra]
  Remainder = [extra]
─────────────────────────────────────────────────────────────────
  DCGs do not use hidden state. They thread the "remaining input"
  invisibly through the predicate arguments. Each rule consumes
  some prefix of the list and passes the rest to the next rule.
  phrase/2 simply calls the translated predicate with [] as the
  expected remainder, enforcing that the entire input is consumed.

Understanding this translation is important because it demystifies what the engine is actually doing during parsing, and because it explains why regular Prolog calls can be embedded inside DCG rules using the {} notation (goals inside curly braces are passed through unchanged to the translated predicate, without the extra list arguments). We will use this extensively when building the log parser.

Terminal symbols in a DCG rule are written inside square brackets. Non-terminal symbols — references to other DCG rules — are written without brackets. The comma operator sequences two items that must match consecutively. The semicolon operator provides alternatives. This is exactly the vocabulary of a Backus-Naur Form grammar, and the correspondence is intentional — DCGs are a direct executable implementation of context-free grammars.

5.2 DCGs Over Character Codes and Strings

The examples above used lists of atoms as the input sequence, which works but is not how real text parsing operates. When parsing log files, the input is a string or a list of character codes. SWI-Prolog provides two approaches for character-level parsing: working directly with character code lists, or using the higher-level string_codes/2 and atom_codes/2 conversions to bridge between strings and code lists.

For our log parser, we will use a pragmatic hybrid approach. We read each log line as a string using read_term_from_atom/3 or read_line_to_string/2, then convert it to a character list with string_chars/2, and then apply a character-level DCG. This approach has the advantage that the grammar rules read naturally — we write "Jun" as a terminal sequence rather than [74, 117, 110] — while still operating at the character level where the full flexibility of DCG backtracking is available.

To introduce this, let us build a simple integer parser before tackling the full log grammar:

% integer_parser.pl
% A character-level DCG for parsing integers.
% Part II, Chapter 5 - Modern SWI-Prolog (2026 Edition)

:- module(integer_parser, [parse_integer/2]).

% digit(?D) -- matches a single digit character and extracts its value
digit(D) --> [C], { code_type(C, digit(D)) }.

% digits(-Ds) -- matches one or more digit characters
digits([D|Ds]) --> digit(D), digits_rest(Ds).

digits_rest([D|Ds]) --> digit(D), !, digits_rest(Ds).
digits_rest([])     --> [].

% integer(-N) -- matches a complete integer, possibly with a leading minus
integer(N) --> ['-'], !, digits(Ds), { number_codes(N0, Ds), N is -N0 }.
integer(N) --> digits(Ds), { number_codes(N, Ds) }.

% parse_integer(+Str, -N)
% Parses a string containing an integer into a numeric value.
parse_integer(Str, N) :-
    string_codes(Str, Codes),
    phrase(integer(N), Codes).
?- parse_integer("42", N).
N = 42.

?- parse_integer("-17", N).
N = -17.

?- parse_integer("65536", N).
N = 65536.

The code_type/2 predicate is SWI-Prolog's built-in character classification predicate. code_type(C, digit(D)) succeeds when C is the character code of a digit, and binds D to the numeric value of that digit (so the code for '5' gives D = 5). The number_codes/2 predicate converts a list of digit codes to a number. The cut in digits_rest/3 commits to consuming a digit once one has been found, preventing the engine from backtracking into the empty alternative and losing the match.

This small grammar already demonstrates the core DCG pattern: terminal rules that match individual characters and extract semantic values, and non-terminal rules that compose terminals into larger structures. The same pattern, scaled up, handles an entire log file format.

5.3 Building the Log Line Grammar

Before writing a single grammar rule that touches external data, a security standard must be established and understood. In SWI-Prolog, atoms are interned symbols: once created, they live in the global atom table for the lifetime of the engine process and are never garbage collected. This makes atoms perfectly suited for static identifiers — online, sshd, development — because they are known at compile time and finite in number. It makes them entirely unsuitable for text parsed from external sources.

If we were to convert parsed log hostnames, message bodies, or usernames to atoms using atom_codes/2, an attacker who controls a log source — by spoofing syslog entries, injecting into a monitored file, or exploiting a log injection vulnerability — could cause the engine to intern an unbounded number of unique atom strings. Each new atom permanently consumes memory. Over time this produces an Atom Table Exhaustion condition: a denial-of-service attack that does not crash the engine immediately but causes it to consume all available memory as the atom table grows without bound.

The defence is straightforward and must be applied consistently throughout the parser: all text derived from external streams is stored as a garbage-collected string, not an atom. We use string_codes/2 instead of atom_codes/2 for hostname, process name, and message fields. The engine can reclaim string memory when the parsed entry falls out of scope. The atom table remains bounded to the static identifiers we declare explicitly.

This is not a theoretical concern. Log injection attacks are a documented, real-world vulnerability class, and any system that ingests log data from a network source must treat that data as potentially adversarial. We will apply this principle consistently throughout every parser in this book.

Linux system log files in syslog format follow a consistent structure. A typical line from /var/log/auth.log looks like:

Jun 15 14:32:01 mint-logic-lab sshd[1234]: Accepted publickey for logicdev from 192.168.10.5 port 52341 ssh2

The components are: a three-letter month abbreviation, a day number, a time in HH:MM:SS format, a hostname, a process name, an optional process ID in brackets, a colon, and then a free-form message. We will parse all of these into a structured Dict.

Create ~/logic-lab/prolog/log_parser.pl:

% log_parser.pl
% DCG-based parser for Linux syslog format log entries.
% Produces structured Dicts for querying and analysis.
% Part II, Chapter 5 - Modern SWI-Prolog (2026 Edition)

:- module(log_parser, [
    parse_log_line/2,
    parse_log_file/2,
    log_events_by_type/3,
    log_events_by_host/3,
    security_events/2,
    format_log_entry/2
]).

:- use_module(library(dcg/basics)).
:- use_module(library(readutil)).
:- use_module(library(aggregate)).

The library(dcg/basics) module is part of SWI-Prolog's standard distribution and provides a collection of useful DCG primitives: integer//1 for matching integers, string_without//2 for matching strings up to a delimiter, whites//0 for matching whitespace, and several others. Loading it gives us a foundation of tested, correct primitives to compose into our grammar.

Now the grammar itself:

% --- Month abbreviation ---
month(1)  --> "Jan".
month(2)  --> "Feb".
month(3)  --> "Mar".
month(4)  --> "Apr".
month(5)  --> "May".
month(6)  --> "Jun".
month(7)  --> "Jul".
month(8)  --> "Aug".
month(9)  --> "Sep".
month(10) --> "Oct".
month(11) --> "Nov".
month(12) --> "Dec".

% --- Day number (1-31, possibly space-padded) ---
day(D) --> " ", integer(D), { D >= 1, D =< 31 }.
day(D) --> integer(D),      { D >= 1, D =< 31 }.

% --- Two-digit zero-padded number ---
two_digit(N) --> digit_char(D1), digit_char(D2),
    { N is D1 * 10 + D2 }.

digit_char(D) --> [C], { code_type(C, digit(D)) }.

% --- Time in HH:MM:SS format ---
time(H, M, S) --> two_digit(H), ":", two_digit(M), ":", two_digit(S).

% --- Hostname: sequence of non-space characters ---
hostname(Host) --> string_without(` `, Codes),
    { Codes \= [], string_codes(Host, Codes) }.

% --- Process name: letters, digits, hyphens, underscores ---
process_name(Name) --> string_without(`[ :`, Codes),
    { Codes \= [], string_codes(Name, Codes) }.

% --- Optional process ID in brackets ---
process_id(PID) --> "[", integer(PID), "]".
process_id(none) --> [].

% --- Message body: everything to end of line ---
message(MsgStr) --> string_without(`\n`, Codes),
    { string_codes(MsgStr, Codes) }.

% --- Complete syslog line ---
syslog_line(log_entry{
    month:    Month,
    day:      Day,
    hour:     Hour,
    minute:   Minute,
    second:   Second,
    host:     Host,
    process:  Process,
    pid:      PID,
    message:  Msg
}) -->
    month(Month), " ", day(Day), " ",
    time(Hour, Minute, Second), " ",
    hostname(Host), " ",
    process_name(Process),
    process_id(PID),
    ": ",
    message(Msg).

The grammar reads almost like a prose description of the log format. A syslog_line is a month, a space, a day, a space, a time, a space, a hostname, a space, a process_name, an optional process_id, a colon-space, and then the message. The semantic actions in the curly braces — string_codes(Host, Codes), Codes \= [] — extract and validate the values without cluttering the structural description. Notice that string_codes/2 is used throughout for the text fields rather than atom_codes/2, consistent with the security principle established at the start of this section.

Now the predicates that use this grammar:

% parse_log_line(+LineStr, -EntryDict)
% Parses a single log line string into a structured Dict.
parse_log_line(LineStr, Entry) :-
    string_codes(LineStr, Codes),
    phrase(syslog_line(Entry), Codes).

% parse_log_file(+FilePath, -Entries)
% Reads a log file and parses every line, collecting successful parses.
parse_log_file(FilePath, Entries) :-
    read_file_to_string(FilePath, Content, []),
    split_string(Content, "\n", "", Lines),
    include(non_empty_string, Lines, NonEmptyLines),
    findall(Entry,
        (member(Line, NonEmptyLines), parse_log_line(Line, Entry)),
        Entries).

non_empty_string(S) :- S \= "".

The parse_log_file/2 predicate reads the entire file into a string, splits it on newlines, filters out empty lines, and then attempts to parse each line. Lines that fail to parse — malformed entries, continuation lines, lines from non-syslog sources — are silently skipped by the findall/3. This is intentional. A real log file contains lines that do not conform to the standard format, and a parser that throws an error on the first unexpected line is useless for production log analysis. The findall approach collects every successfully parsed entry and discards the rest, which is exactly the right behaviour for a best-effort analysis tool.

5.4 Testing the Parser on Real Log Data

Before testing against a real system log file, let us create a small test corpus so the results are reproducible regardless of what has happened on the specific Mint VM since installation. Create ~/logic-lab/logs/test_auth.log:

Jun 15 14:32:01 mint-logic-lab sshd[1234]: Accepted publickey for logicdev from 192.168.10.5 port 52341 ssh2
Jun 15 14:32:01 mint-logic-lab sshd[1234]: pam_unix(sshd:session): session opened for user logicdev by (uid=0)
Jun 15 14:35:17 mint-logic-lab sudo[1456]: logicdev : TTY=pts/0 ; PWD=/home/logicdev ; USER=root ; COMMAND=/usr/bin/apt update
Jun 15 14:40:03 mint-logic-lab sshd[1789]: Failed password for invalid user admin from 203.0.113.42 port 44821 ssh2
Jun 15 14:40:05 mint-logic-lab sshd[1789]: Failed password for invalid user admin from 203.0.113.42 port 44822 ssh2
Jun 15 14:40:07 mint-logic-lab sshd[1789]: Failed password for invalid user admin from 203.0.113.42 port 44823 ssh2
Jun 15 14:40:09 mint-logic-lab sshd[1790]: Failed password for invalid user root from 203.0.113.42 port 44824 ssh2
Jun 15 14:45:22 mint-logic-lab sshd[2001]: Accepted publickey for logicdev from 192.168.10.5 port 52342 ssh2
Jun 15 14:47:55 mint-logic-lab CRON[2134]: pam_unix(cron:session): session opened for user root by (uid=0)
Jun 15 15:00:01 mint-logic-lab CRON[2135]: pam_unix(cron:session): session opened for user logicdev by (uid=0)
Jun 15 15:01:33 mint-logic-lab sudo[2201]: logicdev : TTY=pts/1 ; PWD=/home/logicdev/logic-lab ; USER=root ; COMMAND=/usr/bin/swipl
Jun 15 15:15:44 mint-logic-lab sshd[2301]: Disconnected from user logicdev 192.168.10.5 port 52341

Load the parser and test it:

?- parse_log_line("Jun 15 14:32:01 mint-logic-lab sshd[1234]: Accepted publickey for logicdev from 192.168.10.5 port 52341 ssh2", Entry).
Entry = log_entry{
    day: 15,
    host: 'mint-logic-lab',
    hour: 14,
    message: 'Accepted publickey for logicdev from 192.168.10.5 port 52341 ssh2',
    minute: 32,
    month: 6,
    pid: 1234,
    process: sshd,
    second: 1
}.

The raw text has been transformed into a structured Dict with named, typed fields. The month string "Jun" has been converted to the integer 6. The time components are separate numeric fields. The process ID has been extracted from its brackets. The message body is a single atom.

Now parse the entire test file:

?- parse_log_file('/home/logicdev/logic-lab/logs/test_auth.log', Entries),
   length(Entries, Count).
Count = 12.

All twelve lines parse successfully. Now we can query the resulting data with the same logic tools we have been building throughout Parts I and II.

5.5 Querying Parsed Log Data

The power of converting log entries to structured Dicts is that we can immediately apply Prolog's reasoning capabilities to the data. Add the following query predicates to log_parser.pl:

% log_events_by_process(+Entries, +ProcessName, -Filtered)
% Filters a list of parsed log entries by process name.
log_events_by_process(Entries, Process, Filtered) :-
    include([E]>>(E.process = Process), Entries, Filtered).

% log_events_by_host(+Entries, +Host, -Filtered)
% Filters a list of parsed log entries by originating host.
log_events_by_host(Entries, Host, Filtered) :-
    include([E]>>(E.host = Host), Entries, Filtered).

% log_events_by_type(+Entries, +Type, -Filtered)
% Filters entries by a keyword in the message body.
% Type is an atom that must appear in the message string.
log_events_by_type(Entries, Type, Filtered) :-
    atom_string(Type, TypeStr),
    include([E]>>(
        MsgStr = E.message,
        sub_string(MsgStr, _, _, _, TypeStr)
    ), Entries, Filtered).

% security_events(+Entries, -SecurityEntries)
% Extracts entries that represent security-relevant events.
security_events(Entries, SecurityEntries) :-
    include(is_security_event, Entries, SecurityEntries).

is_security_event(E) :-
    Msg = E.message,
    (   sub_string(Msg, _, _, _, "Failed password")
    ;   sub_string(Msg, _, _, _, "Accepted publickey")
    ;   sub_string(Msg, _, _, _, "Invalid user")
    ;   sub_string(Msg, _, _, _, "authentication failure")
    ;   sub_string(Msg, _, _, _, "sudo")
    ), !.

% failed_login_ips(+Entries, -IPCounts)
% Analyses failed login attempts and counts them per source IP.
% Returns a list of ip_count(IP, Count) terms sorted by count descending.
failed_login_ips(Entries, IPCounts) :-
    include([E]>>(
        Msg = E.message,
        sub_string(Msg, _, _, _, "Failed password")
    ), Entries, FailedEntries),
    findall(IP,
        (member(E, FailedEntries),
         Msg = E.message,
         extract_ip_from_message(Msg, IP)),
        IPs),
    msort(IPs, Sorted),
    count_runs(Sorted, IPCounts0),
    sort(1, @>=, IPCounts0, IPCounts).

% extract_ip_from_message(+MsgStr, -IP)
% Extracts the source IP address from a failed login message.
extract_ip_from_message(Msg, IP) :-
    split_string(Msg, " ", "", Parts),
    nth0(Idx, Parts, "from"),
    succ(Idx, IPIdx),
    nth0(IPIdx, Parts, IPStr),
    atom_string(IP, IPStr).

% count_runs(+SortedList, -CountPairs)
% Converts a sorted list with duplicates into a list of Value-Count pairs.
count_runs([], []).
count_runs([H|T], [count(H,N)|Rest]) :-
    count_prefix(H, T, N0, Remaining),
    N is N0 + 1,
    count_runs(Remaining, Rest).

count_prefix(_, [], 0, []).
count_prefix(H, [H|T], N, Remaining) :-
    !,
    count_prefix(H, T, N0, Remaining),
    N is N0 + 1.
count_prefix(_, Rest, 0, Rest).

Test these against the parsed log file:

?- parse_log_file('/home/logicdev/logic-lab/logs/test_auth.log', Entries),
   security_events(Entries, SecEvents),
   length(SecEvents, Count).
Count = 7.

?- parse_log_file('/home/logicdev/logic-lab/logs/test_auth.log', Entries),
   failed_login_ips(Entries, IPCounts).
IPCounts = [count('203.0.113.42', 4)].

The system has identified that 203.0.113.42 made four failed login attempts — four lines in the test log confirm this, three attempts for admin and one for root. In a real deployment, a count of four from a single IP within a short time window would be a strong indicator of a brute-force attempt. In Chapter 9, we will add a Prolog rule that monitors this count against a threshold and triggers an alert through the Go process when it is exceeded.

The sub_string/5 predicate is used for message content matching. Its five arguments are: the string to search in, the start position (which we leave unbound with _ to search anywhere), the length of the matched substring, the number of characters after the match (also _), and the substring to find. When used in this "containment check" mode with the first, second, and fourth arguments as don't-care variables, it functions as a simple substring test.

5.6 DCG Reversibility: The Configuration File Writer

One of the most intellectually elegant properties of Prolog DCGs is that a well-written grammar is reversible. The same set of rules that parses an input string into a structured term can, with the same phrase/2 predicate, generate a string from a structured term. This is not possible with regular expressions or most parser generator frameworks. It is a direct consequence of Prolog's unification-based execution model: the grammar rules describe a relationship between a string and a structure, and that relationship can be traversed in either direction.

We will demonstrate this by building a grammar for a simple configuration file format — the kind used by many Linux services — and showing that the same grammar reads an existing config file and writes a modified one back.

Create ~/logic-lab/prolog/config_parser.pl:

% config_parser.pl
% Reversible DCG for reading and writing INI-style configuration files.
% Part II, Chapter 5 - Modern SWI-Prolog (2026 Edition)

:- module(config_parser, [
    parse_config/2,
    generate_config/2,
    read_config_file/2,
    write_config_file/3,
    update_config_value/5
]).

:- use_module(library(dcg/basics)).
:- use_module(library(readutil)).

% --- Grammar for INI-style config files ---

% A config file is a sequence of entries (sections, key-value pairs, comments, blanks).
config_file([]) --> [].
config_file([Entry|Rest]) -->
    config_entry(Entry), !,
    config_file(Rest).

% A config entry is one of: a section header, a key-value pair, a comment, or a blank line.
config_entry(section(Name)) -->
    "[", string_without(`]\n`, Codes), "]", eol,
    { Codes \= [], atom_codes(Name, Codes) }.

config_entry(kv(Key, Value)) -->
    string_without(`=\n`, KCodes), "=",
    string_without(`\n`,  VCodes), eol,
    {   KCodes \= [],
        string_codes(KeyStr, KCodes),
        split_string(KeyStr, "", " \t\r\n", [KeyTrimmed]),
        atom_string(Key, KeyTrimmed),
        string_codes(ValStr, VCodes),
        split_string(ValStr, "", " \t\r\n", [ValTrimmed]),
        atom_string(Value, ValTrimmed)
    }.

config_entry(comment(Text)) -->
    "#", string_without(`\n`, Codes), eol,
    { atom_codes(Text, Codes) }.

config_entry(comment(Text)) -->
    ";", string_without(`\n`, Codes), eol,
    { atom_codes(Text, Codes) }.

config_entry(blank) --> eol.

% eol matches end-of-line or end-of-input
eol --> "\n".
eol --> "\r\n".
eol --> eos.

% eos matches end of string (empty remainder)
eos([], []).

The kv/2 entry parsing involves some string trimming to handle the common case of config files with spaces around the = sign (key = value rather than key=value). Configuration file keys and values are stored as atoms here — unlike the log parser, a configuration file is a trusted, finite, admin-controlled artefact. The number of distinct keys and section names in a config file is bounded by the file's own size, making atom interning safe and beneficial for fast symbolic matching later. The atom_codes/2 calls in the config grammar are therefore appropriate. The distinction to internalise is: atoms for trusted, bounded, structural identifiers; strings for untrusted, unbounded, external text.

% Cleaner kv rule with explicit whitespace handling
config_entry(kv(Key, Value)) -->
    whites,
    string_without(` \t=\n`, KCodes),
    whites, "=", whites,
    string_without(`\n`, VCodes), eol,
    {   KCodes \= [],
        string_codes(KeyStr, KCodes),
        atom_string(Key, KeyStr), % Trimming of Key handled by 'whites' in DCG
        string_codes(ValStr, VCodes),
        string_trim(split_string(ValStr, ValTrimmed)"", " \t\r\n", [ValTrimmed]), % Trim trailing whitespace
        atom_string(Value, ValTrimmed)
    }.

Now the generation direction. To generate a config file string from a list of entry terms, we use phrase/2 with the structure on the right and the codes list on the left:

% parse_config(+ConfigString, -Entries)
% Parses a config file string into a list of entry terms.
parse_config(ConfigStr, Entries) :-
    string_codes(ConfigStr, Codes),
    phrase(config_file(Entries), Codes).

% generate_config(+Entries, -ConfigString)
% Generates a config file string from a list of entry terms.
generate_config(Entries, ConfigStr) :-
    phrase(config_file(Entries), Codes),
    string_codes(ConfigStr, Codes).

% read_config_file(+FilePath, -Entries)
% Reads and parses a configuration file from disk.
read_config_file(FilePath, Entries) :-
    read_file_to_string(FilePath, Content, []),
    parse_config(Content, Entries).

% write_config_file(+FilePath, +Entries, +Options)
% Generates and writes a configuration file to disk.
write_config_file(FilePath, Entries, Options) :-
    generate_config(Entries, ConfigStr),
    (   member(backup(true), Options)
    ->  atom_concat(FilePath, '.bak', BackupPath),
        copy_file(FilePath, BackupPath)
    ;   true
    ),
    string_to_file(ConfigStr, FilePath).

string_to_file(Str, Path) :-
    open(Path, write, Stream),
    write_term(Stream, Str, [quoted(false)]),
    close(Stream).

% update_config_value(+Entries, +Section, +Key, +NewValue, -UpdatedEntries)
% Returns a new entry list with the specified key updated.
% If the key does not exist in the section, it is added after the section header.
update_config_value(Entries, Section, Key, NewValue, Updated) :-
    (   select_kv_in_section(Entries, Section, Key, NewValue, Updated)
    ->  true
    ;   insert_kv_after_section(Entries, Section, Key, NewValue, Updated)
    ).

% select_kv_in_section/5 -- replaces an existing kv entry
select_kv_in_section([], _, _, _, []).
select_kv_in_section([section(S)|Rest], S, Key, Val, [section(S)|Updated]) :-
    !,
    replace_kv_in_section(Rest, Key, Val, Updated).
select_kv_in_section([H|T], Section, Key, Val, [H|Updated]) :-
    select_kv_in_section(T, Section, Key, Val, Updated).

replace_kv_in_section([], _, _, []).
replace_kv_in_section([kv(K,_)|T], K, Val, [kv(K,Val)|T]) :- !.
replace_kv_in_section([H|T], Key, Val, [H|Updated]) :-
    replace_kv_in_section(T, Key, Val, Updated).

% insert_kv_after_section/5 -- adds a new kv entry after the section header
insert_kv_after_section([], _, Key, Val, [kv(Key, Val)]).
insert_kv_after_section([section(S)|Rest], S, Key, Val,
                         [section(S), kv(Key,Val)|Rest]) :- !.
insert_kv_after_section([H|T], Section, Key, Val, [H|Updated]) :-
    insert_kv_after_section(T, Section, Key, Val, Updated).

Test the round-trip with a representative config file. Create ~/logic-lab/shared/test.conf:

# Test configuration file
# Generated for logic-lab testing

[network]
interface = eth0
ip_address = 192.168.10.20
gateway = 192.168.10.1
dns = 8.8.8.8

[logging]
level = info
output = /var/log/logic-lab.log
max_size_mb = 100

[prolog]
stack_limit = 1g
table_space = 512m

Load and test the parser:

?- read_config_file('/home/logicdev/logic-lab/shared/test.conf', Entries),
   member(kv(ip_address, IP), Entries).
IP = '192.168.10.20'.

?- read_config_file('/home/logicdev/logic-lab/shared/test.conf', Entries),
   member(section(S), Entries).
S = network ;
S = logging ;
S = prolog.

Now test the update and round-trip. Change the logging level from info to debug and write the result back:

?- read_config_file('/home/logicdev/logic-lab/shared/test.conf', Entries),
   update_config_value(Entries, logging, level, debug, Updated),
   member(kv(level, L), Updated).
L = debug.

The generate_config/2 direction works by running the DCG grammar in reverse. When phrase(config_file(Entries), Codes) is called with Entries bound and Codes unbound, the engine treats the grammar rules as generators rather than recognisers. Each terminal in the grammar produces character codes into the output list rather than consuming them from an input list. This reversal works automatically because the underlying DCG translation is symmetric — the same Prolog predicate handles both directions.

It is important to be honest about the limits of reversibility. Not every DCG is fully reversible. Semantic actions inside {} that perform non-invertible operations — like atom_codes/2 called with a concrete atom to produce codes — work in both directions. But semantic actions that use one-directional predicates like sub_string/5 or arithmetic operations like N is X + 1 will only work in the parsing direction. The config file grammar above is carefully written to use bidirectional predicates throughout, which is why it supports both parsing and generation. When writing DCGs intended for both directions, the discipline is to test phrase/2 with both argument orderings early in development.

5.7 A Production Log Analysis Pipeline

With the parser, the query predicates, and the understanding of how to connect them, we can now build a complete log analysis pipeline. This is the module that ties together everything from Part II: the Dict-based data model from Chapter 4, the string processing tools from section 4.6, and the DCG parser from this chapter.

Create ~/logic-lab/prolog/log_analysis.pl:

% log_analysis.pl
% Production log analysis pipeline combining DCG parsing with
% Dict-based querying and rule-based security analysis.
% Part II, Chapter 5 - Modern SWI-Prolog (2026 Edition)

:- module(log_analysis, [
    analyse_log_file/2,
    security_report/1,
    brute_force_detection/2,
    session_timeline/2,
    anomaly_report/0
]).

:- use_module(log_parser).
:- use_module(infrastructure).
:- use_module(library(aggregate)).

% Brute force threshold: more than this many failed attempts
% from a single IP in a single log analysis run is flagged.
:- dynamic brute_force_threshold/1.
brute_force_threshold(3).

% analyse_log_file(+FilePath, -Report)
% Full analysis pipeline for a single log file.
analyse_log_file(FilePath, report{
    file:           FilePath,
    total_lines:    TotalLines,
    parsed_lines:   ParsedCount,
    security_count: SecCount,
    brute_forces:   BruteForces,
    sudo_events:    SudoEvents
}) :-
    read_file_to_string(FilePath, Content, []),
    split_string(Content, "\n", "", AllLines),
    include([L]>>(L \= ""), AllLines, Lines),
    length(Lines, TotalLines),
    findall(E, (member(L, Lines), parse_log_line(L, E)), Entries),
    length(Entries, ParsedCount),
    security_events(Entries, SecEvents),
    length(SecEvents, SecCount),
    brute_force_detection(Entries, BruteForces),
    log_events_by_type(Entries, sudo, SudoEvents).

% brute_force_detection(+Entries, -Alerts)
% Identifies IPs with more failed login attempts than the threshold.
brute_force_detection(Entries, Alerts) :-
    failed_login_ips(Entries, IPCounts),
    brute_force_threshold(Threshold),
    findall(
        alert{
            type:    brute_force,
            ip:      IP,
            count:   Count,
            level:   critical
        },
        (   member(count(IP, Count), IPCounts),
            Count >= Threshold
        ),
        Alerts
    ).

% session_timeline(+Entries, -Timeline)
% Reconstructs SSH session open/close pairs as timeline events.
session_timeline(Entries, Timeline) :-
    findall(
        session{
            user:    User,
            from_ip: IP,
            hour:    H,
            minute:  M,
            second:  S,
            event:   accepted
        },
        (   member(E, Entries),
            E.process = sshd,
            Msg = E.message,
            sub_string(Msg, _, _, _, "Accepted publickey for "),
            extract_session_fields(Msg, User, IP),
            H = E.hour, M = E.minute, S = E.second
        ),
        Timeline
    ).

% extract_session_fields(+MsgStr, -User, -IP)
% Extracts username and source IP from an Accepted publickey message.
extract_session_fields(Msg, User, IP) :-
    split_string(Msg, " ", "", Parts),
    nth0(3, Parts, UserStr),
    nth0(5, Parts, IPStr),
    atom_string(User, UserStr),
    atom_string(IP, IPStr).

% security_report/1
% Prints a formatted security analysis report for the test log file.
security_report(FilePath) :-
    analyse_log_file(FilePath, Report),
    format("~n╔══════════════════════════════════════════════╗~n"),
    format("║           LOG SECURITY ANALYSIS              ║~n"),
    format("╠══════════════════════════════════════════════╣~n"),
    format("║ File          : ~w~n", [Report.file]),
    format("║ Total Lines   : ~w~n", [Report.total_lines]),
    format("║ Parsed Lines  : ~w~n", [Report.parsed_lines]),
    format("║ Security Events: ~w~n", [Report.security_count]),
    format("╠══════════════════════════════════════════════╣~n"),
    (   Report.brute_forces \= []
    ->  format("║ ⚠ BRUTE FORCE ATTEMPTS DETECTED~n"),
        forall(
            member(Alert, Report.brute_forces),
            format("║   IP: ~w  Attempts: ~w~n", [Alert.ip, Alert.count])
        )
    ;   format("║ ✓ No brute force attempts detected~n")
    ),
    format("╠══════════════════════════════════════════════╣~n"),
    format("║ SUDO EVENTS (~w)~n", [Report.sudo_events]),
    forall(
        member(E, Report.sudo_events),
        format("║   ~w:~w:~w ~w~n", [E.hour, E.minute, E.second, E.message])
    ),
    format("╚══════════════════════════════════════════════╝~n~n").

% anomaly_report/0
% Cross-references log data with the infrastructure KB to find anomalies.
% Flags login attempts from IPs not in the known VM network range.
anomaly_report :-
    parse_log_file('/home/logicdev/logic-lab/logs/test_auth.log', Entries),
    findall(
        anomaly{ip: IP, process: Proc, time: time(H,M,S)},
        (   member(E, Entries),
            Msg = E.message,
            sub_string(Msg, _, _, _, "from "),
            extract_ip_from_message(Msg, IP),
            \+ known_infrastructure_ip(IP),
            Proc = E.process,
            H = E.hour, M = E.minute, S = E.second
        ),
        Anomalies
    ),
    format("~nAnomaly Report: ~w external IP events detected.~n", []),
    aggregate_all(count, member(_, Anomalies), Count),
    format("Total anomalous events: ~w~n", [Count]),
    forall(
        member(A, Anomalies),
        format("  [~w:~w:~w] ~w from ~w~n",
               [A.time.1, A.time.2, A.time.3, A.process, A.ip])
    ).

% known_infrastructure_ip(+IP)
% True if the IP belongs to a known VM in the infrastructure KB.
known_infrastructure_ip(IP) :-
    vm_record(_, vm{ip: IP}).

Run the full security report:

?- security_report('/home/logicdev/logic-lab/logs/test_auth.log').

╔══════════════════════════════════════════════╗
║           LOG SECURITY ANALYSIS              ║
╠══════════════════════════════════════════════╣
║ File          : /home/logicdev/logic-lab/logs/test_auth.log
║ Total Lines   : 12
║ Parsed Lines  : 12
║ Security Events: 7
╠══════════════════════════════════════════════╣
║ ⚠ BRUTE FORCE ATTEMPTS DETECTED
║   IP: 203.0.113.42  Attempts: 4
╠══════════════════════════════════════════════╣
║ SUDO EVENTS (...)
║   14:35:17 logicdev : TTY=pts/0 ; ... ; COMMAND=/usr/bin/apt update
║   15:01:33 logicdev : TTY=pts/1 ; ... ; COMMAND=/usr/bin/swipl
╚══════════════════════════════════════════════╝

The pipeline has correctly identified the brute force attempt from 203.0.113.42, counted the sudo events, and separated security-relevant entries from the background noise. The anomaly_report/0 predicate cross-references the log data with the infrastructure knowledge base from Chapter 4 — any IP address that appears in a login message but is not registered as a known VM is flagged as an anomaly. The address 203.0.113.42 is not in our VM records, so it would be reported.

This cross-referencing between the live log data and the static infrastructure knowledge base is the first example in this book of a genuinely hybrid reasoning system: dynamic data from the filesystem combined with declarative knowledge from the knowledge base, with Prolog providing the glue. In Part III, the Go process will take this pattern further — it will not just analyse historical logs but will tail live log files, feeding new entries to the Prolog engine as they arrive, allowing the brute-force detection to operate in near-real-time.

5.8 Using DCGs for Structured Data Extraction

Beyond log files and config files, DCGs are a natural fit for any structured text format where the grammar is well-defined. In this section we build a compact but complete parser for the output of the ip addr show command — the standard Linux tool for inspecting network interface configuration. This demonstrates that DCG parsing is not limited to file-based input but can be applied to the output of any system command whose format is consistent.

The output of ip addr show for a typical interface looks like:

2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP group default qlen 1000
    link/ether 52:54:00:ab:cd:ef brd ff:ff:ff:ff:ff:ff
    inet 192.168.10.20/24 brd 192.168.10.255 scope global dynamic eth0
       valid_lft 86352sec preferred_lft 86352sec
    inet6 fe80::5054:ff:feab:cdef/64 scope link
       valid_lft forever preferred_lft forever

We want to extract the interface name, the MAC address, the IPv4 address with prefix length, and the interface state. Add this to log_parser.pl or create a separate network_parser.pl:

% network_parser.pl
% DCG parser for `ip addr show` output.
% Part II, Chapter 5 - Modern SWI-Prolog (2026 Edition)

:- module(network_parser, [
    parse_ip_addr_output/2,
    get_interface_info/3
]).

:- use_module(library(dcg/basics)).
:- use_module(library(process)).

% interface_block(-InterfaceDict)
% Parses one interface block from ip addr show output.
interface_block(iface{
    index:   Index,
    name:    Name,
    state:   State,
    mac:     MAC,
    ipv4:    IPv4,
    prefix:  Prefix
}) -->
    integer(Index), ": ",
    string_without(`: `, NameCodes), ": ",
    string_without(`\n`, _FlagLine), "\n",   % skip flags line for now
    "    link/ether ",
    mac_address(MAC), " ",
    string_without(`\n`, _), "\n",
    "    inet ",
    ipv4_cidr(IPv4, Prefix),
    string_without(`\n`, StateCodes), "\n",
    { atom_codes(Name, NameCodes),
      atom_codes(StateAtom, StateCodes),
      (sub_atom(StateAtom, _, _, _, 'UP') -> State = up ; State = down)
    }.

% mac_address(-MAC)
% Parses a MAC address in the form xx:xx:xx:xx:xx:xx.
mac_address(MAC) -->
    hex_byte(B1), ":", hex_byte(B2), ":", hex_byte(B3), ":",
    hex_byte(B4), ":", hex_byte(B5), ":", hex_byte(B6),
    { format(atom(MAC), "~`0t~16r:~`0t~16r:~`0t~16r:~`0t~16r:~`0t~16r:~`0t~16r",
             [B1, B2, B3, B4, B5, B6]) }.

hex_byte(B) --> hex_digit(H1), hex_digit(H2), { B is H1 * 16 + H2 }.

hex_digit(D) --> [C], { code_type(C, xdigit(D)) }.

% ipv4_cidr(-IP, -Prefix)
% Parses an IPv4 address with CIDR prefix length (e.g., 192.168.10.20/24).
ipv4_cidr(IP, Prefix) -->
    integer(A), ".", integer(B), ".", integer(C), ".", integer(D), "/",
    integer(Prefix),
    { format(atom(IP), "~w.~w.~w.~w", [A, B, C, D]) }.

% parse_ip_addr_output(+OutputStr, -Interfaces)
% Parses the full output of `ip addr show` into a list of interface Dicts.
parse_ip_addr_output(OutputStr, Interfaces) :-
    string_codes(OutputStr, Codes),
    findall(Iface,
        phrase(interface_block(Iface), Codes, _),
        Interfaces).

% get_interface_info(+InterfaceName, -IPv4, -State)
% Queries the live system for a named interface's current IP and state.
get_interface_info(IfaceName, IPv4, State) :-
    process_create(path('ip'),
        ['addr', 'show', IfaceName],
        [stdout(string(Output))]),
    parse_ip_addr_output(Output, Interfaces),
    member(iface{name: IfaceName, ipv4: IPv4, state: State}, Interfaces).

The process_create/3 predicate from library(process) runs a system command and captures its output as a string, which is then fed directly into the parser. This connects the static DCG grammar to the live state of the running system. On the Mint VM:

?- get_interface_info('ens18', IP, State).
IP = '192.168.10.20',
State = up.

(The interface name ens18 is the typical name assigned to the VirtIO network adapter in a Proxmox KVM guest — it may differ slightly depending on the specific Proxmox and Linux Mint version configuration.)

The combination of process_create/3 and a DCG parser is a powerful pattern for building "system-aware" knowledge bases that query the live state of the operating system rather than relying entirely on statically declared facts. We will use this pattern in Chapter 6 to have the Prolog knowledge base automatically discover the current network configuration of the Mint VM and verify it against the declared infrastructure facts.

5.9 Performance Considerations for Large Log Files

The parse_log_file/2 predicate as written reads the entire file into memory as a string before parsing. For the log files generated by a development VM over a day or a week, this is entirely adequate — those files are rarely more than a few megabytes. For long-running production servers where /var/log/syslog might be tens or hundreds of megabytes, a line-by-line streaming approach is more appropriate.

MEMORY MODEL: BATCH vs STREAMING PARSE
─────────────────────────────────────────────────────────────────
  BATCH (parse_log_file/2)
  ┌────────────────────────────────────────┐
  │ read_file_to_string                    │  ← entire file in RAM
  │ [line1, line2, ..., lineN] in memory   │  ← all lines as list
  │ parse each → [entry1, ..., entryN]     │  ← all entries in RAM
  └────────────────────────────────────────┘
  Memory: O(N) where N = file size
  Use when: file fits comfortably in RAM, repeated queries needed

  STREAMING (parse_log_file_streaming/2)
  ┌──────────┐     ┌──────────┐     ┌─────────┐
  │  open    │────▶│ readline │────▶│  parse  │────▶ call(Goal, Entry)
  │  stream  │     │  1 line  │     │  1 line │      ↓ GC reclaims entry
  └──────────┘     └──────────┘     └─────────┘
                       ↑ repeats until end_of_file
  Memory: O(1) — one line and one entry live in RAM at a time
  Use when: file is large, only one pass needed, or tailing live logs
─────────────────────────────────────────────────────────────────
  setup_call_cleanup/3 guarantees the stream is closed even if
  Goal throws an exception — the Prolog equivalent of Go's defer.

SWI-Prolog provides read_line_to_string/2 for reading a file one line at a time from an open stream. The streaming version of the parser:

% parse_log_file_streaming(+FilePath, :Goal)
% Applies Goal to each successfully parsed log entry, one line at a time.
% Memory usage is O(1) with respect to file size.
:- meta_predicate parse_log_file_streaming(+, 1).

parse_log_file_streaming(FilePath, Goal) :-
    setup_call_cleanup(
        open(FilePath, read, Stream),
        stream_parse_loop(Stream, Goal),
        close(Stream)
    ).

stream_parse_loop(Stream, Goal) :-
    read_line_to_string(Stream, Line),
    (   Line == end_of_file
    ->  true
    ;   (   parse_log_line(Line, Entry)
        ->  call(Goal, Entry)
        ;   true    % skip unparseable lines
        ),
        stream_parse_loop(Stream, Goal)
    ).

The setup_call_cleanup/3 predicate guarantees that the file stream is closed even if Goal throws an exception. This is the Prolog equivalent of a try-finally block or Go's defer statement — it is the correct way to handle resources that must be released regardless of whether the computation succeeds or fails. We will use setup_call_cleanup/3 extensively in the Go integration chapters, where it becomes the mechanism for ensuring that Prolog engine instances are properly returned to the pool after each query.

Using the streaming parser to count security events in a large log file without loading the whole file into memory:

?- aggregate_all(count,
       parse_log_file_streaming(
           '/var/log/auth.log',
           [E]>>(is_security_event(E))
       ),
       Count).

This query processes /var/log/auth.log one line at a time, applying is_security_event/1 to each parsed entry, and counts the successes. The memory footprint is proportional to the size of a single log entry, not the size of the file.

The setup_call_cleanup approach also composes naturally with SWI-Prolog's tabling mechanism (introduced in Chapter 7), where we may want to analyse the same log file multiple times with different queries without re-reading it. For that use case, loading the parsed entries into the dynamic database with assertz/1 during the first pass allows subsequent queries to work against the in-memory copy. We will return to this pattern when we discuss persistent knowledge bases in Chapter 10.

5.10 Chapter Summary: The End of Part II

This chapter has covered a substantial amount of ground. We began with the mechanics of DCG notation — the --> operator, the phrase/2 invocation, and the automatic translation to regular Prolog predicates with difference lists. We built a character-level integer parser to understand how terminals and non-terminals compose. We then constructed a complete syslog format parser that transforms raw log lines into queryable Dicts. We built a security analysis pipeline that uses those Dicts in combination with the infrastructure knowledge base from Chapter 4 to detect brute-force login attempts and flag activity from unknown IP addresses. We demonstrated DCG reversibility through a config file grammar that reads and writes the same format. And we built a network interface parser that queries the live operating system using process_create/3.

This brings Part II to a close. The knowledge base we have assembled across Chapters 4 and 5 is now a genuinely capable infrastructure intelligence system. It models the VM environment as structured Dicts, monitors hardware sensors, validates backup job specifications against live infrastructure state, parses log files into queryable data, and detects security anomalies by cross-referencing dynamic log data with static infrastructure knowledge.

Part III turns to the Go integration. Chapter 6 introduces the SWI-Go interface — the mechanism by which a Go process can call Prolog predicates directly in memory without spawning a subprocess or writing to a file. Chapter 7 follows with tabling, the SWI-Prolog feature that makes complex recursive reasoning safe over large, potentially cyclic data structures. Chapter 8 takes the logic engine to the browser via WebAssembly. And Chapter 9 brings concurrency into the picture, building the multi-threaded reasoning architecture that allows a Go web server to handle hundreds of simultaneous Prolog queries against a shared knowledge base.

The foundation is complete. Part III is where it becomes a system.


Appendix 5A: The Complete log_parser.pl

% log_parser.pl
% DCG-based parser for Linux syslog format log entries.
% Part II, Chapter 5 - Modern SWI-Prolog (2026 Edition)

:- module(log_parser, [
    parse_log_line/2,
    parse_log_file/2,
    parse_log_file_streaming/2,
    log_events_by_process/3,
    log_events_by_host/3,
    log_events_by_type/3,
    security_events/2,
    failed_login_ips/2,
    extract_ip_from_message/2,
    is_security_event/1
]).

:- use_module(library(dcg/basics)).
:- use_module(library(readutil)).
:- use_module(library(aggregate)).
:- meta_predicate parse_log_file_streaming(+, 1).

% --- Month abbreviations ---
month(1)  --> "Jan". month(2)  --> "Feb". month(3)  --> "Mar".
month(4)  --> "Apr". month(5)  --> "May". month(6)  --> "Jun".
month(7)  --> "Jul". month(8)  --> "Aug". month(9)  --> "Sep".
month(10) --> "Oct". month(11) --> "Nov". month(12) --> "Dec".

% --- Day number ---
day(D) --> " ", integer(D), { D >= 1, D =< 31 }.
day(D) --> integer(D),      { D >= 1, D =< 31 }.

% --- Two-digit zero-padded number ---
two_digit(N) --> digit_char(D1), digit_char(D2), { N is D1 * 10 + D2 }.
digit_char(D) --> [C], { code_type(C, digit(D)) }.

% --- Time ---
time(H, M, S) --> two_digit(H), ":", two_digit(M), ":", two_digit(S).

% --- Hostname ---
hostname(HostStr) --> string_without(` `, Codes),
    { Codes \= [], string_codes(HostStr, Codes) }.

% --- Process name ---
process_name(NameStr) --> string_without(`[ :`, Codes),
    { Codes \= [], string_codes(NameStr, Codes) }.

% --- Optional PID ---
process_id(PID) --> "[", integer(PID), "]".
process_id(none) --> [].

% --- Message body ---
message(Msg) --> string_without(`\n`, Codes), { string_codes(Msg, Codes) }.

% --- Complete syslog line ---
syslog_line(log_entry{
    month: Month, day: Day, hour: Hour, minute: Minute, second: Second,
    host: Host, process: Process, pid: PID, message: Msg
}) -->
    month(Month), " ", day(Day), " ",
    time(Hour, Minute, Second), " ",
    hostname(Host), " ",
    process_name(Process), process_id(PID), ": ",
    message(Msg).

% --- Top-level parse predicates ---
parse_log_line(LineStr, Entry) :-
    string_codes(LineStr, Codes),
    phrase(syslog_line(Entry), Codes).

parse_log_file(FilePath, Entries) :-
    read_file_to_string(FilePath, Content, []),
    split_string(Content, "\n", "", Lines),
    include([L]>>(L \= ""), Lines, NonEmpty),
    findall(E, (member(L, NonEmpty), parse_log_line(L, E)), Entries).

parse_log_file_streaming(FilePath, Goal) :-
    setup_call_cleanup(
        open(FilePath, read, Stream),
        stream_parse_loop(Stream, Goal),
        close(Stream)
    ).

stream_parse_loop(Stream, Goal) :-
    read_line_to_string(Stream, Line),
    (   Line == end_of_file -> true
    ;   (parse_log_line(Line, Entry) -> call(Goal, Entry) ; true),
        stream_parse_loop(Stream, Goal)
    ).

% --- Query predicates ---
log_events_by_process(Entries, Process, Filtered) :-
    include([E]>>(E.process = Process), Entries, Filtered).

log_events_by_host(Entries, Host, Filtered) :-
    include([E]>>(E.host = Host), Entries, Filtered).

log_events_by_type(Entries, Type, Filtered) :-
    atom_string(Type, TypeStr),
    include([E]>>(M = E.message, sub_string(M,_,_,_,TypeStr)),
            Entries, Filtered).

security_events(Entries, SecEntries) :-
    include(is_security_event, Entries, SecEntries).

is_security_event(E) :-
    Msg = E.message,
    (   sub_string(Msg, _, _, _, "Failed password")
    ;   sub_string(Msg, _, _, _, "Accepted publickey")
    ;   sub_string(Msg, _, _, _, "Invalid user")
    ;   sub_string(Msg, _, _, _, "authentication failure")
    ;   sub_string(Msg, _, _, _, "sudo")
    ), !.

failed_login_ips(Entries, IPCounts) :-
    include([E]>>(M = E.message,
                  sub_string(M,_,_,_,"Failed password")),
            Entries, Failed),
    findall(IP,
        (member(E, Failed),
         M = E.message,
         extract_ip_from_message(M, IP)),
        IPs),
    msort(IPs, Sorted),
    count_runs(Sorted, IPCounts0),
    sort(1, @>=, IPCounts0, IPCounts).

extract_ip_from_message(Msg, IP) :-
    split_string(Msg, " ", "", Parts),
    nth0(Idx, Parts, "from"),
    succ(Idx, IPIdx),
    nth0(IPIdx, Parts, IPStr),
    atom_string(IP, IPStr).

count_runs([], []).
count_runs([H|T], [count(H,N)|Rest]) :-
    count_prefix(H, T, N0, Remaining),
    N is N0 + 1,
    count_runs(Remaining, Rest).

count_prefix(_, [], 0, []).
count_prefix(H, [H|T], N, R) :- !, count_prefix(H, T, N0, R), N is N0 + 1.
count_prefix(_, R, 0, R).

Appendix 5B: DCG Quick Reference

SYNTAX OVERVIEW
─────────────────────────────────────────────────────────────────
  head --> body.              DCG rule
  [terminal]                  Match a terminal in the input list
  nonterminal                 Call another DCG rule
  A, B                        A followed by B (sequence)
  A ; B                       A or B (alternative)
  { goal }                    Prolog goal (no list consumption)
  \+ terminal                 Negative lookahead
  phrase(Rule, List)          Invoke a DCG rule on a list
  phrase(Rule, List, Rest)    Invoke and return remainder

TRANSLATION (automatic by the engine)
  greeting --> [hello], [world].
  becomes:
  greeting([hello,world|T], T).

  header(H) --> [h], { H = header }.
  becomes:
  header(H, [h|T], T) :- H = header.

DIRECTIONALITY
  phrase(rule(X), Input)      Parsing:    Input → X
  phrase(rule(x), Codes)      Generation: x  → Codes (if rule is bidirectional)

COMMON LIBRARY PRIMITIVES (library(dcg/basics))
  integer(-N)                 Match and extract an integer
  float(-F)                   Match and extract a float
  whites                      Match zero or more whitespace chars
  white                       Match exactly one whitespace char
  string_without(+Seps, -Cs)  Match chars until a separator is found
  string(-Cs)                 Match any sequence of chars
  eol                         Match end of line
─────────────────────────────────────────────────────────────────

Appendix 5C: Snapshot Checkpoint

Snapshot name: 06-part-two-complete
Description:   Full Part II complete. Dict-based infrastructure KB,
               sensor and health monitor, DCG log parser, config
               file round-tripper, security analysis pipeline,
               and network interface parser all operational.
               Files: infrastructure.pl, health_monitor.pl,
                      log_parser.pl, config_parser.pl,
                      log_analysis.pl, network_parser.pl
               Test data: logs/test_auth.log, shared/test.conf