聊天視窗

數據洞察實戰:從數據採集到模型部署的完整路徑 - 第 2 章

第二章:數據採集與管道建構

發布於 2026-02-27 21:35

# 第二章:數據採集與管道建構 本章將帶領讀者從「為什麼需要收集數據」切入,說明如何設計與實作一條高效、可擴充且可靠的數據管道。內容涵蓋資料抓取、API、批次與實時流、ETL 工具與管道設計,並配以實務案例與可執行的程式碼範例。 --- ## 1. 為何需要嚴謹的數據採集 | 需求 | 採集方式 | 工具 | 成果 | |---|---|---|---| | 需求分析 | 瀏覽紀錄 | Logstash | 事件流 | | 交易資料 | API 取回 | `requests` | 交易表 | | 用戶行為 | 事件觸發 | Kafka | 事件訊息 | | 大規模資料 | 批次同步 | Airflow | 資料湖 | > **要點**: > * 資料來源多樣:結構化資料(資料庫、CRM)、非結構化資料(聊天記錄、圖片)以及半結構化資料(JSON、XML)。 > * 資料流向不僅是「存儲」,還包括「清洗、轉換、聚合」等加工流程。 ## 2. 資料抓取(Data Harvesting) ### 2.1 API 抓取 多數 SaaS 供應商提供 RESTful 或 GraphQL API,通常帶有 **OAuth2** 認證。以下為使用 Python `requests` 取得 Zendesk 支援票務的範例: ```python import requests, json, os API_ENDPOINT = "https://{subdomain}.zendesk.com/api/v2/tickets.json" API_TOKEN = os.getenv("ZENDESK_TOKEN") headers = { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json" } response = requests.get(API_ENDPOINT, headers=headers) response.raise_for_status() tickets = response.json()["tickets"] print(f"取得 {len(tickets)} 筆票務資料") ``` > **備註**: > * 使用 `requests.Session()` 以重用連線,降低網路開銷。 > * 若資料量較大,請搭配分頁機制(`page`、`per_page`)或使用 `Zendesk SDK`。 ### 2.2 網頁抓取(Web Scraping) 對於缺乏 API 的公共資料,可利用 **BeautifulSoup** 與 **Scrapy** 進行抓取: ```python from bs4 import BeautifulSoup import requests url = "https://example.com/products" resp = requests.get(url) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") products = [] for card in soup.select(".product-card"): name = card.select_one("h2.title").text.strip() price = card.select_one("span.price").text.strip() products.append({"name": name, "price": price}) print(products[:3]) ``` > **注意**:抓取前務必確認網站的 robots.txt 與使用條款,避免違法。 ## 3. 批次與實時流(Batch vs Streaming) | 特色 | 批次 | 實時流 | |---|---|---| | 延遲 | 分鐘-小時 | 毫秒-秒 | | 資料量 | 大 | 小-中 | | 資料一致性 | 最終一致 | 事件序列 | | 工具 | Airflow, dbt, Luigi | Kafka, Flink, Spark Structured Streaming | ### 3.1 批次管道(Airflow 範例) Airflow 允許以 DAG 定義任務,以下為簡易的數據抽取‑轉換‑加載(ETL)流程: ```python from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { "owner": "data_eng", "depends_on_past": False, "start_date": datetime(2023, 1, 1), "retries": 1, "retry_delay": timedelta(minutes=5), } with DAG("daily_etl", default_args=default_args, schedule_interval="@daily") as dag: extract = BashOperator( task_id="extract_data", bash_command="python extract.py" ) transform = BashOperator( task_id="transform_data", bash_command="python transform.py" ) load = BashOperator( task_id="load_to_snowflake", bash_command="python load.py" ) extract >> transform >> load ``` > **建議**: > * 使用 **S3** 或 **GCS** 作為暫存層,減少對源系統的頻繁請求。 > * 在 Airflow DAG 中加入 **XCom** 以傳遞中間結果。 ### 3.2 實時流管道(Kafka + Spark Structured Streaming 範例) Kafka 作為分散式訊息代理,Spark Structured Streaming 能即時處理流資料: ```python from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col spark = SparkSession.builder.appName("KafkaStreamingDemo").getOrCreate() schema = "user_id INT, event_type STRING, ts TIMESTAMP" streaming_df = ( spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-broker:9092") .option("subscribe", "user-events") .load() ) parsed_df = ( streaming_df.selectExpr("CAST(value AS STRING)") .select(from_json("value", schema).alias("data")) .select("data.*") ) query = ( parsed_df.writeStream .outputMode("append") .format("console") .option("truncate", False) .start() ) query.awaitTermination() ``` > **關鍵**: > * **Schema** 需要與 Kafka 訊息一致,避免資料類型錯誤。 > * 監控 `offset lag` 與 `processing delay`,確保流資料不會滯後。 ## 4. ETL 工具與自動化 | 工具 | 特色 | 典型工作流程 | |---|---|---| | **Airflow** | DAG、可視化 | 週期性批次工作 | | **dbt** | SQL 轉換、測試 | 資料湖/資料倉儲轉換 | | **Luigi** | Python 模組化 | 依賴管理 | | **Fivetran** | 連接器即插即用 | 低維護資料整合 | ### 4.1 dbt 與資料湖 dbt(data build tool)允許使用 SQL 來編寫轉換腳本,並自動管理依賴。以 **Amazon Redshift** 為例: ```sql -- models/stg_customers.sql SELECT id, name, email, created_at::timestamp AS signup_ts FROM raw.customers WHERE email IS NOT NULL; ``` **dbt 的執行流程**: 1. `dbt run` 觸發所有模型執行。 2. `dbt test` 驗證資料品質。 3. `dbt docs generate` 產生資料線圖。 > **優點**: > * 結合版本控制,所有轉換腳本皆為 Git 可追蹤。 > * 透過 `dbt docs` 可視化資料血緣,方便新人上手。 ## 5. 管道設計原則 | 原則 | 內涵 | 實踐方式 | |---|---|---| | **可維護性** | 易讀、易修改 | 單一責任、模組化、注釋豐富 | | **可擴充性** | 隨資料量擴大 | 分區存儲、分散式處理 | | **可靠性** | 容錯、重試 | 重試機制、監控告警 | | **安全性** | 資料保密 | 加密存儲、IAM 控制 | | **可觀察性** | 追蹤、日誌 | 日誌、指標、追蹤元件 | ### 5.1 實際案例:電商平台 > **背景**:每日 10 萬筆訂單,需即時更新庫存並生成報表。 > > **解決方案**: > 1. **資料來源**: > * 交易系統通過 Kafka 推送 `order_created` 事件。 > * 庫存系統通過 REST API 提供查詢。 > 2. **資料流**: > * **Kafka**:接收訂單事件。 > * **Spark Structured Streaming**:計算每日銷售總額、熱門商品。 > * **Airflow**:每 6 小時更新資料湖(S3)中的 `orders` 與 `inventory` 表。 > 3. **ETL**: > * 使用 **dbt** 將 `orders_raw` 轉為 `orders_clean`,進行資料品質檢查。 > * `inventory` 表使用 **Fivetran** 直接同步至 Snowflake。 > 4. **觀察**: > * **Prometheus** 收集 Spark 指標、Airflow 任務失敗率。 > * **Grafana** 可視化實時訂單量與庫存缺貨率。 > > **結果**: > * 數據延遲 < 2 秒,確保報表即時。 > * 每個管道均有單元測試,確保資料完整。 > > **教訓**:在設計時須考慮「最小可行管道」與「逐步擴充」的平衡。 ## 6. 監控與維運 | 監控指標 | 工具 | 作用 | |---|---|---| | 任務成功率 | Airflow UI | 立即檢視失敗原因 | | 延遲時間 | Prometheus (Kafka lag) | 監測流資料滯後 | | 資料質量 | Great Expectations | 內嵌測試 | | 資源使用 | Grafana | 成本控制 | > **實務**:在 Airflow DAG 中加入 `on_failure_callback`,自動發送 Slack 通知;在 Kafka topic 上設置 `consumer_lag` 監控,若滯後超過閾值即觸發故障修復腳本。 ## 7. 總結 * **數據採集**:從 API、Web Scraping、Kafka、資料庫三個層面說明實作。 * **批次 vs 實時**:明確兩者優缺點,並提供 Airflow 與 Spark Streaming 範例。 * **ETL 工具**:Airflow 與 dbt 的協作模式。 * **管道設計原則**:維護性、擴充性、可靠性、安全性與可觀察性。 * **實際案例**:電商平台的整體流程圖與實踐。 在完成本章後,讀者應能: 1. 評估業務需求,選擇合適的資料來源與抓取方式。 2. 設計批次與實時雙重管道,確保資料即時與批次一致。 3. 使用 Airflow、dbt 及 Kafka 進行資料工程。 4. 以監控指標確保管道穩定運行,並快速排障。 --- > **延伸閱讀**: > * *Designing Data-Intensive Applications* by Martin Kleppmann > * *Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing* by Tyler Akidau et al.