Technical writing

The election intelligence pipeline: aggregating ballot data, social signals, and media coverage for real-time anomaly detection

· 8 min read· AI Analytics
ElectionsInfrastructureKafkaNLP

The companion article on election anomaly detection covers the statistical models: Benford's Law applied at county level, an XGBoost turnout model with SHAP attribution, ARIMA reporting curves, and DBSCAN clustering on FEC contribution data. What that article does not cover is what feeds those models. The detection logic is only as good as the data underneath it, and US election data is genuinely messy: 3,143 counties, 50 separate state systems, no standard reporting format, and the expectation that everything resolves on a single night.

This article covers the pipeline that sits upstream of the anomaly detector. It ingests five distinct streams — AP Election API results, state election authority feeds (structured and unstructured), US Census FIPS reference data, social media signals, and media coverage — normalizes them into a common Kafka topic structure, and delivers a consistent view of ballot reporting, public narrative, and social sentiment to the downstream Flink consumers that run the statistical checks.

Data sources and their characteristics

The five sources have very different characteristics in terms of format, update cadence, and reliability. Understanding these differences drives the architecture decisions downstream.

Data sourceFormatUpdate cadenceLatencyCoverage
AP Election APIJSON~30–60s per county as results come in30–90s100% US federal/state races
State election authority feedsJSON / CSV / HTML / PDF (varies by state)5-minute scraper cadence5–15 min50 states; format varies dramatically
US Census FIPS codesStatic reference CSVStatic (updated post-census)N/A3,143 counties, 50 states
Social media signalsPre-aggregated Kafka messagesNLP batch window (~8 min)8–12 min58M posts/day; geotagged subset by county
Media coverageRSS feeds → NLP pipeline60s during election window~2 min340 news outlets

AP requires a subscription and provides the most reliable structured data. State feeds are where the engineering complexity lives. Some states — Pennsylvania, Michigan — have structured JSON APIs with stable schemas. Others, like Georgia and Arizona, post CSV files to public state websites that require polling. Texas, in many counties, publishes only PDF summary sheets: those go through a separate OCR layer and are treated as a low-confidence fallback rather than a primary source. When AP and state data disagree, AP is trusted unless the discrepancy persists for more than 20 minutes, at which point both values are preserved and flagged for review.

The ballot reporting Kafka topic design

All ballot result data — regardless of source — flows into a single Kafka topic: election.precinct_results. The topic is partitioned by state_fips, giving 50 partitions, one per state. This keeps all precinct messages for a given state on the same partition, which simplifies stateful Flink consumers that need to correlate within-state precincts for Benford's Law analysis.

Each message is a PrecinctResult protobuf. The schema:

// election.proto
syntax = "proto3";

enum ResultSource {
  AP_FEED   = 0;
  STATE_API = 1;
  SCRAPE    = 2;
}

message CandidateResult {
  string candidate_id = 1;   // AP candidate ID (e.g., "96870")
  int64  vote_count   = 2;
  float  vote_pct     = 3;
}

message PrecinctResult {
  string fips_code          = 1;   // 5-character county FIPS (e.g., "42101" = Philadelphia)
  string race_id            = 2;   // AP race ID (e.g., "2024-G-P-54")
  string reporting_unit_id  = 3;   // Precinct or county reporting unit ID

  repeated CandidateResult candidate_results = 4;

  int32  precincts_reporting = 5;
  int32  precincts_total     = 6;

  ResultSource source        = 7;
  int64  ingested_at         = 8;  // Unix milliseconds
}

The Python producer function that publishes a single AP race result to the topic:

from confluent_kafka import Producer
from election_pb2 import PrecinctResult, CandidateResult, ResultSource
import time

def publish_ap_result(race_result: dict, producer: Producer) -> None:
    """
    Serialize an AP race result dict to PrecinctResult proto
    and publish to election.precinct_results.
    """
    fips = ap_state_abbr_to_fips(race_result["statePostal"])
    race_id = race_result["raceID"]

    for reporting_unit in race_result.get("reportingUnits", []):
        msg = PrecinctResult()
        msg.fips_code         = reporting_unit.get("fipsCode", fips)
        msg.race_id           = race_id
        msg.reporting_unit_id = reporting_unit["reportingUnitID"]
        msg.precincts_reporting = reporting_unit["precinctsReporting"]
        msg.precincts_total     = reporting_unit["precinctsTotal"]
        msg.source              = ResultSource.AP_FEED
        msg.ingested_at         = int(time.time() * 1000)

        for candidate in reporting_unit.get("candidates", []):
            cr = msg.candidate_results.add()
            cr.candidate_id = candidate["candidateID"]
            cr.vote_count   = candidate["voteCount"]
            cr.vote_pct     = candidate.get("votePct", 0.0)

        producer.produce(
            topic="election.precinct_results",
            key=fips,          # partition key = state FIPS
            value=msg.SerializeToString(),
        )

    producer.poll(0)  # non-blocking flush trigger

