Technical writing
Voidly's incident publication pipeline: from classifier output to verified PostgreSQL incident record and Kafka alert fan-out
The Voidly real-time pipeline receives ~50,000 probe measurements per hour. Most are clean — no anomaly. A fraction trigger the inline anomaly scorer, which runs the XGBoost classifier at sub-millisecond latency and emits a structured anomaly event. Getting from that event to a published, verified censorship incident — complete with a stable incident ID, a state machine tracking its lifecycle, cross-source corroboration, and an alert to subscribers — is the job of the incident publication pipeline.
This article covers the pipeline's structure: the Rust IncidentStateMachine, the PostgreSQL incident store schema, the idempotency design that makes incident IDs stable across retroactive CensoredPlanet reprocessing, and the Kafka fan-out that triggers alert delivery and REST API cache invalidation.
The IncidentStateMachine
Every censorship incident passes through five states. State transitions are triggered by incoming measurements and external corroboration signals, not by timers (except for the RESOLVED → re-open check, which uses the 12-hour window described in the incident resolution article).
use std::collections::HashSet;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IncidentState {
/// Single probe anomaly; not yet corroborated.
Anomaly,
/// 2+ probes from 2+ ASNs in the same country confirm the anomaly.
MultiSourceAnomaly,
/// External corroboration from OONI or CensoredPlanet raises confidence.
Corroborated,
/// Independent corroboration score >= 0.80; highest confidence tier.
Verified,
/// Blocking rate dropped below threshold; incident considered resolved.
Resolved {
resolution_method: ResolutionMethod,
resolved_at: chrono::DateTime<chrono::Utc>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResolutionMethod {
/// 80%+ of probes now see accessible; consecutive passing measurements met.
ProbeRecovery,
/// Retroactive CensoredPlanet data shows domain never blocked.
RetroactiveCorrection,
/// Manual analyst review determined false positive.
AnalystOverride,
}
#[derive(Debug)]
pub struct StateTransition {
pub from_state: IncidentState,
pub to_state: IncidentState,
pub trigger: TransitionTrigger,
pub transitioned_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug)]
pub enum TransitionTrigger {
NewAnomalyMeasurement { probe_id: String, asn: u32 },
CorroborationUpdate { source: CorroborationSource, score: f32 },
RecoveryMeasurement { probe_id: String },
RetroactiveCpData { cp_measurement_id: String },
}
#[derive(Debug)]
pub enum CorroborationSource { Ooni, CensoredPlanet, Ioda }State transition thresholds
| Transition | Threshold | Typical latency |
|---|---|---|
| Anomaly → MultiSourceAnomaly | ≥ 3 probe measurements, ≥ 2 distinct ASNs, same country | 5–15 minutes |
| MultiSourceAnomaly → Corroborated | corroboration_score ≥ 0.50 from any external source | 15–60 minutes (OONI poll cadence) |
| Corroborated → Verified | corroboration_score ≥ 0.80; ≥ 2 independent sources | 30 min – 6 hours |
| Verified → Resolved | Per-type threshold: DNS 4, HTTP 4, TLS 3 consecutive passing | Hours – days |
| Resolved → MultiSourceAnomaly | New anomaly within 12-hour re-open window | Immediate on trigger |
PostgreSQL incident store schema
The incident store is a PostgreSQL 16 database with a TimescaleDB extension for the time-series event log. The schema separates the mutable incident summary (updated in place) from the append-only event log (immutable audit trail).
-- Mutable incident summary: one row per incident
CREATE TABLE incidents (
incident_id TEXT PRIMARY KEY,
-- Format: '{country_code}:{domain_hash8}:{interference_type}:{epoch_day}'
-- Example: 'RU:a3f1c82d:DNS_TAMPER:20241224'
country_code TEXT NOT NULL,
domain TEXT NOT NULL,
interference_type TEXT NOT NULL, -- from InterferenceType enum
state TEXT NOT NULL DEFAULT 'ANOMALY',
corroboration_score REAL NOT NULL DEFAULT 0.0,
first_detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ,
resolution_method TEXT,
probe_count INTEGER NOT NULL DEFAULT 1,
asn_count INTEGER NOT NULL DEFAULT 1,
-- Denormalized for fast REST API list queries (avoids join with event table)
ooni_corroborated BOOLEAN NOT NULL DEFAULT FALSE,
cp_corroborated BOOLEAN NOT NULL DEFAULT FALSE,
ioda_corroborated BOOLEAN NOT NULL DEFAULT FALSE,
subscriber_notified BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX idx_incidents_country_state ON incidents (country_code, state, first_detected_at DESC);
CREATE INDEX idx_incidents_domain ON incidents (domain, first_detected_at DESC);
CREATE INDEX idx_incidents_active ON incidents (state, country_code) WHERE state != 'RESOLVED';
-- Append-only event log: every state change and corroboration update
CREATE TABLE incident_events (
id BIGSERIAL PRIMARY KEY,
incident_id TEXT NOT NULL REFERENCES incidents(incident_id),
event_type TEXT NOT NULL,
-- 'STATE_CHANGE' | 'PROBE_CONFIRMED' | 'CORROBORATION_UPDATE' | 'RESOLUTION'
from_state TEXT,
to_state TEXT,
payload JSONB, -- event-specific data
event_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Idempotency key: SHA-256(incident_id || event_type || payload_canonical_json)
idempotency_key TEXT NOT NULL UNIQUE
);
-- Hypertable for time-series querying (TimescaleDB)
SELECT create_hypertable('incident_events', 'event_at', chunk_time_interval => INTERVAL '7 days');
CREATE INDEX idx_incident_events_incident_time ON incident_events (incident_id, event_at DESC);Idempotent incident_id assignment
Incident IDs must be stable across retroactive reprocessing. When CensoredPlanet publishes its daily dump and Voidly retroactively adjusts an incident's first_detected_at timestamp, the incident ID must not change — otherwise, all external references (subscriber alerts, REST API responses, HuggingFace dataset records) become stale.
The incident ID is computed as a SHA-256 hash of four stable fields that identify the incident uniquely without relying on timing:
use sha2::{Sha256, Digest};
pub fn compute_incident_id(
country_code: &str,
domain: &str,
interference_type: &str,
clustering_epoch_day: u32, // Unix epoch day; stable once assigned
) -> String {
// The epoch_day is the day the FIRST anomaly measurement arrived.
// It does not change when CensoredPlanet retroactively adjusts timing.
let input = format!(
"{}:{}:{}:{}",
country_code.to_uppercase(),
domain.to_lowercase(),
interference_type.to_uppercase(),
clustering_epoch_day,
);
let hash = Sha256::digest(input.as_bytes());
// Take first 8 hex chars (32 bits) for human-readable ID component
let hash_hex = format!("{:x}", hash);
format!(
"{}:{}:{}",
country_code.to_uppercase(),
&hash_hex[..8],
clustering_epoch_day,
)
}
// Upsert with idempotency: safe to call multiple times with the same measurement
pub async fn upsert_incident(
pg: &sqlx::PgPool,
incident_id: &str,
country_code: &str,
domain: &str,
interference_type: &str,
probe_id: &str,
asn: u32,
) -> Result<bool, sqlx::Error> {
// INSERT ... ON CONFLICT DO UPDATE with bitmask merge for denormalized counts
let result = sqlx::query!(
r#"
INSERT INTO incidents (incident_id, country_code, domain, interference_type,
state, probe_count, asn_count)
VALUES ($1, $2, $3, $4, 'ANOMALY', 1, 1)
ON CONFLICT (incident_id) DO UPDATE
SET probe_count = incidents.probe_count + 1,
last_updated_at = NOW()
RETURNING (xmax = 0) AS was_inserted
"#,
incident_id, country_code, domain, interference_type,
).fetch_one(pg).await?;
Ok(result.was_inserted.unwrap_or(false))
}Kafka fan-out topology
When an incident transitions to a new state, the publication service publishes to three Kafka topics, allowing downstream consumers to evolve independently:
| Kafka topic | Consumer(s) | Partition key |
|---|---|---|
| voidly.incidents.state-changes | Alert delivery service; SSE stream broadcaster | country_code (24 partitions) |
| voidly.incidents.verified | PGP email delivery; journalist webhook queue | incident_id |
| voidly.cache.invalidations | Cloudflare KV cache invalidation worker | country_code |
The voidly.incidents.verified topic uses incident_id as the partition key so that all events for the same incident arrive at the alert delivery consumer in order. This prevents the race condition where a RESOLVED event overtakes a VERIFIED event, which would cause subscribers to receive an “incident resolved” alert before the “incident verified” alert.
The alert delivery service consumes from voidly.incidents.verifiedwith a 500ms delay buffer to batch rapid state changes (e.g., an incident that goes CORROBORATED → VERIFIED within seconds). The buffer reduces duplicate alert delivery by 18% compared to immediate delivery in production measurements.
For the measurement scheduling system that feeds the anomaly events this pipeline consumes: The Voidly measurement scheduler: how we decide which domains to probe and when →
For the alert delivery system that consumes from the Kafka topics this pipeline writes: Voidly's alert delivery system: PGP-encrypted email, webhooks, and RSS →