返回目錄
A
Data Science Unlocked: A Practical Guide for Modern Analysts - 第 2 章
Chapter 2: Data Collection & Engineering
發布於 2026-02-23 15:44
# Chapter 2: Data Collection & Engineering
In the data science lifecycle, the **quality and accessibility of data** dictate how quickly you can turn insights into action. This chapter walks through the modern toolkit for collecting, ingesting, and engineering data so that analysts and engineers can focus on analysis and modeling rather than plumbing.
---
## 2.1 The Data Collection Landscape
| Source | Typical Use‑Case | Frequency | Example Tech Stack |
|--------|-----------------|-----------|-------------------|
| **CSV/Excel** | Small business reports, legacy data | One‑off or periodic | pandas, openpyxl |
| **Relational DBs** | OLTP/OLAP data | Continuous | JDBC, SQLAlchemy |
| **NoSQL** | Log files, event streams | Near‑real‑time | MongoDB, Redis |
| **APIs** | External services (social media, weather) | Hourly/Daily | requests, httpx |
| **Web Scraping** | Competitive pricing, real‑time listings | Batch or continuous | BeautifulSoup, Scrapy |
| **Streaming** | IoT sensors, financial tickers | Millisecond‑level | Kafka, Flink |
> **Key Takeaway:** Align your ingestion strategy with *source velocity* and *data fidelity* needs.
## 2.2 Ingestion Techniques
### 2.2.1 Batch Ingestion
Batch pipelines pull snapshots of data at scheduled intervals. Use case: nightly loads of sales data.
**Python‑based ETL example**:
python
import pandas as pd
import sqlalchemy
# 1. Extract
url = "https://example.com/sales_2023.csv"
raw = pd.read_csv(url)
# 2. Transform
raw['order_date'] = pd.to_datetime(raw['order_date'])
raw['total'] = raw['quantity'] * raw['unit_price']
# 3. Load
engine = sqlalchemy.create_engine("postgresql://user:pw@host/db")
raw.to_sql("sales", engine, if_exists="append", index=False)
### 2.2.2 Incremental / Change Data Capture (CDC)
Avoid re‑processing the entire dataset. Most databases expose a *watermark* column or change logs.
sql
-- PostgreSQL example using a timestamp watermark
INSERT INTO sales (order_id, order_date, total)
SELECT order_id, order_date, total
FROM source_sales
WHERE order_date > (SELECT MAX(order_date) FROM sales);
### 2.2.3 Streaming / Near‑Real‑time
When latency matters, pull data from a broker.
python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"sensor_events",
bootstrap_servers=["kafka-broker:9092"],
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
for msg in consumer:
event = msg.value
# Process event here
## 2.3 Web Scraping & APIs
### 2.3.1 API Consumption
APIs are the most stable source. Use OAuth, rate‑limit handling, and retry logic.
python
import httpx
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5))
def fetch_twitter_user(user_id: str):
url = f"https://api.twitter.com/2/users/{user_id}"
headers = {"Authorization": f"Bearer {TOKEN}"}
resp = httpx.get(url, headers=headers)
resp.raise_for_status()
return resp.json()
### 2.3.2 Web Scraping
Scraping should be the last resort. Always respect robots.txt and terms of service.
python
import requests
from bs4 import BeautifulSoup
import pandas as pd
page = requests.get("https://example.com/products")
soup = BeautifulSoup(page.text, "html.parser")
rows = []
for product in soup.select(".product-card"):
name = product.select_one(".name").text.strip()
price = float(product.select_one(".price").text.strip().replace("$", ""))
rows.append({"name": name, "price": price})
df = pd.DataFrame(rows)
## 2.4 Building Robust Pipelines
| Tool | Strength | Use‑Case |
|------|----------|----------|
| **Apache Airflow** | DAG orchestration, scheduling | Complex batch workflows |
| **dbt** | SQL‑based transformations, version control | Data warehouse modeling |
| **Prefect** | Declarative tasks, cloud‑native | Hybrid pipelines |
| **Dagster** | Type safety, composable pipelines | Enterprise data ops |
### 2.4.1 Airflow DAG Example
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import pandas as pd
import sqlalchemy
def extract(**kwargs):
url = kwargs['params']['csv_url']
return pd.read_csv(url).to_dict(orient='records')
def transform(**kwargs):
df = pd.DataFrame(kwargs['ti'].xcom_pull(task_ids='extract'))
df['order_date'] = pd.to_datetime(df['order_date'])
df['total'] = df['quantity'] * df['unit_price']
return df.to_dict(orient='records')
def load(**kwargs):
df = pd.DataFrame(kwargs['ti'].xcom_pull(task_ids='transform'))
engine = sqlalchemy.create_engine("postgresql://user:pw@host/db")
df.to_sql("sales", engine, if_exists="append", index=False)
def create_dag():
default_args = {
'owner': 'data-eng',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sales_etl',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(1),
catchup=False
)
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
params={'csv_url': 'https://example.com/sales.csv'},
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
return dag
dag = create_dag()
> **Best Practice:** Keep your DAG logic *purely* Python‑based and delegate heavy lifting to external services (Spark, Snowflake, etc.).
### 2.4.2 dbt Model Example
**`models/staging/sales.sql`**
sql
-- staging/sales.sql
SELECT
id AS order_id,
CAST(order_date AS DATE) AS order_date,
quantity,
unit_price
FROM {{ source('raw', 'sales_csv') }}
WHERE order_date IS NOT NULL;
**`models/marts/sales_fct.sql`**
sql
WITH base AS (
SELECT * FROM {{ ref('sales') }}
)
SELECT
order_id,
order_date,
quantity,
unit_price,
quantity * unit_price AS total
FROM base;
dbt automatically creates DAGs, handles dependencies, and stores lineage metadata.
## 2.5 Data Storage & Governance
1. **Choose the right warehouse** – Columnar storage (Snowflake, BigQuery, Redshift) is optimal for analytical workloads.
2. **Partitioning & Clustering** – Improves query performance; e.g., partition by `order_date`.
3. **Versioning & Backups** – Use incremental models and snapshots.
4. **Metadata & Lineage** – Capture source, transformation, and destination details for auditability.
5. **Security** – Encrypt data at rest, enforce least‑privilege access.
## 2.6 Common Pitfalls & Mitigation
| Pitfall | Symptom | Fix |
|----------|---------|-----|
| **Unreliable scrapers** | Data stops appearing when site changes | Use APIs where possible; wrap scraping in retry logic; monitor failures |
| **Data drift** | Model accuracy degrades | Set up drift detection; retrain on fresh data |
| **No logging** | Hard to debug pipeline failures | Integrate with Airflow logs or external log aggregator (ELK) |
| **Duplicate rows** | Inflated metrics | Use idempotent loads or deduplication logic |
## 2.7 Case Study: Building a Real‑Time Retail Dashboard
1. **Sources** – POS system (MySQL), e‑commerce API (REST), weather data (OpenWeather API).
2. **Ingestion** – Airflow pulls nightly sales; Prefect streams POS updates; API wrapper fetches weather.
3. **Transformation** – dbt standardizes schemas; feature engineering adds `seasonal_adjustment`.
4. **Storage** – Data lands in Snowflake, partitioned by `sale_date`.
5. **Serving** – Looker (or Power BI) connects to Snowflake for real‑time dashboards.
6. **Monitoring** – CloudWatch alerts on Airflow task failures; Grafana dashboards for throughput.
> **Outcome:** A 10‑minute‑lag dashboard that updates sales, inventory, and external factors, enabling store managers to adjust staffing in near real‑time.
## 2.8 Practical Exercises
1. **ETL Mini‑Project** – Create an Airflow DAG that pulls the latest 100 rows from a public REST API (e.g., GitHub events) and loads them into a PostgreSQL table.
2. **dbt Sprint** – Model a simple sales fact from two raw CSV files using dbt. Deploy the DAG to a local Airflow instance.
3. **Scraping Challenge** – Scrape product titles and prices from an e‑commerce category page and store them in a local SQLite database. Add error handling and a basic deduplication step.
4. **Data Drift Simulation** – Simulate drift by inserting a row with an unusually high price. Build a drift detection alert in Airflow.
---
**Summary** – Effective data ingestion and pipeline engineering are the backbone of any data‑driven organization. Mastering extraction techniques, orchestration tools, and storage best practices ensures high‑quality, reliable datasets that feed downstream analytics and machine learning models.