返回目錄
A
數據洞察實戰:從數據採集到模型部署的完整路徑 - 第 3 章
第三章:資料清洗、探索性分析與特徵工程
發布於 2026-02-27 21:53
# 第三章:資料清洗、探索性分析與特徵工程
> **章節目標**
> - 透過實務案例說明資料清洗與探索性分析的重點與常見陷阱。
> - 介紹特徵工程的基本方法與工具,並示範如何將它們結合至 dbt、Airflow 與 Python。
> - 讓讀者掌握能在實際專案中快速構建乾淨、可用數據集的完整流程。
---
## 3.1 為什麼要先做資料清洗?
| 清洗缺失的風險 | 影響 | 範例 |
|-----------------|-------|------|
| **資料缺失** | 模型偏誤、決策失誤 | 電商平台「缺貨」商品被錯誤標記為「熱銷」 |
| **重複資料** | 計算成本、統計失真 | 同一筆訂單被多次抓取,導致收入高估 |
| **格式不統一** | 隱藏邏輯關係 | 日期欄位為 `YYYY-MM-DD`、`DD/MM/YYYY`、`MM-DD-YYYY` 混雜 |
| **異常值** | 模型過擬合、誤差放大 | 付款金額為 1 億台幣的單筆交易 |
> **實務結論**:即使數據量再大,沒有乾淨、標準化的資料也無法產出可靠洞見。清洗工作不只是修正錯誤,更是資料治理的第一步。
---
## 3.2 探索性資料分析(EDA)
EDA 是理解資料結構、分佈、關係的關鍵。這一章將以「電商購物車」為例,示範如何透過 Python、SQL 及 dbt 進行探索。
### 3.2.1 Python 與 Pandas
python
import pandas as pd
import matplotlib.pyplot as plt
# 讀取 staging 資料
df = pd.read_csv('staging/cart_raw.csv')
# 基本描述
print(df.describe(include='all'))
# 缺失值統計
print(df.isna().sum())
# 直方圖:商品價格分佈
plt.figure(figsize=(8,5))
plt.hist(df['price'], bins=30, edgecolor='k')
plt.title('商品價格分佈')
plt.xlabel('價格')
plt.ylabel('頻次')
plt.show()
### 3.2.2 SQL 與 dbt
在資料倉儲中,我們常用 dbt 對原始表進行彙總、轉換。以下示範建立一個簡單的 dbt model,產生「清洗後的購物車」
sql
-- models/cart_clean.sql
WITH raw AS (
SELECT * FROM {{ source('staging', 'cart_raw') }}
),
cleaned AS (
SELECT
id,
user_id,
product_id,
quantity,
-- 轉換日期格式,統一為 ISO 8601
PARSE_DATE('%Y-%m-%d', created_at) AS created_at,
-- 去除負數或極端高數量
CASE
WHEN quantity < 1 OR quantity > 100 THEN NULL
ELSE quantity
END AS quantity_cleaned,
-- 轉換價格為數值,處理逗號
CAST(REPLACE(price, ',', '') AS NUMERIC) AS price_cleaned
FROM raw
)
SELECT * FROM cleaned
WHERE quantity_cleaned IS NOT NULL
AND price_cleaned IS NOT NULL;
> **提示**:在 dbt 中,`schema.yml` 可以加入資料品質規則,例如:
> yaml
> version: 2
> models:
> - name: cart_clean
> columns:
> - name: quantity_cleaned
> tests:
> - not_null
> - accepted_range:
> min: 1
> max: 100
>
> 這樣可以自動在資料載入時檢查範圍,避免非法資料進入。
---
## 3.3 特徵工程基礎
特徵工程是把「原始資料」轉化為模型可用輸入的關鍵環節。常見步驟:
| 步驟 | 目的 | 範例 |
|------|------|------|
| **標準化/正規化** | 消除量級差異 | `StandardScaler` 或 `MinMaxScaler` |
| **編碼類別變數** | 讓機器學習模型處理非數值欄位 | One‑Hot Encoding, Target Encoding |
| **日期特徵拆解** | 捕捉季節性、週期性 | `year`, `month`, `week_of_year`, `day_of_week` |
| **衍生特徵** | 對業務有直接意義 | 「平均單價」 = `price / quantity` |
| **稀疏向量處理** | 高維特徵壓縮 | `TruncatedSVD` |
### 3.3.1 具體示範(Python)
python
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import numpy as np
# 讀取已清洗資料
df = pd.read_csv('clean/cart_clean.csv')
numeric_features = ['price_cleaned', 'quantity_cleaned']
categorical_features = ['product_id', 'user_id']
numeric_transformer = Pipeline(steps=[
('scaler', StandardScaler()),
])
categorical_transformer = Pipeline(steps=[
('onehot', OneHotEncoder(handle_unknown='ignore')),
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features),
])
X = preprocessor.fit_transform(df)
print('特徵矩陣形狀:', X.shape)
### 3.3.2 在 dbt 中產生特徵表
若模型訓練需要大量資料,將特徵生成放在 dbt 可確保資料一致性。
sql
-- models/cart_features.sql
WITH base AS (
SELECT * FROM {{ ref('cart_clean') }}
),
features AS (
SELECT
id,
user_id,
product_id,
quantity_cleaned,
price_cleaned,
-- 日期特徵
EXTRACT(year FROM created_at) AS year,
EXTRACT(month FROM created_at) AS month,
EXTRACT(week FROM created_at) AS week_of_year,
EXTRACT(dow FROM created_at) AS day_of_week,
-- 衍生特徵:單價
price_cleaned / NULLIF(quantity_cleaned, 0) AS unit_price
FROM base
)
SELECT * FROM features;
> **備註**:在 dbt 里可再使用 `macro` 定義常用轉換,例如日期拆解,減少重複程式碼。
---
## 3.4 工具鏈結合:Airflow + dbt + Python
| 工具 | 角色 | 典型任務 |
|------|------|----------|
| **Airflow** | 工作流調度 | 每日批次、週期性特徵計算 |
| **dbt** | 數據轉換 | 資料清洗、特徵表構建 |
| **Python** | 機器學習 / EDA | 迭代特徵、模型訓練 |
### 3.4.1 Airflow DAG 範例
python
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_eng',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'cart_pipeline',
default_args=default_args,
description='清洗、特徵化購物車資料',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
# 1. 觸發 dbt run
run_dbt = BashOperator(
task_id='run_dbt',
bash_command='cd /opt/dbt/cart && dbt run --profiles-dir .',
dag=dag,
)
# 2. 執行特徵工程腳本(可選)
feature_eng = PythonOperator(
task_id='feature_eng',
python_callable=lambda: print('執行 Python 特徵工程'),
dag=dag,
)
run_dbt >> feature_eng
### 3.4.2 監控指標
| 指標 | 說明 | 監控方式 |
|------|------|----------|
| **資料完整度** | 所有日誌文件是否完整 | dbt tests + Airflow logs |
| **錯誤率** | DAG 執行失敗比例 | Airflow SLA, PagerDuty |
| **特徵更新延遲** | 最後更新時間與預定時間差距 | dbt metrics, Grafana |
> **提示**:在 Airflow UI 可設定 `sla`,若特徵表更新逾時,自動觸發 Slack 或 PagerDuty 通知。
---
## 3.5 真實案例:電商平台購物車資料處理
### 3.5.1 背景
- 目標:預測客戶是否會在接下來的 7 天內完成結帳。
- 資料來源:Kafka 主題 `cart_events`(即時)+ nightly batch `orders` 表。
### 3.5.2 流程概覽
mermaid
flowchart TD
A[Kafka topic: cart_events] -->|Stream| B[Kafka Connect → PostgreSQL staging]
B -->|Airflow daily| C[dbt: cart_clean]
C -->|dbt: cart_features| D[Feature Store (Snowflake)]
D -->|Python model training| E[ML model (XGBoost)]
E -->|Batch scoring| F[Prediction table]
F -->|API → Frontend| G[Real‑time recommendation]
### 3.5.3 具體實作
1. **Kafka Connect**:將事件持久化至 PostgreSQL staging 表 `cart_raw`。
2. **dbt**:兩個 model,`cart_clean` 與 `cart_features`。
3. **Airflow**:每日 02:00 觸發 dbt,並執行 XGBoost 模型訓練。
4. **Feature Store**:使用 Snowflake 的 `FEATURE_STORE` schema,方便大規模查詢。
5. **API**:FastAPI 服務於前端呼叫,使用最新預測值。
> **結果**:預測準確率提升 12%,平均結帳時間縮短 18%。
---
## 3.6 小結
| 章節重點 | 重要性 |
|----------|--------|
| 資料清洗 | 去除噪音、確保品質 |
| EDA | 了解資料分佈、發現潛在關係 |
| 特徵工程 | 轉化資料以提升模型效能 |
| 工具鏈整合 | 確保自動化、可重複、可追蹤 |
> **未來展望**:接下來的章節將深入機器學習模型選型、評估指標與部署流程,讓讀者能完整掌握從「數據」到「洞見」的整體體系。