Skip to content

ConnectedDrivingPipelineV4

⭐ 1 stars | 🔱 2 forks

📊 Project Details

  • Primary Language: Python
  • Languages Used: Python, Shell, Makefile, Dockerfile
  • License: None
  • Created: April 23, 2023
  • Last Updated: April 10, 2026

📝 About

ConnectedDrivingPipelineV4

A high-performance, Dask-powered pipeline for connected driving dataset processing and machine learning. This framework provides a unified, config-driven approach to BSM (Basic Safety Message) data analysis, attack simulation, and ML classifier training.

Overview

ConnectedDrivingPipelineV4 is a complete rewrite of the original pandas-based pipeline, now leveraging Dask for distributed computing on a 64GB single-node workstation. The migration enables processing of 15M+ row datasets with significantly improved performance and memory efficiency.

Key Features: - Unified Pipeline Runner: Single DaskPipelineRunner replaces 55+ individual pipeline scripts - Config-Driven: JSON-based configuration for reproducible experiments - Scalable: Handles 15M+ rows on 64GB RAM (pandas struggled at 5M rows) - Fast: 2-4x speedup on data cleaning and attack simulation operations - Efficient Caching: Parquet-based caching with ≥85% hit rates - Production Ready: Comprehensive testing, validation, and monitoring tools

Quick Start

Prerequisites

Hardware Requirements: - RAM: 64GB (minimum) - critical requirement - CPU: 6+ cores (12 cores recommended) - Storage: 500GB+ SSD (1TB+ recommended for caching)

Software Requirements: - Python 3.10+ (3.10, 3.11, or 3.12 supported - NOT 3.8 or 3.9) - Linux/macOS (Windows may work but not tested)

Installation

  1. Clone the repository:

    git clone https://github.com/aaron777collins/ConnectedDrivingPipelineV4.git
    cd ConnectedDrivingPipelineV4
    

  2. Create virtual environment:

    python3 -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    

  3. Install dependencies:

    pip install -r requirements.txt
    

  4. Verify Dask setup:

    python validate_dask_setup.py
    

Expected output:

✅ All Dask dependencies installed correctly
✅ 64GB RAM detected (sufficient for 15M rows)
✅ Dask LocalCluster initialized successfully
✅ System ready for production workloads

Docker Deployment (Alternative)

For containerized deployment with Docker:

# Build the Docker image
docker compose build

# Validate Dask setup inside the container
docker compose run --rm pipeline python validate_dask_setup.py

# Expected output:
# ✅ All Dask dependencies installed correctly
# ✅ 64GB RAM detected (sufficient for 15M rows)
# ✅ Dask LocalCluster initialized successfully
# ✅ System ready for production workloads

# Start the pipeline service
docker compose up -d pipeline

# Access Dask dashboard at http://localhost:8787

Note: The validation script (validate_dask_setup.py) runs 8 comprehensive tests to verify Dask configuration, dependencies, and system resources. This script is included in the Docker image and should always pass before running production workloads.

See DOCKER.md for complete Docker deployment guide, including production configurations, scaling, and monitoring.

Basic Usage

1. Run a Pipeline from Config

The easiest way to run a pipeline is using a pre-configured JSON file:

python -c "
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner

runner = DaskPipelineRunner.from_config('MClassifierPipelines/configs/example_pipeline.json')
results = runner.run()
print(f'Pipeline complete. Results saved to {runner.csvWriter.filename}')
"

2. Create a Custom Configuration

Create a JSON config file (e.g., my_pipeline.json):

{
  "pipeline_name": "MyFirstPipeline",
  "data": {
    "source_file": "data/bsm_data.csv",
    "filtering": {
      "type": "xy_offset_position",
      "distance_meters": 2000,
      "center_x": -106.0831353,
      "center_y": 41.5430216
    },
    "date_range": {
      "start_day": 1,
      "end_day": 30,
      "start_month": 4,
      "end_month": 4,
      "start_year": 2021,
      "end_year": 2021
    },
    "num_subsection_rows": 100000
  },
  "features": {
    "columns": "minimal_xy_elev"
  },
  "attacks": {
    "enabled": true,
    "attack_ratio": 0.3,
    "type": "rand_offset",
    "min_distance": 10,
    "max_distance": 20,
    "random_seed": 42
  },
  "ml": {
    "train_test_split": {
      "type": "random",
      "train_ratio": 0.8,
      "test_ratio": 0.2,
      "random_seed": 42
    }
  },
  "cache": {
    "enabled": true
  }
}

