Technical writing

How We Process 2.4M Social Media Posts Per Hour

· 14 min read· AI Analytics
KafkaTimescaleDBNLPInfrastructure

Our OSINT platform runs 24/7, processing roughly 58 million social media posts per day. This is a deep dive into the infrastructure that makes it possible — Kafka partition design, the canonical normalization layer that unifies 47 different platform schemas, TimescaleDB batch writes using the COPY protocol, a distributed MinHash dedup index, and autoscaling on consumer lag.

The scale

2.4 million posts per hour = 667 posts per second sustained. Peak traffic hits 3× that (~2,000 posts/sec) during major events — election nights, breaking news, protest outbreaks. The pipeline has to absorb those spikes without dropping data.

Each post requires: language detection (3ms), entity recognition (12ms), sentiment classification (45ms), duplicate check (1ms), and a write to TimescaleDB + Redis (7ms). Total: ~68ms per post. At 667 posts/sec, that is 45 CPU-seconds of work per wall-clock second. Parallelization across 80 workers is mandatory, and the Kafka buffer is what makes burst absorption possible.

Architecture

Scrapers (47 platform workers)
    │  push canonical PostRecord (Protobuf)
    ▼
Kafka cluster (3 brokers, 24 partitions, 7-day retention)
    │  consumer group: nlp_workers (sticky assignment)
    ▼
NLP Workers (80 × g4dn.xlarge, T4 GPU)
    │  processed records (batch_size=32)
    ├──▶ TimescaleDB (bulk COPY, durable)
    ├──▶ Redis (24h hot data, sorted sets + hashes)
    └──▶ Dead Letter Queue (Kafka: posts_dlq, 30-day retention)

Everything runs on AWS in us-east-1. Total infrastructure cost: $31K/month. Kafka is managed MSK; TimescaleDB runs on a single r6g.2xlarge with a 50TB gp3 EBS volume.

Platform normalization: one struct for 47 JSON schemas

The hardest part of ingesting 47 platforms is not the rate limits or the anti-bot measures — it is the schema heterogeneity. A tweet, a Reddit comment, a Telegram channel post, and a TikTok caption are structurally different objects. Before anything enters Kafka, every scraper normalizes its output to a canonical PostRecord:

// canonical.rs — canonical post schema shared by all scrapers
#[derive(Serialize, Deserialize, Clone)]
pub struct PostRecord {
    // Identity
    pub platform:     Platform,        // enum: Twitter, Reddit, Facebook, …
    pub platform_id:  String,          // native post ID (opaque string)
    pub author_id:    String,          // native user ID (opaque string)
    pub author_name:  Option<String>,  // display name, nullable

    // Content
    pub text:         String,          // normalized text (HTML stripped, URLs normalized)
    pub lang_hint:    Option<String>,  // BCP-47 tag from platform metadata, if present
    pub media_urls:   Vec<String>,     // image/video attachments

    // Engagement
    pub likes:        Option<i64>,
    pub shares:       Option<i64>,     // retweets / reposts / shares depending on platform
    pub replies:      Option<i64>,

    // Provenance
    pub created_at:   DateTime<Utc>,   // canonical UTC timestamp
    pub scraped_at:   DateTime<Utc>,   // when we fetched it
    pub scraper_ver:  u16,             // schema version for downstream schema evolution

    // Thread context
    pub parent_id:    Option<String>,  // reply-to post ID, if any
    pub thread_id:    Option<String>,  // root of the conversation thread
}

#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Platform {
    Twitter = 1, Reddit = 2, Facebook = 3, TikTok = 4,
    Telegram = 5, Instagram = 6, YouTube = 7, Discord = 8,
    // … 39 more
}

