返回目錄
A
數據之鏡:從資料洞察到決策智慧 - 第 2 章
第二章 資料收集與管道設計
發布於 2026-02-25 18:13
# 第二章 資料收集與管道設計
> **核心要點**:
> 1. 了解資料來源的多樣性與特性。 2. 掌握 API 與網頁爬蟲的實務技巧。 3. 設計高效、可維護的 ETL 流程。 4. 保障資料品質與合規性。
---
## 2.1 資料來源類型
| 資料來源 | 特性 | 典型應用 | 取得方式 |
|----------|------|----------|----------|
| 內部資料 | 企業內部生成、結構化 | 交易紀錄、使用者行為 | 直接存取資料庫、文件、日誌 |
| 外部 API | 即時更新、受限權限 | 天氣預報、社群媒體 | HTTP REST / GraphQL |
| 網頁爬蟲 | 非結構化、版面多變 | 商品價格、評論 | requests + BeautifulSoup / Scrapy |
| 大數據平台 | 高吞吐量、批次 | 日誌分析、物聯網感測 | Kafka / Spark Streaming |
| 公共數據集 | 由政府或研究機構提供 | 社會經濟指標 | 下載 CSV / JSON |
> **實務提醒**:在選擇資料來源時,請先評估
> - **資料完整度**:是否缺失關鍵欄位?
> - **更新頻率**:與業務目標是否匹配?
> - **授權與合規**:是否符合個資法、版權規範?
## 2.2 API 與網頁爬蟲實務
### 2.2.1 呼叫 RESTful API
python
import requests
from time import sleep
API_URL = "https://api.example.com/v1/items"
API_KEY = "YOUR_API_KEY"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Accept": "application/json"
}
params = {"page": 1, "limit": 100}
while True:
response = requests.get(API_URL, headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"API error {response.status_code}")
data = response.json()
# 存檔或送入管道
process_batch(data["items"])
if not data.get("has_next"):
break
params["page"] += 1
sleep(0.5) # 避免速率限制
> **關鍵要點**:
> - **速率限制**:觀察 `Retry-After` 或 `X-RateLimit-Remaining`。
> - **認證機制**:OAuth、API Key、JWT。
> - **錯誤處理**:重試策略、斷路器。
### 2.2.2 基礎網頁爬蟲
python
import requests
from bs4 import BeautifulSoup
import pandas as pd
BASE_URL = "https://www.example.com/products"
headers = {"User-Agent": "Mozilla/5.0 (compatible; DataBot/1.0)"}
def parse_product(html):
soup = BeautifulSoup(html, "html.parser")
title = soup.select_one("h1.product-title").text.strip()
price = soup.select_one("span.price").text.strip()
return {"title": title, "price": price}
products = []
for page in range(1, 6):
url = f"{BASE_URL}?page={page}"
r = requests.get(url, headers=headers)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
links = [a['href'] for a in soup.select("a.product-link")]
for link in links:
prod_page = requests.get(link, headers=headers)
prod_page.raise_for_status()
products.append(parse_product(prod_page.text))
df = pd.DataFrame(products)
df.to_csv("products.csv", index=False)
> **最佳實踐**:
> - **尊重 robots.txt**。
> - **並行化**:使用 `concurrent.futures` 或 Scrapy。
> - **反爬措施**:IP 代理、User‑Agent 變更、隨機延遲。
## 2.3 ETL 流程設計
ETL(Extract‑Transform‑Load)是資料工程的核心。良好的 ETL 能確保
資料從原始到最終使用的完整性、準確性與時效性。
### 2.3.1 取樣與測試
- **小批量測試**:先使用 `LIMIT 1000` 或 `sample()` 進行驗證。
- **模擬環境**:利用 Docker Compose 或 Minikube 部署暫存資料庫。
### 2.3.2 主要工作流工具
| 工具 | 優勢 | 典型用法 |
|------|------|----------|
| Apache Airflow | DAG 設計、排程、監控 | `@dag`、`@task`、`BranchPythonOperator` |
| Prefect | 任務流式、動態分支 | `Flow`、`Task`、`Retry` |
| dbt | SQL 轉換、版本控制 | `model`, `seed`, `snapshot` |
| Luigi | 任務鏈、依賴管理 | `Task`, `requires`, `output` |
### 2.3.3 典型 ETL 例子:Airflow DAG
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_product_price_update',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
def extract(**context):
# 呼叫 API 或爬蟲
return fetch_product_prices()
def transform(**context):
raw = context['ti'].xcom_pull(task_ids='extract')
# 清洗、格式化
return clean_prices(raw)
def load(**context):
cleaned = context['ti'].xcom_pull(task_ids='transform')
# 寫入資料庫
db_insert(cleaned)
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)
extract_task >> transform_task >> load_task
> **關鍵技巧**:
> - **XCom**:安全地傳遞資料。
> - **Logging**:將 `context['ti'].log` 與 Airflow UI 連結。
> - **故障恢復**:使用 `trigger_rule='all_done'` 保證後續清理任務執行。
## 2.4 資料治理與合規
- **資料分類**:使用 `PII`, `PHI`, `SENSITIVE` 標記。
- **資料線索**:保留原始資料庫的 `audit_log` 或 `transaction_id`。
- **存取控制**:基於角色的存取(RBAC)。
- **數據保留政策**:根據資料用途設定保留天數,並自動清理過期資料。
### 2.4.1 個人資訊保護
- **資料最小化**:只收集業務必要的欄位。
- **匿名化 / 偽匿名化**:使用 hashing、k‑anonymity。
- **合約與使用條款**:檢查第三方資料的 `Terms of Service`。
## 2.5 實務案例分享
| 企業 | 資料來源 | 主要 ETL 工具 | 成果 |
|------|----------|--------------|------|
| 食品零售商 | 內部 POS、外部價格 API | Airflow + dbt | 24 小時內商品價格同步,提升價格競爭力 20% |
| 社群平台 | 內部日誌、Twitter API | Prefect + Snowflake | 30 秒內即時反饋熱門話題,提升廣告投放 ROI 15% |
| 旅遊公司 | 天氣 API、Google Maps | Airflow + dbt | 確保行程規劃資料正確,客戶投訴率下降 25% |
## 2.5 小結
- **資料來源多樣化**:先評估品質與合規,再決定採集方式。
- **API 呼叫**:速率限制、認證、錯誤處理是關鍵。
- **網頁爬蟲**:並行化、代理與反爬措施是提升效率的關鍵。
- **ETL 工具**:Airflow、Prefect 等能協助設計可監控、可重試的工作流。
- **資料治理**:合規性與資料品質並重,否則即使技術再好也會成為風險。
> **下一步**:在第三章中,我們將進一步探討如何確保
> ETL 後資料的**品質檢查**與**元資料管理**,以及如何在
> 大數據平台上擴充資料管道。
---