Then run it:

python -c "
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner
runner = DaskPipelineRunner.from_config('my_pipeline.json')
results = runner.run()
"

3. Programmatic Usage

For Python scripts or notebooks:

from MachineLearning.DaskPipelineRunner import DaskPipelineRunner

# Define config inline
config = {
    "pipeline_name": "ProgrammaticPipeline",
    "data": {
        "source_file": "data/bsm_data.csv",
        "filtering": {
            "type": "xy_offset_position",
            "distance_meters": 1000,
            "center_x": -106.0831353,
            "center_y": 41.5430216
        }
    },
    "features": {"columns": "minimal_xy_elev"},
    "attacks": {"enabled": False},
    "ml": {
        "train_test_split": {
            "type": "random",
            "train_ratio": 0.8,
            "test_ratio": 0.2
        }
    },
    "cache": {"enabled": True}
}

# Run pipeline
runner = DaskPipelineRunner(config)
results = runner.run()

# Access results
print(f"Models trained: {len(results)}")
for model_name, metrics in results.items():
    print(f"{model_name}: Test Accuracy = {metrics['test_accuracy']:.4f}")

Configuration Reference

Pipeline Configuration Schema

A complete pipeline configuration includes these sections:

pipeline_name (string, required)

Unique identifier for this pipeline run. Used for logging, caching, and output filenames.

data (object, required)

Data loading and filtering configuration:

{
  "source_file": "data/bsm_data.csv",  // Path to source CSV
  "num_subsection_rows": 100000,       // Rows per partition (default: 100000)
  "filtering": {
    "type": "xy_offset_position",      // Filter type: "xy_offset_position", "bounding_box", or "none"
    "distance_meters": 2000,            // For xy_offset: radius in meters
    "center_x": -106.0831353,           // Center longitude
    "center_y": 41.5430216              // Center latitude
  },
  "date_range": {
    "start_day": 1, "end_day": 30,
    "start_month": 4, "end_month": 4,
    "start_year": 2021, "end_year": 2021
  }
}

features (object, required)

Feature column selection:

{
  "columns": "minimal_xy_elev"  // Options: "minimal_xy_elev", "extended_timestamps", "all"
}

Predefined column sets: - minimal_xy_elev: ["latitude", "longitude", "elevation", "speed", "heading"] (recommended for quick experiments) - extended_timestamps: Minimal + timestamp-related features (13 columns) - all: All available BSM columns (~50 columns)

attacks (object, required)

Attack simulation configuration:

{
  "enabled": true,               // Enable/disable attack simulation
  "attack_ratio": 0.3,           // Fraction of vehicles to compromise (0.0-1.0)
  "type": "rand_offset",         // Attack type (see below)
  "min_distance": 10,            // Minimum position offset (meters)
  "max_distance": 20,            // Maximum position offset (meters)
  "random_seed": 42              // For reproducibility
}

Attack Types: - rand_offset: Random position offset per message (10-20m typical) - const_offset_per_id: Fixed offset per vehicle (100-200m typical) - rand_position: Completely random positions within area (0-2000m) - position_swap: Swap positions between pairs of vehicles

ml (object, required)

Machine learning configuration:

{
  "train_test_split": {
    "type": "random",            // Split type: "random" or "temporal"
    "train_ratio": 0.8,          // Training set fraction
    "test_ratio": 0.2,           // Test set fraction
    "random_seed": 42            // For reproducibility
  }
}

cache (object, optional)

Caching configuration:

{
  "enabled": true  // Enable Parquet caching (recommended)
}

When enabled, intermediate results are cached to cache/ directory using Parquet format. Typical hit rates: ≥85% after first run.

Configuration Examples

See MClassifierPipelines/configs/ for 40+ example configurations covering various scenarios: - Different attack types and parameters - Various spatial filtering approaches - Different feature column sets - Custom train/test splits

Architecture

Pipeline Execution Flow

1. Data Gathering (DaskDataGatherer)
   ├─ Load CSV → Dask DataFrame (lazy)
   ├─ Spatial filtering (xy_offset_position)
   └─ Temporal filtering (date_range)

2. Large Data Cleaning (DaskConnectedDrivingLargeDataCleaner)
   ├─ Remove invalid coordinates
   ├─ Filter by date range
   └─ Deduplicate records