// Example: normalizing a raw Twitter API v2 response
impl From<TwitterV2Tweet> for PostRecord {
    fn from(t: TwitterV2Tweet) -> Self {
        PostRecord {
            platform:    Platform::Twitter,
            platform_id: t.id,
            author_id:   t.author_id,
            author_name: t.includes.and_then(|i| i.users.first().map(|u| u.name.clone())),
            text:        strip_urls(&t.text),          // normalize t.co short links
            lang_hint:   t.lang,
            media_urls:  extract_media_urls(&t),
            likes:       t.public_metrics.as_ref().map(|m| m.like_count),
            shares:      t.public_metrics.as_ref().map(|m| m.retweet_count),
            replies:     t.public_metrics.as_ref().map(|m| m.reply_count),
            created_at:  t.created_at.parse().unwrap_or_else(|_| Utc::now()),
            scraped_at:  Utc::now(),
            scraper_ver: SCRAPER_VERSION,
            parent_id:   t.referenced_tweets.as_ref()
                          .and_then(|r| r.iter().find(|t| t.type_ == "replied_to"))
                          .map(|r| r.id.clone()),
            thread_id:   t.conversation_id,
        }
    }
}

The normalization contract is strict: text always has HTML stripped and URLs replaced with a canonical placeholder ([URL]), created_atis always UTC, and platform-native IDs are passed through opaquely as strings so we never coerce e.g. Twitter snowflake IDs to integers and lose precision. Scrapers serialize PostRecord to Protobuf before pushing to Kafka.

Data ingestion: four platforms in detail

Twitter/X: Rate-limited to 5,000 requests per 15 minutes per app credential. We run 40 app credentials in rotation managed by a credential pool service — each scraper pops a credential, uses it, and returns it with its rate-limit reset timestamp. Filtered stream API for real-time keyword matching; search API with since_idpagination for per-account polling. ~300K posts/hour after keyword filtering.

Facebook: The Graph API is restricted to public Pages and severely rate-limited. We use Playwright driving headless Chromium in sandboxed containers. Each scraper authenticates as a real user and scrolls feeds. Proxies rotate every 20 requests via a residential proxy pool (Bright Data). The Playwright fingerprint randomizes viewport, timezone, WebGL renderer string, and canvas noise per session. Success rate ~73%, giving ~150K posts/hour.

Reddit: Excellent public API with no authentication requirement for read-only access (as of this writing). Real-time via the SSE push stream atreddit.com/r/all/new.json?before=…; historical via Pushshift S3 dumps. ~200K posts/hour from election-relevant subreddits.

TikTok: No official developer API for content ingestion. Anti-bot detection is device-fingerprint-based — they verify WebGL signatures, audio context fingerprint, and timing patterns consistent with real hardware. We use residential proxies paired with emulated mobile device profiles (fabricated IMEI, screen dimensions, CPU concurrency matching the emulated model). Measurement noise added to scroll timing. Success rate ~60%; ~100K posts/hour.

The remaining 43 platforms (Instagram, YouTube, Telegram, Discord, local forums, etc.) contribute ~1.65M posts/hour combined. Each platform has a dedicated scraper worker in its own container, all producing PostRecord Protobuf to the same Kafka topic.

Kafka: partition strategy and consumer assignment

All scrapers push to the social_media_posts topic. The partition key isplatform_id || author_id — hashed to 24 partitions. This co-locates all posts from the same author on the same partition, which matters for duplicate detection: the MinHash LSH worker for a given partition sees the author's recent history without needing cross-partition coordination.

# Kafka topic configuration
topic:              social_media_posts
partitions:         24
replication_factor: 3
retention.ms:       604800000   # 7 days
compression.type:   lz4
min.insync.replicas: 2

# Producer (scrapers) — optimize for throughput, accept some latency
acks:                1           # leader-only ack; replicas async
compression.type:    lz4
batch.size:          131072      # 128 KB
linger.ms:           50          # batch up to 50ms
max.in.flight.requests.per.connection: 5

# Consumer (NLP workers) — sticky partition assignment
group.id:            nlp_workers
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
auto.offset.reset:   earliest
max.poll.records:    500
fetch.min.bytes:     65536       # 64 KB — wait for a reasonable batch
fetch.max.wait.ms:   500

