Technical writing
Voidly's Probe-to-Dataset Ingest Pipeline: Normalization, Quality Filtering, and TimescaleDB Indexing
Every censorship measurement that appears in Voidly's public dataset or in the real-time anomaly detector starts as a batch of raw bytes uploaded by a probe application running on an operator's device. The path from those bytes to a queryable TimescaleDB record involves six stages: upload over QUIC, validation in a Cloudflare Worker, Kafka fan-out, Rust-based normalization, quality filtering, and finally bulk insert plus async classification. This article walks through each stage with implementation detail.
Pipeline overview
Probe device (Tauri app, boringtun WireGuard)
│
│ protobuf batch over QUIC/443 (domain-fronted)
│ ~200KB per upload, every 60 seconds
▼
Cloudflare Worker (ingest.voidly.ai)
│ – Authenticate probe (Ed25519 device certificate)
│ – Schema validation (required fields present)
│ – Rate limiting (per probe: max 2 uploads/min)
│ – Queue to Cloudflare Queue → forwards to Kafka bridge
▼
Kafka (3 brokers, topic: raw_measurements, 12 partitions)
│ 7-day retention, lz4 compression
├──▶ Rust normalization consumer (primary path)
└──▶ Raw archival consumer (stores original bytes to S3)
▼
Rust normalization consumer
│ – Deserialize protobuf
│ – Normalize schema across probe versions
│ – Enrich (GeoIP, ASN lookup, domain canonicalization)
│ – Quality filter (drop invalid measurements)
│ – Bulk COPY to TimescaleDB (batches of 1,000)
▼
TimescaleDB (measurements table)
│
├──▶ Async classifier (sets interference_type / confidence_tier)
├──▶ Continuous aggregate refresh (hourly)
└──▶ Nightly Parquet export → HuggingFace (CC BY 4.0)Wire protocol: protobuf over QUIC
Probes send measurement batches using Protocol Buffers version 3 over QUIC/443. QUIC was chosen over TCP for two reasons: (1) QUIC connections are multiplexed without head-of-line blocking, so a slow batch upload does not delay the next batch; (2) QUIC uses TLS 1.3 by default and is harder to fingerprint than a TLS-over-TCP connection (the ciphertext of QUIC datagrams looks identical to UDP/443 traffic from other QUIC applications).
// probe_upload.proto
syntax = "proto3";
message MeasurementBatch {
string probe_id = 1; // SHA-256 of device certificate public key
bytes device_sig = 2; // Ed25519 signature over batch_hash
bytes batch_hash = 3; // SHA-256 of measurements bytes
int64 batch_seq = 4; // monotonic, per-probe; allows dedup on server
string probe_version = 5; // semver "0.9.2"
repeated Measurement measurements = 6;
}
message Measurement {
// Required in all versions
int64 measured_at_unix_ms = 1;
string target_url = 2;
string test_protocol = 3; // "dns"|"tcp"|"tls"|"http"|"https"
int32 vantage_asn = 4;
string vantage_country = 5; // ISO 3166-1 alpha-2
// Optional (added across versions — always check has_field)
repeated string dns_addrs = 10;
string dns_error_code = 11;
bool tcp_connected = 20;
int32 tcp_connect_ms = 21;
bool tls_ok = 30;
bool tls_cert_valid = 31;
int32 tls_alert_code = 32;
int32 http_status = 40;
bytes http_body_sha = 41;
bool control_ok = 50;
}The batch signature allows the Cloudflare Worker to verify probe authenticity without storing any secret on the edge. The device certificate (issued by the Voidly Fleet CA during probe registration) contains the probe's Ed25519 public key. The Worker holds the Fleet CA public key and verifies the chain: device_cert → Fleet CA → trusted.
Cloudflare Worker: validation layer
// ingest worker (TypeScript)
export default {
async fetch(request: Request, env: Env): Promise<Response> {
if (request.method !== 'POST' || !request.url.endsWith('/ingest')) {
return new Response('Not found', { status: 404 });
}
const body = new Uint8Array(await request.arrayBuffer());
const batch = MeasurementBatch.decode(body);
// 1. Authenticate probe
const cert = await loadDeviceCert(batch.probeId, env.KV);
if (!cert) return new Response('Unknown probe', { status: 401 });
const sigValid = await verifyEd25519(
batch.deviceSig,
batch.batchHash,
cert.publicKeyBytes
);
if (!sigValid) return new Response('Invalid signature', { status: 401 });
// 2. Check batch sequence (deduplication)
const lastSeq = await env.KV.get('seq:' + batch.probeId, 'json') ?? -1;
if (batch.batchSeq <= lastSeq) {
return new Response('Duplicate batch', { status: 200 }); // 200, not error
}
await env.KV.put('seq:' + batch.probeId, String(batch.batchSeq));
// 3. Validate schema (required fields)
for (const m of batch.measurements) {
if (!m.measuredAtUnixMs || !m.targetUrl || !m.testProtocol) {
return new Response('Invalid measurement', { status: 400 });
}
}
// 4. Rate limiting
const rateKey = 'rate:' + batch.probeId + ':' + Math.floor(Date.now() / 60000);
const calls = await env.KV.get(rateKey, 'json') ?? 0;
if (calls >= 2) return new Response('Rate limited', { status: 429 });
await env.KV.put(rateKey, String(calls + 1), { expirationTtl: 120 });
// 5. Enqueue to Cloudflare Queue → Kafka bridge
await env.MEASUREMENT_QUEUE.send({ batch: Array.from(body) });
return new Response('OK', { status: 202 });
}
}The batch sequence number stored in Cloudflare KV provides deduplication for retries: if a probe's QUIC connection is interrupted mid-upload, it retries the batch on reconnect. Without sequence tracking the same batch would be inserted twice. We return 200 (not an error) for duplicate sequences so the probe stops retrying.
Kafka: fan-out and archival
The Cloudflare Queue delivers batches to a Kafka bridge (a small Node.js process running on EC2 that receives from Cloudflare Queues via HTTP polling and produces to Kafka). Two consumer groups read from the same Kafka topic:
- normalization-consumer: the primary processing path, parses, normalizes, and inserts to TimescaleDB
- raw-archival-consumer: stores the original protobuf bytes verbatim to S3 (
s3://voidly-raw/{year}/{month}/{day}/{probe_id}/{batch_seq}.pb.gz) before any normalization. This allows reprocessing from original bytes if normalization logic changes.
Rust normalization consumer
The normalization consumer is written in Rust and handles three tasks: deserialization, enrichment, and quality filtering.
Schema drift across probe versions
Probe versions are not guaranteed to be uniform across the fleet. Operators update at their own pace. The protobuf format is backward-compatible (new fields are optional), but the normalization layer must handle missing optional fields gracefully:
struct NormalizedMeasurement {
// Fields present in all versions (v0.1+)
measured_at: DateTime<Utc>,
target_url: String,
target_domain: String, // extracted from URL
test_protocol: Protocol,
vantage_asn: i32,
vantage_country: CountryCode,
probe_id: String,
probe_version: Version,
// Added in v0.3 — None if probe is older
tcp_connect_ms: Option<i16>,
// Added in v0.5 — None if probe is older
tls_alert_code: Option<i16>,
tls_cert_sni: Option<String>,
// Added in v0.7 — None if probe is older
http_body_sha256: Option<[u8; 32]>,
// Enriched fields (always present, derived server-side)
target_ip_geo: Option<CountryCode>, // GeoIP of first dns_addrs entry
target_asn: Option<i32>,
control_reachable: bool,
}
fn normalize(raw: &proto::Measurement, probe_ver: &Version) -> NormalizedMeasurement {
NormalizedMeasurement {
measured_at: DateTime::from_timestamp_millis(raw.measured_at_unix_ms)
.unwrap_or_else(|| Utc::now()), // clock skew protection: cap at now()
target_domain: extract_domain(&raw.target_url),
tcp_connect_ms: if probe_ver >= &Version::parse("0.3.0").unwrap() {
raw.tcp_connect_ms.map(|ms| ms.min(i16::MAX as i32) as i16)
} else {
None
},
// ...
}
}Clock skew is a real problem with probes operating in censored environments. A probe behind Tor or a VPN may have its clock desynchronized. We capmeasured_at at the server's current time to prevent future-dated measurements from corrupting continuous aggregate windows. Measurements more than 48 hours in the past are still accepted (late-uploading probes) but flagged with a late_arrival quality annotation.
Enrichment
struct EnrichmentPipeline {
geoip: MaxmindReader, // GeoLite2-City, updated weekly
asn_db: MaxmindReader, // GeoLite2-ASN, updated weekly
domain_psl: PublicSuffixList, // Mozilla PSL for registrable domain extraction
}
impl EnrichmentPipeline {
fn enrich(&self, m: &mut NormalizedMeasurement) {
// GeoIP + ASN lookup for first resolved IP
if let Some(ip) = m.dns_addrs.first() {
m.target_ip_geo = self.geoip.lookup(ip).map(|r| r.country_code);
m.target_asn = self.asn_db.lookup(ip).map(|r| r.autonomous_system_number);
}
// Extract registrable domain (e.g., "www.bbc.co.uk" → "bbc.co.uk")
m.target_registrable = self.domain_psl
.registrable_domain(&m.target_domain)
.map(|d| d.to_string());
// Canonicalize country code (some probes send "UK" instead of "GB")
m.vantage_country = normalize_country_code(m.vantage_country);
}
}Quality filtering
Not all measurements are usable. We filter at ingest time and set theinference_dropped column (rather than discarding) so that filtering decisions are auditable:
fn quality_check(m: &NormalizedMeasurement) -> Option<&'static str> {
// Control server unreachable: can't distinguish censorship from probe failure
if !m.control_reachable {
return Some("control_unreachable");
}
// Bogon target IP: measurement hit an RFC-1918 or loopback address
if m.dns_addrs.iter().any(is_bogon) && !m.dns_addrs.iter().any(is_public) {
return Some("bogon_resolution_only");
}
// Clock skew: measured_at more than 48h in the past (late upload accepted
// but not used for real-time anomaly detection)
if Utc::now() - m.measured_at > Duration::hours(48) {
return Some("late_arrival_gt48h");
}
// Empty URL: happens with some older probe versions on parse error
if m.target_url.is_empty() {
return Some("empty_target_url");
}
// Probe blacklisted (operator revoked or probe flagged as compromised)
if REVOKED_PROBES.contains(&m.probe_id.as_str()) {
return Some("probe_revoked");
}
None // passes quality check
}In production, about 3.2% of ingested measurements are quality-filtered. The most common reason is control_unreachable (2.1%) — probes in highly censored environments sometimes cannot reach our control server even when they can reach the measurement target, making interference determination unreliable.
Handling election-night spikes
Major censorship events — elections, protests, national crises — produce measurement spikes as: (a) more probe operators are active, (b) existing probes increase their measurement frequency automatically (via the urgent-injection scheduler), and (c) previously-scheduled measurements coincide with the event. During the 2024 Myanmar election the pipeline handled 8× normal throughput for 3 hours.
The pipeline handles spikes at three points:
- Cloudflare Queue: absorbs burst upstream — Cloudflare Queues have no advertised throughput limit and are effectively infinite buffers from the probe perspective
- Kafka: provides backpressure buffer; consumer lag increases during spikes but data is safe in the 7-day retention window
- normalization consumer: auto-scales (EC2 Auto Scaling Group, target tracking on Kafka consumer lag metric); additional instances spin up within 90 seconds when lag exceeds 100,000 messages
# CloudWatch metric alarm: scale out when Kafka lag > 100K
resource "aws_cloudwatch_metric_alarm" "consumer_lag" {
alarm_name = "voidly-consumer-lag-high"
metric_name = "kafka_consumer_lag_sum"
comparison_operator = "GreaterThanThreshold"
threshold = 100000
evaluation_periods = 2
period = 60
alarm_actions = [aws_autoscaling_policy.scale_out.arn]
}
# Typical spike handling:
# t+0m: spike begins, lag starts rising
# t+2m: alarm fires (2× 60s evaluation periods)
# t+4m: new instance starts (AMI baked, ~90s launch)
# t+6m: new instance consuming, lag begins clearing
# t+60m: spike ends, scale-in after 60min cooldownNightly HuggingFace export
Every night at 02:00 UTC, a Parquet export job runs:
-- Export yesterday's measurements to Parquet (via pg_parquet extension)
COPY (
SELECT
measurement_id, probe_id, vantage_country, vantage_asn,
measured_at, target_url, target_domain, test_protocol,
interference_type, interference_prob, confidence_tier,
dns_resolved, tcp_connected, tls_handshake_ok, http_status_code,
control_reachable, inference_dropped, probe_version
FROM measurements
WHERE measured_at >= CURRENT_DATE - INTERVAL '1 day'
AND measured_at < CURRENT_DATE
-- Exclude quality-filtered rows from public export
-- (they are available in the raw S3 archive for researchers)
-- Actually: include them with inference_dropped set, so users can filter
)
TO '/tmp/measurements_{date}/part-{n}.parquet'
WITH (FORMAT parquet, COMPRESSION 'zstd', ROW_GROUP_SIZE 100000);
-- Upload to HuggingFace via huggingface_hub Python API
# Partitioned as country_code={CC}/year_month={YYYY-MM}/part-{n}.parquet
# Each country×month parquet is ~5–50MB depending on probe densityThe export is partitioned by vantage_country and year_monthso that users can download only the data they need (e.g., only Iran and Russia for the last 6 months) without fetching the full 2.2B-row dataset.
End-to-end latency
Stage p50 p99 ────────────────────────────────────────────────────────────────── Probe upload (QUIC, ~200KB batch) 1.2s 4.1s Cloudflare Worker processing 8ms 45ms Cloudflare Queue → Kafka bridge 12s 40s (polling) Kafka producer → TimescaleDB insert 18s 62s (batch accumulation) ────────────────────────────────────────────────────────────────── Probe measurement → database row ~32s ~110s (normal load) Probe measurement → anomaly detection ~38s ~130s (+classifier run) Probe measurement → public dataset ~26hr ~27hr (nightly export)
The 12-second Cloudflare Queue polling latency is the dominant bottleneck in the ingest path. Cloudflare Queues are pull-based; the bridge polls every 10 seconds. Switching to a push-based delivery mechanism (Cloudflare Queues Workers Trigger) would reduce this to near-zero, and is on the roadmap for the next pipeline revision.
For the TimescaleDB schema that receives these measurements and the continuous aggregates built on top of them: Voidly's measurement database: 2.2B probe results in TimescaleDB →
For the Voidly measurement dataset schema — the field definitions that correspond to the protobuf fields documented here: The Voidly measurement dataset: field-by-field schema reference →
For the probe architecture that generates the measurement batches this pipeline ingests, including the QUIC transport and Ed25519 device certificates: The Voidly Probe: Tauri + boringtun network measurement at the operator's edge →
For how BGP routing data from RIPE NCC RIS, RouteViews, and bgp.tools is ingested and joined to probe measurements — MRT parsing, withdrawal detection, and bgp_outage_score computation: Voidly BGP data ingestion: parsing MRT dumps, detecting prefix withdrawals, and computing country outage scores →
For the continuous aggregate layer built on top of the TimescaleDB rows this pipeline writes — 4.1s queries reduced to 4ms: Voidly's TimescaleDB continuous aggregates: pre-aggregating 2.2B probe measurements for fast queries →