返回目錄
A
數據科學實務:從數據蒐集到模型部署的完整流程 - 第 3 章
第 3 章:資料工程基礎
發布於 2026-02-22 17:58
# 第 3 章:資料工程基礎
> **目標**:說明資料管道設計理念、ETL/ELT 的差異、資料湖與資料倉儲的定位,並以 Airflow、dbt 與 Snowflake 為例,示範實際部署腳本與最佳實踐。
## 3.1 資料管道(Data Pipeline)概念
| 項目 | 說明 |
|------|------|
| **資料來源** | API、爬蟲、資料庫、感測器、Kafka、資料湖入口等 |
| **轉換 (Transform)** | 資料清洗、格式化、聚合、編碼、特徵工程 |
| **載入 (Load)** | 寫入資料倉儲、資料湖、NoSQL 或搜尋引擎 |
| **運行方式** | 批次 (Batch)、流式 (Streaming)、混合 (Hybrid) |
| **關鍵挑戰** | 延遲、可擴展性、錯誤回溯、監控與治理 |
> **設計原則**
> - **可擴展性**:水平擴充處理節點或使用分佈式計算引擎。
> - **容錯性**:任務失敗時能夠重試、回溯或手動介入。
> - **監控性**:可視化監控任務執行、延遲與錯誤。
> - **治理**:資料品質檢查、合規審核、審計日誌。
## 3.2 ETL vs ELT
| | **ETL** | **ELT** |
|---|---|---|
| **定義** | *Extract → Transform → Load*(提取 → 轉換 → 載入) | *Extract → Load → Transform*(提取 → 載入 → 轉換) |
| **優點** | 資料在載入前即已經乾淨,減少倉儲壓力 | 利用倉儲強大算力處理大規模資料,延遲低 |
| **缺點** | 轉換流程耗時,需額外資料庫 | 需要更高算力的倉儲,且轉換需在倉儲內進行 |
| **適用場景** | 小型專案、傳統資料倉儲 | 大數據、雲端資料湖與資料倉儲混合架構 |
> **實務建議**:在 Snowflake、BigQuery 等雲端倉儲環境,往往採用 ELT,結合 dbt 做資料建模與轉換。
## 3.3 資料湖 vs 資料倉儲
| | **資料湖** | **資料倉儲** |
|---|---|---|
| **資料類型** | 原始結構、半結構、非結構 | 結構化(表格) |
| **儲存方式** | 零成本物件存儲 (S3、Azure Blob) | 儲存引擎 (Snowflake、Redshift) |
| **查詢性能** | 需額外編譯或 ETL 轉換 | 優化為查詢性能 |
| **使用者** | 數據科學家、資料工程師 | 商業分析師、BI 工具 |
| **治理** | 需要專門的資料治理工具 | 原生資料治理功能 |
> **混合架構**:將資料湖作為原始資料的存儲層,利用 dbt 將需要分析的資料轉換成資料倉儲表。
## 3.4 工具實作:Airflow、dbt、Snowflake
### 3.4.1 Airflow 基礎 DAG 範例
python
# dags/etl_sales_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'data_eng',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-team@example.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'etl_sales_pipeline',
default_args=default_args,
description='每日銷售資料 ETL',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# 1. Extract: 從 S3 讀取 JSON 檔案
extract = SparkSubmitOperator(
task_id='extract_sales',
application='s3://my-bucket/scripts/extract_sales.py',
name='extract_sales',
conn_id='spark_default',
)
# 2. Transform: 轉成 Parquet 並寫入 Snowflake staging
transform = SparkSubmitOperator(
task_id='transform_sales',
application='s3://my-bucket/scripts/transform_sales.py',
name='transform_sales',
conn_id='spark_default',
)
# 3. Load: 透過 Snowflake 進行資料表更新
load = SnowflakeOperator(
task_id='load_sales',
sql='sql/load_sales_into_dw.sql',
snowflake_conn_id='snowflake_default',
)
extract >> transform >> load
> **重點**:
> - Airflow 提供排程、重試、失敗通知與任務依賴。
> - 透過 Spark 提供分佈式處理,適合大規模轉換。
> - SnowflakeOperator 直接執行 SQL,減少資料遷移成本。
### 3.4.2 dbt 轉換模型
yaml
# dbt_project.yml
name: "sales_dw"
version: "1.0"
config-version: 2
profile: "snowflake"
source-paths: ["models/sources"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]
sql
-- models/sales_sales_summary.sql
{{ config(
materialized='table',
tags=['sales'],
) }}
WITH source AS (
SELECT * FROM {{ ref('sales_raw') }}
)
SELECT
date_key,
SUM(amount) AS total_sales,
SUM(quantity) AS total_qty
FROM source
GROUP BY date_key;
> dbt 讓資料工程師能以 SQL 撰寫轉換模型,並自動產生 DAG、依賴關係與測試。
### 3.4.3 Snowflake 資料庫連線設定
| 參數 | 說明 |
|------|------|
| **warehouse** | 雙倍容量倉儲(Snowflake) |
| **database** | `DATA_WAREHOUSE` |
| **schema** | `PUBLIC` 或 `DW_SALES` |
| **role** | `SYSADMIN`、`DATA_ENGINEER` |
| **user** | `data_eng_user` |
sql
-- sql/load_sales_into_dw.sql
BEGIN
-- 刪除舊資料
DELETE FROM dw_sales
WHERE date_key = (SELECT MAX(date_key) FROM staging.sales_raw);
-- Insert 新資料
INSERT INTO dw_sales
SELECT * FROM staging.sales_raw;
END;
> **備註**:
> - Snowflake 的零成本物件存儲可直接連結 S3,減少資料傳輸成本。
> - dbt 內建測試,可在 `tests/` 資料夾定義 `schema.yml` 進行資料品質檢查。
## 3.5 部署與監控最佳實踐
| 層面 | 具體做法 |
|------|----------|
| **CI/CD** | 使用 GitHub Actions 或 Bitbucket Pipelines 推送 DAG、dbt 版本,並自動觸發 Airflow 測試。 |
| **測試** | dbt `schema.yml` + Great Expectations 交叉驗證,Airflow 任務失敗即回報 Slack 或 Teams。 |
| **版本控制** | 所有腳本、SQL、dbt 模型皆存於 Git;使用 GitFlow 分支管理。 |
| **資料治理** | Snowflake 的 `INFORMATION_SCHEMA` 產生審計表,dbt 內建 `docs.generate` 生成資料字典。 |
| **監控** | Airflow UI、Snowflake 的 Query History、Datadog 或 Grafana 整合。 |
## 3.6 案例:混合資料湖 + 資料倉儲
資料湖 (S3) ──► Spark (Extract & Transform) ──► Snowflake Staging (Raw) ──► dbt ──► Snowflake DW (Structured)
> **說明**:
> - S3 提供零成本物件存儲,容納原始 JSON、CSV、圖片等。
> - Spark 在資料湖階層做初步清洗與 Parquet 編碼。
> - Snowflake staging 匯入 Parquet,保留原始資料。
> - dbt 以 SQL 方式在 Snowflake 內進行 ETL,產生分析表。
> - 完整的資料治理流程可在 dbt 測試與 Snowflake 的權限設定中完成。
## 3.7 小結
- **資料管道** 是資料從產生到使用的完整流程,設計時需兼顧延遲、可擴展性與治理。
- **ETL vs ELT**:選擇取決於資料規模與倉儲算力,雲端環境常採 ELT。
- **資料湖 vs 資料倉儲**:資料湖是「原始」層,資料倉儲是「結構化」層,兩者可結合形成混合架構。
- **Airflow + Spark + Snowflake + dbt**:提供排程、分佈式轉換、強大算力與資料建模,組成高效、可維護、可治理的資料管道。
- **最佳實踐**:CI/CD、版本控制、測試、監控、審計日誌,能確保資料品質與合規性。
> **實作練習**:請將上述 Airflow DAG 與 dbt 模型佈署至您所在的雲端環境,並確認每天 6:00 AM 成功完成。若任務失敗,觀察 Airflow UI、Datadog 指標與 Snowflake Query History,並撰寫報告說明失敗原因與復原步驟。