Technical writing

Voidly's incident publication pipeline: from classifier output to verified PostgreSQL incident record and Kafka alert fan-out

· 14 min read· AI Analytics
CensorshipVoidlyInfrastructureData engineering

The Voidly real-time pipeline receives ~50,000 probe measurements per hour. Most are clean — no anomaly. A fraction trigger the inline anomaly scorer, which runs the XGBoost classifier at sub-millisecond latency and emits a structured anomaly event. Getting from that event to a published, verified censorship incident — complete with a stable incident ID, a state machine tracking its lifecycle, cross-source corroboration, and an alert to subscribers — is the job of the incident publication pipeline.

This article covers the pipeline's structure: the Rust IncidentStateMachine, the PostgreSQL incident store schema, the idempotency design that makes incident IDs stable across retroactive CensoredPlanet reprocessing, and the Kafka fan-out that triggers alert delivery and REST API cache invalidation.

The IncidentStateMachine

Every censorship incident passes through five states. State transitions are triggered by incoming measurements and external corroboration signals, not by timers (except for the RESOLVED → re-open check, which uses the 12-hour window described in the incident resolution article).

use std::collections::HashSet;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IncidentState {
    /// Single probe anomaly; not yet corroborated.
    Anomaly,
    /// 2+ probes from 2+ ASNs in the same country confirm the anomaly.
    MultiSourceAnomaly,
    /// External corroboration from OONI or CensoredPlanet raises confidence.
    Corroborated,
    /// Independent corroboration score >= 0.80; highest confidence tier.
    Verified,
    /// Blocking rate dropped below threshold; incident considered resolved.
    Resolved {
        resolution_method: ResolutionMethod,
        resolved_at: chrono::DateTime<chrono::Utc>,
    },
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResolutionMethod {
    /// 80%+ of probes now see accessible; consecutive passing measurements met.
    ProbeRecovery,
    /// Retroactive CensoredPlanet data shows domain never blocked.
    RetroactiveCorrection,
    /// Manual analyst review determined false positive.
    AnalystOverride,
}

#[derive(Debug)]
pub struct StateTransition {
    pub from_state: IncidentState,
    pub to_state: IncidentState,
    pub trigger: TransitionTrigger,
    pub transitioned_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug)]
pub enum TransitionTrigger {
    NewAnomalyMeasurement { probe_id: String, asn: u32 },
    CorroborationUpdate { source: CorroborationSource, score: f32 },
    RecoveryMeasurement { probe_id: String },
    RetroactiveCpData { cp_measurement_id: String },
}

#[derive(Debug)]
pub enum CorroborationSource { Ooni, CensoredPlanet, Ioda }

State transition thresholds

TransitionThresholdTypical latency
Anomaly → MultiSourceAnomaly≥ 3 probe measurements, ≥ 2 distinct ASNs, same country5–15 minutes
MultiSourceAnomaly → Corroboratedcorroboration_score ≥ 0.50 from any external source15–60 minutes (OONI poll cadence)
Corroborated → Verifiedcorroboration_score ≥ 0.80; ≥ 2 independent sources30 min – 6 hours
Verified → ResolvedPer-type threshold: DNS 4, HTTP 4, TLS 3 consecutive passingHours – days
Resolved → MultiSourceAnomalyNew anomaly within 12-hour re-open windowImmediate on trigger

PostgreSQL incident store schema

The incident store is a PostgreSQL 16 database with a TimescaleDB extension for the time-series event log. The schema separates the mutable incident summary (updated in place) from the append-only event log (immutable audit trail).

-- Mutable incident summary: one row per incident
CREATE TABLE incidents (
    incident_id         TEXT        PRIMARY KEY,
    -- Format: '{country_code}:{domain_hash8}:{interference_type}:{epoch_day}'
    -- Example: 'RU:a3f1c82d:DNS_TAMPER:20241224'
    country_code        TEXT        NOT NULL,
    domain              TEXT        NOT NULL,
    interference_type   TEXT        NOT NULL,  -- from InterferenceType enum
    state               TEXT        NOT NULL DEFAULT 'ANOMALY',
    corroboration_score REAL        NOT NULL DEFAULT 0.0,
    first_detected_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    resolved_at         TIMESTAMPTZ,
    resolution_method   TEXT,
    probe_count         INTEGER     NOT NULL DEFAULT 1,
    asn_count           INTEGER     NOT NULL DEFAULT 1,
    -- Denormalized for fast REST API list queries (avoids join with event table)
    ooni_corroborated   BOOLEAN     NOT NULL DEFAULT FALSE,
    cp_corroborated     BOOLEAN     NOT NULL DEFAULT FALSE,
    ioda_corroborated   BOOLEAN     NOT NULL DEFAULT FALSE,
    subscriber_notified BOOLEAN     NOT NULL DEFAULT FALSE
);

