NDIS Database

Collecting and Parsing FBI National DNA Index System Statistics from Wayback Machine

Author

Tina Lasisi | João P. Donadio

Published

November 9, 2025

Introduction

The National DNA Index System (NDIS) is the central database that allows accredited forensic laboratories across the United States to electronically exchange and compare DNA profiles. Maintained by the FBI as part of CODIS (Combined DNA Index System), NDIS tracks the accumulation of DNA records contributed by federal, state, and local laboratories.

This project focuses on systematically compiling the growth and evolution of NDIS by parsing historical statistics published on the FBI’s website and preserved in the Internet Archive’s Wayback Machine. These snapshots contain tables reporting the number of DNA profiles stored in NDIS (offender, arrestee, forensic), as well as information on laboratory participation across jurisdictions.

Objectives

  1. Develop a reproducible pipeline to extract NDIS statistics from archived FBI webpages in the Wayback Machine.

  2. Identify and correct inconsistencies and data quality issues across historical snapshots.

  3. Document the expansion of DNA profiles (offender, arrestee, forensic) over time.

Methodology

Project step-by-step

Setup and Configuration

System Requirements

Required Packages:

  • Core: requests, beautifulsoup4, and lxml (scraping/parsing).

  • Data/Visualization: pandas and tqdm (progress tracking).

Show Configuration code
import sys
import subprocess
import importlib

required_packages = [
    'requests',         # API/HTTP
    'beautifulsoup4',   # HTML parsing
    'lxml',             # Faster parsing
    'pandas',           # Data handling
    'tqdm',              # Progress bars
    'hashlib',
    'collections',
    'pathlib',
    'datetime',
    'os'
]

for package in required_packages:
    try:
        importlib.import_module(package)
        print(f"✓ {package} already installed")
    except ImportError:
        print(f"Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

print(f"📚 All required packages are installed.")

Project Structure

Main Configurations:

  • Directory paths for raw HTML (data_snapshots), metadata (data_metadata), and outputs (data_outputs).

  • Standardization mappings for jurisdiction names and known data typos.

Show Configuration code
from pathlib import Path
import re, json, requests, time, hashlib
from datetime import datetime
import pandas as pd
from bs4 import BeautifulSoup
from tqdm.auto import tqdm
from datetime import datetime
from collections import defaultdict
import os

# Configuration
BASE_DIR = Path("..")  # Project root directory
HTML_DIR = BASE_DIR / "data" / "ndis" / "raw" / "ndis_snapshots"    # Storage for downloaded HTML
META_DIR = BASE_DIR / "data" / "ndis" / "raw" / "ndis_metadata"    # Metadata storage
OUTPUT_DIR = BASE_DIR / "data" / "ndis" / "raw"       # Processed data output

NDIS_SNAPSHOTS_DIR = HTML_DIR
NDIS_SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)

# Create directory structure
for directory in [HTML_DIR, META_DIR, OUTPUT_DIR]:
    directory.mkdir(parents=True, exist_ok=True)
Project directories initialized:
  - Working directory: /Users/tlasisi/GitHub/PODFRIDGE-Databases
  - HTML storage: ../data/ndis/raw/ndis_snapshots
  - Metadata directory: ../data/ndis/raw/ndis_metadata
  - Output directory: ../data/ndis/raw

Wayback Machine Snapshot Search

A function was developed to systematically search the Internet Archive’s Wayback Machine for all preserved snapshots of FBI NDIS statistics pages using a comprehensive multi-phase approach.

Scraping Method

  1. Multi-Phase Search Strategy:
  • First searches for snapshots from the pre-2007 era using state-specific URLs

  • Then targets consolidated pages from post-2007 periods

  • Handles both HTTP and HTTPS protocol variants

  1. Robust Error Handling:
  • Automatic retries with exponential backoff (1s → 2s → 4s delays)

  • Deduplicates results by timestamp

  • Failed requests are tracked and retried with increased retry attempts

  • Preserves complete error context for troubleshooting

Technical Implementation

  1. API Request

  2. Converts JSON responses to clean DataFrame

  3. Sorts chronologically (oldest → newest)

Snapshot Downloader

This system provides a robust method for downloading historical webpage snapshots from the Internet Archive’s Wayback Machine, specifically designed for the FBI NDIS statistics pages.

Download Methods

The download system implements a sequential approach optimized for reliability and respectful API usage:

  • Resilient Downloading: Automatic retries with exponential backoff (2s → 4s → 8s → 16s → 32s delays) and extended 60-second timeouts for reliable network handling

  • Smart File Management: Context-aware naming scheme using timestamp + state/scope identifier (e.g., 20040312_ne.html for pre-2007, 20150621_ndis.html for post-2007)

  • Duplicate Prevention: Automatically skips already downloaded files to prevent redundant operations

  • Progress Tracking: Real-time download status with completion counters and detailed success/failure reporting

  • Rate Limiting: 1.5-second delays between requests to avoid overloading the Wayback Machine servers

