Technical writing
How We Process 2.4M Social Media Posts Per Hour
Our OSINT platform runs 24/7, processing roughly 58 million social media posts per day. This is a deep dive into the infrastructure that makes it possible — Kafka partition design, the canonical normalization layer that unifies 47 different platform schemas, TimescaleDB batch writes using the COPY protocol, a distributed MinHash dedup index, and autoscaling on consumer lag.
The scale
2.4 million posts per hour = 667 posts per second sustained. Peak traffic hits 3× that (~2,000 posts/sec) during major events — election nights, breaking news, protest outbreaks. The pipeline has to absorb those spikes without dropping data.
Each post requires: language detection (3ms), entity recognition (12ms), sentiment classification (45ms), duplicate check (1ms), and a write to TimescaleDB + Redis (7ms). Total: ~68ms per post. At 667 posts/sec, that is 45 CPU-seconds of work per wall-clock second. Parallelization across 80 workers is mandatory, and the Kafka buffer is what makes burst absorption possible.
Architecture
Scrapers (47 platform workers)
│ push canonical PostRecord (Protobuf)
▼
Kafka cluster (3 brokers, 24 partitions, 7-day retention)
│ consumer group: nlp_workers (sticky assignment)
▼
NLP Workers (80 × g4dn.xlarge, T4 GPU)
│ processed records (batch_size=32)
├──▶ TimescaleDB (bulk COPY, durable)
├──▶ Redis (24h hot data, sorted sets + hashes)
└──▶ Dead Letter Queue (Kafka: posts_dlq, 30-day retention)Everything runs on AWS in us-east-1. Total infrastructure cost: $31K/month. Kafka is managed MSK; TimescaleDB runs on a single r6g.2xlarge with a 50TB gp3 EBS volume.
Platform normalization: one struct for 47 JSON schemas
The hardest part of ingesting 47 platforms is not the rate limits or the anti-bot measures — it is the schema heterogeneity. A tweet, a Reddit comment, a Telegram channel post, and a TikTok caption are structurally different objects. Before anything enters Kafka, every scraper normalizes its output to a canonical PostRecord:
// canonical.rs — canonical post schema shared by all scrapers
#[derive(Serialize, Deserialize, Clone)]
pub struct PostRecord {
// Identity
pub platform: Platform, // enum: Twitter, Reddit, Facebook, …
pub platform_id: String, // native post ID (opaque string)
pub author_id: String, // native user ID (opaque string)
pub author_name: Option<String>, // display name, nullable
// Content
pub text: String, // normalized text (HTML stripped, URLs normalized)
pub lang_hint: Option<String>, // BCP-47 tag from platform metadata, if present
pub media_urls: Vec<String>, // image/video attachments
// Engagement
pub likes: Option<i64>,
pub shares: Option<i64>, // retweets / reposts / shares depending on platform
pub replies: Option<i64>,
// Provenance
pub created_at: DateTime<Utc>, // canonical UTC timestamp
pub scraped_at: DateTime<Utc>, // when we fetched it
pub scraper_ver: u16, // schema version for downstream schema evolution
// Thread context
pub parent_id: Option<String>, // reply-to post ID, if any
pub thread_id: Option<String>, // root of the conversation thread
}
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Platform {
Twitter = 1, Reddit = 2, Facebook = 3, TikTok = 4,
Telegram = 5, Instagram = 6, YouTube = 7, Discord = 8,
// … 39 more
}
// Example: normalizing a raw Twitter API v2 response
impl From<TwitterV2Tweet> for PostRecord {
fn from(t: TwitterV2Tweet) -> Self {
PostRecord {
platform: Platform::Twitter,
platform_id: t.id,
author_id: t.author_id,
author_name: t.includes.and_then(|i| i.users.first().map(|u| u.name.clone())),
text: strip_urls(&t.text), // normalize t.co short links
lang_hint: t.lang,
media_urls: extract_media_urls(&t),
likes: t.public_metrics.as_ref().map(|m| m.like_count),
shares: t.public_metrics.as_ref().map(|m| m.retweet_count),
replies: t.public_metrics.as_ref().map(|m| m.reply_count),
created_at: t.created_at.parse().unwrap_or_else(|_| Utc::now()),
scraped_at: Utc::now(),
scraper_ver: SCRAPER_VERSION,
parent_id: t.referenced_tweets.as_ref()
.and_then(|r| r.iter().find(|t| t.type_ == "replied_to"))
.map(|r| r.id.clone()),
thread_id: t.conversation_id,
}
}
}The normalization contract is strict: text always has HTML stripped and URLs replaced with a canonical placeholder ([URL]), created_atis always UTC, and platform-native IDs are passed through opaquely as strings so we never coerce e.g. Twitter snowflake IDs to integers and lose precision. Scrapers serialize PostRecord to Protobuf before pushing to Kafka.
Data ingestion: four platforms in detail
Twitter/X: Rate-limited to 5,000 requests per 15 minutes per app credential. We run 40 app credentials in rotation managed by a credential pool service — each scraper pops a credential, uses it, and returns it with its rate-limit reset timestamp. Filtered stream API for real-time keyword matching; search API with since_idpagination for per-account polling. ~300K posts/hour after keyword filtering.
Facebook: The Graph API is restricted to public Pages and severely rate-limited. We use Playwright driving headless Chromium in sandboxed containers. Each scraper authenticates as a real user and scrolls feeds. Proxies rotate every 20 requests via a residential proxy pool (Bright Data). The Playwright fingerprint randomizes viewport, timezone, WebGL renderer string, and canvas noise per session. Success rate ~73%, giving ~150K posts/hour.
Reddit: Excellent public API with no authentication requirement for read-only access (as of this writing). Real-time via the SSE push stream atreddit.com/r/all/new.json?before=…; historical via Pushshift S3 dumps. ~200K posts/hour from election-relevant subreddits.
TikTok: No official developer API for content ingestion. Anti-bot detection is device-fingerprint-based — they verify WebGL signatures, audio context fingerprint, and timing patterns consistent with real hardware. We use residential proxies paired with emulated mobile device profiles (fabricated IMEI, screen dimensions, CPU concurrency matching the emulated model). Measurement noise added to scroll timing. Success rate ~60%; ~100K posts/hour.
The remaining 43 platforms (Instagram, YouTube, Telegram, Discord, local forums, etc.) contribute ~1.65M posts/hour combined. Each platform has a dedicated scraper worker in its own container, all producing PostRecord Protobuf to the same Kafka topic.
Kafka: partition strategy and consumer assignment
All scrapers push to the social_media_posts topic. The partition key isplatform_id || author_id — hashed to 24 partitions. This co-locates all posts from the same author on the same partition, which matters for duplicate detection: the MinHash LSH worker for a given partition sees the author's recent history without needing cross-partition coordination.
# Kafka topic configuration topic: social_media_posts partitions: 24 replication_factor: 3 retention.ms: 604800000 # 7 days compression.type: lz4 min.insync.replicas: 2 # Producer (scrapers) — optimize for throughput, accept some latency acks: 1 # leader-only ack; replicas async compression.type: lz4 batch.size: 131072 # 128 KB linger.ms: 50 # batch up to 50ms max.in.flight.requests.per.connection: 5 # Consumer (NLP workers) — sticky partition assignment group.id: nlp_workers partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor auto.offset.reset: earliest max.poll.records: 500 fetch.min.bytes: 65536 # 64 KB — wait for a reasonable batch fetch.max.wait.ms: 500
The sticky assignor keeps each worker on the same partitions across rebalances (triggered by worker scale-out/in). This keeps the MinHash rolling window warm: a worker that owns partition 7 keeps its LSH index for that partition's authors in memory, so duplicate detection stays effective after a rebalance rather than resetting.
Seven-day retention is intentional. When we retrain NLP models, we replay the last 7 days through the new pipeline to backfill corrected labels. The __consumer_offsetstopic tracks per-consumer-group position so replay is trivially a consumer-group reset.
NLP worker design
80 workers, each on a g4dn.xlarge (4 vCPU, 16 GB RAM, 1× NVIDIA T4 GPU). Models are loaded once at startup and stay resident in GPU memory; the worker loop batches 32 posts before any inference runs.
import torch, spacy, fasttext
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from optimum.onnxruntime import ORTModelForSequenceClassification
class NLPWorker:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Language detection: FastText LID model — 3ms per post, CPU-only
self.lang_detector = fasttext.load_model("lid.176.bin")
# NER: SpaCy with RoBERTa-base backbone, 7-label set
# Labels: PERSON, ORG, GPE, NORP, EVENT, FAC, PRODUCT
self.ner = spacy.load("en_election_ner_roberta")
# Sentiment: DistilBERT fine-tuned on 5M election posts, INT8 ONNX via Optimum
# 252MB → 63MB after quantization; 45ms → 28ms at batch_size=32
self.sentiment_model = ORTModelForSequenceClassification.from_pretrained(
"models/election-sentiment-int8",
provider="CUDAExecutionProvider",
)
self.tokenizer = AutoTokenizer.from_pretrained("models/election-sentiment-int8")
# MinHash LSH index (see duplicate detection section below)
self.lsh = PartitionedMinHashLSH(threshold=0.85, num_perm=128)
def process_batch(self, records: list[PostRecord]) -> list[ProcessedPost]:
results = []
# Step 1: language detection (CPU, ~3ms/post)
texts = [r.text for r in records]
langs = [self.lang_detector.predict(t)[0][0].replace('__label__', '')
for t in texts]
# Step 2: filter to supported languages (en, es, zh, ar, fr, de, pt, ru)
filtered = [(r, lang) for r, lang in zip(records, langs)
if lang in SUPPORTED_LANGUAGES]
if not filtered:
return []
# Step 3: NER (SpaCy, GPU pipeline, ~12ms/post)
docs = list(self.ner.pipe([r.text for r, _ in filtered], batch_size=32))
# Step 4: sentiment (DistilBERT INT8 ONNX, GPU, ~28ms for batch of 32)
batch_texts = [r.text for r, _ in filtered]
inputs = self.tokenizer(
batch_texts, return_tensors="pt",
truncation=True, max_length=128, # 128-token cap, not 512
padding=True,
)
with torch.no_grad():
logits = self.sentiment_model(**inputs).logits
probs = torch.softmax(logits, dim=1).cpu().numpy()
# Step 5: duplicate check (MinHash, ~1ms/post, see below)
for i, (record, lang) in enumerate(filtered):
is_dup, original_id = self.lsh.check_and_insert(record)
results.append(ProcessedPost(
record=record,
language=lang,
entities={
'people': [e.text for e in docs[i].ents if e.label_ == 'PERSON'],
'orgs': [e.text for e in docs[i].ents if e.label_ == 'ORG'],
'locations': [e.text for e in docs[i].ents if e.label_ in ('GPE', 'FAC')],
'events': [e.text for e in docs[i].ents if e.label_ == 'EVENT'],
},
sentiment_pos=float(probs[i][0]),
sentiment_neu=float(probs[i][1]),
sentiment_neg=float(probs[i][2]),
is_duplicate=is_dup,
original_post_id=original_id,
))
return resultsDuplicate detection: character 4-gram MinHash LSH
Posts are frequently reposted with minor modifications: “RT: …” prefixes, URL appended, first word changed. Word-level MinHash misses these because the set of word tokens changes too much. We use character 4-grams, which are stable under word substitution at the edges of a post. A 4-gram window over “election fraud happening” produces {elec, lect, ecti, ctio, tion, ion , on f, …} — a representation that barely changes if “happening” is swapped to “occurring.”
import hashlib
from datasketch import MinHash
CHAR_NGRAM_N = 4
NUM_PERMS = 128
# b=16 bands × r=8 rows → threshold ≈ (1/b)^(1/r) = (1/16)^(1/8) ≈ 0.85
# This is the standard b/r formula for the threshold at which P(candidate) = 0.5
LSH_BANDS = 16
LSH_ROWS = 8 # NUM_PERMS / LSH_BANDS
def text_to_minhash(text: str) -> MinHash:
"""Character 4-gram MinHash signature for a post text."""
m = MinHash(num_perm=NUM_PERMS)
text = text.lower().strip()
for i in range(len(text) - CHAR_NGRAM_N + 1):
gram = text[i : i + CHAR_NGRAM_N]
m.update(gram.encode('utf-8'))
return m
class PartitionedMinHashLSH:
"""
Distributed MinHash LSH backed by Redis.
Each Kafka partition's NLP worker maintains a local in-memory LRU cache
(60-minute TTL) AND writes band hashes to a shared Redis cluster.
The Redis layer catches cross-partition duplicates (e.g. same post
scraped from two different platform scrapers with different author IDs).
"""
def __init__(self, threshold: float, num_perm: int):
self.threshold = threshold
self.num_perm = num_perm
self.local_cache: dict[str, MinHash] = {} # post_id → MinHash
self.redis = redis.Redis(host=REDIS_HOST, decode_responses=False)
self.ttl = 3600 # 60 minutes
def check_and_insert(self, record: PostRecord) -> tuple[bool, str | None]:
sig = text_to_minhash(record.text)
# Check Redis for similar posts via band hashes
band_keys = self._band_keys(sig, record.platform)
for key in band_keys:
candidates = self.redis.smembers(key)
for cand_id in candidates:
cand_id_str = cand_id.decode()
if cand_id_str in self.local_cache:
j = sig.jaccard(self.local_cache[cand_id_str])
if j >= self.threshold:
return True, cand_id_str
# Not a duplicate — add to Redis and local cache
pipe = self.redis.pipeline()
for key in band_keys:
pipe.sadd(key, record.platform_id)
pipe.expire(key, self.ttl)
pipe.execute()
self.local_cache[record.platform_id] = sig
return False, None
def _band_keys(self, sig: MinHash, platform: Platform) -> list[str]:
"""Compute b=16 Redis keys, one per band. Keys are platform-scoped."""
hv = sig.hashvalues # numpy array, shape (128,)
keys = []
for b in range(LSH_BANDS):
band = hv[b * LSH_ROWS : (b + 1) * LSH_ROWS]
band_hash = hashlib.sha1(band.tobytes()).hexdigest()[:16]
keys.append(f"minhash:{platform.value}:b{b}:{band_hash}")
return keysThe b=16 bands × r=8 rows configuration gives a threshold of approximately 0.85 Jaccard similarity (derived from the S-curve formulaP ≈ 1 − (1 − s^r)^b). Posts need to share ~85% of their 4-gram vocabulary to be flagged as duplicates — tight enough to catch reposts and paraphrases, loose enough not to flag thematically similar posts about the same event. Empirically, ~7.4% of incoming posts are duplicates; stripping them reduces storage and avoids skewing platform-level sentiment counts.
TimescaleDB: batch COPY writes
Naïve INSERT throughput against TimescaleDB peaks at ~8K rows/second per connection with the hypertable overhead. We use PostgreSQL's binary COPY protocol instead — the same approach used in the Voidly measurement database — which bypasses the SQL parser and row-by-row validation entirely.
# posts table DDL
CREATE TABLE posts (
time TIMESTAMPTZ NOT NULL,
platform SMALLINT NOT NULL, -- Platform enum value
platform_id TEXT NOT NULL,
author_id TEXT,
text TEXT,
language CHAR(3),
sentiment_pos REAL,
sentiment_neg REAL,
sentiment_neu REAL,
entities JSONB,
is_duplicate BOOL NOT NULL DEFAULT false,
original_id TEXT, -- set if is_duplicate = true
PRIMARY KEY (time, platform, platform_id)
);
-- Hypertable: 1-day chunks, 4 space partitions by platform hash
SELECT create_hypertable(
'posts', 'time',
partitioning_column => 'platform',
number_partitions => 4,
chunk_time_interval => INTERVAL '1 day'
);
-- Indexes
CREATE INDEX idx_posts_platform ON posts (platform, time DESC);
CREATE INDEX idx_posts_author ON posts (author_id, time DESC);
CREATE INDEX idx_posts_entities ON posts USING GIN (entities);
-- Compression: segment by platform, order by time — 5.8× ratio on 30-day-old chunks
ALTER TABLE posts SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'platform',
timescaledb.compress_orderby = 'time DESC'
);
SELECT add_compression_policy('posts', INTERVAL '7 days');# Python batch writer using psycopg2 binary COPY
import io, struct, psycopg2
from psycopg2.extras import execute_values
class PostsBulkWriter:
"""
Accumulates ProcessedPost objects and flushes to TimescaleDB
via binary COPY when the batch reaches FLUSH_ROWS or FLUSH_INTERVAL_S.
Sustained write rate: ~43K rows/sec; peak (burst): ~110K rows/sec.
"""
FLUSH_ROWS = 5_000
FLUSH_INTERVAL = 2.0 # seconds
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
self.buffer: list[ProcessedPost] = []
self.last_flush = time.monotonic()
def add(self, posts: list[ProcessedPost]):
self.buffer.extend(posts)
if (len(self.buffer) >= self.FLUSH_ROWS or
time.monotonic() - self.last_flush > self.FLUSH_INTERVAL):
self.flush()
def flush(self):
if not self.buffer:
return
rows = [self._to_row(p) for p in self.buffer]
with self.conn.cursor() as cur:
# COPY FROM STDIN avoids per-row SQL parsing overhead
cur.copy_expert(
"COPY posts (time, platform, platform_id, author_id, text, "
"language, sentiment_pos, sentiment_neg, sentiment_neu, "
"entities, is_duplicate, original_id) FROM STDIN",
self._encode_binary(rows),
)
self.conn.commit()
self.buffer.clear()
self.last_flush = time.monotonic()
def _encode_binary(self, rows) -> io.BytesIO:
buf = io.BytesIO()
buf.write(b'PGCOPY
ÿ
') # binary COPY signature
buf.write(struct.pack('!II', 0, 0)) # flags + header extension
for row in rows:
buf.write(struct.pack('!H', len(row))) # field count
for field in row:
if field is None:
buf.write(struct.pack('!i', -1))
else:
data = _pg_encode(field)
buf.write(struct.pack('!i', len(data)))
buf.write(data)
buf.write(struct.pack('!H', 0xFFFF)) # file trailer
buf.seek(0)
return bufRedis hot layer: real-time queries
TimescaleDB is excellent for analytical queries across days or weeks but too slow for the sub-millisecond latency dashboards need. Redis stores the last 24 hours of data in three structures:
# NLP worker writes to both PostgreSQL and Redis after each flush
def update_redis(results: list[ProcessedPost]):
pipe = redis.pipeline(transaction=False)
for p in results:
ts = int(p.record.created_at.timestamp())
key_prefix = f"posts:{p.record.platform.value}"
# Sorted set: post IDs ordered by timestamp — supports range queries
pipe.zadd(f"{key_prefix}:ts", {p.record.platform_id: ts})
# Hash: post fields — fast point lookup by ID
pipe.hset(f"post:{p.record.platform_id}", mapping={
'text': p.record.text[:500], # truncate for memory
'lang': p.language,
'spos': round(p.sentiment_pos, 4),
'sneg': round(p.sentiment_neg, 4),
})
pipe.expire(f"post:{p.record.platform_id}", 86400)
# Real-time counters (no TTL — reset hourly by cron)
pipe.hincrby("stats:posts_by_platform", str(p.record.platform.value), 1)
label = 'pos' if p.sentiment_pos > 0.6 else ('neg' if p.sentiment_neg > 0.6 else 'neu')
pipe.hincrby(f"stats:sentiment_by_platform:{p.record.platform.value}", label, 1)
# Trim sorted sets to last 24h
cutoff = int(time.time()) - 86400
for platform in Platform:
pipe.zremrangebyscore(f"posts:{platform.value}:ts", 0, cutoff)
pipe.execute()Dashboard queries hit Redis first (24h), then fall through to TimescaleDB. Redis cluster: 3× r7g.large instances with cluster mode, ~180GB total working set.
Worker autoscaling on Kafka consumer lag
Steady-state throughput at 80 workers is 667 posts/sec. During breaking events, ingestion rate spikes to ~2,000 posts/sec. Rather than over-provision for peaks, we scale the NLP worker ASG on Kafka consumer lag.
# CloudWatch custom metric: kafka_consumer_lag
# Published every 60s by a lightweight lag-reporter Lambda
# (reads consumer group offsets via Kafka AdminClient)
# Scale-out policy — add 10 workers when lag > 500K
resource "aws_autoscaling_policy" "nlp_scale_out" {
name = "nlp-scale-out"
autoscaling_group_name = aws_autoscaling_group.nlp_workers.name
policy_type = "StepScaling"
step_adjustment {
scaling_adjustment = 10 # add 10 instances
metric_interval_lower_bound = 0 # lag > alarm threshold (500K)
metric_interval_upper_bound = 2000000
}
step_adjustment {
scaling_adjustment = 20 # add 20 instances if lag > 2M
metric_interval_lower_bound = 2000000
}
}
# Scale-in policy — remove 5 workers when lag < 50K for 10 minutes
resource "aws_autoscaling_policy" "nlp_scale_in" {
name = "nlp-scale-in"
autoscaling_group_name = aws_autoscaling_group.nlp_workers.name
policy_type = "StepScaling"
estimated_instance_warmup = 120 # T4 GPU init takes ~90s
step_adjustment {
scaling_adjustment = -5
metric_interval_upper_bound = 0
}
}Scale-out completes in ~6 minutes (EC2 launch + Docker pull + model load). The 7-day Kafka retention means the backlogged posts are safely buffered while additional workers come online — nothing is dropped.
Error handling and dead letter queue
About 0.8% of posts fail processing: malformed Unicode that breaks the tokenizer, text that is all emoji with no 4-gram content to hash, NER crashes on extremely long inputs. These go to posts_dlq rather than being silently dropped.
def process_with_dlq(consumer: KafkaConsumer, dlq: KafkaProducer):
for msg in consumer:
try:
record = PostRecord.from_protobuf(msg.value)
results = worker.process_batch([record])
writer.add(results)
except Exception as e:
# Increment retry counter in message headers
headers = dict(msg.headers or [])
retry_count = int(headers.get(b'retry', b'0'))
if retry_count < 3:
# Re-enqueue with exponential backoff metadata
dlq.send('posts_dlq', value=msg.value,
headers=[(b'retry', str(retry_count + 1).encode()),
(b'error', str(e)[:200].encode()),
(b'original_topic', b'social_media_posts')])
else:
# Exhausted retries — send to dead archive (S3)
s3.put_object(
Bucket='posts-dead-archive',
Key=f"{msg.topic}/{msg.partition}/{msg.offset}",
Body=msg.value,
)
metrics.increment('posts.dlq.count', tags={'error': type(e).__name__})The DLQ is monitored: a CloudWatch alarm fires if posts.dlq.countexceeds 500 in a 5-minute window. A separate DLQ-replay service processes it after manual investigation resolves the root cause.
Monitoring
Every NLP worker exposes a Prometheus endpoint on port 9090. Metrics scraped every 15 seconds, visualized in Grafana. Key alerting thresholds:
# Prometheus metrics exposed by NLP workers
posts_processed_total{platform="twitter"} 1847293
posts_processed_total{platform="reddit"} 892847
processing_duration_seconds{quantile="0.5"} 0.058
processing_duration_seconds{quantile="0.95"} 0.143
processing_duration_seconds{quantile="0.99"} 0.287
model_inference_ms{model="sentiment", quantile="0.5"} 28
model_inference_ms{model="ner", quantile="0.5"} 12
kafka_consumer_lag_total 142830
kafka_consumer_lag{partition="0"} 142
kafka_consumer_lag{partition="1"} 89
duplicate_rate_15m 0.074 # 7.4% duplicates
timescaledb_copy_rows_per_second 43200
timescaledb_copy_errors_total 3PagerDuty receives alerts on: consumer lag above 500K (scale-out trigger); p99 processing latency above 500ms; DLQ depth above 500; TimescaleDB COPY error rate above 0.1%. We have had zero unplanned downtime in 8 months.
Cost breakdown
Monthly AWS bill: $31,247.
- 80× g4dn.xlarge NLP workers (on-demand): $20,960 (67% of total)
- MSK cluster (3× kafka.m5.2xlarge): $3,240
- TimescaleDB r6g.2xlarge + 50 TB gp3: $2,419 + $3,750 = $6,169
- Redis cluster (3× r7g.large): $1,890
- Data transfer, NAT gateways, load balancers: $988
At ~1.73 billion posts per month, that is ~$0.000018 per post. The biggest lever is the NLP workers: switching from on-demand to Spot instances would cut that line item by ~70% ($6.3K/mo instead of $21K), but Spot interruptions would cause worker rebalances and 6-minute re-warm delays. For now we value latency consistency over savings.
What would I do differently?
Partition count was too low at launch. We started with 12 partitions and hit head-of-line blocking when TikTok scrapers produced faster than the 12 assigned workers could consume. Increasing to 24 partitions required a topic recreation and consumer group reset — a 4-hour maintenance window. We should have started higher.
Word-level MinHash was a mistake. We ran word-level MinHash for 6 months before switching to character 4-grams. The duplicate rate dropped from 4.1% to 7.4% after the switch — not because duplicates increased, but because we were missing ~3.3% of actual reposts that had word-level changes but near-identical 4-gram content.
COPY protocol should have been day one. We initially usedexecute_values (batch INSERT). Throughput was 8K rows/sec and we had regular TimescaleDB CPU spikes during flush. Switching to binary COPY improved throughput to 43K rows/sec and eliminated the spikes.
For the NLP model internals — DistilBERT INT8 quantization, SpaCy NER training, and the MinHash coordinated-campaign detector: NLP pipeline for real-time sentiment analysis at scale →
For the TimescaleDB architecture that both this pipeline and Voidly share — continuous aggregates, columnar compression, and the COPY write path: Voidly's measurement database: 2.2B probe results in TimescaleDB →
For how ingestion, normalization, and quality filtering work on the Voidly probe side: Voidly's probe-to-dataset ingest pipeline: normalization, quality filtering, and TimescaleDB indexing →
For the election-specific data engineering pipeline built on this infrastructure — AP Election API, Kafka precinct results, state scrapers, and FIPS normalization: Election data pipeline: AP feeds, Kafka precinct results, and state scraper normalization →