How-to: Run the Ingest Pipeline
Recipes for pulling data from various sources and writing it to a platform or workspace.
Blocklist → ThreatQ
from gnat.ingest import IngestPipeline
from gnat.ingest.sources.readers import PlainTextReader
from gnat.ingest.mappers.mappers import FlatIOCMapper
result = (
IngestPipeline("blocklist-daily")
.read_from(PlainTextReader("https://blocklist.example.com/ips.txt"))
.map_with(FlatIOCMapper(confidence=70, tlp_marking="white"))
.deduplicate(key_fields=["name"])
.write_to(threatq_client)
).run()
print(result) # IngestResult: 1247 records → 1247 mapped → 1201 written
TAXII feed → ThreatQ (incremental)
from gnat.ingest.sources.readers import TAXIICollectionReader
result = (
IngestPipeline("taxii-daily")
.read_from(TAXIICollectionReader(
collection,
added_after="2024-03-20T00:00:00Z",
))
.map_with(STIXPassthroughMapper(client=threatq_client))
.write_to(threatq_client)
).run()
CSV file → workspace
from gnat.ingest.sources.readers import CSVReader
from gnat.ingest.mappers.mappers import CSVIndicatorMapper
result = (
IngestPipeline("csv-import")
.read_from(CSVReader("threat_intel.csv"))
.map_with(CSVIndicatorMapper(
value_field = "ioc_value",
type_field = "ioc_type",
confidence_field = "score",
))
.write_to(threatq_client)
).run()
Splunk alerts → indicators (incremental)
from gnat.ingest.sources.readers import SplunkReader
result = (
IngestPipeline("splunk-alerts")
.read_from(SplunkReader(
splunk_client,
search='search index=security sourcetype=alerts earliest=-24h',
))
.map_with(SplunkResultMapper(
indicator_field = "dest_ip",
indicator_type = "ipv4",
confidence = 65,
))
.write_to(threatq_client)
).run()
See Also
Licensed under the Apache License, Version 2.0