聊天視窗

數據之鏡:從資料洞察到決策智慧 - 第 2 章

第二章 資料收集與管道設計

發布於 2026-02-25 18:13

# 第二章 資料收集與管道設計 > **核心要點**: > 1. 了解資料來源的多樣性與特性。 2. 掌握 API 與網頁爬蟲的實務技巧。 3. 設計高效、可維護的 ETL 流程。 4. 保障資料品質與合規性。 --- ## 2.1 資料來源類型 | 資料來源 | 特性 | 典型應用 | 取得方式 | |----------|------|----------|----------| | 內部資料 | 企業內部生成、結構化 | 交易紀錄、使用者行為 | 直接存取資料庫、文件、日誌 | | 外部 API | 即時更新、受限權限 | 天氣預報、社群媒體 | HTTP REST / GraphQL | | 網頁爬蟲 | 非結構化、版面多變 | 商品價格、評論 | requests + BeautifulSoup / Scrapy | | 大數據平台 | 高吞吐量、批次 | 日誌分析、物聯網感測 | Kafka / Spark Streaming | | 公共數據集 | 由政府或研究機構提供 | 社會經濟指標 | 下載 CSV / JSON | > **實務提醒**:在選擇資料來源時,請先評估 > - **資料完整度**:是否缺失關鍵欄位? > - **更新頻率**:與業務目標是否匹配? > - **授權與合規**:是否符合個資法、版權規範? ## 2.2 API 與網頁爬蟲實務 ### 2.2.1 呼叫 RESTful API python import requests from time import sleep API_URL = "https://api.example.com/v1/items" API_KEY = "YOUR_API_KEY" headers = { "Authorization": f"Bearer {API_KEY}", "Accept": "application/json" } params = {"page": 1, "limit": 100} while True: response = requests.get(API_URL, headers=headers, params=params) if response.status_code != 200: raise Exception(f"API error {response.status_code}") data = response.json() # 存檔或送入管道 process_batch(data["items"]) if not data.get("has_next"): break params["page"] += 1 sleep(0.5) # 避免速率限制 > **關鍵要點**: > - **速率限制**:觀察 `Retry-After` 或 `X-RateLimit-Remaining`。 > - **認證機制**:OAuth、API Key、JWT。 > - **錯誤處理**:重試策略、斷路器。 ### 2.2.2 基礎網頁爬蟲 python import requests from bs4 import BeautifulSoup import pandas as pd BASE_URL = "https://www.example.com/products" headers = {"User-Agent": "Mozilla/5.0 (compatible; DataBot/1.0)"} def parse_product(html): soup = BeautifulSoup(html, "html.parser") title = soup.select_one("h1.product-title").text.strip() price = soup.select_one("span.price").text.strip() return {"title": title, "price": price} products = [] for page in range(1, 6): url = f"{BASE_URL}?page={page}" r = requests.get(url, headers=headers) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") links = [a['href'] for a in soup.select("a.product-link")] for link in links: prod_page = requests.get(link, headers=headers) prod_page.raise_for_status() products.append(parse_product(prod_page.text)) df = pd.DataFrame(products) df.to_csv("products.csv", index=False) > **最佳實踐**: > - **尊重 robots.txt**。 > - **並行化**:使用 `concurrent.futures` 或 Scrapy。 > - **反爬措施**:IP 代理、User‑Agent 變更、隨機延遲。 ## 2.3 ETL 流程設計 ETL(Extract‑Transform‑Load)是資料工程的核心。良好的 ETL 能確保 資料從原始到最終使用的完整性、準確性與時效性。 ### 2.3.1 取樣與測試 - **小批量測試**:先使用 `LIMIT 1000` 或 `sample()` 進行驗證。 - **模擬環境**:利用 Docker Compose 或 Minikube 部署暫存資料庫。 ### 2.3.2 主要工作流工具 | 工具 | 優勢 | 典型用法 | |------|------|----------| | Apache Airflow | DAG 設計、排程、監控 | `@dag`、`@task`、`BranchPythonOperator` | | Prefect | 任務流式、動態分支 | `Flow`、`Task`、`Retry` | | dbt | SQL 轉換、版本控制 | `model`, `seed`, `snapshot` | | Luigi | 任務鏈、依賴管理 | `Task`, `requires`, `output` | ### 2.3.3 典型 ETL 例子:Airflow DAG python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'data_team', 'depends_on_past': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), } with DAG( dag_id='daily_product_price_update', default_args=default_args, schedule_interval='0 2 * * *', start_date=datetime(2023, 1, 1), catchup=False, ) as dag: def extract(**context): # 呼叫 API 或爬蟲 return fetch_product_prices() def transform(**context): raw = context['ti'].xcom_pull(task_ids='extract') # 清洗、格式化 return clean_prices(raw) def load(**context): cleaned = context['ti'].xcom_pull(task_ids='transform') # 寫入資料庫 db_insert(cleaned) extract_task = PythonOperator(task_id='extract', python_callable=extract) transform_task = PythonOperator(task_id='transform', python_callable=transform) load_task = PythonOperator(task_id='load', python_callable=load) extract_task >> transform_task >> load_task > **關鍵技巧**: > - **XCom**:安全地傳遞資料。 > - **Logging**:將 `context['ti'].log` 與 Airflow UI 連結。 > - **故障恢復**:使用 `trigger_rule='all_done'` 保證後續清理任務執行。 ## 2.4 資料治理與合規 - **資料分類**:使用 `PII`, `PHI`, `SENSITIVE` 標記。 - **資料線索**:保留原始資料庫的 `audit_log` 或 `transaction_id`。 - **存取控制**:基於角色的存取(RBAC)。 - **數據保留政策**:根據資料用途設定保留天數,並自動清理過期資料。 ### 2.4.1 個人資訊保護 - **資料最小化**:只收集業務必要的欄位。 - **匿名化 / 偽匿名化**:使用 hashing、k‑anonymity。 - **合約與使用條款**:檢查第三方資料的 `Terms of Service`。 ## 2.5 實務案例分享 | 企業 | 資料來源 | 主要 ETL 工具 | 成果 | |------|----------|--------------|------| | 食品零售商 | 內部 POS、外部價格 API | Airflow + dbt | 24 小時內商品價格同步,提升價格競爭力 20% | | 社群平台 | 內部日誌、Twitter API | Prefect + Snowflake | 30 秒內即時反饋熱門話題,提升廣告投放 ROI 15% | | 旅遊公司 | 天氣 API、Google Maps | Airflow + dbt | 確保行程規劃資料正確,客戶投訴率下降 25% | ## 2.5 小結 - **資料來源多樣化**:先評估品質與合規,再決定採集方式。 - **API 呼叫**:速率限制、認證、錯誤處理是關鍵。 - **網頁爬蟲**:並行化、代理與反爬措施是提升效率的關鍵。 - **ETL 工具**:Airflow、Prefect 等能協助設計可監控、可重試的工作流。 - **資料治理**:合規性與資料品質並重,否則即使技術再好也會成為風險。 > **下一步**:在第三章中,我們將進一步探討如何確保 > ETL 後資料的**品質檢查**與**元資料管理**,以及如何在 > 大數據平台上擴充資料管道。 ---