返回目錄
A
Data Science Unlocked: A Practical Guide for Modern Analysts - 第 3 章
Chapter 3: Building Robust Ingestion Pipelines
發布於 2026-02-23 15:56
# Chapter 3
## Building Robust Ingestion Pipelines
In the previous chapter we laid the groundwork for data collection and storage. This chapter dives into the practical mechanics of moving data from heterogeneous sources into a clean, query‑ready lake. The focus is threefold:
1. **Error handling** – catching the inevitable runtime failures.
2. **Deduplication** – eliminating noisy repeats that corrupt downstream analytics.
3. **Data‑drift simulation & detection** – learning how to spot anomalous behaviour before it breaks models.
The style of this chapter is intentionally open and inquisitive, yet rigorous. We present theory, but we prioritize hands‑on code, reproducible experiments, and clear design principles.
---
### 3.1 Data Acquisition 101
Whether you pull from REST APIs, message queues, or flat files, the first step is always the same: **serialize** and **persist**.
python
import requests
import pandas as pd
import json
def fetch_api(url: str, params: dict) -> pd.DataFrame:
"""Fetch JSON payload, convert to DataFrame, and return."""
resp = requests.get(url, params=params)
resp.raise_for_status() # throws on non‑200
return pd.json_normalize(resp.json())
For batch ingestion, consider using `awscli` or `google-cloud-sdk` wrappers to stream CSVs directly to object storage, then trigger a downstream Spark job.
---
### 3.2 Adding Robust Error Handling
A production ingestion routine cannot afford to die silently. The following skeleton demonstrates a **retry‑backoff** strategy and an explicit **logging** path.
python
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def retry(max_attempts: int = 5, backoff_factor: float = 2.0):
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
attempt = 0
delay = 1
while attempt < max_attempts:
try:
return fn(*args, **kwargs)
except Exception as exc:
attempt += 1
logger.warning(
f"Attempt {attempt}/{max_attempts} failed: {exc}. Retrying in {delay}s"
)
time.sleep(delay)
delay *= backoff_factor
logger.error("All retry attempts exhausted.")
raise RuntimeError("Ingestion failed after retries")
return wrapper
return decorator
@retry(max_attempts=3)
def ingest_to_s3(df: pd.DataFrame, bucket: str, key: str):
"""Persist DataFrame to S3 as Parquet."""
df.to_parquet(f"s3://{bucket}/{key}", index=False)
*Key take‑aways*: always surface exceptions to the orchestrator (Airflow, Prefect, etc.) and never swallow them.
---
### 3.3 Basic Deduplication Step
Duplicate rows can silently inflate sales, skew model targets, or trigger downstream failures. A minimal deduplication routine looks like this:
python
def deduplicate(df: pd.DataFrame, key_cols: list[str]) -> pd.DataFrame:
"""Remove duplicate rows based on key columns.
Keeps the most recent record if a timestamp column is present.
"""
if 'timestamp' in df.columns:
df = df.sort_values('timestamp', ascending=False)
return df.drop_duplicates(subset=key_cols, keep='first')
> **Rule of thumb** – always materialize the deduped dataset in a separate staging table before loading into the canonical store.
---
### 3.4 Data‑Drift Simulation & Detection
To prepare for real‑world drift, we create a synthetic anomaly: a single record with a wildly high price.
python
import numpy as np
# Original data
prices = np.random.normal(loc=100, scale=10, size=1000)
records = pd.DataFrame({'price': prices, 'timestamp': pd.date_range('2024-01-01', periods=1000)})
# Inject drift
drift_record = pd.DataFrame({'price': [9999], 'timestamp': [pd.Timestamp('2024-01-01 00:00:00')]})
records = pd.concat([records, drift_record], ignore_index=True)
#### Airflow Drift‑Alert DAG
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'ds_team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='drift_detection',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
def load_and_check(**context):
df = pd.read_parquet('s3://data-lake/prices/2024-01-01.parquet')
high_price = df['price'].max()
if high_price > 500:
raise ValueError(f"Data drift detected: max price {high_price}")
return high_price
detect_task = PythonOperator(
task_id='detect_drift',
python_callable=load_and_check,
provide_context=True,
)
The DAG raises an exception if the max price exceeds a threshold, which Airflow marks as a failure and triggers an alert via Slack or PagerDuty.
---
### 3.5 Orchestration Best Practices
| Practice | Why it matters | Implementation hint |
|---|---|---|
| **Idempotent tasks** | Avoid duplicate ingestion on retries | Use `checksum` or `record_hash` checks |
| **Schema validation** | Prevent silent schema drift | Employ Great Expectations or custom validators |
| **Resource isolation** | Prevent noisy neighbors | Use Kubernetes pods or Docker containers per DAG |
| **Monitoring & alerting** | Early detection of bottlenecks | CloudWatch metrics, Prometheus alerts |
---
### 3.6 Checklist Before You Deploy
- [ ] All ETL functions wrapped in `@retry` or equivalent.
- [ ] Deduplication logic validated against a reference dataset.
- [ ] Drift detection thresholds calibrated on historical data.
- [ ] Airflow DAGs scheduled with `catchup=False` to avoid backlog.
- [ ] Logs streamed to a centralized log store (ELK, CloudWatch).
- [ ] Alerts integrated with incident response tools.
With these building blocks in place, the ingestion layer becomes a dependable backbone for the rest of the data science stack.