Technical writing

The Voidly Parquet export pipeline: nightly snapshots from TimescaleDB to HuggingFace

· 12 min read· AI Analytics
CensorshipVoidlyData engineeringOpen dataInfrastructure

Every night at 00:30 UTC, a cron job runs on the TimescaleDB host that exports yesterday's probe measurements to Parquet and pushes them to HuggingFace Hub. By 01:30 UTC, the emperor-mew/global-censorship-index dataset reflects all measurements up through the previous midnight. This article covers the export pipeline in detail: schema mapping, compression settings, partitioning strategy, HuggingFace push mechanics, post-push verification, and what happens when the job fails.

For background on what the published dataset looks like and how to consume it, see the companion article: The Voidly open datasets on HuggingFace: structure, daily snapshots, and filter recipes. This article is about the plumbing behind those snapshots.

Schedule and trigger

The export runs via a system cron job at 30 0 * * * UTC — 00:30, thirty minutes after midnight. The offset is intentional: TimescaleDB's compression policy runs at midnight, compressing the previous day's chunks. The export waits for that compression pass to complete before reading, for two reasons.

First, compressed chunks are dramatically faster to scan. A full day's worth of measurements — typically 1.8–2.4 million rows — occupies roughly 800 MB uncompressed in the hypertable. After TimescaleDB's columnar compression runs, that same chunk compresses to approximately 60–90 MB. Reading compressed chunks is roughly 8x faster than reading the same data from uncompressed hot storage because the query touches far fewer disk pages. Second, reading from compressed chunks rather than the active chunk means the export query does not compete with inbound probe writes for buffer pool space or I/O bandwidth.

The 30-minute buffer has never been too short in practice. TimescaleDB's compression worker finishes the previous day's chunks within 4–8 minutes of midnight on the current hardware. A monitoring alert fires if the export job starts and finds any of the target chunks still in the uncompressed state.

PyArrow schema definition

The Parquet schema is defined once in Python as a pa.schema object and reused across all export paths. This ensures that incremental daily files and monthly full snapshots always have identical schemas, which is required for DuckDB to treat them as a single logical dataset.

Three fields — interference_type, confidence_tier, and probe_asn_type — are declared as pa.dictionary(pa.int8(), pa.utf8())rather than plain pa.utf8(). These fields have low cardinality (fewer than 20 distinct values each) so dictionary encoding saves significant space and enables Parquet dictionary-page filtering, which makes predicate pushdown on these columns much more efficient.

import pyarrow as pa

MEASUREMENT_SCHEMA = pa.schema([
    pa.field("measurement_id",     pa.large_utf8(),  nullable=False),
    pa.field("probe_cc",           pa.utf8(),         nullable=False),
    pa.field("probe_asn",          pa.int32(),        nullable=False),
    pa.field("domain",             pa.utf8(),         nullable=False),
    pa.field("test_start_time",    pa.timestamp("us", tz="UTC"), nullable=False),
    pa.field("interference_type",  pa.dictionary(pa.int8(), pa.utf8()), nullable=True),
    pa.field("p_blocked",          pa.float32(),      nullable=True),
    pa.field("confidence_tier",    pa.dictionary(pa.int8(), pa.utf8()), nullable=True),
    pa.field("dns_tamper",         pa.bool_(),        nullable=True),
    pa.field("tls_interference",   pa.bool_(),        nullable=True),
    pa.field("http_blocking",      pa.bool_(),        nullable=True),
    pa.field("bgp_withdrawal",     pa.bool_(),        nullable=True),
    pa.field("control_failure",    pa.bool_(),        nullable=True),
    pa.field("probe_asn_type",     pa.dictionary(pa.int8(), pa.utf8()), nullable=True),
    pa.field("ooni_corroborated",  pa.bool_(),        nullable=True),
    pa.field("ioda_corroborated",  pa.bool_(),        nullable=True),
    pa.field("schema_version",     pa.int16(),        nullable=False),
])

The measurement_id field uses pa.large_utf8() (64-bit offset buffer) rather than pa.utf8() (32-bit) because a full day's batch can exceed 2 GB of string data when IDs are concatenated — a limit that 32-bit offsets cannot address. All other string fields use standard pa.utf8() since they are short bounded values (country codes, domain names, ASN type labels).

The schema_version field is an int16 carrying the Voidly dataset schema version number. It is used by downstream consumers to detect when a field was added or a definition changed — see the dataset schema reference for the version history.

TimescaleDB extraction