The sticky assignor keeps each worker on the same partitions across rebalances (triggered by worker scale-out/in). This keeps the MinHash rolling window warm: a worker that owns partition 7 keeps its LSH index for that partition's authors in memory, so duplicate detection stays effective after a rebalance rather than resetting.

Seven-day retention is intentional. When we retrain NLP models, we replay the last 7 days through the new pipeline to backfill corrected labels. The __consumer_offsetstopic tracks per-consumer-group position so replay is trivially a consumer-group reset.

NLP worker design

80 workers, each on a g4dn.xlarge (4 vCPU, 16 GB RAM, 1× NVIDIA T4 GPU). Models are loaded once at startup and stay resident in GPU memory; the worker loop batches 32 posts before any inference runs.

import torch, spacy, fasttext
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from optimum.onnxruntime import ORTModelForSequenceClassification

class NLPWorker:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        # Language detection: FastText LID model — 3ms per post, CPU-only
        self.lang_detector = fasttext.load_model("lid.176.bin")

        # NER: SpaCy with RoBERTa-base backbone, 7-label set
        # Labels: PERSON, ORG, GPE, NORP, EVENT, FAC, PRODUCT
        self.ner = spacy.load("en_election_ner_roberta")

        # Sentiment: DistilBERT fine-tuned on 5M election posts, INT8 ONNX via Optimum
        # 252MB → 63MB after quantization; 45ms → 28ms at batch_size=32
        self.sentiment_model = ORTModelForSequenceClassification.from_pretrained(
            "models/election-sentiment-int8",
            provider="CUDAExecutionProvider",
        )
        self.tokenizer = AutoTokenizer.from_pretrained("models/election-sentiment-int8")

        # MinHash LSH index (see duplicate detection section below)
        self.lsh = PartitionedMinHashLSH(threshold=0.85, num_perm=128)

    def process_batch(self, records: list[PostRecord]) -> list[ProcessedPost]:
        results = []

        # Step 1: language detection (CPU, ~3ms/post)
        texts = [r.text for r in records]
        langs = [self.lang_detector.predict(t)[0][0].replace('__label__', '')
                 for t in texts]

        # Step 2: filter to supported languages (en, es, zh, ar, fr, de, pt, ru)
        filtered = [(r, lang) for r, lang in zip(records, langs)
                    if lang in SUPPORTED_LANGUAGES]
        if not filtered:
            return []

        # Step 3: NER (SpaCy, GPU pipeline, ~12ms/post)
        docs = list(self.ner.pipe([r.text for r, _ in filtered], batch_size=32))

        # Step 4: sentiment (DistilBERT INT8 ONNX, GPU, ~28ms for batch of 32)
        batch_texts = [r.text for r, _ in filtered]
        inputs = self.tokenizer(
            batch_texts, return_tensors="pt",
            truncation=True, max_length=128,   # 128-token cap, not 512
            padding=True,
        )
        with torch.no_grad():
            logits = self.sentiment_model(**inputs).logits
        probs = torch.softmax(logits, dim=1).cpu().numpy()

        # Step 5: duplicate check (MinHash, ~1ms/post, see below)
        for i, (record, lang) in enumerate(filtered):
            is_dup, original_id = self.lsh.check_and_insert(record)

            results.append(ProcessedPost(
                record=record,
                language=lang,
                entities={
                    'people':    [e.text for e in docs[i].ents if e.label_ == 'PERSON'],
                    'orgs':      [e.text for e in docs[i].ents if e.label_ == 'ORG'],
                    'locations': [e.text for e in docs[i].ents if e.label_ in ('GPE', 'FAC')],
                    'events':    [e.text for e in docs[i].ents if e.label_ == 'EVENT'],
                },
                sentiment_pos=float(probs[i][0]),
                sentiment_neu=float(probs[i][1]),
                sentiment_neg=float(probs[i][2]),
                is_duplicate=is_dup,
                original_post_id=original_id,
            ))

        return results

Duplicate detection: character 4-gram MinHash LSH

