Back to blog
Cybersecurity

Building Distributed Threat Intelligence at Scale

How we architected a CTI platform to process 100K+ daily events using Python, PostgreSQL, vector search, and AI/ML. A deep dive into distributed systems, performance optimization, and real-time threat analysis.

A
Ammly Kinyua
Software Engineer @ Safaricom
September 20, 2025
10 min read
Threat Intelligence
Distributed Systems
Python
Vector Search
AI/ML
PostgreSQL
Microservices
Share:
Building Distributed Threat Intelligence at Scale

Building Distributed Threat Intelligence at Scale

As part of a major telecommunications provider's security team, I took on the challenge of architecting a CTI platform (Cyber Threat Intelligence)—a distributed system designed to ingest, analyze, and correlate threats at massive scale.

This post shares the architecture decisions, technical challenges, and performance optimizations that went into building a production-grade threat intelligence platform processing 100K+ events per day with sub-second query performance.

The Challenge

Traditional threat intelligence systems struggle with three core problems:

  1. Volume - Processing hundreds of thousands of threat indicators daily from OSINT, dark web, and commercial feeds
  2. Speed - Analysts need real-time insights, not hours-old data
  3. Context - Finding related threats across millions of historical records

Our goal: Build a system that handles enterprise-scale data while delivering intelligence in under 1 second.

Architecture Overview

High-Level System Design

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  Data Sources   │────▶│  Ingestion Layer │────▶│  Storage Layer  │
│  (OSINT, etc)   │     │   (Python APIs)  │     │  (PostgreSQL)   │
└─────────────────┘     └──────────────────┘     └─────────────────┘
                                                           │
                        ┌──────────────────┐              │
                        │  Analysis Engine │◀─────────────┘
                        │ (Multi-threaded) │
                        └──────────────────┘
                                │
                        ┌───────▼──────────┐
                        │  AI/ML Services  │
                        │  (Vertex AI)     │
                        └──────────────────┘
                                │
                        ┌───────▼──────────┐
                        │  Vector Search   │
                        │  (pgvector)      │
                        └──────────────────┘

Core Components

1. Distributed Data Ingestion Pipeline

We built a highly parallel ingestion system using Python microservices:

from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import asyncio

@dataclass
class ThreatIndicator:
    ioc_type: str
    value: str
    source: str
    confidence: float
    metadata: dict

