Skip to the content.

Tutorial 2: Write a Custom Adapter

In this tutorial you will write a new EventAdapter from scratch that reads network flow records from a JSON-lines log file. By the end you will have:

This tutorial builds on Tutorial 1. You should already have SenseGNAT installed (pip install -e .) and have run the built-in example at least once.


What an adapter does

SenseGNAT’s detection pipeline starts with an EventAdapter. Its only job is to yield NormalizedNetworkEvent objects — it does not care where the data comes from. The rest of the pipeline (profiler, detectors, connector) never touches raw log files. This separation means you can add support for any log source — syslog, a REST API, a Kafka topic, a database query — by writing one class.

The abstract base class lives in sensegnat/ingestion/base.py:

from abc import ABC, abstractmethod
from collections.abc import Iterable
from sensegnat.models.events import NormalizedNetworkEvent

class EventAdapter(ABC):
    @abstractmethod
    def fetch_events(self) -> Iterable[NormalizedNetworkEvent]:
        raise NotImplementedError

One method, one responsibility: given a source, produce a sequence of normalized events. The rest is up to you.


The event model

Every event you yield must be a NormalizedNetworkEvent. It is a frozen dataclass defined in sensegnat/models/events.py:

@dataclass(frozen=True)
class NormalizedNetworkEvent:
    event_id: str            # unique identifier for this log line
    seen_at: datetime        # timezone-aware timestamp
    source_host: str         # originating host name or IP
    source_user: str | None  # user identity, or None if not available
    destination: str         # destination IP or hostname
    destination_port: int    # destination TCP/UDP port
    protocol: str            # "tcp", "udp", "icmp", etc.
    bytes_out: int = 0       # bytes sent from source to destination
    bytes_in: int = 0        # bytes received from destination

A few important details:


Step 1 — Choose a log format

You are going to parse JSON-lines (.jsonl) files. Each line is a self-contained JSON object representing one network flow. This format is common in modern observability stacks (Vector, Fluent Bit, Elastic Beats).

Here is the schema you will support:

{
  "id": "unique-string",
  "ts": "2026-04-21T09:00:00Z",
  "src_host": "workstation-7",
  "src_user": "bob",
  "dst_ip": "203.0.113.55",
  "dst_port": 443,
  "proto": "tcp",
  "sent_bytes": 4096,
  "recv_bytes": 18200
}

All fields except src_user are required in every record. src_user may be absent or null.


Step 2 — Create the sample data file

Create a directory for your experiment:

mkdir -p ~/sensegnat-demo

Create the sample log file at ~/sensegnat-demo/flows.jsonl. Each line is one JSON object. Copy this exactly:

{"id": "flow-001", "ts": "2026-04-21T08:00:00Z", "src_host": "workstation-7", "src_user": "bob", "dst_ip": "203.0.113.55", "dst_port": 443, "proto": "tcp", "sent_bytes": 4096, "recv_bytes": 18200}
{"id": "flow-002", "ts": "2026-04-21T08:01:15Z", "src_host": "workstation-7", "src_user": "bob", "dst_ip": "203.0.113.55", "dst_port": 443, "proto": "tcp", "sent_bytes": 512, "recv_bytes": 900}
{"id": "flow-003", "ts": "2026-04-21T08:02:30Z", "src_host": "workstation-7", "src_user": "bob", "dst_ip": "198.51.100.1", "dst_port": 80, "proto": "tcp", "sent_bytes": 256, "recv_bytes": 4400}

These three flows establish bob’s baseline: two connections to 203.0.113.55:443 and one to 198.51.100.1:80. You will use them in the first run to build his profile.

Create a second file at ~/sensegnat-demo/flows2.jsonl for the second run:

{"id": "flow-004", "ts": "2026-04-21T09:00:00Z", "src_host": "workstation-7", "src_user": "bob", "dst_ip": "203.0.113.55", "dst_port": 443, "proto": "tcp", "sent_bytes": 1024, "recv_bytes": 500}
{"id": "flow-005", "ts": "2026-04-21T09:01:00Z", "src_host": "workstation-7", "src_user": "bob", "dst_ip": "192.0.2.200", "dst_port": 8443, "proto": "tcp", "sent_bytes": 12288, "recv_bytes": 300}