Posts are frequently reposted with minor modifications: “RT: …” prefixes, URL appended, first word changed. Word-level MinHash misses these because the set of word tokens changes too much. We use character 4-grams, which are stable under word substitution at the edges of a post. A 4-gram window over “election fraud happening” produces {elec, lect, ecti, ctio, tion, ion , on f, …} — a representation that barely changes if “happening” is swapped to “occurring.”

import hashlib
from datasketch import MinHash

CHAR_NGRAM_N = 4
NUM_PERMS = 128
# b=16 bands × r=8 rows → threshold ≈ (1/b)^(1/r) = (1/16)^(1/8) ≈ 0.85
# This is the standard b/r formula for the threshold at which P(candidate) = 0.5
LSH_BANDS = 16
LSH_ROWS  = 8   # NUM_PERMS / LSH_BANDS

def text_to_minhash(text: str) -> MinHash:
    """Character 4-gram MinHash signature for a post text."""
    m = MinHash(num_perm=NUM_PERMS)
    text = text.lower().strip()
    for i in range(len(text) - CHAR_NGRAM_N + 1):
        gram = text[i : i + CHAR_NGRAM_N]
        m.update(gram.encode('utf-8'))
    return m


class PartitionedMinHashLSH:
    """
    Distributed MinHash LSH backed by Redis.

    Each Kafka partition's NLP worker maintains a local in-memory LRU cache
    (60-minute TTL) AND writes band hashes to a shared Redis cluster.
    The Redis layer catches cross-partition duplicates (e.g. same post
    scraped from two different platform scrapers with different author IDs).
    """
    def __init__(self, threshold: float, num_perm: int):
        self.threshold = threshold
        self.num_perm = num_perm
        self.local_cache: dict[str, MinHash] = {}  # post_id → MinHash
        self.redis = redis.Redis(host=REDIS_HOST, decode_responses=False)
        self.ttl = 3600  # 60 minutes

    def check_and_insert(self, record: PostRecord) -> tuple[bool, str | None]:
        sig = text_to_minhash(record.text)

        # Check Redis for similar posts via band hashes
        band_keys = self._band_keys(sig, record.platform)
        for key in band_keys:
            candidates = self.redis.smembers(key)
            for cand_id in candidates:
                cand_id_str = cand_id.decode()
                if cand_id_str in self.local_cache:
                    j = sig.jaccard(self.local_cache[cand_id_str])
                    if j >= self.threshold:
                        return True, cand_id_str

        # Not a duplicate — add to Redis and local cache
        pipe = self.redis.pipeline()
        for key in band_keys:
            pipe.sadd(key, record.platform_id)
            pipe.expire(key, self.ttl)
        pipe.execute()
        self.local_cache[record.platform_id] = sig
        return False, None

    def _band_keys(self, sig: MinHash, platform: Platform) -> list[str]:
        """Compute b=16 Redis keys, one per band. Keys are platform-scoped."""
        hv = sig.hashvalues  # numpy array, shape (128,)
        keys = []
        for b in range(LSH_BANDS):
            band = hv[b * LSH_ROWS : (b + 1) * LSH_ROWS]
            band_hash = hashlib.sha1(band.tobytes()).hexdigest()[:16]
            keys.append(f"minhash:{platform.value}:b{b}:{band_hash}")
        return keys

The b=16 bands × r=8 rows configuration gives a threshold of approximately 0.85 Jaccard similarity (derived from the S-curve formulaP ≈ 1 − (1 − s^r)^b). Posts need to share ~85% of their 4-gram vocabulary to be flagged as duplicates — tight enough to catch reposts and paraphrases, loose enough not to flag thematically similar posts about the same event. Empirically, ~7.4% of incoming posts are duplicates; stripping them reduces storage and avoids skewing platform-level sentiment counts.

TimescaleDB: batch COPY writes

Naïve INSERT throughput against TimescaleDB peaks at ~8K rows/second per connection with the hypertable overhead. We use PostgreSQL's binary COPY protocol instead — the same approach used in the Voidly measurement database — which bypasses the SQL parser and row-by-row validation entirely.