CREATE INDEX idx_incidents_country_state ON incidents (country_code, state, first_detected_at DESC);
CREATE INDEX idx_incidents_domain ON incidents (domain, first_detected_at DESC);
CREATE INDEX idx_incidents_active ON incidents (state, country_code) WHERE state != 'RESOLVED';

-- Append-only event log: every state change and corroboration update
CREATE TABLE incident_events (
    id                  BIGSERIAL   PRIMARY KEY,
    incident_id         TEXT        NOT NULL REFERENCES incidents(incident_id),
    event_type          TEXT        NOT NULL,
    -- 'STATE_CHANGE' | 'PROBE_CONFIRMED' | 'CORROBORATION_UPDATE' | 'RESOLUTION'
    from_state          TEXT,
    to_state            TEXT,
    payload             JSONB,      -- event-specific data
    event_at            TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    -- Idempotency key: SHA-256(incident_id || event_type || payload_canonical_json)
    idempotency_key     TEXT        NOT NULL UNIQUE
);

-- Hypertable for time-series querying (TimescaleDB)
SELECT create_hypertable('incident_events', 'event_at', chunk_time_interval => INTERVAL '7 days');

CREATE INDEX idx_incident_events_incident_time ON incident_events (incident_id, event_at DESC);

Idempotent incident_id assignment

Incident IDs must be stable across retroactive reprocessing. When CensoredPlanet publishes its daily dump and Voidly retroactively adjusts an incident's first_detected_at timestamp, the incident ID must not change — otherwise, all external references (subscriber alerts, REST API responses, HuggingFace dataset records) become stale.

The incident ID is computed as a SHA-256 hash of four stable fields that identify the incident uniquely without relying on timing:

use sha2::{Sha256, Digest};

pub fn compute_incident_id(
    country_code: &str,
    domain: &str,
    interference_type: &str,
    clustering_epoch_day: u32,  // Unix epoch day; stable once assigned
) -> String {
    // The epoch_day is the day the FIRST anomaly measurement arrived.
    // It does not change when CensoredPlanet retroactively adjusts timing.
    let input = format!(
        "{}:{}:{}:{}",
        country_code.to_uppercase(),
        domain.to_lowercase(),
        interference_type.to_uppercase(),
        clustering_epoch_day,
    );
    let hash = Sha256::digest(input.as_bytes());
    // Take first 8 hex chars (32 bits) for human-readable ID component
    let hash_hex = format!("{:x}", hash);
    format!(
        "{}:{}:{}",
        country_code.to_uppercase(),
        &hash_hex[..8],
        clustering_epoch_day,
    )
}

// Upsert with idempotency: safe to call multiple times with the same measurement
pub async fn upsert_incident(
    pg: &sqlx::PgPool,
    incident_id: &str,
    country_code: &str,
    domain: &str,
    interference_type: &str,
    probe_id: &str,
    asn: u32,
) -> Result<bool, sqlx::Error> {
    // INSERT ... ON CONFLICT DO UPDATE with bitmask merge for denormalized counts
    let result = sqlx::query!(
        r#"
        INSERT INTO incidents (incident_id, country_code, domain, interference_type,
                               state, probe_count, asn_count)
        VALUES ($1, $2, $3, $4, 'ANOMALY', 1, 1)
        ON CONFLICT (incident_id) DO UPDATE
        SET probe_count    = incidents.probe_count + 1,
            last_updated_at = NOW()
        RETURNING (xmax = 0) AS was_inserted
        "#,
        incident_id, country_code, domain, interference_type,
    ).fetch_one(pg).await?;

    Ok(result.was_inserted.unwrap_or(false))
}

Kafka fan-out topology

When an incident transitions to a new state, the publication service publishes to three Kafka topics, allowing downstream consumers to evolve independently:

Kafka topicConsumer(s)Partition key
voidly.incidents.state-changesAlert delivery service; SSE stream broadcastercountry_code (24 partitions)
voidly.incidents.verifiedPGP email delivery; journalist webhook queueincident_id
voidly.cache.invalidationsCloudflare KV cache invalidation workercountry_code

The voidly.incidents.verified topic uses incident_id as the partition key so that all events for the same incident arrive at the alert delivery consumer in order. This prevents the race condition where a RESOLVED event overtakes a VERIFIED event, which would cause subscribers to receive an “incident resolved” alert before the “incident verified” alert.

The alert delivery service consumes from voidly.incidents.verifiedwith a 500ms delay buffer to batch rapid state changes (e.g., an incident that goes CORROBORATED → VERIFIED within seconds). The buffer reduces duplicate alert delivery by 18% compared to immediate delivery in production measurements.


For the measurement scheduling system that feeds the anomaly events this pipeline consumes: The Voidly measurement scheduler: how we decide which domains to probe and when →

For the alert delivery system that consumes from the Kafka topics this pipeline writes: Voidly's alert delivery system: PGP-encrypted email, webhooks, and RSS →