聊天視窗

數據科學實務:從數據蒐集到模型部署的完整流程 - 第 3 章

第 3 章:資料工程基礎

發布於 2026-02-22 17:58

# 第 3 章:資料工程基礎 > **目標**:說明資料管道設計理念、ETL/ELT 的差異、資料湖與資料倉儲的定位,並以 Airflow、dbt 與 Snowflake 為例,示範實際部署腳本與最佳實踐。 ## 3.1 資料管道(Data Pipeline)概念 | 項目 | 說明 | |------|------| | **資料來源** | API、爬蟲、資料庫、感測器、Kafka、資料湖入口等 | | **轉換 (Transform)** | 資料清洗、格式化、聚合、編碼、特徵工程 | | **載入 (Load)** | 寫入資料倉儲、資料湖、NoSQL 或搜尋引擎 | | **運行方式** | 批次 (Batch)、流式 (Streaming)、混合 (Hybrid) | | **關鍵挑戰** | 延遲、可擴展性、錯誤回溯、監控與治理 | > **設計原則** > - **可擴展性**:水平擴充處理節點或使用分佈式計算引擎。 > - **容錯性**:任務失敗時能夠重試、回溯或手動介入。 > - **監控性**:可視化監控任務執行、延遲與錯誤。 > - **治理**:資料品質檢查、合規審核、審計日誌。 ## 3.2 ETL vs ELT | | **ETL** | **ELT** | |---|---|---| | **定義** | *Extract → Transform → Load*(提取 → 轉換 → 載入) | *Extract → Load → Transform*(提取 → 載入 → 轉換) | | **優點** | 資料在載入前即已經乾淨,減少倉儲壓力 | 利用倉儲強大算力處理大規模資料,延遲低 | | **缺點** | 轉換流程耗時,需額外資料庫 | 需要更高算力的倉儲,且轉換需在倉儲內進行 | | **適用場景** | 小型專案、傳統資料倉儲 | 大數據、雲端資料湖與資料倉儲混合架構 | > **實務建議**:在 Snowflake、BigQuery 等雲端倉儲環境,往往採用 ELT,結合 dbt 做資料建模與轉換。 ## 3.3 資料湖 vs 資料倉儲 | | **資料湖** | **資料倉儲** | |---|---|---| | **資料類型** | 原始結構、半結構、非結構 | 結構化(表格) | | **儲存方式** | 零成本物件存儲 (S3、Azure Blob) | 儲存引擎 (Snowflake、Redshift) | | **查詢性能** | 需額外編譯或 ETL 轉換 | 優化為查詢性能 | | **使用者** | 數據科學家、資料工程師 | 商業分析師、BI 工具 | | **治理** | 需要專門的資料治理工具 | 原生資料治理功能 | > **混合架構**:將資料湖作為原始資料的存儲層,利用 dbt 將需要分析的資料轉換成資料倉儲表。 ## 3.4 工具實作:Airflow、dbt、Snowflake ### 3.4.1 Airflow 基礎 DAG 範例 python # dags/etl_sales_pipeline.py from datetime import datetime, timedelta from airflow import DAG from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator default_args = { 'owner': 'data_eng', 'depends_on_past': False, 'email_on_failure': True, 'email': ['data-team@example.com'], 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( 'etl_sales_pipeline', default_args=default_args, description='每日銷售資料 ETL', schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False, ) as dag: # 1. Extract: 從 S3 讀取 JSON 檔案 extract = SparkSubmitOperator( task_id='extract_sales', application='s3://my-bucket/scripts/extract_sales.py', name='extract_sales', conn_id='spark_default', ) # 2. Transform: 轉成 Parquet 並寫入 Snowflake staging transform = SparkSubmitOperator( task_id='transform_sales', application='s3://my-bucket/scripts/transform_sales.py', name='transform_sales', conn_id='spark_default', ) # 3. Load: 透過 Snowflake 進行資料表更新 load = SnowflakeOperator( task_id='load_sales', sql='sql/load_sales_into_dw.sql', snowflake_conn_id='snowflake_default', ) extract >> transform >> load > **重點**: > - Airflow 提供排程、重試、失敗通知與任務依賴。 > - 透過 Spark 提供分佈式處理,適合大規模轉換。 > - SnowflakeOperator 直接執行 SQL,減少資料遷移成本。 ### 3.4.2 dbt 轉換模型 yaml # dbt_project.yml name: "sales_dw" version: "1.0" config-version: 2 profile: "snowflake" source-paths: ["models/sources"] analysis-paths: ["analysis"] macro-paths: ["macros"] sql -- models/sales_sales_summary.sql {{ config( materialized='table', tags=['sales'], ) }} WITH source AS ( SELECT * FROM {{ ref('sales_raw') }} ) SELECT date_key, SUM(amount) AS total_sales, SUM(quantity) AS total_qty FROM source GROUP BY date_key; > dbt 讓資料工程師能以 SQL 撰寫轉換模型,並自動產生 DAG、依賴關係與測試。 ### 3.4.3 Snowflake 資料庫連線設定 | 參數 | 說明 | |------|------| | **warehouse** | 雙倍容量倉儲(Snowflake) | | **database** | `DATA_WAREHOUSE` | | **schema** | `PUBLIC` 或 `DW_SALES` | | **role** | `SYSADMIN`、`DATA_ENGINEER` | | **user** | `data_eng_user` | sql -- sql/load_sales_into_dw.sql BEGIN -- 刪除舊資料 DELETE FROM dw_sales WHERE date_key = (SELECT MAX(date_key) FROM staging.sales_raw); -- Insert 新資料 INSERT INTO dw_sales SELECT * FROM staging.sales_raw; END; > **備註**: > - Snowflake 的零成本物件存儲可直接連結 S3,減少資料傳輸成本。 > - dbt 內建測試,可在 `tests/` 資料夾定義 `schema.yml` 進行資料品質檢查。 ## 3.5 部署與監控最佳實踐 | 層面 | 具體做法 | |------|----------| | **CI/CD** | 使用 GitHub Actions 或 Bitbucket Pipelines 推送 DAG、dbt 版本,並自動觸發 Airflow 測試。 | | **測試** | dbt `schema.yml` + Great Expectations 交叉驗證,Airflow 任務失敗即回報 Slack 或 Teams。 | | **版本控制** | 所有腳本、SQL、dbt 模型皆存於 Git;使用 GitFlow 分支管理。 | | **資料治理** | Snowflake 的 `INFORMATION_SCHEMA` 產生審計表,dbt 內建 `docs.generate` 生成資料字典。 | | **監控** | Airflow UI、Snowflake 的 Query History、Datadog 或 Grafana 整合。 | ## 3.6 案例:混合資料湖 + 資料倉儲 資料湖 (S3) ──► Spark (Extract & Transform) ──► Snowflake Staging (Raw) ──► dbt ──► Snowflake DW (Structured) > **說明**: > - S3 提供零成本物件存儲,容納原始 JSON、CSV、圖片等。 > - Spark 在資料湖階層做初步清洗與 Parquet 編碼。 > - Snowflake staging 匯入 Parquet,保留原始資料。 > - dbt 以 SQL 方式在 Snowflake 內進行 ETL,產生分析表。 > - 完整的資料治理流程可在 dbt 測試與 Snowflake 的權限設定中完成。 ## 3.7 小結 - **資料管道** 是資料從產生到使用的完整流程,設計時需兼顧延遲、可擴展性與治理。 - **ETL vs ELT**:選擇取決於資料規模與倉儲算力,雲端環境常採 ELT。 - **資料湖 vs 資料倉儲**:資料湖是「原始」層,資料倉儲是「結構化」層,兩者可結合形成混合架構。 - **Airflow + Spark + Snowflake + dbt**:提供排程、分佈式轉換、強大算力與資料建模,組成高效、可維護、可治理的資料管道。 - **最佳實踐**:CI/CD、版本控制、測試、監控、審計日誌,能確保資料品質與合規性。 > **實作練習**:請將上述 Airflow DAG 與 dbt 模型佈署至您所在的雲端環境,並確認每天 6:00 AM 成功完成。若任務失敗,觀察 Airflow UI、Datadog 指標與 Snowflake Query History,並撰寫報告說明失敗原因與復原步驟。