class ThreatIngestor:
    """High-throughput threat data ingestion engine"""

    def __init__(self, max_workers: int = 10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.batch_size = 1000

    async def ingest_batch(self, indicators: list[ThreatIndicator]):
        """Process indicators in parallel batches"""
        # Validate indicators
        validated = await self.validate_batch(indicators)

        # Enrich with OSINT data
        enriched = await self.enrich_batch(validated)

        # Generate embeddings for semantic search
        embedded = await self.generate_embeddings(enriched)

        # Bulk insert to database
        await self.bulk_insert(embedded)

    async def process_feed(self, feed_url: str):
        """Multi-threaded feed processing"""
        raw_data = await self.fetch_feed(feed_url)

        # Process in batches for optimal throughput
        for batch in self.chunk(raw_data, self.batch_size):
            await self.ingest_batch(batch)

Key Design Decisions:

  • Multi-threaded processing - Handles I/O-bound operations (API calls, database writes) in parallel
  • Batch processing - Groups 1000 indicators per batch for optimal database performance
  • Asynchronous I/O - Uses asyncio for non-blocking operations
  • Circuit breakers - Fails gracefully when external APIs are down

Result: Processing speed increased from 500 events/hour to 100K+ events/day.

Vector Search Implementation

The breakthrough feature of KingaSphere is semantic threat correlation using vector embeddings. Traditional keyword search can't find related threats with different wording—vector search solves this.

Why Vector Search?

Consider these related threats:

1. "Malicious PowerShell script detected on endpoint"
2. "Suspicious PS1 execution observed"
3. "Command-line obfuscation via encoded PowerShell"

Traditional keyword search would miss connections. Vector search understands they're all PowerShell-related threats.

Architecture with pgvector

-- PostgreSQL schema with vector support
CREATE EXTENSION vector;

CREATE TABLE threat_intelligence (
    id SERIAL PRIMARY KEY,
    ioc_value TEXT NOT NULL,
    ioc_type VARCHAR(50),
    description TEXT,
    mitre_tactics TEXT[],
    confidence_score FLOAT,

    -- Vector embedding for semantic search
    embedding VECTOR(1536),

    -- Metadata
    first_seen TIMESTAMP DEFAULT NOW(),
    last_seen TIMESTAMP DEFAULT NOW(),
    source VARCHAR(100),

    -- Indexes for performance
    CONSTRAINT unique_ioc UNIQUE (ioc_value, ioc_type)
);

-- IVFFlat index for fast similarity search
CREATE INDEX ON threat_intelligence
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

Generating Embeddings

We use Vertex AI's text embedding model for high-quality vectors:

from google.cloud import aiplatform
from typing import List

class EmbeddingGenerator:
    """Generate vector embeddings using Vertex AI"""

    def __init__(self):
        aiplatform.init(project="enterprise-cti")
        self.model = "textembedding-gecko@003"

    async def generate(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for batch of texts"""
        # Vertex AI supports batch processing
        response = await aiplatform.TextEmbeddingModel(
            self.model
        ).get_embeddings(texts)

        return [emb.values for emb in response]

    async def embed_threat(self, threat: dict) -> List[float]:
        """Create rich context for threat embedding"""
        # Combine multiple fields for better context
        context = f"""
        IOC Type: {threat['ioc_type']}
        Value: {threat['ioc_value']}
        Description: {threat['description']}
        MITRE Tactics: {', '.join(threat['mitre_tactics'])}
        """

        embeddings = await self.generate([context])
        return embeddings[0]

Semantic Search Query

async def find_similar_threats(
    query: str,
    limit: int = 10,
    similarity_threshold: float = 0.7
):
    """Find threats similar to query using vector search"""

    # Generate query embedding
    query_embedding = await embed_text(query)

    # Similarity search using cosine distance
    results = await db.execute("""
        SELECT
            id,
            ioc_value,
            description,
            mitre_tactics,
            1 - (embedding <=> $1::vector) AS similarity
        FROM threat_intelligence
        WHERE 1 - (embedding <=> $1::vector) > $2
        ORDER BY embedding <=> $1::vector
        LIMIT $3
    """, query_embedding, similarity_threshold, limit)

    return results

Performance Metrics:

  • Query time: < 50ms for searches across 1M+ records
  • Accuracy: 95% relevant results in top 10
  • Index size: ~4GB for 1M vectors (1536 dimensions)

MITRE ATT&CK Mapping with AI

One of the most powerful features is automatic MITRE ATT&CK framework mapping using Gemini 2.5 Pro.

Automated TTP Classification

from vertexai.preview.generative_models import GenerativeModel

class MITREMapper:
    """AI-powered MITRE ATT&CK technique extraction"""

    def __init__(self):
        self.model = GenerativeModel("gemini-2.5-pro-002")

    async def extract_ttps(self, threat_description: str) -> dict:
        """Extract tactics, techniques, and procedures using AI"""

        prompt = f"""
        Analyze this threat intelligence and extract MITRE ATT&CK information:

        Threat: {threat_description}

        Return JSON with:
        - tactics: List of MITRE tactics
        - techniques: List of technique IDs (e.g., T1059.001)
        - procedures: Specific procedures observed
        - severity: HIGH/MEDIUM/LOW
        - explanation: Brief reasoning
        """

        response = await self.model.generate_content_async(prompt)
        return self.parse_response(response.text)

    async def classify_threat(self, threat: dict) -> dict:
        """Full threat classification pipeline"""

        # Extract TTPs
        ttps = await self.extract_ttps(threat['description'])

        # Enrich with context
        threat['mitre_tactics'] = ttps['tactics']
        threat['mitre_techniques'] = ttps['techniques']
        threat['severity'] = ttps['severity']
        threat['ai_analysis'] = ttps['explanation']

        return threat

Natural Language Threat Summarization

Analysts don't want raw data—they want actionable insights:

async def generate_threat_summary(threats: List[dict]) -> str:
    """Generate executive summary of threat landscape"""

    prompt = f"""
    Analyze these {len(threats)} threat indicators:

    {format_threats(threats)}

    Create an executive summary covering:
    1. Most critical threats
    2. Common attack patterns
    3. Recommended actions
    4. Affected systems/services

    Format for security analysts, be concise.
    """

    model = GenerativeModel("gemini-2.5-pro-002")
    response = await model.generate_content_async(prompt)

    return response.text

Result: Threat analysis time reduced from 2 hours to 5 minutes per incident.

Performance Optimization Techniques

1. Database Query Optimization

# Before: N+1 query problem
for threat in threats:
    related = db.query(
        "SELECT * FROM relationships WHERE threat_id = ?",
        threat.id
    )  # 1000 threats = 1000 queries!

# After: Batch loading
threat_ids = [t.id for t in threats]
all_relationships = db.query("""
    SELECT * FROM relationships
    WHERE threat_id = ANY($1)
""", threat_ids)  # 1 query for all threats!

2. Redis Caching Strategy

import redis
from functools import wraps

redis_client = redis.Redis()

def cache_result(ttl: int = 3600):
    """Cache expensive operations"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Generate cache key
            cache_key = f"{func.__name__}:{hash(args)}"

            # Check cache
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)

            # Execute function
            result = await func(*args, **kwargs)

            # Store in cache
            redis_client.setex(
                cache_key,
                ttl,
                json.dumps(result)
            )

            return result
        return wrapper
    return decorator

@cache_result(ttl=3600)
async def get_threat_stats(time_range: str):
    """Expensive aggregation query - cache for 1 hour"""
    return await db.execute("""
        SELECT
            ioc_type,
            COUNT(*) as count,
            AVG(confidence_score) as avg_confidence
        FROM threat_intelligence
        WHERE first_seen > NOW() - INTERVAL $1
        GROUP BY ioc_type
    """, time_range)

3. Connection Pooling

from psycopg_pool import AsyncConnectionPool

# Connection pool for PostgreSQL
pool = AsyncConnectionPool(
    conninfo="postgresql://user:pass@host/db",
    min_size=10,
    max_size=50,
    timeout=30
)

async def execute_query(query: str, params: tuple):
    """Execute query with connection pooling"""
    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, params)
            return await cur.fetchall()