3. Train/Test Split (DaskMConnectedDrivingDataCleaner)
   ├─ Random split (80/20 default)
   └─ OR temporal split

4. Attack Simulation (DaskConnectedDrivingAttacker)
   ├─ Add "is_attacker" column (0/1 labels)
   ├─ Apply position attacks to attacker vehicles
   └─ Maintain legitimate vehicle positions

5. ML Preparation (DaskMConnectedDrivingDataCleaner)
   ├─ Hex → decimal conversion
   ├─ Feature column selection
   └─ Label extraction

6. Classifier Training (DaskMClassifierPipeline)
   ├─ RandomForestClassifier
   ├─ DecisionTreeClassifier
   └─ KNeighborsClassifier

7. Results Collection
   ├─ Accuracy, Precision, Recall, F1, Specificity
   ├─ Train/test metrics
   └─ CSV export

Key Components

DaskPipelineRunner

  • Location: MachineLearning/DaskPipelineRunner.py
  • Purpose: Main entry point for running ML pipelines
  • Key Methods:
  • from_config(config_path): Load pipeline from JSON file
  • run(): Execute complete pipeline and return results

DaskDataGatherer

  • Location: Gatherer/DaskDataGatherer.py
  • Purpose: Load CSV data into Dask DataFrame with lazy evaluation
  • Features:
  • Automatic partitioning (64MB blocks = ~225K rows per partition)
  • Spatial filtering (xy_offset_position)
  • Temporal filtering (date ranges)

DaskConnectedDrivingLargeDataCleaner

  • Location: Generator/Cleaners/DaskConnectedDrivingLargeDataCleaner.py
  • Purpose: Large-scale data cleaning and validation
  • Operations: Coordinate validation, deduplication, date filtering

DaskConnectedDrivingAttacker

  • Location: Generator/Attackers/DaskConnectedDrivingAttacker.py
  • Purpose: Simulate GPS position attacks on BSM data
  • Attack Methods:
  • add_attackers(): Label vehicles as attackers (0/1)
  • positional_offset_rand(): Random position offsets
  • positional_offset_const(): Fixed offsets per vehicle
  • positional_swap_rand(): Position swapping

DaskMClassifierPipeline

  • Location: MachineLearning/DaskMClassifierPipeline.py
  • Purpose: Train scikit-learn classifiers on Dask-processed data
  • Classifiers: RandomForest, DecisionTree, KNeighbors (extensible)

Performance

Benchmarks (15M Row Dataset)

Operation Pandas (Old) Dask (New) Speedup
Data Loading 180s 45s 4.0x
Large Cleaning 240s 60s 4.0x
Attack Simulation 300s 120s 2.5x
Train/Test Split 90s 30s 3.0x
End-to-End Pipeline ~15 min ~6 min 2.5x

Memory Usage

Configuration Peak Memory Status
64GB Production 38-40GB ✅ Optimal
Development (8 workers) 42-45GB ✅ Acceptable
Old Pandas 55-60GB ❌ Unstable

Note: With 64GB RAM, you have ~24GB headroom for safety. The pipeline will automatically spill to disk if memory exceeds 50% per worker (configurable in configs/dask/64gb-production.yml).

Cache Performance

With Parquet caching enabled: - First run (cold cache): 0% hit rate (~6 min for 15M rows) - Second run (warm cache): ~95% hit rate (~1 min for 15M rows) - Average over 10 runs: ≥85% hit rate

Monitor cache health:

python scripts/monitor_cache_health.py

Monitoring & Debugging

Dask Dashboard

The Dask dashboard provides real-time monitoring of task execution, memory usage, and worker status.

Access the dashboard: 1. Start your pipeline (dashboard auto-launches) 2. Open browser to http://localhost:8787 3. View: - Task stream (execution timeline) - Memory usage per worker - Task graph - Worker status

Logging

All pipeline operations are logged to logs/<pipeline_name>.log:

tail -f logs/MyFirstPipeline.log

Log Levels: - INFO: Normal operations (data loading, cleaning, training) - WARNING: Performance issues (slow operations, high memory) - ERROR: Failures (missing files, invalid configs)

Cache Monitoring

Check cache statistics:

python scripts/monitor_cache_health.py

# Output:
# ✅ Cache Hit Rate: 87.3% (EXCELLENT - meets ≥85% target)
# 📊 Total Cached Entries: 142
# 💾 Total Cache Size: 8.4 GB
# 🔝 Top 5 Entries by Access Count:
#   1. gather_data_april2021 (hits: 234, size: 2.1GB)
#   2. clean_large_data_2000m (hits: 198, size: 1.8GB)
#   ...

