Building Distributed Threat Intelligence at Scale
How we architected a CTI platform to process 100K+ daily events using Python, PostgreSQL, vector search, and AI/ML. A deep dive into distributed systems, performance optimization, and real-time threat analysis.
Building Distributed Threat Intelligence at Scale
As part of a major telecommunications provider's security team, I took on the challenge of architecting a CTI platform (Cyber Threat Intelligence)—a distributed system designed to ingest, analyze, and correlate threats at massive scale.
This post shares the architecture decisions, technical challenges, and performance optimizations that went into building a production-grade threat intelligence platform processing 100K+ events per day with sub-second query performance.
The Challenge
Traditional threat intelligence systems struggle with three core problems:
- Volume - Processing hundreds of thousands of threat indicators daily from OSINT, dark web, and commercial feeds
- Speed - Analysts need real-time insights, not hours-old data
- Context - Finding related threats across millions of historical records
Our goal: Build a system that handles enterprise-scale data while delivering intelligence in under 1 second.
Architecture Overview
High-Level System Design
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Data Sources │────▶│ Ingestion Layer │────▶│ Storage Layer │
│ (OSINT, etc) │ │ (Python APIs) │ │ (PostgreSQL) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
┌──────────────────┐ │
│ Analysis Engine │◀─────────────┘
│ (Multi-threaded) │
└──────────────────┘
│
┌───────▼──────────┐
│ AI/ML Services │
│ (Vertex AI) │
└──────────────────┘
│
┌───────▼──────────┐
│ Vector Search │
│ (pgvector) │
└──────────────────┘
Core Components
1. Distributed Data Ingestion Pipeline
We built a highly parallel ingestion system using Python microservices:
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import asyncio
@dataclass
class ThreatIndicator:
ioc_type: str
value: str
source: str
confidence: float
metadata: dict
class ThreatIngestor:
"""High-throughput threat data ingestion engine"""
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.batch_size = 1000
async def ingest_batch(self, indicators: list[ThreatIndicator]):
"""Process indicators in parallel batches"""
# Validate indicators
validated = await self.validate_batch(indicators)
# Enrich with OSINT data
enriched = await self.enrich_batch(validated)
# Generate embeddings for semantic search
embedded = await self.generate_embeddings(enriched)
# Bulk insert to database
await self.bulk_insert(embedded)
async def process_feed(self, feed_url: str):
"""Multi-threaded feed processing"""
raw_data = await self.fetch_feed(feed_url)
# Process in batches for optimal throughput
for batch in self.chunk(raw_data, self.batch_size):
await self.ingest_batch(batch)
Key Design Decisions:
- Multi-threaded processing - Handles I/O-bound operations (API calls, database writes) in parallel
- Batch processing - Groups 1000 indicators per batch for optimal database performance
- Asynchronous I/O - Uses
asynciofor non-blocking operations - Circuit breakers - Fails gracefully when external APIs are down
Result: Processing speed increased from 500 events/hour to 100K+ events/day.
Vector Search Implementation
The breakthrough feature of KingaSphere is semantic threat correlation using vector embeddings. Traditional keyword search can't find related threats with different wording—vector search solves this.
Why Vector Search?
Consider these related threats:
1. "Malicious PowerShell script detected on endpoint"
2. "Suspicious PS1 execution observed"
3. "Command-line obfuscation via encoded PowerShell"
Traditional keyword search would miss connections. Vector search understands they're all PowerShell-related threats.
Architecture with pgvector
-- PostgreSQL schema with vector support
CREATE EXTENSION vector;
CREATE TABLE threat_intelligence (
id SERIAL PRIMARY KEY,
ioc_value TEXT NOT NULL,
ioc_type VARCHAR(50),
description TEXT,
mitre_tactics TEXT[],
confidence_score FLOAT,
-- Vector embedding for semantic search
embedding VECTOR(1536),
-- Metadata
first_seen TIMESTAMP DEFAULT NOW(),
last_seen TIMESTAMP DEFAULT NOW(),
source VARCHAR(100),
-- Indexes for performance
CONSTRAINT unique_ioc UNIQUE (ioc_value, ioc_type)
);
-- IVFFlat index for fast similarity search
CREATE INDEX ON threat_intelligence
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
Generating Embeddings
We use Vertex AI's text embedding model for high-quality vectors:
from google.cloud import aiplatform
from typing import List
class EmbeddingGenerator:
"""Generate vector embeddings using Vertex AI"""
def __init__(self):
aiplatform.init(project="enterprise-cti")
self.model = "textembedding-gecko@003"
async def generate(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for batch of texts"""
# Vertex AI supports batch processing
response = await aiplatform.TextEmbeddingModel(
self.model
).get_embeddings(texts)
return [emb.values for emb in response]
async def embed_threat(self, threat: dict) -> List[float]:
"""Create rich context for threat embedding"""
# Combine multiple fields for better context
context = f"""
IOC Type: {threat['ioc_type']}
Value: {threat['ioc_value']}
Description: {threat['description']}
MITRE Tactics: {', '.join(threat['mitre_tactics'])}
"""
embeddings = await self.generate([context])
return embeddings[0]
Semantic Search Query
async def find_similar_threats(
query: str,
limit: int = 10,
similarity_threshold: float = 0.7
):
"""Find threats similar to query using vector search"""
# Generate query embedding
query_embedding = await embed_text(query)
# Similarity search using cosine distance
results = await db.execute("""
SELECT
id,
ioc_value,
description,
mitre_tactics,
1 - (embedding <=> $1::vector) AS similarity
FROM threat_intelligence
WHERE 1 - (embedding <=> $1::vector) > $2
ORDER BY embedding <=> $1::vector
LIMIT $3
""", query_embedding, similarity_threshold, limit)
return results
Performance Metrics:
- Query time: < 50ms for searches across 1M+ records
- Accuracy: 95% relevant results in top 10
- Index size: ~4GB for 1M vectors (1536 dimensions)
MITRE ATT&CK Mapping with AI
One of the most powerful features is automatic MITRE ATT&CK framework mapping using Gemini 2.5 Pro.
Automated TTP Classification
from vertexai.preview.generative_models import GenerativeModel
class MITREMapper:
"""AI-powered MITRE ATT&CK technique extraction"""
def __init__(self):
self.model = GenerativeModel("gemini-2.5-pro-002")
async def extract_ttps(self, threat_description: str) -> dict:
"""Extract tactics, techniques, and procedures using AI"""
prompt = f"""
Analyze this threat intelligence and extract MITRE ATT&CK information:
Threat: {threat_description}
Return JSON with:
- tactics: List of MITRE tactics
- techniques: List of technique IDs (e.g., T1059.001)
- procedures: Specific procedures observed
- severity: HIGH/MEDIUM/LOW
- explanation: Brief reasoning
"""
response = await self.model.generate_content_async(prompt)
return self.parse_response(response.text)
async def classify_threat(self, threat: dict) -> dict:
"""Full threat classification pipeline"""
# Extract TTPs
ttps = await self.extract_ttps(threat['description'])
# Enrich with context
threat['mitre_tactics'] = ttps['tactics']
threat['mitre_techniques'] = ttps['techniques']
threat['severity'] = ttps['severity']
threat['ai_analysis'] = ttps['explanation']
return threat
Natural Language Threat Summarization
Analysts don't want raw data—they want actionable insights:
async def generate_threat_summary(threats: List[dict]) -> str:
"""Generate executive summary of threat landscape"""
prompt = f"""
Analyze these {len(threats)} threat indicators:
{format_threats(threats)}
Create an executive summary covering:
1. Most critical threats
2. Common attack patterns
3. Recommended actions
4. Affected systems/services
Format for security analysts, be concise.
"""
model = GenerativeModel("gemini-2.5-pro-002")
response = await model.generate_content_async(prompt)
return response.text
Result: Threat analysis time reduced from 2 hours to 5 minutes per incident.
Performance Optimization Techniques
1. Database Query Optimization
# Before: N+1 query problem
for threat in threats:
related = db.query(
"SELECT * FROM relationships WHERE threat_id = ?",
threat.id
) # 1000 threats = 1000 queries!
# After: Batch loading
threat_ids = [t.id for t in threats]
all_relationships = db.query("""
SELECT * FROM relationships
WHERE threat_id = ANY($1)
""", threat_ids) # 1 query for all threats!
2. Redis Caching Strategy
import redis
from functools import wraps
redis_client = redis.Redis()
def cache_result(ttl: int = 3600):
"""Cache expensive operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{func.__name__}:{hash(args)}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Execute function
result = await func(*args, **kwargs)
# Store in cache
redis_client.setex(
cache_key,
ttl,
json.dumps(result)
)
return result
return wrapper
return decorator
@cache_result(ttl=3600)
async def get_threat_stats(time_range: str):
"""Expensive aggregation query - cache for 1 hour"""
return await db.execute("""
SELECT
ioc_type,
COUNT(*) as count,
AVG(confidence_score) as avg_confidence
FROM threat_intelligence
WHERE first_seen > NOW() - INTERVAL $1
GROUP BY ioc_type
""", time_range)
3. Connection Pooling
from psycopg_pool import AsyncConnectionPool
# Connection pool for PostgreSQL
pool = AsyncConnectionPool(
conninfo="postgresql://user:pass@host/db",
min_size=10,
max_size=50,
timeout=30
)
async def execute_query(query: str, params: tuple):
"""Execute query with connection pooling"""
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchall()
Performance Improvements:
- API response time: 60% reduction (from 500ms to 200ms)
- Database connections: 50% reduction (from 100 to 50 concurrent)
- Memory usage: 40% reduction through efficient connection reuse
Integration with SIEM/EDR Platforms
KingaSphere integrates with enterprise security tools for automated response:
from crowdstrike import FalconAPI
class IncidentOrchestrator:
"""Automated incident response orchestration"""
def __init__(self):
self.falcon = FalconAPI()
async def respond_to_threat(self, threat_id: str):
"""Automated incident response workflow"""
# Get threat details
threat = await db.get_threat(threat_id)
# Check if IOC exists in environment
detections = await self.falcon.search_ioc(
threat['ioc_value']
)
if detections:
# Create incident
incident = await self.create_incident(threat, detections)
# Execute response playbook
if threat['severity'] == 'HIGH':
# Quarantine affected hosts
await self.falcon.contain_hosts(
[d['host_id'] for d in detections]
)
# Alert SOC team
await self.send_alert(incident)
return incident
Result: Incident response time improved by 95% (from hours to minutes).
Monitoring and Observability
We use Prometheus and Grafana for real-time monitoring:
from prometheus_client import Counter, Histogram
# Metrics
threats_processed = Counter(
'threats_processed_total',
'Total threats processed',
['source', 'ioc_type']
)
processing_time = Histogram(
'threat_processing_seconds',
'Time to process threat',
buckets=[.1, .5, 1.0, 2.5, 5.0, 10.0]
)
@processing_time.time()
async def process_threat(threat: dict):
"""Process threat with metric collection"""
try:
result = await analyze_threat(threat)
# Record success
threats_processed.labels(
source=threat['source'],
ioc_type=threat['ioc_type']
).inc()
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
Lessons Learned
1. Start with PostgreSQL, Not a "Specialized" Database
We initially considered dedicated vector databases (Pinecone, Weaviate), but PostgreSQL with pgvector proved superior:
✅ Single database for both relational and vector data ✅ ACID compliance for critical threat intelligence ✅ Mature ecosystem with extensive tooling ✅ Lower operational complexity
2. AI/ML Isn't Magic—It Needs Good Data
Our first AI models had 60% accuracy. After improving data quality:
- Cleaned duplicate indicators
- Enriched context with multiple sources
- Added confidence scores
- Implemented human-in-the-loop validation
Result: Accuracy improved to 95%.
3. Optimize for P99, Not Average
Average response time looked great (200ms), but P99 was 5 seconds. We found:
- Slow queries on large threat campaigns
- Occasional database connection timeouts
- Unoptimized vector searches
After optimization, P99 dropped to < 1 second.
Key Metrics
After 6 months of operation:
| Metric | Value | | ------------------------------ | ---------- | | Daily Events Processed | 100K+ | | Query Response Time (P50) | 50ms | | Query Response Time (P99) | < 1s | | AI Classification Accuracy | 95% | | Incident Response Time | 95% faster | | System Uptime | 99.9% | | Database Size | 10TB+ | | Active Threat Indicators | 5M+ |
Future Roadmap
We're continuing to evolve the CTI platform:
- Graph database integration - Model complex threat relationships
- Predictive threat modeling - ML models to predict attack patterns
- Automated threat hunting - AI-driven proactive threat discovery
- Multi-tenancy - Support for multiple business units
- Real-time streaming - Sub-second threat propagation
Conclusion
Building distributed threat intelligence at scale requires:
✅ Strong architectural foundations - Microservices, async processing, connection pooling
✅ Modern AI/ML - Vector search, LLM integration, automated classification
✅ Performance obsession - Caching, query optimization, monitoring
✅ Continuous iteration - Start small, measure, optimize, scale
The CTI platform now processes 100K+ threats daily, provides sub-second intelligence retrieval, and has reduced incident response times by 95%—protecting enterprise customers and critical infrastructure.
Want to discuss threat intelligence architecture? Connect with me on LinkedIn or check out my work on GitHub.
Building something similar? I'm happy to share more technical details—reach out!