聊天視窗

資料科學實戰:從數據到決策的完整流程 - 第 2 章

第2章 數據收集與儲存

發布於 2026-03-04 21:11

## 第2章 數據收集與儲存 在資料科學的整個流程中,**數據收集**與**儲存**是第一道關鍵門檻。若來源不可靠、格式不統一、合規性不符,後續的清洗、探索、建模都會被「數據質量」所拖累。這一章將從多元資料來源、API、爬蟲到雲端儲存,逐步說明如何在確保品質與合規的前提下,將資料轉化為可供分析的結構化資產。 --- ### 2.1 數據來源類型 | 類型 | 特徵 | 典型應用 | 代表工具 | 風險點 | |------|------|----------|----------|--------| | **結構化** | 以行列為單位、模式明確 | 交易記錄、財務報表 | RDBMS、Spark SQL | 模式變更、重複鍵 | | **半結構化** | 有結構但不嚴格,如 JSON、XML | API 回傳、日誌檔 | MongoDB、Elasticsearch | 字段缺失、版本衝突 | | **非結構化** | 文字、影像、音訊 | 電子郵件、影片、IoT 感測 | Hadoop HDFS、Blob Storage | 解析成本高、品質難控 | > **實例**:一家電商平台會同時擁有「內部」的結構化交易表、來自「第三方」的商品評價 JSON,以及「公開」的天氣 API 數據。這些數據必須在不同頻度、不同格式下被同步到資料倉儲。 --- ### 2.2 API 數據取得 #### 2.2.1 主要協議 - **RESTful**:以 HTTP/HTTPS 方式交互,典型的 GET/POST/PUT/DELETE。 - **GraphQL**:單一端點,客戶端可靈活指定需求。 - **Streaming**:Kafka、Kinesis、WebSocket,適用於實時資料流。 #### 2.2.2 認證方式 | 認證類型 | 應用場景 | 範例 | |-----------|-----------|------| | API Key | 單向存取 | `?api_key=1234` | | OAuth2 | 用戶授權 | `access_token` | | JWT | 自簽發 | `Bearer <token>` | #### 2.2.3 Python 取得 Twitter 近一週推文範例 ```python import requests, json from datetime import datetime, timedelta # 1. 取得 bearer token(假設已完成 OAuth2 流程) bearer_token = "YOUR_BEARER_TOKEN" headers = { "Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json" } # 2. 設定查詢參數 end_time = datetime.utcnow().isoformat("T") + "Z" start_time = (datetime.utcnow() - timedelta(days=7)).isoformat("T") + "Z" query_params = { "query": "#python", "start_time": start_time, "end_time": end_time, "max_results": 100, "tweet.fields": "created_at,lang" } # 3. 發送請求 url = "https://api.twitter.com/2/tweets/search/recent" response = requests.get(url, headers=headers, params=query_params) print(json.dumps(response.json(), indent=2)) ``` > **注意**:Twitter API 存在 rate‑limit(一般為 900 次 / 15 分鐘)。可透過 `response.headers['x-rate-limit-remaining']` 追蹤剩餘次數,並使用 exponential backoff 來控制呼叫頻率。 --- ### 2.3 網頁爬蟲 #### 2.3.1 合法性與道德 | 項目 | 建議行為 | |------|----------| | Robot‑txt | 尊重網站的 `robots.txt` 說明 | | User‑Agent | 使用真實、能辨識的 User‑Agent | | Cookie | 若網站依賴登入,先合法取得 Session | | 速率 | 避免對目標網站造成過度負載 | #### 2.3.2 技術棧 - **Requests**:簡易 HTTP 請求。 - **BeautifulSoup**:HTML 解析。 - **Selenium**:需要 JavaScript 渲染。 - **Scrapy**:完整框架,支援分布式、延伸、管道處理。 #### 2.3.3 Scrapy 範例:抓取豆瓣電影排行榜 ```python import scrapy class DoubanSpider(scrapy.Spider): name = "douban_movie" start_urls = [ "https://movie.douban.com/chart/" ] def parse(self, response): for sel in response.css("div.chart-item"): yield { "title": sel.css("h3 a::text").get(), "rating": sel.css("span.rating-num::text").get(), "url": sel.css("h3 a::attr(href)").get() } ``` > **小技巧**:若遇到 IP 限制,可使用 `scrapy-proxies` 或自行搭配 VPN/Proxy Pool。 --- ### 2.4 雲端儲存方案 | 類別 | 優勢 | 適用場景 | 代表產品 | |------|------|----------|----------| | **Object Storage** | 成本低、可擴展 | 原始資料、影像、備份 | AWS S3, GCP Cloud Storage, Azure Blob | | **Data Warehouse** | 高效查詢、列式存儲 | 報表、BI | Snowflake, BigQuery, Redshift | | **Data Lake** | 支援多種格式、彈性 | 大規模原始資料、資料科學 | AWS Lake Formation, Azure Data Lake, GCP BigLake | | **Serverless** | 無需管理基礎設施 | 小批量、事件驅動 | AWS Athena, GCP BigQuery Serverless | #### 2.4.1 典型工作流程 1. **原始資料寫入**:S3/Blob 上傳。 2. **資料整理**:使用 Glue、Dataflow 或 Snowpipe 進行 ETL/ELT。 3. **元資料管理**:AWS Glue Catalog、Data Catalog。 4. **安全控制**:IAM、Bucket Policy、Encryption At Rest & In Transit。 #### 2.4.2 圖表:ETL 於雲端 ``` +---------------------+ +------------------+ +----------------+ | 原始資料(S3) | | ETL 工具(Glue) | | 數據倉儲(Snowflake) | +---------------------+ +------------------+ +----------------+ | | | | 存儲 & 轉換 | 讀取 & 查詢 | ``` --- ### 2.5 數據質量保證 | 重要指標 | 定義 | 常見檢查方法 | |----------|------|--------------| | 完整性 | 所有必要欄位均有資料 | NULL 檢查、必填欄位 | | 一致性 | 內部邏輯不衝突 | 參照完整性、範圍檢查 | | 準確性 | 資料反映實際狀況 | 交叉驗證、外部比對 | | 及時性 | 資料更新頻率 | 時間戳戳、過期檢查 | #### 2.5.1 Great Expectations 範例 ```python import great_expectations as ge # 1. 讀取資料 df = ge.from_pandas(pd.read_csv("/data/transactions.csv")) # 2. 定義期望(Expectation) df.expect_column_values_to_not_be_null("transaction_id") df.expect_column_values_to_be_in_set("status", ["paid", "failed", "pending"]) # 3. 驗證並產生報告 results = df.validate() print(results) ``` > **說明**:Great Expectations 可以連接多種資料來源(CSV、PostgreSQL、Snowflake 等),同時支援自動化報告與通知。 --- ### 2.6 合規與隱私 | 法規 | 主要要求 | 具體措施 | |------|----------|----------| | GDPR | 個人資料保護 | 資料最小化、匿名化、隱私影響評估 | | CCPA | 消費者隱私 | 同意機制、資料刪除請求 | | HIPAA | 醫療資料 | 專門加密、存取控制 | #### 2.6.1 匿名化範例:哈希化姓名與地址 ```python import hashlib def anonymize(name, address): # 使用 SHA-256 哈希並截取前 16 字元 hashed_name = hashlib.sha256(name.encode()).hexdigest()[:16] hashed_address = hashlib.sha256(address.encode()).hexdigest()[:16] return hashed_name, hashed_address ``` > **提示**:若需進一步保護,請考慮差分隱私或同態加密,以降低逆推風險。 --- ### 2.7 數據管道設計 #### 2.7.1 ETL vs ELT | 方向 | 優勢 | 典型工具 | |------|------|----------| | ETL | 數據清洗於源頭完成,資料庫負載較輕 | Talend, Informatica | | ELT | 利用資料倉儲的運算能力,流程更靈活 | dbt, Snowflake Snowpipe | #### 2.7.2 Batch vs Streaming - **Batch**:批次處理,適合歷史資料、週期性報表。 - **Streaming**:實時資料流,適合交易、監控、即時推播。 #### 2.7.3 Orchestration 框架 | 框架 | 優勢 | 特色 | |------|------|------| | Airflow | 社群活躍、DAG 視覺化 | 依賴式排程、容錯性高 | | Prefect | 動態任務執行、易於開發 | 內建 UI、可監控失敗 | | Dagster | 強型別、測試友善 | 與 dbt 集成、型別安全 | #### 2.7.4 範例:Airflow DAG 連接 API 與 S3 ```python from airflow import DAG from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator from airflow.utils.dates import days_ago with DAG( "twitter_to_s3", schedule_interval="@daily", start_date=days_ago(1), ) as dag: fetch_tweets = SimpleHttpOperator( task_id="fetch_tweets", http_conn_id="twitter_api", endpoint="/2/tweets/search/recent", method="GET", data={"query": "#python"}, headers={"Authorization": "Bearer YOUR_TOKEN"}, response_check=lambda response: response.status_code == 200, log_response=True, ) upload_s3 = S3CreateObjectOperator( task_id="upload_s3", bucket_name="my-twitter-data", key="{{ ds }}/tweets.json", data=fetch_tweets.output, replace=True, ) fetch_tweets >> upload_s3 ``` --- ### 2.8 實戰案例 | 產業 | 目標 | 資料來源 | 主要技術 | |------|------|----------|----------| | 金融 | 欺詐偵測 | 內部交易紀錄 + 信用評分 API | Kafka、Spark Structured Streaming、S3、Snowflake | | 零售 | 營銷活動效果 | CRM、社交媒體情緒分析 | Python、NLTK、ElasticSearch | | 健康 | 病例追蹤 | EMR、基因資料 | Hadoop、HBase、Spark MLlib | > **案例說明(金融)**:公司利用每日批次從內部交易系統匯出 CSV,並同步呼叫外部信用評分 API(RESTful)。將兩組資料結合後寫入 Snowflake,透過 dbt 進行資料建模。最後利用 Snowflake 的 Snowpipe 進行實時上傳,並在 Airflow 中定時觸發欺詐檢測模型,結果以 Slack 通知風險團隊。 --- ### 2.9 小結 - **多元來源**:結構化、半結構化、非結構化資料皆需納入考量。 - **API 與爬蟲**:設計時須重視認證、速率與合法性。 - **雲端儲存**:選擇 Object Storage、Data Warehouse 或 Data Lake 取決於資料類型與使用頻率。 - **品質保證**:使用工具(Great Expectations、DataCleaner)及自動化流程,確保數據可靠。 - **合規合法**:在收集與處理階段就實施匿名化、同意管理與訪問控制。 - **管道設計**:ETL/ELT、Batch/Streaming 的選擇應配合業務需求,並用 Airflow 或 Prefect 等工具編排。 本章奠定了**數據收集**與**儲存**的基礎,為後續的數據清洗、探索、建模與部署提供堅實的素材與基礎設施。接下來,我們將進一步探討如何將這些原始資料轉化為高品質的分析對象。