Technical writing

Voidly's Probe-to-Dataset Ingest Pipeline: Normalization, Quality Filtering, and TimescaleDB Indexing

· 15 min read· AI Analytics
CensorshipVoidlyInfrastructureData pipelineKafka

Every censorship measurement that appears in Voidly's public dataset or in the real-time anomaly detector starts as a batch of raw bytes uploaded by a probe application running on an operator's device. The path from those bytes to a queryable TimescaleDB record involves six stages: upload over QUIC, validation in a Cloudflare Worker, Kafka fan-out, Rust-based normalization, quality filtering, and finally bulk insert plus async classification. This article walks through each stage with implementation detail.

Pipeline overview

Probe device (Tauri app, boringtun WireGuard)
       │
       │  protobuf batch over QUIC/443 (domain-fronted)
       │  ~200KB per upload, every 60 seconds
       ▼
Cloudflare Worker (ingest.voidly.ai)
       │  – Authenticate probe (Ed25519 device certificate)
       │  – Schema validation (required fields present)
       │  – Rate limiting (per probe: max 2 uploads/min)
       │  – Queue to Cloudflare Queue → forwards to Kafka bridge
       ▼
Kafka (3 brokers, topic: raw_measurements, 12 partitions)
       │  7-day retention, lz4 compression
       ├──▶ Rust normalization consumer (primary path)
       └──▶ Raw archival consumer (stores original bytes to S3)
       ▼
Rust normalization consumer
       │  – Deserialize protobuf
       │  – Normalize schema across probe versions
       │  – Enrich (GeoIP, ASN lookup, domain canonicalization)
       │  – Quality filter (drop invalid measurements)
       │  – Bulk COPY to TimescaleDB (batches of 1,000)
       ▼
TimescaleDB (measurements table)
       │
       ├──▶ Async classifier (sets interference_type / confidence_tier)
       ├──▶ Continuous aggregate refresh (hourly)
       └──▶ Nightly Parquet export → HuggingFace (CC BY 4.0)

Wire protocol: protobuf over QUIC

Probes send measurement batches using Protocol Buffers version 3 over QUIC/443. QUIC was chosen over TCP for two reasons: (1) QUIC connections are multiplexed without head-of-line blocking, so a slow batch upload does not delay the next batch; (2) QUIC uses TLS 1.3 by default and is harder to fingerprint than a TLS-over-TCP connection (the ciphertext of QUIC datagrams looks identical to UDP/443 traffic from other QUIC applications).

// probe_upload.proto
syntax = "proto3";

message MeasurementBatch {
    string probe_id       = 1;   // SHA-256 of device certificate public key
    bytes  device_sig     = 2;   // Ed25519 signature over batch_hash
    bytes  batch_hash     = 3;   // SHA-256 of measurements bytes
    int64  batch_seq      = 4;   // monotonic, per-probe; allows dedup on server
    string probe_version  = 5;   // semver "0.9.2"

    repeated Measurement measurements = 6;
}

message Measurement {
    // Required in all versions
    int64  measured_at_unix_ms = 1;
    string target_url          = 2;
    string test_protocol       = 3;   // "dns"|"tcp"|"tls"|"http"|"https"
    int32  vantage_asn         = 4;
    string vantage_country     = 5;   // ISO 3166-1 alpha-2

    // Optional (added across versions — always check has_field)
    repeated string dns_addrs       = 10;
    string          dns_error_code  = 11;
    bool            tcp_connected   = 20;
    int32           tcp_connect_ms  = 21;
    bool            tls_ok          = 30;
    bool            tls_cert_valid  = 31;
    int32           tls_alert_code  = 32;
    int32           http_status     = 40;
    bytes           http_body_sha   = 41;
    bool            control_ok      = 50;
}

The batch signature allows the Cloudflare Worker to verify probe authenticity without storing any secret on the edge. The device certificate (issued by the Voidly Fleet CA during probe registration) contains the probe's Ed25519 public key. The Worker holds the Fleet CA public key and verifies the chain: device_cert → Fleet CA → trusted.

Cloudflare Worker: validation layer

