返回目錄
A
數據洞察實戰:從數據採集到模型部署的完整路徑 - 第 2 章
第二章:數據採集與管道建構
發布於 2026-02-27 21:35
# 第二章:數據採集與管道建構
本章將帶領讀者從「為什麼需要收集數據」切入,說明如何設計與實作一條高效、可擴充且可靠的數據管道。內容涵蓋資料抓取、API、批次與實時流、ETL 工具與管道設計,並配以實務案例與可執行的程式碼範例。
---
## 1. 為何需要嚴謹的數據採集
| 需求 | 採集方式 | 工具 | 成果 |
|---|---|---|---|
| 需求分析 | 瀏覽紀錄 | Logstash | 事件流 |
| 交易資料 | API 取回 | `requests` | 交易表 |
| 用戶行為 | 事件觸發 | Kafka | 事件訊息 |
| 大規模資料 | 批次同步 | Airflow | 資料湖 |
> **要點**:
> * 資料來源多樣:結構化資料(資料庫、CRM)、非結構化資料(聊天記錄、圖片)以及半結構化資料(JSON、XML)。
> * 資料流向不僅是「存儲」,還包括「清洗、轉換、聚合」等加工流程。
## 2. 資料抓取(Data Harvesting)
### 2.1 API 抓取
多數 SaaS 供應商提供 RESTful 或 GraphQL API,通常帶有 **OAuth2** 認證。以下為使用 Python `requests` 取得 Zendesk 支援票務的範例:
```python
import requests, json, os
API_ENDPOINT = "https://{subdomain}.zendesk.com/api/v2/tickets.json"
API_TOKEN = os.getenv("ZENDESK_TOKEN")
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
response = requests.get(API_ENDPOINT, headers=headers)
response.raise_for_status()
tickets = response.json()["tickets"]
print(f"取得 {len(tickets)} 筆票務資料")
```
> **備註**:
> * 使用 `requests.Session()` 以重用連線,降低網路開銷。
> * 若資料量較大,請搭配分頁機制(`page`、`per_page`)或使用 `Zendesk SDK`。
### 2.2 網頁抓取(Web Scraping)
對於缺乏 API 的公共資料,可利用 **BeautifulSoup** 與 **Scrapy** 進行抓取:
```python
from bs4 import BeautifulSoup
import requests
url = "https://example.com/products"
resp = requests.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
products = []
for card in soup.select(".product-card"):
name = card.select_one("h2.title").text.strip()
price = card.select_one("span.price").text.strip()
products.append({"name": name, "price": price})
print(products[:3])
```
> **注意**:抓取前務必確認網站的 robots.txt 與使用條款,避免違法。
## 3. 批次與實時流(Batch vs Streaming)
| 特色 | 批次 | 實時流 |
|---|---|---|
| 延遲 | 分鐘-小時 | 毫秒-秒 |
| 資料量 | 大 | 小-中 |
| 資料一致性 | 最終一致 | 事件序列 |
| 工具 | Airflow, dbt, Luigi | Kafka, Flink, Spark Structured Streaming |
### 3.1 批次管道(Airflow 範例)
Airflow 允許以 DAG 定義任務,以下為簡易的數據抽取‑轉換‑加載(ETL)流程:
```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data_eng",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG("daily_etl", default_args=default_args, schedule_interval="@daily") as dag:
extract = BashOperator(
task_id="extract_data",
bash_command="python extract.py"
)
transform = BashOperator(
task_id="transform_data",
bash_command="python transform.py"
)
load = BashOperator(
task_id="load_to_snowflake",
bash_command="python load.py"
)
extract >> transform >> load
```
> **建議**:
> * 使用 **S3** 或 **GCS** 作為暫存層,減少對源系統的頻繁請求。
> * 在 Airflow DAG 中加入 **XCom** 以傳遞中間結果。
### 3.2 實時流管道(Kafka + Spark Structured Streaming 範例)
Kafka 作為分散式訊息代理,Spark Structured Streaming 能即時處理流資料:
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
spark = SparkSession.builder.appName("KafkaStreamingDemo").getOrCreate()
schema = "user_id INT, event_type STRING, ts TIMESTAMP"
streaming_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker:9092")
.option("subscribe", "user-events")
.load()
)
parsed_df = (
streaming_df.selectExpr("CAST(value AS STRING)")
.select(from_json("value", schema).alias("data"))
.select("data.*")
)
query = (
parsed_df.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
query.awaitTermination()
```
> **關鍵**:
> * **Schema** 需要與 Kafka 訊息一致,避免資料類型錯誤。
> * 監控 `offset lag` 與 `processing delay`,確保流資料不會滯後。
## 4. ETL 工具與自動化
| 工具 | 特色 | 典型工作流程 |
|---|---|---|
| **Airflow** | DAG、可視化 | 週期性批次工作 |
| **dbt** | SQL 轉換、測試 | 資料湖/資料倉儲轉換 |
| **Luigi** | Python 模組化 | 依賴管理 |
| **Fivetran** | 連接器即插即用 | 低維護資料整合 |
### 4.1 dbt 與資料湖
dbt(data build tool)允許使用 SQL 來編寫轉換腳本,並自動管理依賴。以 **Amazon Redshift** 為例:
```sql
-- models/stg_customers.sql
SELECT
id,
name,
email,
created_at::timestamp AS signup_ts
FROM raw.customers
WHERE email IS NOT NULL;
```
**dbt 的執行流程**:
1. `dbt run` 觸發所有模型執行。
2. `dbt test` 驗證資料品質。
3. `dbt docs generate` 產生資料線圖。
> **優點**:
> * 結合版本控制,所有轉換腳本皆為 Git 可追蹤。
> * 透過 `dbt docs` 可視化資料血緣,方便新人上手。
## 5. 管道設計原則
| 原則 | 內涵 | 實踐方式 |
|---|---|---|
| **可維護性** | 易讀、易修改 | 單一責任、模組化、注釋豐富 |
| **可擴充性** | 隨資料量擴大 | 分區存儲、分散式處理 |
| **可靠性** | 容錯、重試 | 重試機制、監控告警 |
| **安全性** | 資料保密 | 加密存儲、IAM 控制 |
| **可觀察性** | 追蹤、日誌 | 日誌、指標、追蹤元件 |
### 5.1 實際案例:電商平台
> **背景**:每日 10 萬筆訂單,需即時更新庫存並生成報表。
>
> **解決方案**:
> 1. **資料來源**:
> * 交易系統通過 Kafka 推送 `order_created` 事件。
> * 庫存系統通過 REST API 提供查詢。
> 2. **資料流**:
> * **Kafka**:接收訂單事件。
> * **Spark Structured Streaming**:計算每日銷售總額、熱門商品。
> * **Airflow**:每 6 小時更新資料湖(S3)中的 `orders` 與 `inventory` 表。
> 3. **ETL**:
> * 使用 **dbt** 將 `orders_raw` 轉為 `orders_clean`,進行資料品質檢查。
> * `inventory` 表使用 **Fivetran** 直接同步至 Snowflake。
> 4. **觀察**:
> * **Prometheus** 收集 Spark 指標、Airflow 任務失敗率。
> * **Grafana** 可視化實時訂單量與庫存缺貨率。
>
> **結果**:
> * 數據延遲 < 2 秒,確保報表即時。
> * 每個管道均有單元測試,確保資料完整。
>
> **教訓**:在設計時須考慮「最小可行管道」與「逐步擴充」的平衡。
## 6. 監控與維運
| 監控指標 | 工具 | 作用 |
|---|---|---|
| 任務成功率 | Airflow UI | 立即檢視失敗原因 |
| 延遲時間 | Prometheus (Kafka lag) | 監測流資料滯後 |
| 資料質量 | Great Expectations | 內嵌測試 |
| 資源使用 | Grafana | 成本控制 |
> **實務**:在 Airflow DAG 中加入 `on_failure_callback`,自動發送 Slack 通知;在 Kafka topic 上設置 `consumer_lag` 監控,若滯後超過閾值即觸發故障修復腳本。
## 7. 總結
* **數據採集**:從 API、Web Scraping、Kafka、資料庫三個層面說明實作。
* **批次 vs 實時**:明確兩者優缺點,並提供 Airflow 與 Spark Streaming 範例。
* **ETL 工具**:Airflow 與 dbt 的協作模式。
* **管道設計原則**:維護性、擴充性、可靠性、安全性與可觀察性。
* **實際案例**:電商平台的整體流程圖與實踐。
在完成本章後,讀者應能:
1. 評估業務需求,選擇合適的資料來源與抓取方式。
2. 設計批次與實時雙重管道,確保資料即時與批次一致。
3. 使用 Airflow、dbt 及 Kafka 進行資料工程。
4. 以監控指標確保管道穩定運行,並快速排障。
---
> **延伸閱讀**:
> * *Designing Data-Intensive Applications* by Martin Kleppmann
> * *Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing* by Tyler Akidau et al.