data-pipeline-patterns
Follow these patterns when implementing data pipelines, ETL, data ingestion, or data validation in OptAIC. Use for point-in-time (PIT) correctness, Arrow schemas, quality checks, and Prefect orchestration.
When & Why to Use This Skill
This Claude skill provides a comprehensive framework and architectural patterns for implementing robust data pipelines, ETL processes, and data validation workflows. It focuses on maintaining high data integrity through Point-in-Time (PIT) correctness, standardized Apache Arrow schemas, and automated orchestration using Prefect to ensure reliable data governance and lineage tracking.
Use Cases
- Case 1: Implementing ETL and data ingestion flows that require strict Point-in-Time (PIT) correctness to prevent lookahead bias in temporal datasets.
- Case 2: Defining and enforcing standardized data structures using Apache Arrow schemas to ensure consistency across ETL, training, and inference stages.
- Case 3: Automating complex data orchestration and dependency management using Prefect and Lineage DAGs to track data provenance and freshness.
- Case 4: Setting up automated data quality validation checks, such as schema conformance and duplicate detection, to maintain production-grade dataset integrity.
| name | data-pipeline-patterns |
|---|---|
| description | Follow these patterns when implementing data pipelines, ETL, data ingestion, or data validation in OptAIC. Use for point-in-time (PIT) correctness, Arrow schemas, quality checks, and Prefect orchestration. |
Data Pipeline Patterns
Guide for implementing data pipelines that integrate with OptAIC's orchestration and governance.
When to Use
Apply when:
- Building PipelineDef implementations (ETL, Expression, Training)
- Implementing data ingestion flows
- Creating data quality validation
- Setting up Arrow schemas for datasets
- Integrating with Prefect orchestration
Pipeline Definition Types
| Type | Purpose | Input | Output |
|---|---|---|---|
| ETL | External data ingestion | API/files | Dataset version |
| Expression | DSL transformation | Datasets | Derived dataset |
| Training | Model training | Datasets | Model artifact |
| Inference | Model prediction | Features + model | Prediction dataset |
| Monitoring | Quality/drift checks | Datasets | Metrics + alerts |
Point-in-Time (PIT) Correctness
Critical rule: Always track knowledge_date (when data was known) separately from as_of_date (data's effective date).
# WRONG - lookahead bias
df = pd.read_sql("SELECT * FROM prices WHERE date = ?", [target_date])
# CORRECT - PIT query
df = pd.read_sql("""
SELECT * FROM prices
WHERE as_of_date <= ?
AND knowledge_date <= ?
ORDER BY knowledge_date DESC
""", [target_date, knowledge_cutoff])
See references/pit-patterns.md.
Arrow Schema Pattern
import pyarrow as pa
def price_schema() -> pa.Schema:
return pa.schema([
pa.field("date", pa.date32(), nullable=False),
pa.field("symbol", pa.string(), nullable=False),
pa.field("close", pa.float64(), nullable=False),
pa.field("knowledge_date", pa.timestamp("us"), nullable=False),
])
Prefect Integration
from prefect import flow, task
@task
async def fetch_data(source: str, date: str) -> dict:
pass
@task
async def validate_schema(data: dict, schema_ref: str) -> bool:
pass
@flow
async def daily_refresh(dataset_id: UUID, date: str):
raw = await fetch_data(...)
if not await validate_schema(raw, schema_ref):
raise ValidationError()
await store_data(raw, dataset_id)
await emit_activity("dataset.refreshed", ...)
See references/prefect-patterns.md.
Lineage DAG at Creation Time
CRITICAL: Lineage DAG is built when Instances are CREATED, NOT at execution time.
from libs.orchestration import LineageResolver
# At DatasetInstance creation:
async def create_dataset_instance(session, actor, payload):
resolver = LineageResolver()
# 1. Build lineage DAG from pipeline config
dag = await resolver.build_dag_for_instance(session, instance.id, actor.tenant_id)
# 2. Cache upstream IDs for fast execution checks
if dag.has_dependencies:
instance.upstream_resource_ids = dag.upstream_ids
instance.upstream_status = {str(uid): "unknown" for uid in dag.upstream_ids}
# 3. Create DatasetLineage + Subscription records
await resolver.create_lineage_and_subscriptions(session, dag)
Pub/Sub Observer Pattern
Downstream datasets are notified when upstreams complete:
from libs.orchestration import LineageObserver, CentrifugoNotifier
async def on_run_completed(session, run):
observer = LineageObserver()
# Notify downstreams, get those now fully ready
ready_ids = await observer.on_upstream_completed(
session,
upstream_id=run.dataset_instance_id,
run_id=run.resource_id,
)
# Publish real-time notifications
notifier = CentrifugoNotifier()
for downstream_id in ready_ids:
await notifier.notify_upstream_ready(downstream_id, upstream_id, True)
Fast Execution Check
Use cached status for execution checks (no lineage query):
from libs.orchestration import LineageResolver
resolver = LineageResolver()
all_ready = await resolver.check_all_upstreams_ready(session, instance_id)
if not all_ready and not force:
raise UpstreamNotReadyError(...)
UpdateFrequency Configuration
Configure expected update frequency for freshness calculations:
from libs.orchestration import UpdateFrequency
# Daily data, 1 day grace period
frequency = UpdateFrequency(
frequency="daily",
grace_period_days=1,
)
# Business days only (skip weekends)
frequency = UpdateFrequency(
frequency="daily",
business_days_only=True,
grace_period_days=1,
)
# Weekly on Monday
frequency = UpdateFrequency(
frequency="weekly",
day_of_week=0, # 0=Monday
)
# Store in Instance config_json
config = {
"update_frequency": {
"frequency": "daily",
"business_days_only": True,
"grace_period_days": 1,
}
}
See references/lineage-patterns.md.
Data Quality Checks
Standard checks to implement:
no_future_dates- Prevent lookaheadno_duplicates- Key uniquenesscoverage_check- Required dates/symbolsschema_conformance- Arrow schema match
See references/quality-checks.md.
Lazy Import Rule
Heavy deps must be lazy-loaded:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa
Reference Files
- PIT Patterns - Point-in-time correctness
- Prefect Patterns - Orchestration integration
- Quality Checks - Data validation
- Lineage Patterns - Dependency and freshness tracking