The extraction uses a server-side psycopg2 named cursor rather than loading all rows into memory at once. A single day can contain 2M+ rows; materializing the full result set as a Python list before building a PyArrow table would require several gigabytes of RAM. Instead, the query streams rows in batches of 50,000, building oneRecordBatch per fetch and collecting them into a list.

import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import date, timedelta

def export_day(conn, export_date: date) -> pa.Table:
    """
    Export all measurements for a single UTC calendar day.
    Uses server-side cursor to avoid loading 2M+ rows into memory.
    """
    query = """
        SELECT
            measurement_id, probe_cc, probe_asn, domain,
            test_start_time, interference_type, p_blocked,
            confidence_tier, dns_tamper, tls_interference,
            http_blocking, bgp_withdrawal, control_failure,
            probe_asn_type, ooni_corroborated, ioda_corroborated,
            schema_version
        FROM measurements
        WHERE test_start_time >= %s
          AND test_start_time <  %s
        ORDER BY test_start_time
    """
    start = export_date
    end   = export_date + timedelta(days=1)

    batches = []
    with conn.cursor(name="export_cursor") as cur:
        cur.itersize = 50_000  # fetch 50K rows per round-trip
        cur.execute(query, (start, end))
        while True:
            rows = cur.fetchmany(50_000)
            if not rows:
                break
            batch = pa.RecordBatch.from_pydict(
                {col: [r[i] for r in rows] for i, col in enumerate(MEASUREMENT_SCHEMA.names)},
                schema=MEASUREMENT_SCHEMA,
            )
            batches.append(batch)

    return pa.Table.from_batches(batches, schema=MEASUREMENT_SCHEMA)

The named cursor (name="export_cursor") is the key detail. Without a name, psycopg2 fetches the entire result set to the client before returning the first row. Named cursors execute server-side and stream results via the PostgreSQL wire protocol, honouring itersize as the fetch batch size. On a day with 2.1M rows at 50K per round-trip, this is 42 database round-trips — typically completing in under 4 minutes on the local network between the export script and the TimescaleDB host.

The ORDER BY test_start_time clause is deliberate. Ordering the extraction by time means that each 128 MB Parquet row group contains a contiguous time range, which improves time-based predicate pushdown when consumers filter by date range within a single day's file.

Parquet write settings

The table is written to a local temporary path before being pushed to HuggingFace. The write settings balance file size, read speed, and compatibility:

pq.write_table(
    table,
    output_path,
    compression="zstd",
    compression_level=3,
    row_group_size=128 * 1024 * 1024,  # 128MB
    use_dictionary=True,
    write_statistics=True,
)

Compression: Zstandard at level 3 is the default choice for this pipeline. At level 3, zstd achieves roughly 70–75% compression ratio on these measurement rows (similar to gzip level 6 but 3–4x faster to decompress). Higher levels (9, 19) would shrink files by a further 5–8% but significantly slow the compression step with no meaningful benefit for the consumers of this dataset, who pay decompression cost — not compression cost.

Row group size: 128 MB row groups balance two constraints. Smaller row groups (e.g., 16 MB) would allow more selective predicate pushdown — a query filtering by confidence_tier could skip more row groups — but would increase the Parquet metadata footer size and the number of seeks during a full scan. 128 MB is a practical middle ground that DuckDB, pandas, and Polars all handle efficiently.

Dictionary encoding: use_dictionary=True applies to all columns. For the three columns declared as pa.dictionary in the schema (interference_type, confidence_tier, probe_asn_type), this enables Parquet dictionary pages — storing the small set of distinct values once per row group and encoding each row as an integer index. For high-cardinality columns like measurement_id and domain, PyArrow automatically falls back to plain encoding when the dictionary would be larger than the plain representation.

Column statistics: write_statistics=True writes min/max statistics into the Parquet footer for every column. This is what enables row-group skipping in DuckDB and Arrow DataFusion — without statistics, a filter predicate must open every row group to evaluate. With statistics, a query like WHERE test_start_time > '2025-02-07 18:00:00' can skip all row groups whose max test_start_time is below the threshold.

Partitioning strategy

Files are partitioned by probe_cc (country code) and year_month, using HuggingFace's directory layout convention for Parquet datasets. The layout follows the Hive partitioning style (key=value/ directories) so that any Parquet-aware tool recognizes the partition columns automatically:

global-censorship-index/
  data/
    probe_cc=CN/
      year_month=2025-02/
        measurements-2025-02-07.parquet
    probe_cc=IR/
      year_month=2025-02/
        measurements-2025-02-07.parquet
    ...

