Skip to the content.

ADR-0016: Feed Scheduling

Decision: Built-in threading scheduler (FeedScheduler) as the primary path, with export adapters for APScheduler and Celery.

Why not just APScheduler

APScheduler is excellent but adds a dependency and a learning curve for a feature analysts need to configure on day one. The built-in scheduler covers the common case (interval + cron) in ~200 lines. Teams that already have APScheduler or Celery can use the export adapters.

Reader factory pattern — the key to correct incremental ingestion

The reader_factory callable receives a JobRunContext on every run. This is the correct place to construct time-windowed readers:

def make_taxii_reader(ctx):
    return TAXIICollectionReader(
        collection,
        added_after=ctx.last_success_iso or "2024-01-01T00:00:00Z",
    )

ctx.last_success_iso is None on the first run (full backfill) and the ISO timestamp of the last successful completion on all subsequent runs. This means incremental ingestion is automatic — readers only fetch new data.

Why factory and not instance: Readers are not reusable across runs — they may hold open connections, carry file pointers, or depend on the added_after computed from the previous run. A factory produces a fresh reader each time with the correct parameters.

FeedJob state machine

enabled=False → always "skipped"
overlap=skip, lock unavailable → "skipped"
reader/pipeline raises → "failed", on_failure callback
result.errors non-empty → "partial", on_failure callback
all clear → "success", on_success callback, last_success_at updated

consecutive_failures counts backwards through history and resets to 0 on the first “success” or “partial” result. Use it for alerting thresholds: fire a PagerDuty alert when job.consecutive_failures >= 3.

Drift-corrected timing

The scheduler computes next_trigger = last_scheduled_at + interval rather than time.time() + interval. This means a 1-hour feed that takes 5 minutes to run still fires at the next hour mark, not 65 minutes after the last run. If the process was down and the next trigger is already in the past, it fires immediately without trying to backfill the missed runs.

Threading model

One daemon thread per job. Threads sleep in 1-second increments (not one long sleep) so stop() responds within ~1 second regardless of interval. start(run_immediately=True) is the right choice for startup backfill — it fires all jobs once before entering the normal schedule loop.

Overlap policy

"skip" (default): if a run is still executing when the next trigger fires, the new run is logged as “skipped”. Best for feeds where missing one run is acceptable and you don’t want a backlog of queued runs.

"queue": the new run waits for the current one to finish. Use for feeds where every run must complete, but beware that a slow source can cause unlimited queueing.

Config extras

pip install "gnat[schedule]" adds croniter for cron expression support. Interval-based jobs work with no extras. APScheduler and Celery adapters require those packages installed separately.

Quick-reference: adding a scheduled feed

from gnat.schedule import FeedJob, FeedScheduler
from gnat.ingest.sources.readers import PlainTextReader, TAXIICollectionReader
from gnat.ingest.mappers.mappers import FlatIOCMapper, STIXPassthroughMapper

# Stateless feed (blocklist)
blocklist = FeedJob(
    job_id="blocklist-hourly",
    reader_factory=lambda ctx: PlainTextReader("https://example.com/ips.txt"),
    mapper_factory=lambda ctx: FlatIOCMapper(confidence=70, tlp_marking="white"),
    interval_seconds=3600,
    client=tq_client,
)

# Incremental TAXII feed
taxii = FeedJob(
    job_id="taxii-daily",
    reader_factory=lambda ctx: TAXIICollectionReader(
        collection,
        added_after=ctx.last_success_iso or "2024-01-01T00:00:00Z",
    ),
    mapper_factory=lambda ctx: STIXPassthroughMapper(client=tq_client),
    cron="0 2 * * *",   # 02:00 daily — requires pip install "gnat[schedule]"
    client=tq_client,
    on_failure=lambda rec: logger.error("TAXII feed failed: %s", rec.error),
)

scheduler = FeedScheduler()
scheduler.add(blocklist)
scheduler.add(taxii)
scheduler.start(run_immediately=True)   # backfill on startup

# Health check
for status in scheduler.statuses():
    if not status["is_healthy"]:
        print(f"UNHEALTHY: {status['job_id']}{status['last_run_status']}")

Licensed under the Apache License, Version 2.0