Technical writing
Voidly measurement API export: NDJSON streaming, Parquet generation, and HuggingFace dataset sync
Voidly's measurement corpus is a public good. The censorship signals it captures are most valuable when researchers can combine them with external data — political event timelines, BGP routing tables, OSINT feeds — that Voidly does not hold. Publishing the corpus requires three delivery mechanisms suited to different consumer patterns: a streaming API for incremental pulls by programmatic consumers, nightly Parquet files for bulk analysis in data science environments, and a HuggingFace dataset push for machine learning researchers who want a standardizeddatasets.load_dataset() entry point.
NDJSON streaming API
The export endpoint streams measurements as NDJSON (newline-delimited JSON), one measurement record per line. Keyset pagination on (ts, measurement_id) allows consumers to resume interrupted transfers without re-fetching already-received records:
-- PostgreSQL + TimescaleDB query used by the export endpoint
-- Keyset pagination: client passes the (ts, measurement_id) of the last
-- received row as after_ts and after_id parameters.
SELECT
m.measurement_id,
m.ts,
m.probe_id, -- anonymized: sha256(real_id + weekly_salt)[:16]
m.country_code,
m.asn,
m.domain,
m.test_type,
m.dns_nxdomain,
m.dns_resolver_ip,
m.tcp_connect_failed,
m.tls_cert_valid,
m.http_status_code,
m.body_length,
m.body_sha256,
m.control_rtt_ms,
m.probe_rtt_ms,
m.censor_prob, -- calibrated P(censored) from ONNX classifier
m.classifier_version
FROM measurements m
WHERE (m.ts, m.measurement_id) > (:after_ts, :after_id)
AND m.ts >= :start_ts
AND m.ts < :end_ts
AND (:country_code IS NULL OR m.country_code = :country_code)
ORDER BY m.ts ASC, m.measurement_id ASC
LIMIT :page_size; -- default 1000, max 5000The endpoint also supports a Server-Sent Events mode (Accept: text/event-stream) for consumers that need real-time push delivery rather than pull-based pagination. In SSE mode, the server holds the connection open and emits new measurement rows as they arrive from the ingestion pipeline, with a 30-second keepalive ping to prevent proxy timeouts:
// api/export/route.ts (Next.js API route, deployed to server runtime)
import { NextRequest } from 'next/server';
export const runtime = 'nodejs';
export const dynamic = 'force-dynamic';
export async function GET(req: NextRequest) {
const accept = req.headers.get('accept') ?? '';
const isSse = accept.includes('text/event-stream');
if (isSse) {
// SSE: hold open, emit measurements as they arrive
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const pg = await getPool();
let cursor: string | null = null;
// Send initial backfill (last 5 minutes)
const backfillStart = new Date(Date.now() - 5 * 60 * 1000).toISOString();
const rows = await pg.query(
'SELECT * FROM measurements WHERE ts >= $1 ORDER BY ts, measurement_id LIMIT 5000',
[backfillStart]
);
for (const row of rows.rows) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(row)}\n\n`));
cursor = `${row.ts}:${row.measurement_id}`;
}
// Poll for new rows every 2 seconds
const interval = setInterval(async () => {
try {
const [afterTs, afterId] = cursor ? cursor.split(':') : ['1970-01-01', ''];
const fresh = await pg.query(
`SELECT * FROM measurements
WHERE (ts, measurement_id) > ($1::timestamptz, $2)
ORDER BY ts, measurement_id LIMIT 500`,
[afterTs, afterId]
);
for (const row of fresh.rows) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(row)}\n\n`));
cursor = `${row.ts}:${row.measurement_id}`;
}
// Keepalive ping
if (fresh.rows.length === 0) {
controller.enqueue(encoder.encode(': ping\n\n'));
}
} catch {
clearInterval(interval);
controller.close();
}
}, 2000);
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
// Standard keyset-paginated NDJSON response
const params = req.nextUrl.searchParams;
const afterTs = params.get('after_ts') ?? '1970-01-01T00:00:00Z';
const afterId = params.get('after_id') ?? '';
const startTs = params.get('start_ts') ?? afterTs;
const endTs = params.get('end_ts') ?? new Date().toISOString();
const country = params.get('country') ?? null;
const pageSize = Math.min(parseInt(params.get('page_size') ?? '1000', 10), 5000);
const pg = await getPool();
const rows = await pg.query(
`SELECT * FROM measurements
WHERE (ts, measurement_id) > ($1::timestamptz, $2)
AND ts >= $3::timestamptz AND ts < $4::timestamptz
AND ($5::text IS NULL OR country_code = $5)
ORDER BY ts, measurement_id LIMIT $6`,
[afterTs, afterId, startTs, endTs, country, pageSize]
);
const ndjson = rows.rows.map(r => JSON.stringify(r)).join('\n') + '\n';
const lastRow = rows.rows[rows.rows.length - 1];
const nextCursor = lastRow
? `?after_ts=${encodeURIComponent(lastRow.ts)}&after_id=${lastRow.measurement_id}`
: null;
return new Response(ndjson, {
headers: {
'Content-Type': 'application/x-ndjson',
'X-Next-Cursor': nextCursor ?? '',
'X-Result-Count': String(rows.rows.length),
},
});
}Nightly Parquet generation
The nightly batch job generates one Parquet file per country per day, written to an S3 bucket with path structure s3://voidly-public/measurements/date={YYYY-MM-DD}/country={CC}/measurements.parquet. PyArrow's columnar compression is configured to maximize read performance for analytics queries on the censor_prob and domain columns, which dominate researcher access patterns:
# export/parquet_writer.py
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
from datetime import date, timedelta
PARQUET_SCHEMA = pa.schema([
pa.field('measurement_id', pa.string(), nullable=False),
pa.field('ts', pa.timestamp('us', tz='UTC'), nullable=False),
pa.field('probe_id', pa.string(), nullable=False), # anonymized
pa.field('country_code', pa.string(), nullable=False), # ISO 3166-1 alpha-2
pa.field('asn', pa.int32(), nullable=False),
pa.field('domain', pa.dictionary(pa.int16(), pa.string()), nullable=False),
pa.field('test_type', pa.dictionary(pa.int8(), pa.string()), nullable=False),
pa.field('dns_nxdomain', pa.bool_(), nullable=True),
pa.field('dns_resolver_ip', pa.string(), nullable=True),
pa.field('tcp_connect_failed',pa.bool_(), nullable=True),
pa.field('tls_cert_valid', pa.bool_(), nullable=True),
pa.field('http_status_code', pa.int16(), nullable=True),
pa.field('body_length', pa.int32(), nullable=True),
pa.field('body_sha256', pa.binary(32),nullable=True), # fixed-width 32-byte binary
pa.field('control_rtt_ms', pa.float32(), nullable=True),
pa.field('probe_rtt_ms', pa.float32(), nullable=True),
pa.field('censor_prob', pa.float32(), nullable=False),
pa.field('classifier_version',pa.string(), nullable=False),
])
WRITE_OPTIONS = pq.ParquetWriterProperties(
compression='zstd',
compression_level=3, # zstd level 3: good balance for read-heavy analytics
use_dictionary=True, # dictionary encoding for domain + test_type columns
write_statistics=True, # min/max stats for predicate pushdown
data_page_size=1024 * 1024, # 1 MB data pages (larger = better scan throughput)
row_group_size=500_000, # ~500K rows per row group; ~64 MB per row group at this schema
)
def write_country_parquet(
rows: list[dict],
output_path: str,
) -> pq.FileMetaData:
table = pa.Table.from_pylist(rows, schema=PARQUET_SCHEMA)
# Sort by (domain, ts) to co-locate same-domain rows for efficient domain filters
table = table.sort_by([('domain', 'ascending'), ('ts', 'ascending')])
with pq.ParquetWriter(output_path, PARQUET_SCHEMA, **WRITE_OPTIONS.__dict__) as writer:
writer.write_table(table)
return pq.read_metadata(output_path)Sorting by (domain, ts) rather than ts alone co-locates same-domain measurements within row groups. When a researcher filters by domain, Parquet predicate pushdown reads only the row groups whose min/max domain statistics include the target, reducing I/O by approximately 60% compared to chronological sort for single-domain queries.
HuggingFace dataset sync
The nightly job pushes the freshly generated Parquet files to a HuggingFace dataset repository (voidly-data/network-measurements) using the huggingface_hub library. The dataset card is regenerated on each push to include updated statistics:
# export/hf_push.py
from huggingface_hub import HfApi, DatasetCard, DatasetCardData
def push_to_huggingface(
parquet_files: list[str], # local paths to newly generated Parquet files
date_str: str, # YYYY-MM-DD
hf_token: str,
) -> None:
api = HfApi(token=hf_token)
repo_id = "voidly-data/network-measurements"
for local_path in parquet_files:
# Path in the HF repo: data/date=2025-08-24/country=IR/measurements.parquet
country = extract_country_from_path(local_path)
repo_path = f"data/date={date_str}/country={country}/measurements.parquet"
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=repo_path,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Add measurements for {date_str} country={country}",
)
# Update dataset card with fresh stats
total_measurements = query_total_measurement_count(date_str)
countries_today = len(parquet_files)
card_data = DatasetCardData(
language=['en'],
license='cc-by-4.0',
task_categories=['text-classification'],
tags=['censorship', 'internet-freedom', 'network-measurement', 'OONI'],
)
card = DatasetCard.from_template(
card_data,
template_str=DATASET_CARD_TEMPLATE.format(
date=date_str,
total_measurements=f"{total_measurements:,}",
countries_today=countries_today,
schema_version=SCHEMA_VERSION,
),
)
card.push_to_hub(repo_id, token=hf_token)Schema versioning
The classifier_version field in every measurement record allows consumers to filter or join on the classifier model version that produced censor_prob. When a new classifier is deployed, measurements scored under the old model remain in the corpus with the old version tag; the new model's scores are tagged separately. Researchers building oncensor_prob must filter to a single classifier version to avoid mixing probability distributions calibrated against different feature sets:
| classifier_version | Date range | Features | AUC-ROC |
|---|---|---|---|
| v1.0 | 2023-01 – 2024-06 | 8 | 0.883 |
| v2.0 | 2024-07 – 2025-03 | 10 | 0.911 |
| v2.1 | 2025-04 – | 12 | 0.924 |
Related writing
Voidly dataset schema documents the full measurement schema in detail, including the field derivation rules and the anonymization approach for probe identifiers.
Voidly DNS measurement describes the DNS probe implementation that produces the dns_nxdomain anddns_resolver_ip fields exported through this API.