AP API integration

The AP Election API uses REST with long-polling. On election night, the pipeline polls GET /elections/{date}?officesId=P,G,S,H&level=FIPSCode every 30 seconds. The level=FIPSCode parameter requests county-level breakdowns rather than just state totals, which is necessary for the precinct-level Benford analysis.

AP uses state postal abbreviations (PA, MI) rather than FIPS codes. The pipeline maintains a static mapping table. The critical case is that AP's statewide reporting unit uses the state's two-letter abbreviation as its fipsCode field rather than a numeric FIPS — these need to be caught and replaced before publishing.

# State abbreviation → FIPS mapping (partial)
AP_STATE_TO_FIPS = {
    "AL": "01", "AK": "02", "AZ": "04", "AR": "05",
    "CA": "06", "CO": "08", "CT": "09", "DE": "10",
    "FL": "12", "GA": "13", "HI": "15", "ID": "16",
    "IL": "17", "IN": "18", "IA": "19", "KS": "20",
    "KY": "21", "LA": "22", "ME": "23", "MD": "24",
    "MA": "25", "MI": "26", "MN": "27", "MS": "28",
    "MO": "29", "MT": "30", "NE": "31", "NV": "32",
    "NH": "33", "NJ": "34", "NM": "35", "NY": "36",
    "NC": "37", "ND": "38", "OH": "39", "OK": "40",
    "OR": "41", "PA": "42", "RI": "44", "SC": "45",
    "SD": "46", "TN": "47", "TX": "48", "UT": "49",
    "VT": "50", "VA": "51", "WA": "53", "WV": "54",
    "WI": "55", "WY": "56", "DC": "11", "PR": "72",
}

def ap_state_abbr_to_fips(state_abbr: str) -> str:
    return AP_STATE_TO_FIPS.get(state_abbr, state_abbr)


def parse_ap_race(race_json: dict) -> list[PrecinctResult]:
    """
    Parse a single race object from the AP /elections response.
    Returns a list of PrecinctResult protos, one per reporting unit.
    """
    results = []
    state_fips = ap_state_abbr_to_fips(race_json["statePostal"])

    for unit in race_json.get("reportingUnits", []):
        unit_fips = unit.get("fipsCode", "")
        # AP statewide unit uses abbreviation as fipsCode — normalize it
        if not unit_fips.isdigit():
            unit_fips = state_fips.zfill(2) + "000"  # state-level pseudo-FIPS

        pct_reporting = (
            unit["precinctsReporting"] / unit["precinctsTotal"]
            if unit["precinctsTotal"] > 0 else 0.0
        )

        msg = PrecinctResult()
        msg.fips_code           = unit_fips.zfill(5)
        msg.race_id             = race_json["raceID"]
        msg.reporting_unit_id   = unit["reportingUnitID"]
        msg.precincts_reporting = unit["precinctsReporting"]
        msg.precincts_total     = unit["precinctsTotal"]
        msg.source              = ResultSource.AP_FEED
        msg.ingested_at         = int(time.time() * 1000)

        for c in unit.get("candidates", []):
            cr = msg.candidate_results.add()
            cr.candidate_id = c["candidateID"]
            cr.vote_count   = c["voteCount"]
            cr.vote_pct     = c.get("votePct", 0.0)

        results.append((msg, pct_reporting))

    return results

AP occasionally issues corrections: a county that reported 85% of precincts will briefly revert to 60% as a data entry error is fixed, then re-advance. The pipeline detects this by comparing incoming precincts_reporting to the last-seen value for the same (fips_code, race_id)pair. A decrease triggers a correction flag in the message metadata, and the downstream anomaly detector re-runs its Benford check on that county from scratch rather than continuing an incremental accumulation.

State scraper layer

For states without a structured API — or for cross-validation of AP data — a Playwright-based scraper layer polls state election websites on 5-minute intervals during election windows. Each state has a separate configuration object:

from dataclasses import dataclass
from typing import Literal