# posts table DDL
CREATE TABLE posts (
    time            TIMESTAMPTZ NOT NULL,
    platform        SMALLINT    NOT NULL,   -- Platform enum value
    platform_id     TEXT        NOT NULL,
    author_id       TEXT,
    text            TEXT,
    language        CHAR(3),
    sentiment_pos   REAL,
    sentiment_neg   REAL,
    sentiment_neu   REAL,
    entities        JSONB,
    is_duplicate    BOOL        NOT NULL DEFAULT false,
    original_id     TEXT,                   -- set if is_duplicate = true

    PRIMARY KEY (time, platform, platform_id)
);

-- Hypertable: 1-day chunks, 4 space partitions by platform hash
SELECT create_hypertable(
    'posts', 'time',
    partitioning_column => 'platform',
    number_partitions    => 4,
    chunk_time_interval  => INTERVAL '1 day'
);

-- Indexes
CREATE INDEX idx_posts_platform   ON posts (platform, time DESC);
CREATE INDEX idx_posts_author     ON posts (author_id, time DESC);
CREATE INDEX idx_posts_entities   ON posts USING GIN (entities);

-- Compression: segment by platform, order by time — 5.8× ratio on 30-day-old chunks
ALTER TABLE posts SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'platform',
    timescaledb.compress_orderby   = 'time DESC'
);
SELECT add_compression_policy('posts', INTERVAL '7 days');
# Python batch writer using psycopg2 binary COPY
import io, struct, psycopg2
from psycopg2.extras import execute_values

