聊天視窗

數據洞察實戰:從數據採集到模型部署的完整路徑 - 第 3 章

第三章:資料清洗、探索性分析與特徵工程

發布於 2026-02-27 21:53

# 第三章:資料清洗、探索性分析與特徵工程 > **章節目標** > - 透過實務案例說明資料清洗與探索性分析的重點與常見陷阱。 > - 介紹特徵工程的基本方法與工具,並示範如何將它們結合至 dbt、Airflow 與 Python。 > - 讓讀者掌握能在實際專案中快速構建乾淨、可用數據集的完整流程。 --- ## 3.1 為什麼要先做資料清洗? | 清洗缺失的風險 | 影響 | 範例 | |-----------------|-------|------| | **資料缺失** | 模型偏誤、決策失誤 | 電商平台「缺貨」商品被錯誤標記為「熱銷」 | | **重複資料** | 計算成本、統計失真 | 同一筆訂單被多次抓取,導致收入高估 | | **格式不統一** | 隱藏邏輯關係 | 日期欄位為 `YYYY-MM-DD`、`DD/MM/YYYY`、`MM-DD-YYYY` 混雜 | | **異常值** | 模型過擬合、誤差放大 | 付款金額為 1 億台幣的單筆交易 | > **實務結論**:即使數據量再大,沒有乾淨、標準化的資料也無法產出可靠洞見。清洗工作不只是修正錯誤,更是資料治理的第一步。 --- ## 3.2 探索性資料分析(EDA) EDA 是理解資料結構、分佈、關係的關鍵。這一章將以「電商購物車」為例,示範如何透過 Python、SQL 及 dbt 進行探索。 ### 3.2.1 Python 與 Pandas python import pandas as pd import matplotlib.pyplot as plt # 讀取 staging 資料 df = pd.read_csv('staging/cart_raw.csv') # 基本描述 print(df.describe(include='all')) # 缺失值統計 print(df.isna().sum()) # 直方圖:商品價格分佈 plt.figure(figsize=(8,5)) plt.hist(df['price'], bins=30, edgecolor='k') plt.title('商品價格分佈') plt.xlabel('價格') plt.ylabel('頻次') plt.show() ### 3.2.2 SQL 與 dbt 在資料倉儲中,我們常用 dbt 對原始表進行彙總、轉換。以下示範建立一個簡單的 dbt model,產生「清洗後的購物車」 sql -- models/cart_clean.sql WITH raw AS ( SELECT * FROM {{ source('staging', 'cart_raw') }} ), cleaned AS ( SELECT id, user_id, product_id, quantity, -- 轉換日期格式,統一為 ISO 8601 PARSE_DATE('%Y-%m-%d', created_at) AS created_at, -- 去除負數或極端高數量 CASE WHEN quantity < 1 OR quantity > 100 THEN NULL ELSE quantity END AS quantity_cleaned, -- 轉換價格為數值,處理逗號 CAST(REPLACE(price, ',', '') AS NUMERIC) AS price_cleaned FROM raw ) SELECT * FROM cleaned WHERE quantity_cleaned IS NOT NULL AND price_cleaned IS NOT NULL; > **提示**:在 dbt 中,`schema.yml` 可以加入資料品質規則,例如: > yaml > version: 2 > models: > - name: cart_clean > columns: > - name: quantity_cleaned > tests: > - not_null > - accepted_range: > min: 1 > max: 100 > > 這樣可以自動在資料載入時檢查範圍,避免非法資料進入。 --- ## 3.3 特徵工程基礎 特徵工程是把「原始資料」轉化為模型可用輸入的關鍵環節。常見步驟: | 步驟 | 目的 | 範例 | |------|------|------| | **標準化/正規化** | 消除量級差異 | `StandardScaler` 或 `MinMaxScaler` | | **編碼類別變數** | 讓機器學習模型處理非數值欄位 | One‑Hot Encoding, Target Encoding | | **日期特徵拆解** | 捕捉季節性、週期性 | `year`, `month`, `week_of_year`, `day_of_week` | | **衍生特徵** | 對業務有直接意義 | 「平均單價」 = `price / quantity` | | **稀疏向量處理** | 高維特徵壓縮 | `TruncatedSVD` | ### 3.3.1 具體示範(Python) python from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline import numpy as np # 讀取已清洗資料 df = pd.read_csv('clean/cart_clean.csv') numeric_features = ['price_cleaned', 'quantity_cleaned'] categorical_features = ['product_id', 'user_id'] numeric_transformer = Pipeline(steps=[ ('scaler', StandardScaler()), ]) categorical_transformer = Pipeline(steps=[ ('onehot', OneHotEncoder(handle_unknown='ignore')), ]) preprocessor = ColumnTransformer( transformers=[ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features), ]) X = preprocessor.fit_transform(df) print('特徵矩陣形狀:', X.shape) ### 3.3.2 在 dbt 中產生特徵表 若模型訓練需要大量資料,將特徵生成放在 dbt 可確保資料一致性。 sql -- models/cart_features.sql WITH base AS ( SELECT * FROM {{ ref('cart_clean') }} ), features AS ( SELECT id, user_id, product_id, quantity_cleaned, price_cleaned, -- 日期特徵 EXTRACT(year FROM created_at) AS year, EXTRACT(month FROM created_at) AS month, EXTRACT(week FROM created_at) AS week_of_year, EXTRACT(dow FROM created_at) AS day_of_week, -- 衍生特徵:單價 price_cleaned / NULLIF(quantity_cleaned, 0) AS unit_price FROM base ) SELECT * FROM features; > **備註**:在 dbt 里可再使用 `macro` 定義常用轉換,例如日期拆解,減少重複程式碼。 --- ## 3.4 工具鏈結合:Airflow + dbt + Python | 工具 | 角色 | 典型任務 | |------|------|----------| | **Airflow** | 工作流調度 | 每日批次、週期性特徵計算 | | **dbt** | 數據轉換 | 資料清洗、特徵表構建 | | **Python** | 機器學習 / EDA | 迭代特徵、模型訓練 | ### 3.4.1 Airflow DAG 範例 python from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'data_eng', 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'cart_pipeline', default_args=default_args, description='清洗、特徵化購物車資料', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) # 1. 觸發 dbt run run_dbt = BashOperator( task_id='run_dbt', bash_command='cd /opt/dbt/cart && dbt run --profiles-dir .', dag=dag, ) # 2. 執行特徵工程腳本(可選) feature_eng = PythonOperator( task_id='feature_eng', python_callable=lambda: print('執行 Python 特徵工程'), dag=dag, ) run_dbt >> feature_eng ### 3.4.2 監控指標 | 指標 | 說明 | 監控方式 | |------|------|----------| | **資料完整度** | 所有日誌文件是否完整 | dbt tests + Airflow logs | | **錯誤率** | DAG 執行失敗比例 | Airflow SLA, PagerDuty | | **特徵更新延遲** | 最後更新時間與預定時間差距 | dbt metrics, Grafana | > **提示**:在 Airflow UI 可設定 `sla`,若特徵表更新逾時,自動觸發 Slack 或 PagerDuty 通知。 --- ## 3.5 真實案例:電商平台購物車資料處理 ### 3.5.1 背景 - 目標:預測客戶是否會在接下來的 7 天內完成結帳。 - 資料來源:Kafka 主題 `cart_events`(即時)+ nightly batch `orders` 表。 ### 3.5.2 流程概覽 mermaid flowchart TD A[Kafka topic: cart_events] -->|Stream| B[Kafka Connect → PostgreSQL staging] B -->|Airflow daily| C[dbt: cart_clean] C -->|dbt: cart_features| D[Feature Store (Snowflake)] D -->|Python model training| E[ML model (XGBoost)] E -->|Batch scoring| F[Prediction table] F -->|API → Frontend| G[Real‑time recommendation] ### 3.5.3 具體實作 1. **Kafka Connect**:將事件持久化至 PostgreSQL staging 表 `cart_raw`。 2. **dbt**:兩個 model,`cart_clean` 與 `cart_features`。 3. **Airflow**:每日 02:00 觸發 dbt,並執行 XGBoost 模型訓練。 4. **Feature Store**:使用 Snowflake 的 `FEATURE_STORE` schema,方便大規模查詢。 5. **API**:FastAPI 服務於前端呼叫,使用最新預測值。 > **結果**:預測準確率提升 12%,平均結帳時間縮短 18%。 --- ## 3.6 小結 | 章節重點 | 重要性 | |----------|--------| | 資料清洗 | 去除噪音、確保品質 | | EDA | 了解資料分佈、發現潛在關係 | | 特徵工程 | 轉化資料以提升模型效能 | | 工具鏈整合 | 確保自動化、可重複、可追蹤 | > **未來展望**:接下來的章節將深入機器學習模型選型、評估指標與部署流程,讓讀者能完整掌握從「數據」到「洞見」的整體體系。