聊天視窗

Data Science Unlocked: A Practical Guide for Modern Analysts - 第 3 章

Chapter 3: Building Robust Ingestion Pipelines

發布於 2026-02-23 15:56

# Chapter 3 ## Building Robust Ingestion Pipelines In the previous chapter we laid the groundwork for data collection and storage. This chapter dives into the practical mechanics of moving data from heterogeneous sources into a clean, query‑ready lake. The focus is threefold: 1. **Error handling** – catching the inevitable runtime failures. 2. **Deduplication** – eliminating noisy repeats that corrupt downstream analytics. 3. **Data‑drift simulation & detection** – learning how to spot anomalous behaviour before it breaks models. The style of this chapter is intentionally open and inquisitive, yet rigorous. We present theory, but we prioritize hands‑on code, reproducible experiments, and clear design principles. --- ### 3.1 Data Acquisition 101 Whether you pull from REST APIs, message queues, or flat files, the first step is always the same: **serialize** and **persist**. python import requests import pandas as pd import json def fetch_api(url: str, params: dict) -> pd.DataFrame: """Fetch JSON payload, convert to DataFrame, and return.""" resp = requests.get(url, params=params) resp.raise_for_status() # throws on non‑200 return pd.json_normalize(resp.json()) For batch ingestion, consider using `awscli` or `google-cloud-sdk` wrappers to stream CSVs directly to object storage, then trigger a downstream Spark job. --- ### 3.2 Adding Robust Error Handling A production ingestion routine cannot afford to die silently. The following skeleton demonstrates a **retry‑backoff** strategy and an explicit **logging** path. python import time import logging from functools import wraps logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def retry(max_attempts: int = 5, backoff_factor: float = 2.0): def decorator(fn): @wraps(fn) def wrapper(*args, **kwargs): attempt = 0 delay = 1 while attempt < max_attempts: try: return fn(*args, **kwargs) except Exception as exc: attempt += 1 logger.warning( f"Attempt {attempt}/{max_attempts} failed: {exc}. Retrying in {delay}s" ) time.sleep(delay) delay *= backoff_factor logger.error("All retry attempts exhausted.") raise RuntimeError("Ingestion failed after retries") return wrapper return decorator @retry(max_attempts=3) def ingest_to_s3(df: pd.DataFrame, bucket: str, key: str): """Persist DataFrame to S3 as Parquet.""" df.to_parquet(f"s3://{bucket}/{key}", index=False) *Key take‑aways*: always surface exceptions to the orchestrator (Airflow, Prefect, etc.) and never swallow them. --- ### 3.3 Basic Deduplication Step Duplicate rows can silently inflate sales, skew model targets, or trigger downstream failures. A minimal deduplication routine looks like this: python def deduplicate(df: pd.DataFrame, key_cols: list[str]) -> pd.DataFrame: """Remove duplicate rows based on key columns. Keeps the most recent record if a timestamp column is present. """ if 'timestamp' in df.columns: df = df.sort_values('timestamp', ascending=False) return df.drop_duplicates(subset=key_cols, keep='first') > **Rule of thumb** – always materialize the deduped dataset in a separate staging table before loading into the canonical store. --- ### 3.4 Data‑Drift Simulation & Detection To prepare for real‑world drift, we create a synthetic anomaly: a single record with a wildly high price. python import numpy as np # Original data prices = np.random.normal(loc=100, scale=10, size=1000) records = pd.DataFrame({'price': prices, 'timestamp': pd.date_range('2024-01-01', periods=1000)}) # Inject drift drift_record = pd.DataFrame({'price': [9999], 'timestamp': [pd.Timestamp('2024-01-01 00:00:00')]}) records = pd.concat([records, drift_record], ignore_index=True) #### Airflow Drift‑Alert DAG python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import pandas as pd default_args = { 'owner': 'ds_team', 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( dag_id='drift_detection', default_args=default_args, schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) as dag: def load_and_check(**context): df = pd.read_parquet('s3://data-lake/prices/2024-01-01.parquet') high_price = df['price'].max() if high_price > 500: raise ValueError(f"Data drift detected: max price {high_price}") return high_price detect_task = PythonOperator( task_id='detect_drift', python_callable=load_and_check, provide_context=True, ) The DAG raises an exception if the max price exceeds a threshold, which Airflow marks as a failure and triggers an alert via Slack or PagerDuty. --- ### 3.5 Orchestration Best Practices | Practice | Why it matters | Implementation hint | |---|---|---| | **Idempotent tasks** | Avoid duplicate ingestion on retries | Use `checksum` or `record_hash` checks | | **Schema validation** | Prevent silent schema drift | Employ Great Expectations or custom validators | | **Resource isolation** | Prevent noisy neighbors | Use Kubernetes pods or Docker containers per DAG | | **Monitoring & alerting** | Early detection of bottlenecks | CloudWatch metrics, Prometheus alerts | --- ### 3.6 Checklist Before You Deploy - [ ] All ETL functions wrapped in `@retry` or equivalent. - [ ] Deduplication logic validated against a reference dataset. - [ ] Drift detection thresholds calibrated on historical data. - [ ] Airflow DAGs scheduled with `catchup=False` to avoid backlog. - [ ] Logs streamed to a centralized log store (ELK, CloudWatch). - [ ] Alerts integrated with incident response tools. With these building blocks in place, the ingestion layer becomes a dependable backbone for the rest of the data science stack.