@dataclass
class StateScraperConfig:
    state_fips:        str
    url_template:      str                              # may include {race_id}, {county} tokens
    format:            Literal["json", "csv", "html"]
    fips_column:       str                             # column name containing county FIPS
    candidate_columns: list[str]                       # ordered list of candidate vote columns
    update_selector:   str                             # CSS selector for "last updated" timestamp

# Example: Georgia posts CSV files to a known URL pattern
GEORGIA_CONFIG = StateScraperConfig(
    state_fips       = "13",
    url_template     = "https://results.enr.clarityelections.com/GA/{race_id}/current_ver/reports/summary.csv",
    format           = "csv",
    fips_column      = "FIPS",
    candidate_columns= ["Candidate_1_Votes", "Candidate_2_Votes"],
    update_selector  = ".last-updated",
)

CSV normalization is non-trivial. Georgia uses 3-digit county FIPS codes internally — 001for Appling County rather than the full 5-digit 13001. The normalizer handles left-padding and state prefix prepending. A second common issue is percentage vs. raw count ambiguity: some state export CSVs emit vote percentages in a column named Votes, with raw counts in a separate column that is not always present.

import pandas as pd
import re

def normalize_state_csv(
    raw_df: pd.DataFrame,
    config: StateScraperConfig,
) -> pd.DataFrame:
    """
    Normalize a raw state CSV into a standard schema with columns:
    [fips_code, candidate_id, vote_count]
    """
    df = raw_df.copy()

    # 1. Normalize FIPS: left-pad to 3 digits, then prepend state prefix to get 5 digits
    df["fips_code"] = (
        df[config.fips_column]
        .astype(str)
        .str.zfill(3)                              # pad county portion to 3 digits
        .apply(lambda x: config.state_fips.zfill(2) + x)  # prepend 2-digit state FIPS
    )

    # 2. Detect whether columns contain percentages or raw counts
    sample_vals = df[config.candidate_columns[0]].dropna().head(50)
    looks_like_pct = (sample_vals <= 100.0).all() and (sample_vals.mean() < 60.0)

    rows = []
    for _, row in df.iterrows():
        for i, col in enumerate(config.candidate_columns):
            raw_val = row[col]
            if pd.isna(raw_val):
                continue
            if looks_like_pct:
                # Percentage column: skip — can't recover absolute counts without totals
                # Instead, record as None and let AP data fill the gap
                vote_count = None
            else:
                # Strip commas, cast to int
                vote_count = int(re.sub(r"[^0-9]", "", str(raw_val)))

            rows.append({
                "fips_code":    row["fips_code"],
                "candidate_idx": i,
                "vote_count":   vote_count,
            })

    return pd.DataFrame(rows)

The HTML scraper path is a last-resort fallback for states that have no structured data download. Playwright navigates to the state results page, waits for the dynamic table to render, extracts table rows, and applies a regex-based candidate name matcher to map scraped names to AP candidate IDs. The matching uses fuzzy comparison because state results pages often list candidates as "SMITH, JOHN R." while AP records them as "John Smith". This path introduces the most latency and the most normalization uncertainty; results from HTML scraping are tagged with source=SCRAPE and given lower weight in the anomaly detector's confidence scoring.

Social signal aggregation

The existing NLP pipeline already processes 58M posts per day across 47 platform schemas. During election periods (election day ± 3 days), two additional Kafka consumers read from the social.classified topic — the topic that carries posts after sentiment classification and entity extraction by the DistilBERT-based NLP pipeline.

ElectionSentimentConsumer

Filters posts mentioning candidate names, campaign slogans, or election-related hashtags. For each matching post, it reads the DistilBERT sentiment score (already computed upstream) and the geotagged county FIPS code (where available). Posts without geotags are dropped from the county-level aggregation but retained for state-level rollups.

The consumer aggregates into 1-hour tumbling windows keyed by (state_fips, hour_bucket)and publishes to election.social_sentiment:

# Output schema for election.social_sentiment
{
  "state_fips":      "42",        # Pennsylvania
  "hour_bucket":     "2024-11-05T21:00:00Z",
  "candidate_id":    "96870",
  "post_count":      14823,
  "mean_sentiment":  0.61,        # [-1.0, 1.0], positive = favorable
  "p25_sentiment":   0.42,
  "p75_sentiment":   0.78,
  "geotagged_count": 4201,        # subset with county-level geotag
}

ElectionClaimConsumer

Uses the entity extraction output from the NLP pipeline to identify posts making specific numeric claims about vote counts or margins — phrases like "X is winning by 12%" or "candidate Y has 340,000 votes." Extracted claims are compared against the latest AP actuals for the referenced race. A divergence exceeding 15 percentage points triggers a narrative anomaly signal.

