聊天視窗

Data Science for the Modern Analyst: From Data to Insight - 第 2 章

Chapter 2: Data Acquisition & Cleaning

發布於 2026-03-04 13:44

# Chapter 2: Data Acquisition & Cleaning Data acquisition and cleaning form the bedrock of any data‑science endeavor. Whether you’re pulling sales figures from a relational database, ingesting IoT telemetry from a NoSQL store, or streaming click logs in real time, the goal is the same: obtain high‑quality, well‑structured data that can feed downstream analytics and modeling. ## 2.1 Techniques for Extracting Data | Source | Typical Tools / Libraries | Typical Use‑Case | |--------|---------------------------|-----------------| | **Relational Databases** | `psycopg2`, `SQLAlchemy`, `pandas.read_sql` | Structured transactional data (e.g., sales, finance) | **NoSQL Stores** | `pymongo`, `cassandra-driver`, `boto3` (S3) | Semi‑structured or schema‑less data (e.g., JSON logs, user profiles) | **Streaming Platforms** | `kafka-python`, `confluent_kafka`, `pyspark.sql.streaming` | Real‑time event processing (e.g., fraud alerts, recommendation engines) ### 2.1.1 Pulling from a Relational DB python import pandas as pd from sqlalchemy import create_engine # 1. Create a connection string (PostgreSQL example) engine = create_engine( "postgresql+psycopg2://user:password@host:5432/database" ) # 2. Read a table into a DataFrame sales = pd.read_sql("SELECT * FROM sales", engine) print(sales.head()) ### 2.1.2 Querying a NoSQL Collection python from pymongo import MongoClient client = MongoClient("mongodb://user:password@host:27017/") db = client["analytics"] # Retrieve documents where purchase_amount > 100 cursor = db.purchases.find({"purchase_amount": {"$gt": 100}}) # Convert to DataFrame for analysis import pandas as pd purchases = pd.DataFrame(list(cursor)) print(purchases.head()) ### 2.1.3 Consuming a Kafka Stream python from kafka import KafkaConsumer import json consumer = KafkaConsumer( "user_activity", bootstrap_servers=["broker1:9092", "broker2:9092"], auto_offset_reset="earliest", enable_auto_commit=True, group_id="analysis-group", value_deserializer=lambda m: json.loads(m.decode("utf-8")) ) for message in consumer: event = message.value # Process or store the event print(event) # Break after first message for demo purposes break ## 2.2 Data Cleaning Strategies After extraction, raw data rarely arrives in a ready‑to‑analyze form. Cleaning involves several steps: 1. **Missing‑Value Handling** – Deletion, imputation, or model‑based approaches. 2. **Outlier Detection & Treatment** – Statistical methods or density‑based clustering. 3. **Data Normalization & Scaling** – Standardization, min‑max, or log transforms. 4. **Data Type Correction & Deduplication** – Ensuring schema consistency. 5. **Encoding Categorical Variables** – One‑hot, ordinal, or target encoding. ### 2.2.1 Missing‑Value Imputation | Technique | When to Use | Example | |-----------|-------------|---------| | **Deletion** | Low % missing, random pattern | `df.dropna()` | | **Mean / Median** | Numerical, symmetric distribution | `df['col'].fillna(df['col'].mean(), inplace=True)` | | **K‑NN Imputer** | Non‑linear relationships | `from sklearn.impute import KNNImputer` | | **MICE / Iterative Imputer** | Complex dependencies | `from sklearn.experimental import enable_iterative_imputer; from sklearn.impute import IterativeImputer` | | **Predictive Models** | When missingness correlates with other features | `from sklearn.ensemble import RandomForestRegressor` | python # Example: K‑NN imputation from sklearn.impute import KNNImputer imputer = KNNImputer(n_neighbors=5) X_imputed = imputer.fit_transform(df[['age', 'income', 'score']]) df[['age', 'income', 'score']] = X_imputed ### 2.2.2 Outlier Detection python import numpy as np # Z‑score method z_scores = np.abs((df['score'] - df['score'].mean()) / df['score'].std()) outliers = df[z_scores > 3] # IQR method Q1 = df['score'].quantile(0.25) Q3 = df['score'].quantile(0.75) IQR = Q3 - Q1 outliers = df[(df['score'] < Q1 - 1.5 * IQR) | (df['score'] > Q3 + 1.5 * IQR)] ### 2.2.3 Normalization & Scaling | Method | Use‑Case | Code | |--------|----------|------| | **StandardScaler** | Gaussian‑like data | `from sklearn.preprocessing import StandardScaler` | | **MinMaxScaler** | Preserve bounds [0,1] | `from sklearn.preprocessing import MinMaxScaler` | | **Log Transform** | Right‑skewed | `df['log_income'] = np.log1p(df['income'])` | python from sklearn.preprocessing import StandardScaler scaler = StandardScaler() df[['age', 'income']] = scaler.fit_transform(df[['age', 'income']]) ### 2.2.4 Deduplication & Data Type Correction python # Remove duplicate rows clean_df = df.drop_duplicates(subset=['customer_id', 'transaction_id']) # Ensure correct data types clean_df['date'] = pd.to_datetime(clean_df['date']) clean_df['customer_id'] = clean_df['customer_id'].astype(str) ### 2.2.5 Encoding Categorical Variables | Encoding | When | Library / Code | |----------|------|----------------| | **One‑Hot** | Nominal categories | `pd.get_dummies(df, columns=['region'])` | | **Ordinal** | Ordered categories | `df['grade'] = df['grade'].map({'A': 4, 'B': 3, 'C': 2, 'D': 1})` | | **Target Encoding** | High cardinality | `df['cat_enc'] = df.groupby('category')['target'].transform('mean')` | python # One‑hot example encoded = pd.get_dummies(df, columns=['gender'], drop_first=True) ## 2.3 Building a Reproducible Pipeline 1. **Version Control** – Store all code in a Git repository; tag releases. 2. **Environment Management** – Use `conda` or `pipenv` with `environment.yml`/`Pipfile`. 3. **Data Versioning** – Tools like `DVC` or `Delta Lake` track raw and processed datasets. 4. **Automated Testing** – Unit tests for data validation rules. 5. **CI/CD** – Automate pipeline runs with GitHub Actions or GitLab CI. yaml # Example GitHub Actions workflow for data pipeline name: Data Pipeline on: push: branches: [ main ] jobs: build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.10' - name: Install dependencies run: pip install -r requirements.txt - name: Run pipeline run: python src/etl/main.py - name: Upload artifact uses: actions/upload-artifact@v3 with: name: cleaned-data path: data/cleaned/*.csv ## 2.4 Practical Checklist | Step | Checkpoint | Tool / Technique | |------|------------|------------------| | 1 | Data is reachable | Connection strings, API keys | | 2 | Schema consistency | SQLAlchemy metadata, MongoDB schema docs | | 3 | No missing critical columns | `df.isnull().sum()` | | 4 | Outliers flagged | Statistical plots, z‑score thresholds | | 5 | Normalization applied | StandardScaler, MinMaxScaler | | 6 | Encoding complete | `pd.get_dummies`, mapping dictionaries | | 7 | Pipeline versioned | Git tags, DVC tracking | | 8 | Documentation updated | README, Jupyter notebooks | --- ### Key Takeaways - **Extract, transform, load (ETL) is a disciplined cycle** that ensures you start downstream processes with clean, consistent data. - **Missing data is rarely “missing for good”; treat it strategically** using the right imputation method. - **Outliers can be signals, not noise**; investigate before discarding. - **Scaling isn’t a one‑size‑fits‑all**; match the method to the distribution of each feature. - **Reproducibility begins at data acquisition**—document connections, credentials, and extraction logic. - **Versioning data is as critical as versioning code**; tools like DVC bridge that gap. By mastering data acquisition and cleaning, you lay a robust foundation for the exploratory, modeling, and deployment stages that follow. In the next chapter, we’ll turn this pristine dataset into meaningful insights through exploratory data analysis.