// ingest worker (TypeScript)
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    if (request.method !== 'POST' || !request.url.endsWith('/ingest')) {
      return new Response('Not found', { status: 404 });
    }

    const body = new Uint8Array(await request.arrayBuffer());
    const batch = MeasurementBatch.decode(body);

    // 1. Authenticate probe
    const cert = await loadDeviceCert(batch.probeId, env.KV);
    if (!cert) return new Response('Unknown probe', { status: 401 });

    const sigValid = await verifyEd25519(
      batch.deviceSig,
      batch.batchHash,
      cert.publicKeyBytes
    );
    if (!sigValid) return new Response('Invalid signature', { status: 401 });

    // 2. Check batch sequence (deduplication)
    const lastSeq = await env.KV.get('seq:' + batch.probeId, 'json') ?? -1;
    if (batch.batchSeq <= lastSeq) {
      return new Response('Duplicate batch', { status: 200 });  // 200, not error
    }
    await env.KV.put('seq:' + batch.probeId, String(batch.batchSeq));

    // 3. Validate schema (required fields)
    for (const m of batch.measurements) {
      if (!m.measuredAtUnixMs || !m.targetUrl || !m.testProtocol) {
        return new Response('Invalid measurement', { status: 400 });
      }
    }

    // 4. Rate limiting
    const rateKey = 'rate:' + batch.probeId + ':' + Math.floor(Date.now() / 60000);
    const calls = await env.KV.get(rateKey, 'json') ?? 0;
    if (calls >= 2) return new Response('Rate limited', { status: 429 });
    await env.KV.put(rateKey, String(calls + 1), { expirationTtl: 120 });

    // 5. Enqueue to Cloudflare Queue → Kafka bridge
    await env.MEASUREMENT_QUEUE.send({ batch: Array.from(body) });

    return new Response('OK', { status: 202 });
  }
}

The batch sequence number stored in Cloudflare KV provides deduplication for retries: if a probe's QUIC connection is interrupted mid-upload, it retries the batch on reconnect. Without sequence tracking the same batch would be inserted twice. We return 200 (not an error) for duplicate sequences so the probe stops retrying.

Kafka: fan-out and archival