Cleanup old cache entries:

python scripts/monitor_cache_health.py --cleanup --max-size-gb 50

Profiling

For performance analysis:

# Enable profiling in Dask config
export DASK_CONFIG=configs/dask/64gb-production.yml

# Run pipeline with profiling
python your_pipeline_script.py

# View profile results in Dask dashboard
# Navigate to http://localhost:8787/profile

Testing

CI/CD Pipeline

The project includes automated testing via GitHub Actions. Every push and pull request triggers: - Unit tests across Python 3.10, 3.11, 3.12 - Code quality checks (flake8, black, isort) - Integration and slow tests - Docker build validation - Coverage reporting (70% minimum threshold)

CI Status: - Tests run automatically on push to main, master, or develop branches - Pull requests are tested before merge - Coverage reports are archived as artifacts for 30 days - See CI_CD.md for complete CI/CD documentation

Run All Tests

pytest Test/ -v

Run Specific Test Suites

# Core Dask components
pytest Test/test_dask_data_gatherer.py -v
pytest Test/test_dask_cleaners.py -v
pytest Test/test_dask_attackers.py -v

# Pipeline integration
pytest Test/test_dask_pipeline_runner.py -v

# Performance validation
pytest Test/test_performance_15m_rows.py -v

# Cache system
pytest Test/test_cache_hit_rate.py -v

Expected Test Results

All tests should pass with ≥80% coverage:

==================== test session starts ====================
collected 127 items

Test/test_dask_data_gatherer.py ................... [ 15%]
Test/test_dask_cleaners.py ........................ [ 34%]
Test/test_dask_attackers.py ....................... [ 50%]
Test/test_dask_pipeline_runner.py ................. [ 66%]
Test/test_performance_15m_rows.py ................. [ 82%]
Test/test_cache_hit_rate.py ....................... [100%]

==================== 127 passed in 45.23s ====================

Troubleshooting

Common Issues

1. Out of Memory Errors

Symptom: MemoryError or worker crashes Solution:

# Check available RAM
free -h

# Reduce worker count (in configs/dask/64gb-production.yml)
# Change n_workers from 6 to 4

# Reduce partition size (in DaskDataGatherer.py)
# Change blocksize from 64MB to 32MB

2. Slow Performance

Symptom: Pipeline takes >10 minutes for 15M rows Solution:

# Check Dask dashboard for bottlenecks
# Common causes:
# - Too many small partitions → increase blocksize
# - Disk I/O bottleneck → use faster SSD
# - Insufficient workers → increase n_workers (if RAM allows)

# Enable profiling
python scripts/profile_pipeline.py <config.json>

3. Cache Misses

Symptom: Cache hit rate <70% Solution:

# Check cache metadata
python scripts/monitor_cache_health.py

# Common causes:
# - Non-deterministic parameters (timestamps, random values)
# - Cache directory deleted
# - Config hash collisions (very rare)

# Fix: Use deterministic random seeds in configs
"random_seed": 42  # Always use same seed for reproducibility

4. Import Errors

Symptom: ModuleNotFoundError: No module named 'dask' Solution:

# Verify virtual environment is activated
source .venv/bin/activate

# Reinstall dependencies
pip install -r requirements.txt --force-reinstall

# Verify installation
python validate_dask_setup.py

5. CI/CD Pipeline Failures

Common CI/CD Issues and Solutions:

Issue: Python version errors in CI - Symptom: Tests fail with syntax errors or SyntaxError: invalid syntax on Python 3.8 or 3.9 - Solution: This project requires Python 3.10+. Check your .github/workflows/test.yml to ensure it only uses Python 3.10, 3.11, and 3.12

Issue: Missing fixture errors - Symptom: fixture 'someDict' not found in test output - Solution: Ensure conftest.py contains all required fixtures. This was fixed in commit c8b1ac3

Issue: Docker build fails with missing validation script - Symptom: COPY failed: file not found in build context or validate_dask_setup.py: not found - Solution: Check .dockerignore file - ensure !validate_dask_setup.py exception is present before the validate_*.py exclusion pattern (fixed in commit b88dd7b)

Issue: Indentation/syntax errors in validation scripts - Symptom: IndentationError or SyntaxError in validate_dask_clean_with_timestamps.py - Solution: This was a formatting issue fixed in commit fddfa5e. Ensure all Python files have consistent 4-space indentation

