返回目錄
A
Data Science for the Modern Analyst: From Data to Insight - 第 2 章
Chapter 2: Data Acquisition & Cleaning
發布於 2026-03-04 13:44
# Chapter 2: Data Acquisition & Cleaning
Data acquisition and cleaning form the bedrock of any data‑science endeavor. Whether you’re pulling sales figures from a relational database, ingesting IoT telemetry from a NoSQL store, or streaming click logs in real time, the goal is the same: obtain high‑quality, well‑structured data that can feed downstream analytics and modeling.
## 2.1 Techniques for Extracting Data
| Source | Typical Tools / Libraries | Typical Use‑Case |
|--------|---------------------------|-----------------|
| **Relational Databases** | `psycopg2`, `SQLAlchemy`, `pandas.read_sql` | Structured transactional data (e.g., sales, finance)
| **NoSQL Stores** | `pymongo`, `cassandra-driver`, `boto3` (S3) | Semi‑structured or schema‑less data (e.g., JSON logs, user profiles)
| **Streaming Platforms** | `kafka-python`, `confluent_kafka`, `pyspark.sql.streaming` | Real‑time event processing (e.g., fraud alerts, recommendation engines)
### 2.1.1 Pulling from a Relational DB
python
import pandas as pd
from sqlalchemy import create_engine
# 1. Create a connection string (PostgreSQL example)
engine = create_engine(
"postgresql+psycopg2://user:password@host:5432/database"
)
# 2. Read a table into a DataFrame
sales = pd.read_sql("SELECT * FROM sales", engine)
print(sales.head())
### 2.1.2 Querying a NoSQL Collection
python
from pymongo import MongoClient
client = MongoClient("mongodb://user:password@host:27017/")
db = client["analytics"]
# Retrieve documents where purchase_amount > 100
cursor = db.purchases.find({"purchase_amount": {"$gt": 100}})
# Convert to DataFrame for analysis
import pandas as pd
purchases = pd.DataFrame(list(cursor))
print(purchases.head())
### 2.1.3 Consuming a Kafka Stream
python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"user_activity",
bootstrap_servers=["broker1:9092", "broker2:9092"],
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id="analysis-group",
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
for message in consumer:
event = message.value
# Process or store the event
print(event)
# Break after first message for demo purposes
break
## 2.2 Data Cleaning Strategies
After extraction, raw data rarely arrives in a ready‑to‑analyze form. Cleaning involves several steps:
1. **Missing‑Value Handling** – Deletion, imputation, or model‑based approaches.
2. **Outlier Detection & Treatment** – Statistical methods or density‑based clustering.
3. **Data Normalization & Scaling** – Standardization, min‑max, or log transforms.
4. **Data Type Correction & Deduplication** – Ensuring schema consistency.
5. **Encoding Categorical Variables** – One‑hot, ordinal, or target encoding.
### 2.2.1 Missing‑Value Imputation
| Technique | When to Use | Example |
|-----------|-------------|---------|
| **Deletion** | Low % missing, random pattern | `df.dropna()` |
| **Mean / Median** | Numerical, symmetric distribution | `df['col'].fillna(df['col'].mean(), inplace=True)` |
| **K‑NN Imputer** | Non‑linear relationships | `from sklearn.impute import KNNImputer` |
| **MICE / Iterative Imputer** | Complex dependencies | `from sklearn.experimental import enable_iterative_imputer; from sklearn.impute import IterativeImputer` |
| **Predictive Models** | When missingness correlates with other features | `from sklearn.ensemble import RandomForestRegressor` |
python
# Example: K‑NN imputation
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=5)
X_imputed = imputer.fit_transform(df[['age', 'income', 'score']])
df[['age', 'income', 'score']] = X_imputed
### 2.2.2 Outlier Detection
python
import numpy as np
# Z‑score method
z_scores = np.abs((df['score'] - df['score'].mean()) / df['score'].std())
outliers = df[z_scores > 3]
# IQR method
Q1 = df['score'].quantile(0.25)
Q3 = df['score'].quantile(0.75)
IQR = Q3 - Q1
outliers = df[(df['score'] < Q1 - 1.5 * IQR) | (df['score'] > Q3 + 1.5 * IQR)]
### 2.2.3 Normalization & Scaling
| Method | Use‑Case | Code |
|--------|----------|------|
| **StandardScaler** | Gaussian‑like data | `from sklearn.preprocessing import StandardScaler` |
| **MinMaxScaler** | Preserve bounds [0,1] | `from sklearn.preprocessing import MinMaxScaler` |
| **Log Transform** | Right‑skewed | `df['log_income'] = np.log1p(df['income'])` |
python
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']])
### 2.2.4 Deduplication & Data Type Correction
python
# Remove duplicate rows
clean_df = df.drop_duplicates(subset=['customer_id', 'transaction_id'])
# Ensure correct data types
clean_df['date'] = pd.to_datetime(clean_df['date'])
clean_df['customer_id'] = clean_df['customer_id'].astype(str)
### 2.2.5 Encoding Categorical Variables
| Encoding | When | Library / Code |
|----------|------|----------------|
| **One‑Hot** | Nominal categories | `pd.get_dummies(df, columns=['region'])` |
| **Ordinal** | Ordered categories | `df['grade'] = df['grade'].map({'A': 4, 'B': 3, 'C': 2, 'D': 1})` |
| **Target Encoding** | High cardinality | `df['cat_enc'] = df.groupby('category')['target'].transform('mean')` |
python
# One‑hot example
encoded = pd.get_dummies(df, columns=['gender'], drop_first=True)
## 2.3 Building a Reproducible Pipeline
1. **Version Control** – Store all code in a Git repository; tag releases.
2. **Environment Management** – Use `conda` or `pipenv` with `environment.yml`/`Pipfile`.
3. **Data Versioning** – Tools like `DVC` or `Delta Lake` track raw and processed datasets.
4. **Automated Testing** – Unit tests for data validation rules.
5. **CI/CD** – Automate pipeline runs with GitHub Actions or GitLab CI.
yaml
# Example GitHub Actions workflow for data pipeline
name: Data Pipeline
on:
push:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run pipeline
run: python src/etl/main.py
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: cleaned-data
path: data/cleaned/*.csv
## 2.4 Practical Checklist
| Step | Checkpoint | Tool / Technique |
|------|------------|------------------|
| 1 | Data is reachable | Connection strings, API keys |
| 2 | Schema consistency | SQLAlchemy metadata, MongoDB schema docs |
| 3 | No missing critical columns | `df.isnull().sum()` |
| 4 | Outliers flagged | Statistical plots, z‑score thresholds |
| 5 | Normalization applied | StandardScaler, MinMaxScaler |
| 6 | Encoding complete | `pd.get_dummies`, mapping dictionaries |
| 7 | Pipeline versioned | Git tags, DVC tracking |
| 8 | Documentation updated | README, Jupyter notebooks |
---
### Key Takeaways
- **Extract, transform, load (ETL) is a disciplined cycle** that ensures you start downstream processes with clean, consistent data.
- **Missing data is rarely “missing for good”; treat it strategically** using the right imputation method.
- **Outliers can be signals, not noise**; investigate before discarding.
- **Scaling isn’t a one‑size‑fits‑all**; match the method to the distribution of each feature.
- **Reproducibility begins at data acquisition**—document connections, credentials, and extraction logic.
- **Versioning data is as critical as versioning code**; tools like DVC bridge that gap.
By mastering data acquisition and cleaning, you lay a robust foundation for the exploratory, modeling, and deployment stages that follow. In the next chapter, we’ll turn this pristine dataset into meaningful insights through exploratory data analysis.