Show downloader helpers code
def download_with_retry(url, save_path, max_retries=4, initial_delay=2):
    """
    Download an archived snapshot with retries and exponential backoff.
    """
    delay = initial_delay
    for attempt in range(max_retries):
        try:
            resp = requests.get(url, timeout=120)
            resp.raise_for_status()
            
            # Validate content is reasonable
            if len(resp.content) < 500:
                if b"error" in resp.content.lower() or b"not found" in resp.content.lower():
                    raise requests.exceptions.RequestException(f"Possible error page: {len(resp.content)} bytes")
            
            # Save to disk
            with open(save_path, "wb") as f:
                f.write(resp.content)
            print(f"✓ Downloaded: {save_path}")
            return True
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                print(f"✗ Final attempt failed for {url}: {str(e)}")
                return False
            print(f"! Attempt {attempt+1} failed, retrying in {delay}s...")
            time.sleep(delay)
            delay *= 2
    return False


def snapshot_to_filepath(row):
    """
    Map a snapshot record to a local filename.
    Format: {timestamp}_{state_or_scope}.html
    """
    ts = row["timestamp"]
    original = row["original"]
    
    # derive name from FBI URL pattern
    if "/codis/" in original and original.endswith(".htm"):
        # Pre-2007: use last part (state code or army/dc)
        suffix = Path(original).stem
    else:
        # Post-2007: consolidated page, use 'ndis'
        suffix = "ndis"
    
    save_dir = HTML_DIR
    save_dir.mkdir(parents=True, exist_ok=True)  # Ensure directory exists
    
    return save_dir / f"{ts}_{suffix}.html"

Download Execution

The download execution phase performs bulk retrieval of historical NDIS snapshots with comprehensive error handling:

  • Sequential Processing: Iterates through snapshot DataFrame chronologically, processing each file individually for maximum reliability

  • URL Construction: Uses identity flag (id_) in archive URLs to retrieve unmodified original content: https://web.archive.org/web/{timestamp}id_/{original}

  • Binary Preservation: Saves files as binary content to maintain original encoding and prevent character corruption

  • Comprehensive Logging: Provides real-time progress updates with attempt counters and final success/failure statistics

  • Flexible Limiting: Optional download limits for testing or partial processing