The Cloudflare Queue delivers batches to a Kafka bridge (a small Node.js process running on EC2 that receives from Cloudflare Queues via HTTP polling and produces to Kafka). Two consumer groups read from the same Kafka topic:

  • normalization-consumer: the primary processing path, parses, normalizes, and inserts to TimescaleDB
  • raw-archival-consumer: stores the original protobuf bytes verbatim to S3 (s3://voidly-raw/{year}/{month}/{day}/{probe_id}/{batch_seq}.pb.gz) before any normalization. This allows reprocessing from original bytes if normalization logic changes.

Rust normalization consumer

The normalization consumer is written in Rust and handles three tasks: deserialization, enrichment, and quality filtering.

Schema drift across probe versions

Probe versions are not guaranteed to be uniform across the fleet. Operators update at their own pace. The protobuf format is backward-compatible (new fields are optional), but the normalization layer must handle missing optional fields gracefully:

struct NormalizedMeasurement {
    // Fields present in all versions (v0.1+)
    measured_at:       DateTime<Utc>,
    target_url:        String,
    target_domain:     String,    // extracted from URL
    test_protocol:     Protocol,
    vantage_asn:       i32,
    vantage_country:   CountryCode,
    probe_id:          String,
    probe_version:     Version,

    // Added in v0.3 — None if probe is older
    tcp_connect_ms:    Option<i16>,

    // Added in v0.5 — None if probe is older
    tls_alert_code:    Option<i16>,
    tls_cert_sni:      Option<String>,

    // Added in v0.7 — None if probe is older
    http_body_sha256:  Option<[u8; 32]>,

    // Enriched fields (always present, derived server-side)
    target_ip_geo:     Option<CountryCode>,  // GeoIP of first dns_addrs entry
    target_asn:        Option<i32>,
    control_reachable: bool,
}

fn normalize(raw: &proto::Measurement, probe_ver: &Version) -> NormalizedMeasurement {
    NormalizedMeasurement {
        measured_at: DateTime::from_timestamp_millis(raw.measured_at_unix_ms)
            .unwrap_or_else(|| Utc::now()),   // clock skew protection: cap at now()
        target_domain: extract_domain(&raw.target_url),
        tcp_connect_ms: if probe_ver >= &Version::parse("0.3.0").unwrap() {
            raw.tcp_connect_ms.map(|ms| ms.min(i16::MAX as i32) as i16)
        } else {
            None
        },
        // ...
    }
}

Clock skew is a real problem with probes operating in censored environments. A probe behind Tor or a VPN may have its clock desynchronized. We capmeasured_at at the server's current time to prevent future-dated measurements from corrupting continuous aggregate windows. Measurements more than 48 hours in the past are still accepted (late-uploading probes) but flagged with a late_arrival quality annotation.

Enrichment

struct EnrichmentPipeline {
    geoip:      MaxmindReader,    // GeoLite2-City, updated weekly
    asn_db:     MaxmindReader,    // GeoLite2-ASN, updated weekly
    domain_psl: PublicSuffixList, // Mozilla PSL for registrable domain extraction
}

impl EnrichmentPipeline {
    fn enrich(&self, m: &mut NormalizedMeasurement) {
        // GeoIP + ASN lookup for first resolved IP
        if let Some(ip) = m.dns_addrs.first() {
            m.target_ip_geo = self.geoip.lookup(ip).map(|r| r.country_code);
            m.target_asn = self.asn_db.lookup(ip).map(|r| r.autonomous_system_number);
        }

        // Extract registrable domain (e.g., "www.bbc.co.uk" → "bbc.co.uk")
        m.target_registrable = self.domain_psl
            .registrable_domain(&m.target_domain)
            .map(|d| d.to_string());

        // Canonicalize country code (some probes send "UK" instead of "GB")
        m.vantage_country = normalize_country_code(m.vantage_country);
    }
}

Quality filtering

Not all measurements are usable. We filter at ingest time and set theinference_dropped column (rather than discarding) so that filtering decisions are auditable:

fn quality_check(m: &NormalizedMeasurement) -> Option<&'static str> {
    // Control server unreachable: can't distinguish censorship from probe failure
    if !m.control_reachable {
        return Some("control_unreachable");
    }

    // Bogon target IP: measurement hit an RFC-1918 or loopback address
    if m.dns_addrs.iter().any(is_bogon) && !m.dns_addrs.iter().any(is_public) {
        return Some("bogon_resolution_only");
    }

    // Clock skew: measured_at more than 48h in the past (late upload accepted
    // but not used for real-time anomaly detection)
    if Utc::now() - m.measured_at > Duration::hours(48) {
        return Some("late_arrival_gt48h");
    }

    // Empty URL: happens with some older probe versions on parse error
    if m.target_url.is_empty() {
        return Some("empty_target_url");
    }

    // Probe blacklisted (operator revoked or probe flagged as compromised)
    if REVOKED_PROBES.contains(&m.probe_id.as_str()) {
        return Some("probe_revoked");
    }

    None  // passes quality check
}

In production, about 3.2% of ingested measurements are quality-filtered. The most common reason is control_unreachable (2.1%) — probes in highly censored environments sometimes cannot reach our control server even when they can reach the measurement target, making interference determination unreliable.

Handling election-night spikes

Major censorship events — elections, protests, national crises — produce measurement spikes as: (a) more probe operators are active, (b) existing probes increase their measurement frequency automatically (via the urgent-injection scheduler), and (c) previously-scheduled measurements coincide with the event. During the 2024 Myanmar election the pipeline handled 8× normal throughput for 3 hours.

The pipeline handles spikes at three points:

  • Cloudflare Queue: absorbs burst upstream — Cloudflare Queues have no advertised throughput limit and are effectively infinite buffers from the probe perspective
  • Kafka: provides backpressure buffer; consumer lag increases during spikes but data is safe in the 7-day retention window
  • normalization consumer: auto-scales (EC2 Auto Scaling Group, target tracking on Kafka consumer lag metric); additional instances spin up within 90 seconds when lag exceeds 100,000 messages