Flow flow-004 revisits 203.0.113.55 — already in bob’s profile, so no finding. Flow flow-005 goes to 192.0.2.200 — a destination bob has never been seen at. That will trigger a rare-destination finding.


Step 3 — Write the adapter

Create ~/sensegnat-demo/my_adapter.py with the following content:

from __future__ import annotations

import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from uuid import uuid4

from sensegnat.ingestion.base import EventAdapter
from sensegnat.models.events import NormalizedNetworkEvent


class JsonLinesAdapter(EventAdapter):
    """Reads NormalizedNetworkEvent records from a JSON-lines (.jsonl) file.

    Expected record schema:
        {
            "id":         str,            # unique flow identifier
            "ts":         str,            # ISO 8601 timestamp (UTC assumed)
            "src_host":   str,            # originating host
            "src_user":   str | null,     # optional user identity
            "dst_ip":     str,            # destination IP or hostname
            "dst_port":   int,            # destination port
            "proto":      str,            # transport protocol
            "sent_bytes": int,            # bytes sent (optional, default 0)
            "recv_bytes": int             # bytes received (optional, default 0)
        }
    """

    def __init__(self, path: Path) -> None:
        self._path = Path(path)

    def fetch_events(self) -> Iterable[NormalizedNetworkEvent]:
        with self._path.open() as fh:
            for lineno, line in enumerate(fh, start=1):
                line = line.strip()
                if not line:
                    continue
                record = json.loads(line)
                yield self._parse_record(record, lineno)

    @staticmethod
    def _parse_record(record: dict, lineno: int) -> NormalizedNetworkEvent:
        # Parse the timestamp. fromisoformat handles "2026-04-21T08:00:00Z"
        # in Python 3.11+. Older Pythons need the Z replaced with +00:00.
        raw_ts = record["ts"].replace("Z", "+00:00")
        seen_at = datetime.fromisoformat(raw_ts)
        if seen_at.tzinfo is None:
            seen_at = seen_at.replace(tzinfo=timezone.utc)

        # source_user may be missing from the dict entirely, or explicitly null
        source_user = record.get("src_user") or None

        return NormalizedNetworkEvent(
            event_id=record.get("id") or str(uuid4()),
            seen_at=seen_at,
            source_host=record["src_host"],
            source_user=source_user,
            destination=record["dst_ip"],
            destination_port=int(record["dst_port"]),
            protocol=record["proto"].lower(),
            bytes_out=int(record.get("sent_bytes") or 0),
            bytes_in=int(record.get("recv_bytes") or 0),
        )

Walk through the key decisions in _parse_record:


Step 4 — Wire the adapter into SenseGNATService

Create ~/sensegnat-demo/run_demo.py:

from __future__ import annotations

from pathlib import Path

from sensegnat.api.service import SenseGNATService
from my_adapter import JsonLinesAdapter


FLOWS_RUN1 = Path("~/sensegnat-demo/flows.jsonl").expanduser()
FLOWS_RUN2 = Path("~/sensegnat-demo/flows2.jsonl").expanduser()


def print_records(records: list[dict]) -> None:
    if not records:
        print("  (no records published)")
        return
    for r in records:
        rtype = r.get("type")
        if rtype == "indicator":
            print(f"  [indicator] {r['x_gnat_signature']}")
            print(f"    subject  : {r['x_sensegnat_subject_id']}")
            print(f"    severity : {r['x_sensegnat_severity']}")
            print(f"    score    : {r['x_sensegnat_score']}")
            print(f"    summary  : {r['x_sensegnat_summary']}")
            print(f"    evidence : {r['x_sensegnat_evidence']}")
        elif rtype == "note":
            print(f"  [note]      {r['content']}")
        print()


# --- Run 1: build bob's profile from flows.jsonl ---
print("=== Run 1 — building baseline ===")
service = SenseGNATService(adapter=JsonLinesAdapter(FLOWS_RUN1))
records = service.run_once()
print_records(records)

# --- Run 2: feed flows2.jsonl — flow-005 will fire a finding ---
print("=== Run 2 — detecting anomaly ===")
service.adapter = JsonLinesAdapter(FLOWS_RUN2)
records = service.run_once()
print_records(records)

Notice that you reuse the same service instance across both runs. The InMemoryProfileStore inside service accumulates profiles between runs, which is exactly what you need to simulate a real multi-batch deployment.


Step 5 — Run it

