data-engineering
ML data engineering covering data pipelines, data quality, collection strategies, storage, and versioning for machine learning systems.
When & Why to Use This Skill
This Claude skill provides a comprehensive framework for ML data engineering, focusing on building robust data pipelines, ensuring data quality through automated validation, and implementing version control for machine learning datasets. It streamlines the transition from raw data collection to feature storage, enabling reproducible and reliable ML systems by integrating industry-standard tools like DVC and Great Expectations.
Use Cases
- Automating multi-source data collection: Seamlessly ingest data from APIs, databases, and parquet files into a unified processing pipeline.
- Implementing automated data validation: Use Great Expectations to perform schema checks and statistical validation, ensuring only high-quality data reaches your ML models.
- Managing data versioning and reproducibility: Integrate DVC to track dataset versions alongside code changes, allowing for consistent experiment tracking and easy rollbacks.
- Designing scalable ML storage architectures: Establish efficient storage patterns using Data Lakes for raw assets and Feature Stores for optimized model inputs.
| name | data-engineering |
|---|---|
| description | ML data engineering covering data pipelines, data quality, collection strategies, storage, and versioning for machine learning systems. |
Data Engineering for ML
Building robust data infrastructure for ML systems.
Data Pipeline Architecture
┌─────────────────────────────────────────────────────────────┐
│ ML DATA PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ │
│ COLLECTION → VALIDATION → PROCESSING → STORAGE │
│ ↓ ↓ ↓ ↓ │
│ Sources Schema Check Transform Data Lake │
│ APIs Quality Check Normalize Feature Store │
│ DBs Statistics Encode Model Registry │
│ │
└─────────────────────────────────────────────────────────────┘
Data Collection
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class DataSource:
name: str
type: str # database, api, file, stream
connection: Dict
class DataCollector:
def __init__(self, sources: List[DataSource]):
self.sources = sources
def collect(self, source_name: str) -> pd.DataFrame:
source = next(s for s in self.sources if s.name == source_name)
if source.type == "database":
return pd.read_sql(source.connection["query"],
source.connection["conn"])
elif source.type == "api":
response = requests.get(source.connection["url"])
return pd.DataFrame(response.json())
elif source.type == "file":
return pd.read_parquet(source.connection["path"])
Data Quality
import great_expectations as ge
def validate_data(df: pd.DataFrame, expectations_path: str) -> bool:
ge_df = ge.from_pandas(df)
# Schema validation
assert ge_df.expect_column_to_exist("user_id").success
assert ge_df.expect_column_values_to_not_be_null("user_id").success
assert ge_df.expect_column_values_to_be_unique("user_id").success
# Value validation
assert ge_df.expect_column_values_to_be_between(
"age", min_value=0, max_value=150
).success
# Statistical validation
assert ge_df.expect_column_mean_to_be_between(
"purchase_amount", min_value=0, max_value=10000
).success
return True
Data Versioning
# DVC for data versioning
# dvc init
# dvc add data/processed/
import dvc.api
# Load specific version
data_url = dvc.api.get_url(
path='data/processed/train.parquet',
repo='https://github.com/org/repo',
rev='v1.2.0'
)
# Track changes
def version_data(data_path: str, message: str):
import subprocess
subprocess.run(["dvc", "add", data_path])
subprocess.run(["git", "add", f"{data_path}.dvc"])
subprocess.run(["git", "commit", "-m", message])
subprocess.run(["dvc", "push"])
Data Storage Patterns
| Pattern | Use Case | Technology |
|---|---|---|
| Data Lake | Raw storage | S3, GCS, ADLS |
| Data Warehouse | Analytics | Snowflake, BigQuery |
| Feature Store | ML features | Feast, Tecton |
| Vector Store | Embeddings | Pinecone, Weaviate |
Commands
/omgdata:collect- Data collection/omgdata:validate- Data validation/omgdata:version- Version data
Best Practices
- Validate data at every stage
- Version all data assets
- Document data schemas
- Monitor data quality metrics
- Implement data lineage tracking