Each nightly run produces one file per country that had measurements on the export date. Countries with no measurements on a given day produce no file — this is intentional. Downstream tools should treat a missing file as zero measurements for that country-day, not as an error.

The Hive-style layout allows tools like PyArrow's ParquetDataset to apply partition pruning without reading any file content — the partition column values are in the directory names:

ds = pq.ParquetDataset("data/", filters=[("probe_cc", "=", "IR")])

For DuckDB, the same effect is achieved by globbing only the relevant path:

-- Only reads files under probe_cc=IR/, skips all other countries
SELECT * FROM read_parquet('data/probe_cc=IR/**/*.parquet')
WHERE year_month = '2025-02';

Country code was chosen as the primary partition key because the most common query pattern — both for journalism use cases and for ML training — is filtering to one or a handful of countries. Month is the secondary key because it bounds file sizes for high-density countries like China and Iran, where a single month at full coverage produces 2–4 million measurements. Without the month partition, CN would eventually become a single enormous Parquet file that few tools could scan efficiently.

HuggingFace Hub push

All files produced for a given export date are pushed to HuggingFace in a single atomic commit using the huggingface_hub library's create_commit API. A single commit is important: if the push is interrupted mid-way, the Hub either accepts the entire commit or none of it. Partial pushes — where some country files land but others don't — would leave the dataset in an inconsistent state that downstream consumers could silently misinterpret as zero measurements for the missing countries.

from huggingface_hub import HfApi, CommitOperationAdd

def push_to_hub(parquet_files: list[tuple[str, str]], repo_id: str, token: str):
    """
    Push Parquet files to HuggingFace Hub in a single atomic commit.
    parquet_files: list of (local_path, repo_path) tuples.
    """
    api = HfApi(token=token)
    operations = [
        CommitOperationAdd(
            path_in_repo=repo_path,
            path_or_fileobj=local_path,
        )
        for local_path, repo_path in parquet_files
    ]
    api.create_commit(
        repo_id=repo_id,
        repo_type="dataset",
        operations=operations,
        commit_message=f"Daily export: {export_date}",
    )

CommitOperationAdd uploads each file via HuggingFace's LFS upload API before the commit is finalized. The library handles chunked multipart upload internally. On a typical night with 80–120 country files, the total upload is 3–6 GB of Parquet. On a 1 Gbps uplink from the export host, this completes in 30–60 seconds under normal conditions.

The commit_message always includes the export date in YYYY-MM-DD format. HuggingFace surfaces commit messages in the dataset revision history, so this makes it straightforward to identify which commit corresponds to which calendar day and to git checkout to a specific snapshot for reproducibility.

Post-push verification

After each push, the export job runs three verification checks before marking the export as SUCCEEDED in the export_log table. All three must pass.

  1. Path existence check: The job calls the HuggingFace API to list the contents of each probe_cc=XX/year_month=YYYY-MM/ directory and confirms that every file it uploaded appears at the expected path. This catches cases where the commit appeared to succeed but a file was silently not included.
  2. SHA-256 integrity check: The job re-downloads each Parquet file from HuggingFace and computes its SHA-256 hash, comparing against the hash of the locally-written temporary file. A mismatch — which has occurred once due to a mid-upload network error that was not caught by the LFS protocol — triggers an immediate alert rather than leaving a corrupt file silently in the dataset.
  3. Row count reconciliation: The job reads the Parquet metadata footer from the downloaded file (without reading row data — Parquet stores total row count in the footer) and compares it against a COUNT(*) query run against TimescaleDB for the same country and date range. A mismatch of more than 0.1% (accounting for any measurements that arrived after the extraction query but before the count) triggers a FAILED status.

If any of the three checks fails, the export job sends a PagerDuty alert and writes FAILED to the export_log table for that date. The failed files are not deleted from HuggingFace — they remain visible in the dataset revision history, but the failed commit is noted in the export log so operators can identify and re-push the affected date during the next successful run.

Incremental vs. full snapshot

The nightly job produces incremental exports: only the previous day's measurements are written. On the first day of each month, the job additionally produces a full snapshot: all measurements from the beginning of the dataset through the previous midnight, written to a versioned directory alongside the daily files:

global-censorship-index/
  data/
    probe_cc=CN/year_month=2025-02/measurements-2025-02-07.parquet
    ...
  snapshots/
    2025-02-01/
      probe_cc=CN.parquet
      probe_cc=IR.parquet
      probe_cc=RU.parquet
      ...