class PostsBulkWriter:
    """
    Accumulates ProcessedPost objects and flushes to TimescaleDB
    via binary COPY when the batch reaches FLUSH_ROWS or FLUSH_INTERVAL_S.
    Sustained write rate: ~43K rows/sec; peak (burst): ~110K rows/sec.
    """
    FLUSH_ROWS      = 5_000
    FLUSH_INTERVAL  = 2.0  # seconds

    def __init__(self, dsn: str):
        self.conn   = psycopg2.connect(dsn)
        self.buffer: list[ProcessedPost] = []
        self.last_flush = time.monotonic()

    def add(self, posts: list[ProcessedPost]):
        self.buffer.extend(posts)
        if (len(self.buffer) >= self.FLUSH_ROWS or
                time.monotonic() - self.last_flush > self.FLUSH_INTERVAL):
            self.flush()

    def flush(self):
        if not self.buffer:
            return
        rows = [self._to_row(p) for p in self.buffer]
        with self.conn.cursor() as cur:
            # COPY FROM STDIN avoids per-row SQL parsing overhead
            cur.copy_expert(
                "COPY posts (time, platform, platform_id, author_id, text, "
                "language, sentiment_pos, sentiment_neg, sentiment_neu, "
                "entities, is_duplicate, original_id) FROM STDIN",
                self._encode_binary(rows),
            )
        self.conn.commit()
        self.buffer.clear()
        self.last_flush = time.monotonic()

    def _encode_binary(self, rows) -> io.BytesIO:
        buf = io.BytesIO()
        buf.write(b'PGCOPY
ÿ
')  # binary COPY signature
        buf.write(struct.pack('!II', 0, 0))  # flags + header extension
        for row in rows:
            buf.write(struct.pack('!H', len(row)))  # field count
            for field in row:
                if field is None:
                    buf.write(struct.pack('!i', -1))
                else:
                    data = _pg_encode(field)
                    buf.write(struct.pack('!i', len(data)))
                    buf.write(data)
        buf.write(struct.pack('!H', 0xFFFF))  # file trailer
        buf.seek(0)
        return buf

Redis hot layer: real-time queries

TimescaleDB is excellent for analytical queries across days or weeks but too slow for the sub-millisecond latency dashboards need. Redis stores the last 24 hours of data in three structures:

# NLP worker writes to both PostgreSQL and Redis after each flush
def update_redis(results: list[ProcessedPost]):
    pipe = redis.pipeline(transaction=False)

    for p in results:
        ts = int(p.record.created_at.timestamp())
        key_prefix = f"posts:{p.record.platform.value}"

        # Sorted set: post IDs ordered by timestamp — supports range queries
        pipe.zadd(f"{key_prefix}:ts", {p.record.platform_id: ts})

        # Hash: post fields — fast point lookup by ID
        pipe.hset(f"post:{p.record.platform_id}", mapping={
            'text': p.record.text[:500],      # truncate for memory
            'lang': p.language,
            'spos': round(p.sentiment_pos, 4),
            'sneg': round(p.sentiment_neg, 4),
        })
        pipe.expire(f"post:{p.record.platform_id}", 86400)

        # Real-time counters (no TTL — reset hourly by cron)
        pipe.hincrby("stats:posts_by_platform", str(p.record.platform.value), 1)
        label = 'pos' if p.sentiment_pos > 0.6 else ('neg' if p.sentiment_neg > 0.6 else 'neu')
        pipe.hincrby(f"stats:sentiment_by_platform:{p.record.platform.value}", label, 1)

    # Trim sorted sets to last 24h
    cutoff = int(time.time()) - 86400
    for platform in Platform:
        pipe.zremrangebyscore(f"posts:{platform.value}:ts", 0, cutoff)

    pipe.execute()

Dashboard queries hit Redis first (24h), then fall through to TimescaleDB. Redis cluster: 3× r7g.large instances with cluster mode, ~180GB total working set.

Worker autoscaling on Kafka consumer lag

Steady-state throughput at 80 workers is 667 posts/sec. During breaking events, ingestion rate spikes to ~2,000 posts/sec. Rather than over-provision for peaks, we scale the NLP worker ASG on Kafka consumer lag.

# CloudWatch custom metric: kafka_consumer_lag
# Published every 60s by a lightweight lag-reporter Lambda
# (reads consumer group offsets via Kafka AdminClient)

# Scale-out policy — add 10 workers when lag > 500K
resource "aws_autoscaling_policy" "nlp_scale_out" {
  name                   = "nlp-scale-out"
  autoscaling_group_name = aws_autoscaling_group.nlp_workers.name
  policy_type            = "StepScaling"

  step_adjustment {
    scaling_adjustment          = 10   # add 10 instances
    metric_interval_lower_bound = 0    # lag > alarm threshold (500K)
    metric_interval_upper_bound = 2000000
  }
  step_adjustment {
    scaling_adjustment          = 20   # add 20 instances if lag > 2M
    metric_interval_lower_bound = 2000000
  }
}

# Scale-in policy — remove 5 workers when lag < 50K for 10 minutes
resource "aws_autoscaling_policy" "nlp_scale_in" {
  name                   = "nlp-scale-in"
  autoscaling_group_name = aws_autoscaling_group.nlp_workers.name
  policy_type            = "StepScaling"
  estimated_instance_warmup = 120  # T4 GPU init takes ~90s

  step_adjustment {
    scaling_adjustment          = -5
    metric_interval_upper_bound = 0
  }
}

Scale-out completes in ~6 minutes (EC2 launch + Docker pull + model load). The 7-day Kafka retention means the backlogged posts are safely buffered while additional workers come online — nothing is dropped.

Error handling and dead letter queue

About 0.8% of posts fail processing: malformed Unicode that breaks the tokenizer, text that is all emoji with no 4-gram content to hash, NER crashes on extremely long inputs. These go to posts_dlq rather than being silently dropped.

def process_with_dlq(consumer: KafkaConsumer, dlq: KafkaProducer):
    for msg in consumer:
        try:
            record = PostRecord.from_protobuf(msg.value)
            results = worker.process_batch([record])
            writer.add(results)
        except Exception as e:
            # Increment retry counter in message headers
            headers = dict(msg.headers or [])
            retry_count = int(headers.get(b'retry', b'0'))

            if retry_count < 3:
                # Re-enqueue with exponential backoff metadata
                dlq.send('posts_dlq', value=msg.value,
                          headers=[(b'retry', str(retry_count + 1).encode()),
                                   (b'error', str(e)[:200].encode()),
                                   (b'original_topic', b'social_media_posts')])
            else:
                # Exhausted retries — send to dead archive (S3)
                s3.put_object(
                    Bucket='posts-dead-archive',
                    Key=f"{msg.topic}/{msg.partition}/{msg.offset}",
                    Body=msg.value,
                )
            metrics.increment('posts.dlq.count', tags={'error': type(e).__name__})

The DLQ is monitored: a CloudWatch alarm fires if posts.dlq.countexceeds 500 in a 5-minute window. A separate DLQ-replay service processes it after manual investigation resolves the root cause.

Monitoring

Every NLP worker exposes a Prometheus endpoint on port 9090. Metrics scraped every 15 seconds, visualized in Grafana. Key alerting thresholds:

# Prometheus metrics exposed by NLP workers
posts_processed_total{platform="twitter"}          1847293
posts_processed_total{platform="reddit"}            892847

processing_duration_seconds{quantile="0.5"}          0.058
processing_duration_seconds{quantile="0.95"}         0.143
processing_duration_seconds{quantile="0.99"}         0.287

model_inference_ms{model="sentiment", quantile="0.5"}  28
model_inference_ms{model="ner",       quantile="0.5"}  12

kafka_consumer_lag_total                            142830
kafka_consumer_lag{partition="0"}                      142
kafka_consumer_lag{partition="1"}                       89

duplicate_rate_15m                                   0.074   # 7.4% duplicates

timescaledb_copy_rows_per_second                   43200
timescaledb_copy_errors_total                           3

PagerDuty receives alerts on: consumer lag above 500K (scale-out trigger); p99 processing latency above 500ms; DLQ depth above 500; TimescaleDB COPY error rate above 0.1%. We have had zero unplanned downtime in 8 months.

Cost breakdown

Monthly AWS bill: $31,247.

  • 80× g4dn.xlarge NLP workers (on-demand): $20,960 (67% of total)
  • MSK cluster (3× kafka.m5.2xlarge): $3,240
  • TimescaleDB r6g.2xlarge + 50 TB gp3: $2,419 + $3,750 = $6,169
  • Redis cluster (3× r7g.large): $1,890
  • Data transfer, NAT gateways, load balancers: $988

At ~1.73 billion posts per month, that is ~$0.000018 per post. The biggest lever is the NLP workers: switching from on-demand to Spot instances would cut that line item by ~70% ($6.3K/mo instead of $21K), but Spot interruptions would cause worker rebalances and 6-minute re-warm delays. For now we value latency consistency over savings.

What would I do differently?

Partition count was too low at launch. We started with 12 partitions and hit head-of-line blocking when TikTok scrapers produced faster than the 12 assigned workers could consume. Increasing to 24 partitions required a topic recreation and consumer group reset — a 4-hour maintenance window. We should have started higher.

Word-level MinHash was a mistake. We ran word-level MinHash for 6 months before switching to character 4-grams. The duplicate rate dropped from 4.1% to 7.4% after the switch — not because duplicates increased, but because we were missing ~3.3% of actual reposts that had word-level changes but near-identical 4-gram content.

COPY protocol should have been day one. We initially usedexecute_values (batch INSERT). Throughput was 8K rows/sec and we had regular TimescaleDB CPU spikes during flush. Switching to binary COPY improved throughput to 43K rows/sec and eliminated the spikes.


For the NLP model internals — DistilBERT INT8 quantization, SpaCy NER training, and the MinHash coordinated-campaign detector: NLP pipeline for real-time sentiment analysis at scale →

For the TimescaleDB architecture that both this pipeline and Voidly share — continuous aggregates, columnar compression, and the COPY write path: Voidly's measurement database: 2.2B probe results in TimescaleDB →

For how ingestion, normalization, and quality filtering work on the Voidly probe side: Voidly's probe-to-dataset ingest pipeline: normalization, quality filtering, and TimescaleDB indexing →

For the election-specific data engineering pipeline built on this infrastructure — AP Election API, Kafka precinct results, state scrapers, and FIPS normalization: Election data pipeline: AP feeds, Kafka precinct results, and state scraper normalization →