ConnectedDrivingPipelineV4
⭐ 1 stars | 🔱 2 forks
🔗 Quick Links
📊 Project Details
- Primary Language: Python
- Languages Used: Python, Shell, Makefile, Dockerfile
- License: None
- Created: April 23, 2023
- Last Updated: April 10, 2026
📝 About
ConnectedDrivingPipelineV4
A high-performance, Dask-powered pipeline for connected driving dataset processing and machine learning. This framework provides a unified, config-driven approach to BSM (Basic Safety Message) data analysis, attack simulation, and ML classifier training.
Overview
ConnectedDrivingPipelineV4 is a complete rewrite of the original pandas-based pipeline, now leveraging Dask for distributed computing on a 64GB single-node workstation. The migration enables processing of 15M+ row datasets with significantly improved performance and memory efficiency.
Key Features:
- Unified Pipeline Runner: Single DaskPipelineRunner replaces 55+ individual pipeline scripts
- Config-Driven: JSON-based configuration for reproducible experiments
- Scalable: Handles 15M+ rows on 64GB RAM (pandas struggled at 5M rows)
- Fast: 2-4x speedup on data cleaning and attack simulation operations
- Efficient Caching: Parquet-based caching with ≥85% hit rates
- Production Ready: Comprehensive testing, validation, and monitoring tools
Quick Start
Prerequisites
Hardware Requirements: - RAM: 64GB (minimum) - critical requirement - CPU: 6+ cores (12 cores recommended) - Storage: 500GB+ SSD (1TB+ recommended for caching)
Software Requirements: - Python 3.10+ (3.10, 3.11, or 3.12 supported - NOT 3.8 or 3.9) - Linux/macOS (Windows may work but not tested)
Installation
-
Clone the repository:
-
Create virtual environment:
-
Install dependencies:
-
Verify Dask setup:
Expected output:
✅ All Dask dependencies installed correctly
✅ 64GB RAM detected (sufficient for 15M rows)
✅ Dask LocalCluster initialized successfully
✅ System ready for production workloads
Docker Deployment (Alternative)
For containerized deployment with Docker:
# Build the Docker image
docker compose build
# Validate Dask setup inside the container
docker compose run --rm pipeline python validate_dask_setup.py
# Expected output:
# ✅ All Dask dependencies installed correctly
# ✅ 64GB RAM detected (sufficient for 15M rows)
# ✅ Dask LocalCluster initialized successfully
# ✅ System ready for production workloads
# Start the pipeline service
docker compose up -d pipeline
# Access Dask dashboard at http://localhost:8787
Note: The validation script (validate_dask_setup.py) runs 8 comprehensive tests to verify Dask configuration, dependencies, and system resources. This script is included in the Docker image and should always pass before running production workloads.
See DOCKER.md for complete Docker deployment guide, including production configurations, scaling, and monitoring.
Basic Usage
1. Run a Pipeline from Config
The easiest way to run a pipeline is using a pre-configured JSON file:
python -c "
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner
runner = DaskPipelineRunner.from_config('MClassifierPipelines/configs/example_pipeline.json')
results = runner.run()
print(f'Pipeline complete. Results saved to {runner.csvWriter.filename}')
"
2. Create a Custom Configuration
Create a JSON config file (e.g., my_pipeline.json):
{
"pipeline_name": "MyFirstPipeline",
"data": {
"source_file": "data/bsm_data.csv",
"filtering": {
"type": "xy_offset_position",
"distance_meters": 2000,
"center_x": -106.0831353,
"center_y": 41.5430216
},
"date_range": {
"start_day": 1,
"end_day": 30,
"start_month": 4,
"end_month": 4,
"start_year": 2021,
"end_year": 2021
},
"num_subsection_rows": 100000
},
"features": {
"columns": "minimal_xy_elev"
},
"attacks": {
"enabled": true,
"attack_ratio": 0.3,
"type": "rand_offset",
"min_distance": 10,
"max_distance": 20,
"random_seed": 42
},
"ml": {
"train_test_split": {
"type": "random",
"train_ratio": 0.8,
"test_ratio": 0.2,
"random_seed": 42
}
},
"cache": {
"enabled": true
}
}
Then run it:
python -c "
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner
runner = DaskPipelineRunner.from_config('my_pipeline.json')
results = runner.run()
"
3. Programmatic Usage
For Python scripts or notebooks:
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner
# Define config inline
config = {
"pipeline_name": "ProgrammaticPipeline",
"data": {
"source_file": "data/bsm_data.csv",
"filtering": {
"type": "xy_offset_position",
"distance_meters": 1000,
"center_x": -106.0831353,
"center_y": 41.5430216
}
},
"features": {"columns": "minimal_xy_elev"},
"attacks": {"enabled": False},
"ml": {
"train_test_split": {
"type": "random",
"train_ratio": 0.8,
"test_ratio": 0.2
}
},
"cache": {"enabled": True}
}
# Run pipeline
runner = DaskPipelineRunner(config)
results = runner.run()
# Access results
print(f"Models trained: {len(results)}")
for model_name, metrics in results.items():
print(f"{model_name}: Test Accuracy = {metrics['test_accuracy']:.4f}")
Configuration Reference
Pipeline Configuration Schema
A complete pipeline configuration includes these sections:
pipeline_name (string, required)
Unique identifier for this pipeline run. Used for logging, caching, and output filenames.
data (object, required)
Data loading and filtering configuration:
{
"source_file": "data/bsm_data.csv", // Path to source CSV
"num_subsection_rows": 100000, // Rows per partition (default: 100000)
"filtering": {
"type": "xy_offset_position", // Filter type: "xy_offset_position", "bounding_box", or "none"
"distance_meters": 2000, // For xy_offset: radius in meters
"center_x": -106.0831353, // Center longitude
"center_y": 41.5430216 // Center latitude
},
"date_range": {
"start_day": 1, "end_day": 30,
"start_month": 4, "end_month": 4,
"start_year": 2021, "end_year": 2021
}
}
features (object, required)
Feature column selection:
Predefined column sets:
- minimal_xy_elev: ["latitude", "longitude", "elevation", "speed", "heading"] (recommended for quick experiments)
- extended_timestamps: Minimal + timestamp-related features (13 columns)
- all: All available BSM columns (~50 columns)
attacks (object, required)
Attack simulation configuration:
{
"enabled": true, // Enable/disable attack simulation
"attack_ratio": 0.3, // Fraction of vehicles to compromise (0.0-1.0)
"type": "rand_offset", // Attack type (see below)
"min_distance": 10, // Minimum position offset (meters)
"max_distance": 20, // Maximum position offset (meters)
"random_seed": 42 // For reproducibility
}
Attack Types:
- rand_offset: Random position offset per message (10-20m typical)
- const_offset_per_id: Fixed offset per vehicle (100-200m typical)
- rand_position: Completely random positions within area (0-2000m)
- position_swap: Swap positions between pairs of vehicles
ml (object, required)
Machine learning configuration:
{
"train_test_split": {
"type": "random", // Split type: "random" or "temporal"
"train_ratio": 0.8, // Training set fraction
"test_ratio": 0.2, // Test set fraction
"random_seed": 42 // For reproducibility
}
}
cache (object, optional)
Caching configuration:
When enabled, intermediate results are cached to cache/ directory using Parquet format. Typical hit rates: ≥85% after first run.
Configuration Examples
See MClassifierPipelines/configs/ for 40+ example configurations covering various scenarios:
- Different attack types and parameters
- Various spatial filtering approaches
- Different feature column sets
- Custom train/test splits
Architecture
Pipeline Execution Flow
1. Data Gathering (DaskDataGatherer)
├─ Load CSV → Dask DataFrame (lazy)
├─ Spatial filtering (xy_offset_position)
└─ Temporal filtering (date_range)
2. Large Data Cleaning (DaskConnectedDrivingLargeDataCleaner)
├─ Remove invalid coordinates
├─ Filter by date range
└─ Deduplicate records
3. Train/Test Split (DaskMConnectedDrivingDataCleaner)
├─ Random split (80/20 default)
└─ OR temporal split
4. Attack Simulation (DaskConnectedDrivingAttacker)
├─ Add "is_attacker" column (0/1 labels)
├─ Apply position attacks to attacker vehicles
└─ Maintain legitimate vehicle positions
5. ML Preparation (DaskMConnectedDrivingDataCleaner)
├─ Hex → decimal conversion
├─ Feature column selection
└─ Label extraction
6. Classifier Training (DaskMClassifierPipeline)
├─ RandomForestClassifier
├─ DecisionTreeClassifier
└─ KNeighborsClassifier
7. Results Collection
├─ Accuracy, Precision, Recall, F1, Specificity
├─ Train/test metrics
└─ CSV export
Key Components
DaskPipelineRunner
- Location:
MachineLearning/DaskPipelineRunner.py - Purpose: Main entry point for running ML pipelines
- Key Methods:
from_config(config_path): Load pipeline from JSON filerun(): Execute complete pipeline and return results
DaskDataGatherer
- Location:
Gatherer/DaskDataGatherer.py - Purpose: Load CSV data into Dask DataFrame with lazy evaluation
- Features:
- Automatic partitioning (64MB blocks = ~225K rows per partition)
- Spatial filtering (xy_offset_position)
- Temporal filtering (date ranges)
DaskConnectedDrivingLargeDataCleaner
- Location:
Generator/Cleaners/DaskConnectedDrivingLargeDataCleaner.py - Purpose: Large-scale data cleaning and validation
- Operations: Coordinate validation, deduplication, date filtering
DaskConnectedDrivingAttacker
- Location:
Generator/Attackers/DaskConnectedDrivingAttacker.py - Purpose: Simulate GPS position attacks on BSM data
- Attack Methods:
add_attackers(): Label vehicles as attackers (0/1)positional_offset_rand(): Random position offsetspositional_offset_const(): Fixed offsets per vehiclepositional_swap_rand(): Position swapping
DaskMClassifierPipeline
- Location:
MachineLearning/DaskMClassifierPipeline.py - Purpose: Train scikit-learn classifiers on Dask-processed data
- Classifiers: RandomForest, DecisionTree, KNeighbors (extensible)
Performance
Benchmarks (15M Row Dataset)
| Operation | Pandas (Old) | Dask (New) | Speedup |
|---|---|---|---|
| Data Loading | 180s | 45s | 4.0x |
| Large Cleaning | 240s | 60s | 4.0x |
| Attack Simulation | 300s | 120s | 2.5x |
| Train/Test Split | 90s | 30s | 3.0x |
| End-to-End Pipeline | ~15 min | ~6 min | 2.5x |
Memory Usage
| Configuration | Peak Memory | Status |
|---|---|---|
| 64GB Production | 38-40GB | ✅ Optimal |
| Development (8 workers) | 42-45GB | ✅ Acceptable |
| Old Pandas | 55-60GB | ❌ Unstable |
Note: With 64GB RAM, you have ~24GB headroom for safety. The pipeline will automatically spill to disk if memory exceeds 50% per worker (configurable in configs/dask/64gb-production.yml).
Cache Performance
With Parquet caching enabled: - First run (cold cache): 0% hit rate (~6 min for 15M rows) - Second run (warm cache): ~95% hit rate (~1 min for 15M rows) - Average over 10 runs: ≥85% hit rate
Monitor cache health:
Monitoring & Debugging
Dask Dashboard
The Dask dashboard provides real-time monitoring of task execution, memory usage, and worker status.
Access the dashboard:
1. Start your pipeline (dashboard auto-launches)
2. Open browser to http://localhost:8787
3. View:
- Task stream (execution timeline)
- Memory usage per worker
- Task graph
- Worker status
Logging
All pipeline operations are logged to logs/<pipeline_name>.log:
Log Levels: - INFO: Normal operations (data loading, cleaning, training) - WARNING: Performance issues (slow operations, high memory) - ERROR: Failures (missing files, invalid configs)
Cache Monitoring
Check cache statistics:
python scripts/monitor_cache_health.py
# Output:
# ✅ Cache Hit Rate: 87.3% (EXCELLENT - meets ≥85% target)
# 📊 Total Cached Entries: 142
# 💾 Total Cache Size: 8.4 GB
# 🔝 Top 5 Entries by Access Count:
# 1. gather_data_april2021 (hits: 234, size: 2.1GB)
# 2. clean_large_data_2000m (hits: 198, size: 1.8GB)
# ...
Cleanup old cache entries:
Profiling
For performance analysis:
# Enable profiling in Dask config
export DASK_CONFIG=configs/dask/64gb-production.yml
# Run pipeline with profiling
python your_pipeline_script.py
# View profile results in Dask dashboard
# Navigate to http://localhost:8787/profile
Testing
CI/CD Pipeline
The project includes automated testing via GitHub Actions. Every push and pull request triggers: - Unit tests across Python 3.10, 3.11, 3.12 - Code quality checks (flake8, black, isort) - Integration and slow tests - Docker build validation - Coverage reporting (70% minimum threshold)
CI Status:
- Tests run automatically on push to main, master, or develop branches
- Pull requests are tested before merge
- Coverage reports are archived as artifacts for 30 days
- See CI_CD.md for complete CI/CD documentation
Run All Tests
Run Specific Test Suites
# Core Dask components
pytest Test/test_dask_data_gatherer.py -v
pytest Test/test_dask_cleaners.py -v
pytest Test/test_dask_attackers.py -v
# Pipeline integration
pytest Test/test_dask_pipeline_runner.py -v
# Performance validation
pytest Test/test_performance_15m_rows.py -v
# Cache system
pytest Test/test_cache_hit_rate.py -v
Expected Test Results
All tests should pass with ≥80% coverage:
==================== test session starts ====================
collected 127 items
Test/test_dask_data_gatherer.py ................... [ 15%]
Test/test_dask_cleaners.py ........................ [ 34%]
Test/test_dask_attackers.py ....................... [ 50%]
Test/test_dask_pipeline_runner.py ................. [ 66%]
Test/test_performance_15m_rows.py ................. [ 82%]
Test/test_cache_hit_rate.py ....................... [100%]
==================== 127 passed in 45.23s ====================
Troubleshooting
Common Issues
1. Out of Memory Errors
Symptom: MemoryError or worker crashes
Solution:
# Check available RAM
free -h
# Reduce worker count (in configs/dask/64gb-production.yml)
# Change n_workers from 6 to 4
# Reduce partition size (in DaskDataGatherer.py)
# Change blocksize from 64MB to 32MB
2. Slow Performance
Symptom: Pipeline takes >10 minutes for 15M rows Solution:
# Check Dask dashboard for bottlenecks
# Common causes:
# - Too many small partitions → increase blocksize
# - Disk I/O bottleneck → use faster SSD
# - Insufficient workers → increase n_workers (if RAM allows)
# Enable profiling
python scripts/profile_pipeline.py <config.json>
3. Cache Misses
Symptom: Cache hit rate <70% Solution:
# Check cache metadata
python scripts/monitor_cache_health.py
# Common causes:
# - Non-deterministic parameters (timestamps, random values)
# - Cache directory deleted
# - Config hash collisions (very rare)
# Fix: Use deterministic random seeds in configs
"random_seed": 42 # Always use same seed for reproducibility
4. Import Errors
Symptom: ModuleNotFoundError: No module named 'dask'
Solution:
# Verify virtual environment is activated
source .venv/bin/activate
# Reinstall dependencies
pip install -r requirements.txt --force-reinstall
# Verify installation
python validate_dask_setup.py
5. CI/CD Pipeline Failures
Common CI/CD Issues and Solutions:
Issue: Python version errors in CI
- Symptom: Tests fail with syntax errors or SyntaxError: invalid syntax on Python 3.8 or 3.9
- Solution: This project requires Python 3.10+. Check your .github/workflows/test.yml to ensure it only uses Python 3.10, 3.11, and 3.12
Issue: Missing fixture errors
- Symptom: fixture 'someDict' not found in test output
- Solution: Ensure conftest.py contains all required fixtures. This was fixed in commit c8b1ac3
Issue: Docker build fails with missing validation script
- Symptom: COPY failed: file not found in build context or validate_dask_setup.py: not found
- Solution: Check .dockerignore file - ensure !validate_dask_setup.py exception is present before the validate_*.py exclusion pattern (fixed in commit b88dd7b)
Issue: Indentation/syntax errors in validation scripts
- Symptom: IndentationError or SyntaxError in validate_dask_clean_with_timestamps.py
- Solution: This was a formatting issue fixed in commit fddfa5e. Ensure all Python files have consistent 4-space indentation
Issue: Import errors for Helpers or Decorators modules
- Symptom: ModuleNotFoundError: No module named 'Helpers' or 'Decorators'
- Solution: Ensure Helpers/__init__.py and Decorators/__init__.py exist (even if empty). Fixed in commits 919d8ad and 465e016
For more detailed troubleshooting, see the Troubleshooting Guide.
Advanced Usage
Custom Attack Methods
Extend DaskConnectedDrivingAttacker to implement custom attack patterns:
from Generator.Attackers.DaskConnectedDrivingAttacker import DaskConnectedDrivingAttacker
class MyCustomAttacker(DaskConnectedDrivingAttacker):
def my_custom_attack(self, df, min_dist, max_dist):
"""Implement your custom attack logic."""
# Your attack logic here
return modified_df
# Use in pipeline
attacker = MyCustomAttacker(...)
result_df = attacker.my_custom_attack(df, 10, 20)
Custom Classifiers
Add custom scikit-learn classifiers to the pipeline:
from sklearn.svm import SVC
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner, DEFAULT_CLASSIFIER_INSTANCES
# Add SVM to default classifiers
custom_classifiers = DEFAULT_CLASSIFIER_INSTANCES + [SVC(kernel='rbf')]
# Modify DaskPipelineRunner to use custom classifiers
# (requires editing DaskPipelineRunner.py or subclassing)
Batch Processing
Process multiple configs in parallel:
import glob
from MachineLearning.DaskPipelineRunner import DaskPipelineRunner
config_files = glob.glob("MClassifierPipelines/configs/*.json")
for config_file in config_files:
print(f"Running {config_file}...")
runner = DaskPipelineRunner.from_config(config_file)
results = runner.run()
print(f"Completed {config_file}")
Migration from Pandas
If you have existing pandas-based pipelines:
1. Config Migration
Convert old pipeline scripts to JSON configs:
- Extract parameters (distance, attack type, columns)
- Create JSON config file
- Test with DaskPipelineRunner.from_config()
2. Code Migration
Replace pandas operations with Dask equivalents:
# Old (pandas)
df = pd.read_csv("data.csv")
df = df[df["latitude"] > 40]
result = df.compute() # Error: pandas doesn't have .compute()
# New (Dask)
df = dd.read_csv("data.csv")
df = df[df["latitude"] > 40]
result = df.compute() # OK: Dask executes lazy operations
3. Memory Configuration
Adjust worker memory limits for your system:
- 64GB → 6 workers × 8GB = 48GB (recommended)
- 128GB → 12 workers × 8GB = 96GB
- Edit configs/dask/64gb-production.yml
Contributing
Contributions welcome! Please:
1. Fork the repository
2. Create a feature branch (git checkout -b feature/my-feature)
3. Commit changes (git commit -am 'Add my feature')
4. Push to branch (git push origin feature/my-feature)
5. Create Pull Request
Development Setup:
# Install dev dependencies
pip install -r requirements.txt
pip install pytest pytest-cov black flake8
# Run tests
pytest Test/ -v --cov
# Format code
black .
# Lint
flake8 .
Documentation
Full documentation is available at: - Online Docs: http://aaron777collins.github.io/ConnectedDrivingPipelineV4 - Config Examples: MClassifierPipelines/configs/README.md - API Reference: docs/API_Reference.md - Performance Reports: - DASK_BOTTLENECK_PROFILING_REPORT.md - MEMORY_OPTIMIZATION_REPORT_TASK49.md - TASK50_CACHE_HIT_RATE_OPTIMIZATION_REPORT.md
License
[Include license information here]
Contact
- Repository: https://github.com/aaron777collins/ConnectedDrivingPipelineV4
- Issues: https://github.com/aaron777collins/ConnectedDrivingPipelineV4/issues
- Documentation: http://aaron777collins.github.io/ConnectedDrivingPipelineV4
Acknowledgments
This Dask migration was completed as part of a comprehensive performance optimization effort. See COMPREHENSIVE_DASK_MIGRATION_PLAN_PROGRESS.md for full migration details and progress tracking.
Key Contributors: - Original pandas framework: Aaron Collins - Dask migration and optimization: Automated agent (Claude/Anthropic) - Testing and validation: Comprehensive test suite (127+ tests)
Status: Production Ready ✅ Last Updated: 2026-01-18 Version: 4.0 (Dask Migration Complete)