Show download execution code
def download_snapshots(snap_df, limit=None):
    """
    Iterate over snapshot DataFrame and download archived HTML pages.
    """
    total = len(snap_df) if limit is None else min(limit, len(snap_df))
    print(f"Starting download of {total} snapshots at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    successes, failures = 0, 0
    failed_downloads = []
    
    for i, row in enumerate(snap_df.head(total).itertuples(index=False), 1):
        ts, original, _, _ = row
        save_path = snapshot_to_filepath(row._asdict())
        
        if save_path.exists() and save_path.stat().st_size > 1000:
            print(f"- [{i}/{total}] Already exists: {save_path}")
            successes += 1
            continue
        
        archive_url = f"https://web.archive.org/web/{ts}id_/{original}"
        print(f"- [{i}/{total}] Downloading {archive_url}")
        
        if download_with_retry(archive_url, save_path):
            successes += 1
        else:
            failures += 1
            failed_downloads.append({
                'timestamp': ts,
                'url': original,
                'archive_url': archive_url
            })
        
        if i % 50 == 0:
            print(f"Progress: {i}/{total} ({i/total*100:.1f}%) - {successes} success, {failures} failures")
        
        time.sleep(1.5)
    
    # Save failure report
    if failed_downloads:
        failure_report = {
            'download_completed': datetime.now().isoformat(),
            'total_attempted': total,
            'successful': successes,
            'failed': failures,
            'failed_downloads': failed_downloads
        }
        with open(META_DIR / "download_failures.json", 'w') as f:
            json.dump(failure_report, f, indent=2)
    
    print(f"\nDownload completed. Success: {successes}, Failures: {failures}")
    return successes, failures


successes, failures = download_snapshots(snap_df, limit=None)

Download Validation

Post-download validation ensures data integrity and identifies potential issues:

  • File Existence Verification: Checks that all expected files were successfully downloaded to the target directory

  • Content Quality Assessment: Validates HTML content by examining file headers for proper HTML tags

  • Error Categorization: Separates missing files from corrupted/non-HTML files for targeted remediation

  • Metadata Generation: Creates JSON validation reports with detailed statistics and file counts

  • Actionable Reporting: Provides clear feedback on download success rates and files requiring attention

Show download validation code
def validate_downloads(snap_df):
    """
    Validate downloaded files exist and are HTML-like.
    """
    missing, bad_html = [], []
    
    for row in snap_df.itertuples(index=False):
        save_path = snapshot_to_filepath(row._asdict())
        if not save_path.exists():
            missing.append(save_path)
            continue
        try:
            file_size = save_path.stat().st_size
            if file_size < 1000:
                bad_html.append(save_path)
                continue
                
            with open(save_path, "rb") as f:
                start = f.read(500).lower()
            if b"<html" not in start and b"<!doctype" not in start:
                bad_html.append(save_path)
        except Exception:
            bad_html.append(save_path)
    
    print(f"Validation results → Missing: {len(missing)}, Bad HTML: {len(bad_html)}")
    return missing, bad_html

# Run validation
missing, bad_html = validate_downloads(snap_df)

# Save validation summary
validation_meta = {
    "validation_performed": datetime.now().isoformat(),
    "missing_files": len(missing),
    "bad_html_files": len(bad_html),
    "total_snapshots": len(snap_df),
    "successful_downloads": successes,
    "success_rate": f"{(successes/len(snap_df))*100:.1f}%"
}
with open(META_DIR / "validation_metadata.json", "w") as f:
    json.dump(validation_meta, f, indent=2)
Show validation report code
# Save and print report
report_path = META_DIR / f"validation_metadata.json"
with open(report_path, 'w') as f:
    json.dump(validation_meta, f, indent=2)

print(f"\n{'='*60}")
print("DOWNLOAD VALIDATION REPORT")
print(f"{'='*60}")
print(f"  Missing files: {validation_meta['missing_files']}/{validation_meta['total_snapshots']}")
print(f"  Bad HTML files: {validation_meta['bad_html_files']}/{validation_meta['total_snapshots']}")
print(f"  Successful downloads: {validation_meta['successful_downloads']}/{validation_meta['total_snapshots']} ({validation_meta['success_rate']})")

print(f"\nFull report: {report_path}")

Data Extraction

The data extraction pipeline converts downloaded HTML snapshots into structured tabular data, handling the evolution of FBI NDIS reporting formats across different time periods.

Extraction Overview

The extraction system processes three distinct eras of NDIS reporting:

  1. Pre-2007 Era: Basic statistics without date metadata or arrestee data

  2. 2007-2011 Era: Includes “as of” dates but no arrestee profiles

  3. Post-2012 Era: Complete format with all profile types and consistent dating

Key Features:

  • Era-Aware Processing: Automatically routes files to appropriate parsers based on timestamp

  • Metadata Recovery: Extracts report dates from “as of” statements when available

  • Complete Traceability: Links each record to its source HTML file and original URL

  • Robust Error Handling: Processes files individually to prevent single failures from stopping the entire batch

Core Parser Functions

Essential text processing utilities for NDIS data extraction:

  • HTML Cleaning: Removes navigation, scripts, and styling elements to focus on data content

  • Date Extraction: Identifies and parses “as of” dates using multiple pattern variations

  • Text Normalization: Standardizes whitespace and jurisdiction name formatting

  • Encoding Handling: Manages various character encodings found in historical snapshots

Show setup and normalization functions code
def extract_ndis_metadata(html_content):
    """
    Extract key metadata from NDIS HTML content including report dates
    
    Returns:
    --------
    dict:
        - report_month: Month from "as of" statement (None if not found)
        - report_year: Year from "as of" statement (None if not found)  
        - clean_text: Normalized text content
    """
    
    # Multiple patterns to catch different "as of" formats
    date_patterns = [
        r'[Aa]s of ([A-Za-z]+) (\d{4})',           # "as of November 2008"
        r'[Aa]s of ([A-Za-z]+) (\d{1,2}), (\d{4})', # "as of November 15, 2008"
        r'Statistics as of ([A-Za-z]+) (\d{4})',    # "Statistics as of November 2008"
        r'Statistics as of ([A-Za-z]+) (\d{1,2}), (\d{4})' # "Statistics as of November 15, 2008"
    ]
    
    report_month = None
    report_year = None
    
    # Find first occurrence of any date pattern
    for pattern in date_patterns:
        date_match = re.search(pattern, html_content)
        if date_match:
            month_str = date_match.group(1)
            if len(date_match.groups()) == 2:  # Month + Year only
                year_str = date_match.group(2)
            else:  # Month + Day + Year
                year_str = date_match.group(3)
            
            # Convert month name to number
            try:
                month_num = pd.to_datetime(f"{month_str} 1, 2000").month
                report_month = month_num
                report_year = int(year_str)
                break
            except:
                continue
    
    # Clean HTML and normalize text
    soup = BeautifulSoup(html_content, 'lxml')
    
    # Remove scripts, styles, and navigation elements
    for element in soup(['script', 'style', 'nav', 'header', 'footer']):
        element.decompose()
    
    # Get clean text with normalized whitespace
    clean_text = re.sub(r'\s+', ' ', soup.get_text(' ', strip=True))
    
    return {
        'report_month': report_month,
        'report_year': report_year, 
        'clean_text': clean_text
    }

def standardize_jurisdiction_name(name):
    """
    Clean and standardize jurisdiction names for consistency
    """
    if not name:
        return name
        
    # Remove common prefixes and suffixes
    name = re.sub(r'^.*?(Back to top|Tables by NDIS Participant|ation\.)\s*', 
                 '', name, flags=re.I).strip()
    
    # Standardize known variants
    replacements = {
        'D.C./FBI Lab': 'DC/FBI Lab',
        'D.C./Metro PD': 'DC/Metro PD', 
        'US Army': 'U.S. Army',
        'D.C.': 'DC'
    }
    
    for old, new in replacements.items():
        name = name.replace(old, new)
    
    return name.strip()

def extract_original_url_from_filename(html_file):
    """
    Reconstruct original URL from filename and timestamp
    """
    filename = html_file.name
    timestamp = filename.split('_')[0]  # Get timestamp part
    
    # Determine URL pattern based on filename suffix
    if filename.endswith('_ndis.html'):
        # Post-2007 consolidated format - use most common URL pattern
        return "https://www.fbi.gov/services/laboratory/biometric-analysis/codis/ndis-statistics"
    else:
        # Pre-2007 state-specific format
        state_code = filename.split('_')[1].replace('.html', '')
        return f"http://www.fbi.gov/hq/lab/codis/{state_code}.htm"

Era-Specific Parsers

Time-period-adapted parsing logic that accounts for format evolution:

Pre-2007 Parser:

  • Extracts basic statistics from state-specific pages

  • Uses timestamp-derived year (no report dates available)

  • Sets arrestee counts to 0 (not reported in this era)

  • Handles missing NDIS labs and investigations data

2008-2011 Parser:

  • Processes consolidated pages with “Back to top” section dividers

  • Extracts month and year from report dates

  • Handles missing arrestee data (sets to 0)

  • Multiple pattern matching for jurisdiction identification

2012-2016 Parser:

  • First era with arrestee data extraction

  • Processes consolidated pages with “Back to top” section dividers

  • Multiple pattern matching for jurisdiction identification

  • Complete jurisdiction coverage with standardized names

Post-2017 Parser:

  • Modern format with consistent structure and all fields

  • Robust regex pattern for reliable extraction

  • Full feature extraction including arrestee profiles

  • Complete jurisdiction coverage with standardized names

Show parser functions code
def parse_pre2007_ndis(text, timestamp, html_file, report_month=None, report_year=None):
    """
    Parse NDIS snapshots from 2001-2007 era (state-specific pages)
    HTML has table structure with state name followed by data rows
    """
    records = []
    
    # Clean up the text for better matching
    text = re.sub(r'\s+', ' ', text)
    
    # Try multiple patterns to extract state name
    jurisdiction = None
    
    # Pattern 1: State name in large font (from graphic alt text or heading)
    # Looking for patterns like "Graphic of Pennsylvania Pennsylvania" or just the state name
    state_pattern1 = r'(?:Graphic of|alt=")([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*?)(?:"|>|\s+Offender)'
    state_match = re.search(state_pattern1, text, re.IGNORECASE)
    
    if state_match:
        jurisdiction = state_match.group(1).strip()
    
    # Pattern 2: Try to extract from filename as fallback
    if not jurisdiction:
        # Extract state code from filename (e.g., "20010715040342_pa.html")
        filename = html_file.name
        state_code_match = re.search(r'_([a-z]{2,5})\.html$', filename)
        if state_code_match:
            state_code = state_code_match.group(1)
            # Map common state codes to names
            state_map = {
                'pa': 'Pennsylvania', 'nc': 'North Carolina', 'ct': 'Connecticut',
                'wv': 'West Virginia', 'ks': 'Kansas', 'nd': 'North Dakota',
                'wy': 'Wyoming', 'ky': 'Kentucky', 'la': 'Louisiana',
                'dc': 'DC/FBI Lab', 'de': 'Delaware', 'ne': 'Nebraska',
                'sc': 'South Carolina', 'tn': 'Tennessee', 'ma': 'Massachusetts',
                'fl': 'Florida', 'nh': 'New Hampshire', 'sd': 'South Dakota',
                'me': 'Maine', 'hi': 'Hawaii', 'nm': 'New Mexico',
                'al': 'Alabama', 'tx': 'Texas', 'mi': 'Michigan',
                'ut': 'Utah', 'ar': 'Arkansas', 'az': 'Arizona',
                'mo': 'Missouri', 'ny': 'New York', 'mn': 'Minnesota',
                'vt': 'Vermont', 'id': 'Idaho', 'oh': 'Ohio',
                'ok': 'Oklahoma', 'or': 'Oregon', 'ca': 'California',
                'il': 'Illinois', 'wi': 'Wisconsin', 'ms': 'Mississippi',
                'wa': 'Washington', 'mt': 'Montana', 'in': 'Indiana',
                'co': 'Colorado', 'va': 'Virginia', 'ga': 'Georgia',
                'ak': 'Alaska', 'md': 'Maryland', 'nj': 'New Jersey',
                'nv': 'Nevada', 'ri': 'Rhode Island', 'ia': 'Iowa',
                'army': 'U.S. Army'
            }
            jurisdiction = state_map.get(state_code, state_code.upper())
    
    if jurisdiction:
        jurisdiction = standardize_jurisdiction_name(jurisdiction)
        
        # Extract individual values with more flexible patterns
        # These patterns work with the table structure in your example
        offender_match = re.search(r'Offender\s+Profiles?\s+([\d,]+)', text, re.IGNORECASE)
        forensic_match = re.search(r'Forensic\s+(?:Samples?|Profiles?)\s+([\d,]+)', text, re.IGNORECASE)
        
        # NDIS labs can appear as "NDIS Participating Labs" or just "Number of CODIS Labs"
        ndis_labs_match = re.search(r'(?:NDIS\s+Participating\s+Labs?|Number\s+of\s+CODIS\s+Labs?)\s+(\d+)', text, re.IGNORECASE)
        
        investigations_match = re.search(r'Investigations?\s+Aided\s+([\d,]+)', text, re.IGNORECASE)
        
        if offender_match and forensic_match:
            records.append({
                'timestamp': timestamp,
                'report_month': None,  # Not available pre-2007
                'report_year': None,   # Not available pre-2007
                'jurisdiction': jurisdiction,
                'offender_profiles': int(offender_match.group(1).replace(',', '')),
                'arrestee': 0,  # Not reported pre-2007
                'forensic_profiles': int(forensic_match.group(1).replace(',', '')),
                'ndis_labs': int(ndis_labs_match.group(1)) if ndis_labs_match else 0,
                'investigations_aided': int(investigations_match.group(1).replace(',', '')) if investigations_match else 0
            })
    
    return records

def parse_2008_2011_ndis(text, timestamp, html_file, report_month=None, report_year=None):
    """
    Parse NDIS snapshots from 2008-2011 era
    Consolidated page with state anchors and "Back to top" links
    No arrestee data in this period
    """
    records = []
    
    # Clean up the text
    text = re.sub(r'\s+', ' ', text)
    
    # Split by "Back to top" to isolate each state section
    sections = re.split(r'Back\s+to\s+top', text, flags=re.IGNORECASE)
    
    for section in sections:
        # Look for state name pattern (appears as anchor or bold text)
        # Try multiple patterns to catch different HTML formats
        jurisdiction = None
        
        # Pattern 1: <a name="State"></a><strong>State</strong>
        state_match = re.search(r'<a\s+name="([^"]+)"[^>]*>.*?(?:<strong>|<b>)\s*([A-Z][^<]+?)(?:</strong>|</b>)', section, re.IGNORECASE)
        if state_match:
            jurisdiction = state_match.group(2).strip()
        
        # Pattern 2: Just the state name in bold/strong tags before "Statistical Information"
        if not jurisdiction:
            state_match = re.search(r'(?:<strong>|<b>)\s*([A-Z][^<]+?)(?:</strong>|</b>).*?Statistical\s+Information', section, re.IGNORECASE)
            if state_match:
                jurisdiction = state_match.group(1).strip()
        
        # Pattern 3: State name without tags before "Statistical Information"
        if not jurisdiction:
            state_match = re.search(r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\s+Statistical\s+Information', section)
            if state_match:
                jurisdiction = state_match.group(1).strip()
        
        if jurisdiction:
            jurisdiction = standardize_jurisdiction_name(jurisdiction)
            
            # Extract values
            offender_match = re.search(r'Offender\s+Profiles?\s+([\d,]+)', section, re.IGNORECASE)
            forensic_match = re.search(r'Forensic\s+(?:Samples?|Profiles?)\s+([\d,]+)', section, re.IGNORECASE)
            ndis_labs_match = re.search(r'NDIS\s+Participating\s+Labs?\s+(\d+)', section, re.IGNORECASE)
            investigations_match = re.search(r'Investigations?\s+Aided\s+([\d,]+)', section, re.IGNORECASE)
            
            if offender_match and forensic_match:
                records.append({
                    'timestamp': timestamp,
                    'report_month': report_month,
                    'report_year': report_year,
                    'jurisdiction': jurisdiction,
                    'offender_profiles': int(offender_match.group(1).replace(',', '')),
                    'arrestee': 0,  # Not reported 2008-2011
                    'forensic_profiles': int(forensic_match.group(1).replace(',', '')),
                    'ndis_labs': int(ndis_labs_match.group(1)) if ndis_labs_match else 0,
                    'investigations_aided': int(investigations_match.group(1).replace(',', '')) if investigations_match else 0
                })
    
    return records


def parse_2012_2016_ndis(text, timestamp, html_file, report_month=None, report_year=None):
    """
    Parse NDIS snapshots from 2012-2016 era
    Includes arrestee data for the first time
    """
    records = []
    
    # Clean up the text
    text = re.sub(r'\s+', ' ', text)
    
    # Split by "Back to top" to isolate each state section
    sections = re.split(r'Back\s+to\s+top', text, flags=re.IGNORECASE)
    
    for section in sections:
        jurisdiction = None
        
        # Look for state name patterns
        # Pattern 1: <a name="State"></a><b>State</b>
        state_match = re.search(r'<a\s+name="([^"]+)"[^>]*>.*?<b>([^<]+?)</b>', section, re.IGNORECASE)
        if state_match:
            jurisdiction = state_match.group(2).strip()
        
        # Pattern 2: Just bold state name
        if not jurisdiction:
            state_match = re.search(r'<b>([A-Z][^<]+?)</b>.*?Statistical\s+Information', section, re.IGNORECASE)
            if state_match:
                jurisdiction = state_match.group(1).strip()
        
        # Pattern 3: State name without tags
        if not jurisdiction:
            state_match = re.search(r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\s+Statistical\s+Information', section)
            if state_match:
                jurisdiction = state_match.group(1).strip()
        
        if jurisdiction:
            jurisdiction = standardize_jurisdiction_name(jurisdiction)
            
            # Extract values INCLUDING arrestee which appears starting 2012
            offender_match = re.search(r'Offender\s+Profiles?\s+([\d,]+)', section, re.IGNORECASE)
            arrestee_match = re.search(r'Arrestee\s+([\d,]+)', section, re.IGNORECASE)
            forensic_match = re.search(r'Forensic\s+Profiles?\s+([\d,]+)', section, re.IGNORECASE)
            ndis_labs_match = re.search(r'NDIS\s+Participating\s+Labs?\s+(\d+)', section, re.IGNORECASE)
            investigations_match = re.search(r'Investigations?\s+Aided\s+([\d,]+)', section, re.IGNORECASE)
            
            if offender_match and forensic_match:
                records.append({
                    'timestamp': timestamp,
                    'report_month': report_month,
                    'report_year': report_year,
                    'jurisdiction': jurisdiction,
                    'offender_profiles': int(offender_match.group(1).replace(',', '')),
                    'arrestee': int(arrestee_match.group(1).replace(',', '')) if arrestee_match else 0,
                    'forensic_profiles': int(forensic_match.group(1).replace(',', '')),
                    'ndis_labs': int(ndis_labs_match.group(1)) if ndis_labs_match else 0,
                    'investigations_aided': int(investigations_match.group(1).replace(',', '')) if investigations_match else 0
                })
    
    return records


def parse_post2017_ndis(text, timestamp, html_file, report_month=None, report_year=None):
    """
    Parse NDIS snapshots from 2017+ era
    Modern format with consistent structure and all fields
    Keep using your existing working pattern for this era
    """
    records = []
    
    # This is your existing working pattern - don't change it
    pattern = re.compile(
        r'([A-Z][\w\s\.\-\'\/&\(\)]+?)Statistical Information'
        r'.*?Offender Profiles\s+([\d,]+)'
        r'.*?Arrestee\s+([\d,]+)'
        r'.*?Forensic Profiles\s+([\d,]+)'
        r'.*?NDIS Participating Labs\s+(\d+)'
        r'.*?Investigations Aided\s+([\d,]+)',
        re.IGNORECASE | re.DOTALL
    )
    
    for match in pattern.finditer(text):
        records.append({
            'timestamp': timestamp,
            'report_month': report_month,
            'report_year': report_year,
            'jurisdiction': standardize_jurisdiction_name(match.group(1)),
            'offender_profiles': int(match.group(2).replace(',', '')),
            'arrestee': int(match.group(3).replace(',', '')),
            'forensic_profiles': int(match.group(4).replace(',', '')),
            'ndis_labs': int(match.group(5)),
            'investigations_aided': int(match.group(6).replace(',', ''))
        })
    
    return records

Output Schema

Each extracted record contains the following standardized fields:

Field Description Availability
timestamp Wayback capture timestamp (YYYYMMDDHHMMSS) All eras
report_month Month from “as of” statement 2007+ only
report_year Year from “as of” statement 2007+ only
jurisdiction Standardized state/agency name All eras
offender_profiles DNA profiles from convicted offenders All eras
arrestee DNA profiles from arrestees 2012+ only
forensic_profiles Crime scene DNA profiles All eras
ndis_labs Number of participating laboratories All eras
investigations_aided Cases assisted by DNA matches All eras

Batch Processing

The complete extraction workflow:

File Discovery

  • Scans download directory for HTML files

  • Sorts chronologically for consistent processing

  • Tracks progress with detailed logging

Individual File Processing

  • Reads HTML content with encoding fallback

  • Extracts metadata and cleans content

  • Routes to era-appropriate parser based on timestamp

  • Captures source file information for traceability

Data Consolidation

  • Combines all records into single DataFrame

  • Adds derived timestamp columns (capture_date, year)

  • Validates data integrity and completeness

  • Sorts by capture date and jurisdiction for consistency

Show batch processing function code
def process_ndis_snapshot(html_file):
    """
    Convert single NDIS HTML file to structured data
    Routes to appropriate parser based on timestamp year
    """
    try:
        # Read HTML content with multiple encoding attempts
        content = None
        for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
            try:
                content = html_file.read_text(encoding=encoding)
                break
            except UnicodeDecodeError:
                continue
        
        if content is None:
            # Final fallback with error ignoring
            content = html_file.read_text(encoding='latin-1', errors='ignore')
        
        # Check if file has reasonable content
        if len(content.strip()) < 100:
            print(f"⚠  Small file detected: {html_file.name} ({len(content)} chars)")
            return []
        
        # Extract metadata
        metadata = extract_ndis_metadata(content)
        timestamp = html_file.stem.split('_')[0]
        
        try:
            year = int(timestamp[:4])
        except ValueError:
            print(f"⚠  Invalid timestamp in filename: {html_file.name}")
            return []
        
        # Route to appropriate parser with fallback
        records = []
        parser_used = None
        
        if year <= 2007:
            records = parse_pre2007_ndis(metadata['clean_text'], timestamp, html_file,
                                       metadata['report_month'], metadata['report_year'])
            parser_used = "pre2007"
        elif year <= 2011:
            records = parse_2008_2011_ndis(metadata['clean_text'], timestamp, html_file,
                                         metadata['report_month'], metadata['report_year'])
            parser_used = "2008-2011"
        elif year <= 2016:
            records = parse_2012_2016_ndis(metadata['clean_text'], timestamp, html_file,
                                         metadata['report_month'], metadata['report_year'])
            parser_used = "2012-2016"
        else:
            records = parse_post2017_ndis(metadata['clean_text'], timestamp, html_file,
                                        metadata['report_month'], metadata['report_year'])
            parser_used = "post2017"
        
        if records:
            # Add parser info for debugging
            for record in records:
                record['parser_used'] = parser_used
                record['source_file'] = html_file.name
            return records
        else:
            print(f"⚠  No records extracted from {html_file.name} (year: {year}, parser: {parser_used})")
            return []
            
    except Exception as e:
        print(f"❌ Error processing {html_file.name}: {str(e)}")
        return []


def process_all_snapshots():
    """
    Process all downloaded snapshots into a single DataFrame
    
    Returns:
    --------
    pd.DataFrame
        Combined dataset with all snapshots
    """
    all_records = []
    html_files = sorted(NDIS_SNAPSHOTS_DIR.glob("*.html"))
    
    print(f"Processing {len(html_files)} HTML snapshots...")
    
    successful_files = 0
    failed_files = 0
    failed_list = []
    
    for html_file in tqdm(html_files, desc="Extracting NDIS data"):
        records = process_ndis_snapshot(html_file)
        if records:
            all_records.extend(records)
            successful_files += 1
        else:
            failed_files += 1
            failed_list.append(html_file.name)
    
    print(f"Processing complete: {successful_files} successful, {failed_files} failed")
    
    # Save failure report
    if failed_list:
        failure_report_path = META_DIR / "processing_failures.json"
        with open(failure_report_path, 'w') as f:
            json.dump({
                'timestamp': datetime.now().isoformat(),
                'total_files': len(html_files),
                'successful': successful_files,
                'failed': failed_files,
                'failed_files': failed_list[:100]  # Limit to first 100
            }, f, indent=2)
        print(f"Failure report saved: {failure_report_path}")
    
    if not all_records:
        print("Warning: No records extracted!")
        return pd.DataFrame()
    
    df = pd.DataFrame(all_records)
    
    # Add derived date columns
    df['capture_date'] = pd.to_datetime(df['timestamp'], format='%Y%m%d%H%M%S', errors='coerce')
    df['capture_year'] = df['capture_date'].dt.year
    
    # Remove any records with invalid dates
    invalid_dates = df['capture_date'].isna().sum()
    if invalid_dates > 0:
        print(f"⚠  Removed {invalid_dates} records with invalid dates")
        df = df[df['capture_date'].notna()]
    
    return df.sort_values(['capture_date', 'jurisdiction']).reset_index(drop=True)

Export & Validation

Structured output generation with comprehensive quality control:

Validation Checks:

  • Verifies all required columns are present

  • Checks for null values in critical fields

  • Validates numeric ranges (non-negative counts)

  • Confirms timestamp format consistency

  • Ensures jurisdiction names contain valid characters

Export Features

  • Saves as UTF-8 encoded CSV for maximum compatibility

  • Generates timestamped filenames for version control

  • Creates metadata summary with file statistics

  • Performs round-trip validation to confirm data integrity

Quality Metrics

  • Records total file count and processing success rate

  • Tracks temporal coverage (earliest to latest snapshots)

  • Documents jurisdiction coverage across time periods

  • Reports data completeness by era and field

Show execution function code
def export_ndis_data(df, output_dir=OUTPUT_DIR):
    """
    Export processed NDIS data with comprehensive metadata
    """
    if df.empty:
        print("Warning: DataFrame is empty, skipping export")
        return None
    
    # Generate output filename
    export_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    csv_path = output_dir / f"ndis_data_raw.csv"
    
    # Export main dataset
    df.to_csv(csv_path, index=False, encoding='utf-8')
    
    # Calculate export metadata
    file_size_mb = csv_path.stat().st_size / (1024 * 1024)
    
    export_metadata = {
        'export_timestamp': export_timestamp,
        'export_path': str(csv_path.resolve()),
        'record_count': len(df),
        'file_size_mb': round(file_size_mb, 2),
        'unique_snapshots': df['timestamp'].nunique(),
        'unique_jurisdictions': df['jurisdiction'].nunique(),
        'date_coverage': {
            'earliest_capture': df['capture_date'].min().isoformat(),
            'latest_capture': df['capture_date'].max().isoformat(),
            'span_years': df['capture_year'].max() - df['capture_year'].min() + 1
        },
        'data_completeness': {
            'with_report_dates': len(df[df['report_year'].notna()]),
            'with_arrestee_data': len(df[df['arrestee'] > 0]),
            'total_investigations_aided': int(df['investigations_aided'].sum())
        }
    }
    
    # Save metadata
    metadata_path = META_DIR / f"ndis_export_metadata_{export_timestamp}.json"
    import json
    with open(metadata_path, 'w') as f:
        json.dump(export_metadata, f, indent=2, default=str)
    
    print(f"✓ Data exported: {csv_path}")
    print(f"✓ Metadata saved: {metadata_path}")
    print(f"✓ Export summary: {len(df):,} records, {file_size_mb:.1f} MB")
    
    return export_metadata

Data integrity validation

  • Schema Checking: Ensures proper field types and formats.

  • Null Validation: Confirms mandatory fields are populated.

  • Value Sanity Checks: Verifies non-negative numbers.

Show validation function code
def validate_extracted_data(df):
    """
    Comprehensive validation of extracted NDIS data
    """
    print("Validating extracted data...")
    
    # Required columns check
    required_cols = [
        'timestamp', 'jurisdiction',
        'offender_profiles', 'arrestee', 'forensic_profiles', 
        'ndis_labs', 'investigations_aided'
    ]
    
    validation_results = {}
    
    # Check column presence
    missing_cols = set(required_cols) - set(df.columns)
    validation_results['missing_columns'] = list(missing_cols)
    
    if missing_cols:
        print(f"✗ Missing required columns: {missing_cols}")
        return validation_results
    
    # Data quality checks
    validation_results['total_records'] = len(df)
    validation_results['unique_timestamps'] = df['timestamp'].nunique()
    validation_results['unique_jurisdictions'] = df['jurisdiction'].nunique()
    validation_results['date_range'] = {
        'earliest': df['capture_date'].min().strftime('%Y-%m-%d'),
        'latest': df['capture_date'].max().strftime('%Y-%m-%d')
    }
       
    # Value range checks
    numeric_cols = ['offender_profiles', 'arrestee', 'forensic_profiles', 'ndis_labs', 'investigations_aided']
    negative_values = {}
    for col in numeric_cols:
        negative_count = (df[col] < 0).sum()
        if negative_count > 0:
            negative_values[col] = negative_count
    validation_results['negative_values'] = negative_values
    
    # Era-specific validation
    pre2007_count = len(df[df['capture_year'] < 2007])
    arrestee_pre2012 = len(df[(df['capture_year'] < 2012) & (df['arrestee'] > 0)])
    
    validation_results['era_checks'] = {
        'pre2007_records': pre2007_count,
        'arrestee_before_2012': arrestee_pre2012  # Should be 0
    }
    
    # Print summary
    print(f"✓ Total records: {validation_results['total_records']:,}")
    print(f"✓ Unique snapshots: {validation_results['unique_timestamps']}")
    print(f"✓ Unique jurisdictions: {validation_results['unique_jurisdictions']}")
    print(f"✓ Date range: {validation_results['date_range']['earliest']} to {validation_results['date_range']['latest']}")
    
    if validation_results['negative_values']:
        print(f"! Negative values found: {validation_results['negative_values']}")
    
    if validation_results['era_checks']['arrestee_before_2012'] > 0:
        print(f"! Data integrity issue: {validation_results['era_checks']['arrestee_before_2012']} arrestee records found before 2012")
    
    return validation_results

Extraction Execution

Show main execution code
if __name__ == "__main__":
    # Process all snapshots
    ndis_data = process_all_snapshots()
    
    if not ndis_data.empty:
        # Validate data quality
        validation_results = validate_extracted_data(ndis_data)
        
        # Export if validation passes
        export_metadata = export_ndis_data(ndis_data)
        print("\n" + "="*50)
        print("EXTRACTION COMPLETE")
        print("="*50)
        print(f"Records extracted: {len(ndis_data):,}")
        print(f"Time span: {ndis_data['capture_year'].min()}-{ndis_data['capture_year'].max()}")
        print(f"Jurisdictions: {ndis_data['jurisdiction'].nunique()}")
    else:
        print("No data extracted - check HTML files and parsing logic")

View Analyses →