# CloudWatch metric alarm: scale out when Kafka lag > 100K
resource "aws_cloudwatch_metric_alarm" "consumer_lag" {
  alarm_name          = "voidly-consumer-lag-high"
  metric_name         = "kafka_consumer_lag_sum"
  comparison_operator = "GreaterThanThreshold"
  threshold           = 100000
  evaluation_periods  = 2
  period              = 60
  alarm_actions       = [aws_autoscaling_policy.scale_out.arn]
}

# Typical spike handling:
# t+0m:  spike begins, lag starts rising
# t+2m:  alarm fires (2× 60s evaluation periods)
# t+4m:  new instance starts (AMI baked, ~90s launch)
# t+6m:  new instance consuming, lag begins clearing
# t+60m: spike ends, scale-in after 60min cooldown

Nightly HuggingFace export

Every night at 02:00 UTC, a Parquet export job runs:

-- Export yesterday's measurements to Parquet (via pg_parquet extension)
COPY (
    SELECT
        measurement_id, probe_id, vantage_country, vantage_asn,
        measured_at, target_url, target_domain, test_protocol,
        interference_type, interference_prob, confidence_tier,
        dns_resolved, tcp_connected, tls_handshake_ok, http_status_code,
        control_reachable, inference_dropped, probe_version
    FROM measurements
    WHERE measured_at >= CURRENT_DATE - INTERVAL '1 day'
      AND measured_at < CURRENT_DATE
    -- Exclude quality-filtered rows from public export
    -- (they are available in the raw S3 archive for researchers)
    -- Actually: include them with inference_dropped set, so users can filter
)
TO '/tmp/measurements_{date}/part-{n}.parquet'
WITH (FORMAT parquet, COMPRESSION 'zstd', ROW_GROUP_SIZE 100000);

-- Upload to HuggingFace via huggingface_hub Python API
# Partitioned as country_code={CC}/year_month={YYYY-MM}/part-{n}.parquet
# Each country×month parquet is ~5–50MB depending on probe density

The export is partitioned by vantage_country and year_monthso that users can download only the data they need (e.g., only Iran and Russia for the last 6 months) without fetching the full 2.2B-row dataset.

End-to-end latency

Stage                                       p50     p99
──────────────────────────────────────────────────────────────────
Probe upload (QUIC, ~200KB batch)           1.2s    4.1s
Cloudflare Worker processing                8ms     45ms
Cloudflare Queue → Kafka bridge             12s     40s   (polling)
Kafka producer → TimescaleDB insert         18s     62s   (batch accumulation)
──────────────────────────────────────────────────────────────────
Probe measurement → database row            ~32s    ~110s (normal load)
Probe measurement → anomaly detection       ~38s    ~130s (+classifier run)
Probe measurement → public dataset          ~26hr   ~27hr (nightly export)

The 12-second Cloudflare Queue polling latency is the dominant bottleneck in the ingest path. Cloudflare Queues are pull-based; the bridge polls every 10 seconds. Switching to a push-based delivery mechanism (Cloudflare Queues Workers Trigger) would reduce this to near-zero, and is on the roadmap for the next pipeline revision.


For the TimescaleDB schema that receives these measurements and the continuous aggregates built on top of them: Voidly's measurement database: 2.2B probe results in TimescaleDB →

For the Voidly measurement dataset schema — the field definitions that correspond to the protobuf fields documented here: The Voidly measurement dataset: field-by-field schema reference →

For the probe architecture that generates the measurement batches this pipeline ingests, including the QUIC transport and Ed25519 device certificates: The Voidly Probe: Tauri + boringtun network measurement at the operator's edge →

For how BGP routing data from RIPE NCC RIS, RouteViews, and bgp.tools is ingested and joined to probe measurements — MRT parsing, withdrawal detection, and bgp_outage_score computation: Voidly BGP data ingestion: parsing MRT dumps, detecting prefix withdrawals, and computing country outage scores →

For the continuous aggregate layer built on top of the TimescaleDB rows this pipeline writes — 4.1s queries reduced to 4ms: Voidly's TimescaleDB continuous aggregates: pre-aggregating 2.2B probe measurements for fast queries →