Performance Improvements:

  • API response time: 60% reduction (from 500ms to 200ms)
  • Database connections: 50% reduction (from 100 to 50 concurrent)
  • Memory usage: 40% reduction through efficient connection reuse

Integration with SIEM/EDR Platforms

KingaSphere integrates with enterprise security tools for automated response:

from crowdstrike import FalconAPI

class IncidentOrchestrator:
    """Automated incident response orchestration"""

    def __init__(self):
        self.falcon = FalconAPI()

    async def respond_to_threat(self, threat_id: str):
        """Automated incident response workflow"""

        # Get threat details
        threat = await db.get_threat(threat_id)

        # Check if IOC exists in environment
        detections = await self.falcon.search_ioc(
            threat['ioc_value']
        )

        if detections:
            # Create incident
            incident = await self.create_incident(threat, detections)

            # Execute response playbook
            if threat['severity'] == 'HIGH':
                # Quarantine affected hosts
                await self.falcon.contain_hosts(
                    [d['host_id'] for d in detections]
                )

                # Alert SOC team
                await self.send_alert(incident)

            return incident

Result: Incident response time improved by 95% (from hours to minutes).

Monitoring and Observability

We use Prometheus and Grafana for real-time monitoring:

from prometheus_client import Counter, Histogram

# Metrics
threats_processed = Counter(
    'threats_processed_total',
    'Total threats processed',
    ['source', 'ioc_type']
)

processing_time = Histogram(
    'threat_processing_seconds',
    'Time to process threat',
    buckets=[.1, .5, 1.0, 2.5, 5.0, 10.0]
)

@processing_time.time()
async def process_threat(threat: dict):
    """Process threat with metric collection"""
    try:
        result = await analyze_threat(threat)

        # Record success
        threats_processed.labels(
            source=threat['source'],
            ioc_type=threat['ioc_type']
        ).inc()

        return result
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        raise

Lessons Learned

1. Start with PostgreSQL, Not a "Specialized" Database

We initially considered dedicated vector databases (Pinecone, Weaviate), but PostgreSQL with pgvector proved superior:

Single database for both relational and vector data ✅ ACID compliance for critical threat intelligence ✅ Mature ecosystem with extensive tooling ✅ Lower operational complexity

2. AI/ML Isn't Magic—It Needs Good Data

Our first AI models had 60% accuracy. After improving data quality:

  • Cleaned duplicate indicators
  • Enriched context with multiple sources
  • Added confidence scores
  • Implemented human-in-the-loop validation

Result: Accuracy improved to 95%.

3. Optimize for P99, Not Average

Average response time looked great (200ms), but P99 was 5 seconds. We found:

  • Slow queries on large threat campaigns
  • Occasional database connection timeouts
  • Unoptimized vector searches

After optimization, P99 dropped to < 1 second.

Key Metrics

After 6 months of operation:

| Metric | Value | | ------------------------------ | ---------- | | Daily Events Processed | 100K+ | | Query Response Time (P50) | 50ms | | Query Response Time (P99) | < 1s | | AI Classification Accuracy | 95% | | Incident Response Time | 95% faster | | System Uptime | 99.9% | | Database Size | 10TB+ | | Active Threat Indicators | 5M+ |

Future Roadmap

We're continuing to evolve the CTI platform:

  1. Graph database integration - Model complex threat relationships
  2. Predictive threat modeling - ML models to predict attack patterns
  3. Automated threat hunting - AI-driven proactive threat discovery
  4. Multi-tenancy - Support for multiple business units
  5. Real-time streaming - Sub-second threat propagation

Conclusion

Building distributed threat intelligence at scale requires:

Strong architectural foundations - Microservices, async processing, connection pooling ✅ Modern AI/ML - Vector search, LLM integration, automated classification
Performance obsession - Caching, query optimization, monitoring ✅ Continuous iteration - Start small, measure, optimize, scale

The CTI platform now processes 100K+ threats daily, provides sub-second intelligence retrieval, and has reduced incident response times by 95%—protecting enterprise customers and critical infrastructure.


Want to discuss threat intelligence architecture? Connect with me on LinkedIn or check out my work on GitHub.

Building something similar? I'm happy to share more technical details—reach out!