# Output schema for election.narrative_claims
{
  "race_id":           "2024-G-P-42",
  "claimed_pct":       67.4,         # what the post claims
  "ap_actual_pct":     49.1,         # AP data at time of claim
  "divergence_pp":     18.3,         # absolute difference in percentage points
  "post_count":        3847,         # how many posts made this approximate claim
  "flagged":           true,         # divergence_pp > 15.0
  "window_start":      "2024-11-05T22:00:00Z",
  "window_end":        "2024-11-05T22:15:00Z",
}

Media coverage stream

RSS feeds from 340 news outlets are pre-ingested at a 5-minute cadence under normal operation. During election windows (election day ± 3 days), the refresh cadence drops to 60 seconds. Each article passes through an ElectionArticleClassifier: a DistilBERT model fine-tuned on 48,000 labeled election articles categorized as correct reporting, misinformation, analysis, or opinion. Classification runs at ingest time before the article enters the media coverage stream.

The key signal is narrative divergence: when AP data shows Candidate A leading in State X, but a high fraction of recently published articles mentioning State X claim Candidate B is winning, the narrative_divergence_score for that race spikes.

The divergence score is computed using cosine similarity between two embeddings:

  • Narrative embedding: the mean of embeddings for articles published in the last 30 minutes that mention the race, weighted by each outlet's monthly unique visitor count (a proxy for reach).
  • Data embedding: an embedding of a canonical sentence constructed from the AP result — e.g., "Candidate A is leading Candidate B by 4.2 points with 63% of precincts reporting in Pennsylvania."

A cosine similarity below 0.6 between these two embeddings is treated as a meaningful narrative divergence. The threshold was calibrated against 2022 midterm data where ground truth was available. At 0.6, the method produces roughly one false positive per two races monitored — acceptable given that the signal feeds a human review queue rather than an automated action.

FIPS normalization challenges

FIPS codes are the canonical join key across all five data sources. Three jurisdictions require special handling.

Connecticut planning regions

Connecticut switched from county-based FIPS codes to planning region FIPS codes in 2022. The old scheme had 8 counties; the new scheme has 9 planning regions with different boundaries. Census data predating 2022 uses old codes; post-2022 Census data and some state feeds use new codes. The pipeline maintains a fips_crosswalk table that maps old county FIPS to new planning region FIPS for join purposes, though the mapping is not always one-to-one where planning region boundaries cross old county lines.

Alaska election districts

Alaska reports election results by state house district rather than by borough. The pipeline maps Alaska house districts to borough FIPS codes using Census TIGER geographic crosswalk files. This is approximate: a house district can span multiple boroughs, so the mapping assigns a post to the borough containing the plurality of the district's population.

New York City boroughs

AP data represents New York City under FIPS 36061 (New York County, i.e., Manhattan). Census data breaks NYC into five borough FIPS codes. For consistency, the pipeline aggregates all NYC borough data to 36061 when joining against AP results, rather than attempting to split AP data by borough.

# Connecticut old-to-new planning region crosswalk (partial)
CT_FIPS_CROSSWALK = {
    "09001": "09110",  # Fairfield County → Capitol Region (partial; largest pop overlap)
    "09003": "09120",  # Hartford County → Capitol Region
    "09005": "09130",  # Litchfield County → Northwest Hills
    "09007": "09140",  # Middlesex County → Lower Connecticut River Valley
    "09009": "09150",  # New Haven County → South Central Connecticut
    "09011": "09160",  # New London County → Southeastern Connecticut
    "09013": "09170",  # Tolland County → Capitol Region
    "09015": "09180",  # Windham County → Northeastern Connecticut
}

# Alaska house district → borough FIPS (plurality-population mapping)
AK_DISTRICT_TO_BOROUGH = {
    "1":  "02020",   # Anchorage Municipality
    "2":  "02020",
    "3":  "02020",
    # ... 40 districts total
    "36": "02090",   # Fairbanks North Star Borough
    "37": "02090",
    "38": "02185",   # North Slope Borough
    "39": "02185",
    "40": "02188",   # Northwest Arctic Borough
}