The full snapshot collapses the per-month partition into a single Parquet file per country, spanning the entire measurement history for that country. This produces larger files (CN spans approximately 22 GB compressed as of early 2025) but eliminates the overhead of reading and merging dozens of per-month files when a query needs all historical data for a country.

DuckDB queries that need to filter across all time without streaming the entire incremental history should use the snapshot directory. Queries that only need recent data (e.g., the last 30 days) should use the incremental files directly, since reading 30 per-day files is faster than reading and then filtering a multi-year full snapshot.

The full snapshot run is significantly longer than the nightly incremental — typically 45–90 minutes for the full extraction and conversion, plus another 20–30 minutes for upload. It is scheduled to start at 01:00 UTC on the first of each month, after the nightly incremental completes.

Error handling and retry

The export job has separate retry policies for the two external dependencies: TimescaleDB availability and HuggingFace Hub reachability.

TimescaleDB unavailable: If the initial connection or the export query fails, the job retries 3 times with 5-minute backoff between attempts (i.e., at T+5m, T+10m, T+15m from the first failure). TimescaleDB outages on this host are rare and short — the retry window covers the typical restart cycle. If all three retries fail, the job records FAILED in export_log and exits. No HuggingFace push is attempted.

HuggingFace push failure: If the create_commit call fails (typically a network timeout during the LFS upload phase), the job retries once after a 10-minute wait. HuggingFace LFS uploads are idempotent — re-uploading a file with the same content hash is a no-op on the server side — so retrying is safe. If the single retry also fails, the job records FAILED and a Slack alert fires via the nightly monitoring webhook.

Backfill on recovery: Missed exports are backfilled automatically. When the job starts each night, it queries the export_log table for any dates in the past 7 days with status FAILED or MISSING. It processes these backfill dates first — oldest to newest — before running the current night's incremental export. This means that after a multi-day outage, the dataset catches up automatically once the job can run successfully, without any manual intervention. The HuggingFace commit for each backfill date includes the original export date in the commit message, preserving the audit trail.

Export log table

The export_log table in PostgreSQL (on the same host as TimescaleDB) is the operational record of every export attempt:

CREATE TABLE export_log (
    export_date      DATE        NOT NULL,
    run_started_at   TIMESTAMPTZ NOT NULL,
    run_finished_at  TIMESTAMPTZ,
    status           TEXT        NOT NULL CHECK (status IN ('RUNNING', 'SUCCEEDED', 'FAILED')),
    rows_exported    BIGINT,
    files_pushed     INTEGER,
    bytes_uploaded   BIGINT,
    hf_commit_sha    TEXT,
    error_message    TEXT,
    PRIMARY KEY (export_date)
);

The hf_commit_sha column records the HuggingFace commit SHA for successful pushes, providing a direct link between the export log record and the dataset revision history. This is the recommended way to trace which version of the dataset corresponds to a particular export date when reproducing an analysis.

The rows_exported count is compared against the row count in the Parquet metadata during post-push verification. A monitoring dashboard queries this table to produce the daily export health chart and to alert when rows_exporteddeviates more than 20% from the 7-day trailing average — which can signal a probe network issue rather than an export failure.

What this pipeline does not do

A few things are explicitly out of scope for the export job, handled elsewhere:

  • Schema evolution: Adding a new column to the Parquet schema requires a separate migration process that backfills existing Parquet files. The export job does not modify historical files — it only writes the current night's incremental. Schema migrations are tracked in the dataset schema reference.
  • Reclassification: If the anomaly classifier is retrained and historical measurements are reclassified, a separate backfill job re-exports affected dates. This is not the nightly export job; it runs on demand and produces a distinct commit with a message noting the reclassification reason.
  • OONI historical corpus: The emperor-mew/ooni-censorship-historical dataset is not updated by this pipeline. It was built separately from the OONI S3 archive and receives only occasional patch commits for fingerprint corrections.

Related articles:

For what the published dataset looks like and how to query it with Python, R, and DuckDB: The Voidly open datasets on HuggingFace: structure, daily snapshots, and filter recipes →

For how the TimescaleDB hypertable that feeds this export is designed and maintained: Voidly's measurement database: 2.2B probe results in TimescaleDB →

For the full upstream pipeline that gets probe bytes into TimescaleDB in the first place: Voidly's probe-to-dataset ingest pipeline: normalization, quality filtering, and TimescaleDB indexing →