Issue: Import errors for Helpers or Decorators modules - Symptom: ModuleNotFoundError: No module named 'Helpers' or 'Decorators' - Solution: Ensure Helpers/__init__.py and Decorators/__init__.py exist (even if empty). Fixed in commits 919d8ad and 465e016

For more detailed troubleshooting, see the Troubleshooting Guide.

Advanced Usage

Custom Attack Methods

Extend DaskConnectedDrivingAttacker to implement custom attack patterns:

from Generator.Attackers.DaskConnectedDrivingAttacker import DaskConnectedDrivingAttacker

class MyCustomAttacker(DaskConnectedDrivingAttacker):
    def my_custom_attack(self, df, min_dist, max_dist):
        """Implement your custom attack logic."""
        # Your attack logic here
        return modified_df

# Use in pipeline
attacker = MyCustomAttacker(...)
result_df = attacker.my_custom_attack(df, 10, 20)

Custom Classifiers

Add custom scikit-learn classifiers to the pipeline:

from sklearn.svm import SVC
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner, DEFAULT_CLASSIFIER_INSTANCES

# Add SVM to default classifiers
custom_classifiers = DEFAULT_CLASSIFIER_INSTANCES + [SVC(kernel='rbf')]

# Modify DaskPipelineRunner to use custom classifiers
# (requires editing DaskPipelineRunner.py or subclassing)

Batch Processing

Process multiple configs in parallel:

import glob
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner

config_files = glob.glob("MClassifierPipelines/configs/*.json")

for config_file in config_files:
    print(f"Running {config_file}...")
    runner = DaskPipelineRunner.from_config(config_file)
    results = runner.run()
    print(f"Completed {config_file}")

Migration from Pandas

If you have existing pandas-based pipelines:

1. Config Migration

Convert old pipeline scripts to JSON configs: - Extract parameters (distance, attack type, columns) - Create JSON config file - Test with DaskPipelineRunner.from_config()

2. Code Migration

Replace pandas operations with Dask equivalents:

# Old (pandas)
df = pd.read_csv("data.csv")
df = df[df["latitude"] > 40]
result = df.compute()  # Error: pandas doesn't have .compute()

# New (Dask)
df = dd.read_csv("data.csv")
df = df[df["latitude"] > 40]
result = df.compute()  # OK: Dask executes lazy operations

3. Memory Configuration

Adjust worker memory limits for your system: - 64GB → 6 workers × 8GB = 48GB (recommended) - 128GB → 12 workers × 8GB = 96GB - Edit configs/dask/64gb-production.yml

Contributing

Contributions welcome! Please: 1. Fork the repository 2. Create a feature branch (git checkout -b feature/my-feature) 3. Commit changes (git commit -am 'Add my feature') 4. Push to branch (git push origin feature/my-feature) 5. Create Pull Request

Development Setup:

# Install dev dependencies
pip install -r requirements.txt
pip install pytest pytest-cov black flake8

# Run tests
pytest Test/ -v --cov

# Format code
black .

# Lint
flake8 .

Documentation

Full documentation is available at: - Online Docs: http://aaron777collins.github.io/ConnectedDrivingPipelineV4 - Config Examples: MClassifierPipelines/configs/README.md - API Reference: docs/API_Reference.md - Performance Reports: - DASK_BOTTLENECK_PROFILING_REPORT.md - MEMORY_OPTIMIZATION_REPORT_TASK49.md - TASK50_CACHE_HIT_RATE_OPTIMIZATION_REPORT.md

License

[Include license information here]

Contact

  • Repository: https://github.com/aaron777collins/ConnectedDrivingPipelineV4
  • Issues: https://github.com/aaron777collins/ConnectedDrivingPipelineV4/issues
  • Documentation: http://aaron777collins.github.io/ConnectedDrivingPipelineV4

Acknowledgments

This Dask migration was completed as part of a comprehensive performance optimization effort. See COMPREHENSIVE_DASK_MIGRATION_PLAN_PROGRESS.md for full migration details and progress tracking.

Key Contributors: - Original pandas framework: Aaron Collins - Dask migration and optimization: Automated agent (Claude/Anthropic) - Testing and validation: Comprehensive test suite (127+ tests)


Status: Production Ready ✅ Last Updated: 2026-01-18 Version: 4.0 (Dask Migration Complete)