def normalize_fips(
    raw_fips: str,
    state_code: str,
    election_year: int,
) -> str:
    """
    Return the canonical 5-character FIPS code for join purposes.
    Handles CT planning region migration, AK district mapping, and NYC consolidation.
    """
    fips = raw_fips.strip().zfill(5)

    # Connecticut: if pre-2022 code and year >= 2022, remap to planning region
    if state_code == "CT" and election_year >= 2022 and fips in CT_FIPS_CROSSWALK:
        return CT_FIPS_CROSSWALK[fips]

    # Alaska: if it looks like a district number (2-digit, no leading zeros matching borough pattern)
    if state_code == "AK" and not fips.startswith("02"):
        district = str(int(fips))   # strip leading zeros
        return AK_DISTRICT_TO_BOROUGH.get(district, fips)

    # NYC: remap all five borough FIPS to Manhattan (36061) for AP join consistency
    NYC_BOROUGH_FIPS = {"36005", "36047", "36061", "36081", "36085"}
    if fips in NYC_BOROUGH_FIPS:
        return "36061"

    return fips

How the streams feed the anomaly detector

The anomaly detection models — Benford's Law, XGBoost turnout, ARIMA reporting curves — are implemented as a Flink consumer with 5-minute tumbling windows. The consumer reads from three Kafka topics:

election.precinct_results

The primary input. Within each 5-minute window, the Flink job collects all PrecinctResult messages for a given (state_fips, race_id) pair. For each county that has received new results in the window, it re-runs the Benford chi-square test on the accumulated precinct vote totals and updates the XGBoost turnout residuals.

Race conditions from multi-source data — where AP and a state scraper report different totals for the same county in the same window — are resolved by accepting the message with the latest ingested_at timestamp for each (fips_code, race_id) pair. This is a last-writer-wins strategy. In practice, AP corrections are the most common cause of conflict, and their correction signals (decreasing precincts_reporting) are handled by a separate re-trigger path before the last-writer-wins merge.

election.social_sentiment

The sentiment stream validates correlation with AP reporting. When the anomaly detector flags a county for unusual turnout, it checks whether social sentiment in that state favors the over-performing candidate by an unusual margin. A turnout anomaly that co-occurs with a large sentiment-vs-AP divergence scores higher in the triage queue than a standalone turnout anomaly.

election.narrative_claims

Narrative claims flagged by the ElectionClaimConsumer — posts claiming vote percentages that diverge from AP actuals by more than 15 percentage points — are attached as annotations to any concurrent anomaly for the same race. A race where social media is circulating factually incorrect results while the AP data simultaneously shows a statistical anomaly is a stronger signal than either indicator alone.

Latency and reliability targets

End-to-end latency from source event to anomaly detector input varies significantly by stream:

Pathp50 latencyp99 latency
AP ballot result → Kafka → anomaly detector45s120s
State scraper → Kafka → anomaly detector6 min15 min
Social post → NLP pipeline → anomaly detector8 min14 min
News article published → anomaly detector annotation2 min5 min

The state scraper p99 is dominated by scraper cadence (5 minutes) plus occasional Playwright navigation timeouts on overloaded state websites on election night. State election sites receive extremely high traffic on election night; scrapers implement exponential backoff with a 30-second initial retry delay.

Reliability targets: the pipeline runs on three independent Kafka brokers with replication factor 3 and min.insync.replicas=2. A single broker loss is transparent to producers and consumers. State scraper failures are logged and trigger PagerDuty alerts, but they are non-critical: AP data continues to flow and provides full federal/state race coverage. The pipeline is designed to tolerate a complete outage of the scraper layer without degrading AP-sourced anomaly detection.

Full pipeline outage tolerance — meaning a scenario where all Kafka brokers become unavailable — is 15 minutes of message buffering in producer-side queues. Beyond 15 minutes, producers begin dropping messages and the anomaly detector window falls behind. This scenario has not occurred in any election monitored to date; the design priority is correct data over guaranteed delivery when the infrastructure is fully degraded.


For the statistical anomaly detection models that consume from this pipeline — Benford's Law, XGBoost turnout, ARIMA reporting curves: Detecting election anomalies using statistical methods →

For the 58M posts/day social media pipeline whose classified output feeds the ElectionSentimentConsumer: How we process 2.4M social-media posts per hour →

For the social media ingestion system — the three-tier collection strategy, token-bucket rate limiting, and Kafka topic partitioning by platform: Social media ingestion at scale: collecting 58M posts per day from 47 platform schemas →

For the NLP pipeline that classifies election posts and extracts named entities for the narrative divergence score: NLP pipeline for real-time sentiment analysis at scale →

For the statistical anomaly tests that process the precinct result stream from this pipeline — Benford's Law, last-digit uniformity, turnout z-scores, and cross-validation: Statistical anomaly detection for election integrity: Benford's Law, digit uniformity, and turnout modeling →