cd ~/sensegnat-demo
python run_demo.py

Expected output:

=== Run 1 — building baseline ===
  (no records published)

=== Run 2 — detecting anomaly ===
  [indicator] rare-destination
    subject  : bob
    severity : medium
    score    : 0.65
    summary  : bob contacted a rare destination 192.0.2.200
    evidence : {'destination': '192.0.2.200', 'port': '8443', 'protocol': 'tcp'}

  [note]      bob: 1 finding(s) — rare-destination. Severity: medium, peak score: 0.65.

Run 1 published nothing — bob had no prior profile, so RareDestinationDetector returned None for all three flows. The profiler recorded 203.0.113.55 and 198.51.100.1 as bob’s known destinations.

Run 2 published two records:


Step 6 — Handle errors robustly

The adapter above will raise a json.JSONDecodeError if any line is malformed, and a KeyError if a required field is missing. For a production adapter you should handle these explicitly:

def fetch_events(self) -> Iterable[NormalizedNetworkEvent]:
    with self._path.open() as fh:
        for lineno, line in enumerate(fh, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                record = json.loads(line)
            except json.JSONDecodeError as exc:
                raise ValueError(
                    f"{self._path}:{lineno}: invalid JSON — {exc}"
                ) from exc
            try:
                yield self._parse_record(record, lineno)
            except KeyError as exc:
                raise ValueError(
                    f"{self._path}:{lineno}: missing required field {exc}"
                ) from exc

This follows SenseGNAT’s convention: no bare except. Surface errors with context — file name and line number — so the caller knows exactly where to look.


Step 7 — Extend the adapter

Supporting multiple files

If your log rotation produces one file per hour, accept a list of paths:

class JsonLinesAdapter(EventAdapter):
    def __init__(self, paths: list[Path]) -> None:
        self._paths = [Path(p) for p in paths]

    def fetch_events(self) -> Iterable[NormalizedNetworkEvent]:
        for path in self._paths:
            with path.open() as fh:
                for lineno, line in enumerate(fh, start=1):
                    line = line.strip()
                    if not line:
                        continue
                    yield self._parse_record(json.loads(line), lineno)

Wire it in:

from pathlib import Path
service = SenseGNATService(
    adapter=JsonLinesAdapter(sorted(Path("/var/log/flows").glob("*.jsonl")))
)

Supporting a different timestamp format

If your source emits Unix epoch floats instead of ISO 8601 strings, swap the timestamp parsing line:

seen_at = datetime.fromtimestamp(float(record["ts"]), tz=timezone.utc)

The existing CsvEventAdapter (sensegnat/ingestion/csv_adapter.py) uses the same pattern — try ISO 8601 first and fall back to epoch float:

try:
    seen_at = datetime.fromisoformat(raw_ts).replace(tzinfo=timezone.utc)
except ValueError:
    seen_at = datetime.fromtimestamp(float(raw_ts), tz=timezone.utc)

Adding host-only events (no user field)

If your log source does not record user identities, set source_user=None consistently. SenseGNAT will then use source_host as the subject ID, and profiles will be per-host rather than per-user.


Step 8 — Add a third run to watch the profile grow

Add a third run to run_demo.py to see how the profile accumulates:

import json
from pathlib import Path

FLOWS_RUN3 = Path("~/sensegnat-demo/flows3.jsonl").expanduser()

# Write flows3.jsonl inline for the demo
FLOWS_RUN3.write_text(
    '{"id": "flow-006", "ts": "2026-04-21T10:00:00Z", '
    '"src_host": "workstation-7", "src_user": "bob", '
    '"dst_ip": "192.0.2.200", "dst_port": 8443, "proto": "tcp", '
    '"sent_bytes": 500, "recv_bytes": 100}\n'
)

print("=== Run 3 — same destination as run 2 finding ===")
service.adapter = JsonLinesAdapter(FLOWS_RUN3)
records = service.run_once()
print_records(records)

Expected output for run 3:

=== Run 3 — same destination as run 2 finding ===
  (no records published)

192.0.2.200 was added to bob’s profile at the end of run 2 (put_many merges via BehaviorProfile.merge()). On run 3 it is already known, so no finding fires. This is how SenseGNAT learns: each run that does not produce a finding expands the baseline so that the same connection is never flagged twice.


Checklist: what makes a good adapter

Before you use your adapter in production, verify: