返回目錄
A
資料科學實戰:從數據到決策的完整流程 - 第 2 章
第2章 數據收集與儲存
發布於 2026-03-04 21:11
## 第2章 數據收集與儲存
在資料科學的整個流程中,**數據收集**與**儲存**是第一道關鍵門檻。若來源不可靠、格式不統一、合規性不符,後續的清洗、探索、建模都會被「數據質量」所拖累。這一章將從多元資料來源、API、爬蟲到雲端儲存,逐步說明如何在確保品質與合規的前提下,將資料轉化為可供分析的結構化資產。
---
### 2.1 數據來源類型
| 類型 | 特徵 | 典型應用 | 代表工具 | 風險點 |
|------|------|----------|----------|--------|
| **結構化** | 以行列為單位、模式明確 | 交易記錄、財務報表 | RDBMS、Spark SQL | 模式變更、重複鍵 |
| **半結構化** | 有結構但不嚴格,如 JSON、XML | API 回傳、日誌檔 | MongoDB、Elasticsearch | 字段缺失、版本衝突 |
| **非結構化** | 文字、影像、音訊 | 電子郵件、影片、IoT 感測 | Hadoop HDFS、Blob Storage | 解析成本高、品質難控 |
> **實例**:一家電商平台會同時擁有「內部」的結構化交易表、來自「第三方」的商品評價 JSON,以及「公開」的天氣 API 數據。這些數據必須在不同頻度、不同格式下被同步到資料倉儲。
---
### 2.2 API 數據取得
#### 2.2.1 主要協議
- **RESTful**:以 HTTP/HTTPS 方式交互,典型的 GET/POST/PUT/DELETE。
- **GraphQL**:單一端點,客戶端可靈活指定需求。
- **Streaming**:Kafka、Kinesis、WebSocket,適用於實時資料流。
#### 2.2.2 認證方式
| 認證類型 | 應用場景 | 範例 |
|-----------|-----------|------|
| API Key | 單向存取 | `?api_key=1234` |
| OAuth2 | 用戶授權 | `access_token` |
| JWT | 自簽發 | `Bearer <token>` |
#### 2.2.3 Python 取得 Twitter 近一週推文範例
```python
import requests, json
from datetime import datetime, timedelta
# 1. 取得 bearer token(假設已完成 OAuth2 流程)
bearer_token = "YOUR_BEARER_TOKEN"
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json"
}
# 2. 設定查詢參數
end_time = datetime.utcnow().isoformat("T") + "Z"
start_time = (datetime.utcnow() - timedelta(days=7)).isoformat("T") + "Z"
query_params = {
"query": "#python",
"start_time": start_time,
"end_time": end_time,
"max_results": 100,
"tweet.fields": "created_at,lang"
}
# 3. 發送請求
url = "https://api.twitter.com/2/tweets/search/recent"
response = requests.get(url, headers=headers, params=query_params)
print(json.dumps(response.json(), indent=2))
```
> **注意**:Twitter API 存在 rate‑limit(一般為 900 次 / 15 分鐘)。可透過 `response.headers['x-rate-limit-remaining']` 追蹤剩餘次數,並使用 exponential backoff 來控制呼叫頻率。
---
### 2.3 網頁爬蟲
#### 2.3.1 合法性與道德
| 項目 | 建議行為 |
|------|----------|
| Robot‑txt | 尊重網站的 `robots.txt` 說明 |
| User‑Agent | 使用真實、能辨識的 User‑Agent |
| Cookie | 若網站依賴登入,先合法取得 Session |
| 速率 | 避免對目標網站造成過度負載 |
#### 2.3.2 技術棧
- **Requests**:簡易 HTTP 請求。
- **BeautifulSoup**:HTML 解析。
- **Selenium**:需要 JavaScript 渲染。
- **Scrapy**:完整框架,支援分布式、延伸、管道處理。
#### 2.3.3 Scrapy 範例:抓取豆瓣電影排行榜
```python
import scrapy
class DoubanSpider(scrapy.Spider):
name = "douban_movie"
start_urls = [
"https://movie.douban.com/chart/"
]
def parse(self, response):
for sel in response.css("div.chart-item"):
yield {
"title": sel.css("h3 a::text").get(),
"rating": sel.css("span.rating-num::text").get(),
"url": sel.css("h3 a::attr(href)").get()
}
```
> **小技巧**:若遇到 IP 限制,可使用 `scrapy-proxies` 或自行搭配 VPN/Proxy Pool。
---
### 2.4 雲端儲存方案
| 類別 | 優勢 | 適用場景 | 代表產品 |
|------|------|----------|----------|
| **Object Storage** | 成本低、可擴展 | 原始資料、影像、備份 | AWS S3, GCP Cloud Storage, Azure Blob |
| **Data Warehouse** | 高效查詢、列式存儲 | 報表、BI | Snowflake, BigQuery, Redshift |
| **Data Lake** | 支援多種格式、彈性 | 大規模原始資料、資料科學 | AWS Lake Formation, Azure Data Lake, GCP BigLake |
| **Serverless** | 無需管理基礎設施 | 小批量、事件驅動 | AWS Athena, GCP BigQuery Serverless |
#### 2.4.1 典型工作流程
1. **原始資料寫入**:S3/Blob 上傳。
2. **資料整理**:使用 Glue、Dataflow 或 Snowpipe 進行 ETL/ELT。
3. **元資料管理**:AWS Glue Catalog、Data Catalog。
4. **安全控制**:IAM、Bucket Policy、Encryption At Rest & In Transit。
#### 2.4.2 圖表:ETL 於雲端
```
+---------------------+ +------------------+ +----------------+
| 原始資料(S3) | | ETL 工具(Glue) | | 數據倉儲(Snowflake) |
+---------------------+ +------------------+ +----------------+
| | |
| 存儲 & 轉換 | 讀取 & 查詢 |
```
---
### 2.5 數據質量保證
| 重要指標 | 定義 | 常見檢查方法 |
|----------|------|--------------|
| 完整性 | 所有必要欄位均有資料 | NULL 檢查、必填欄位 |
| 一致性 | 內部邏輯不衝突 | 參照完整性、範圍檢查 |
| 準確性 | 資料反映實際狀況 | 交叉驗證、外部比對 |
| 及時性 | 資料更新頻率 | 時間戳戳、過期檢查 |
#### 2.5.1 Great Expectations 範例
```python
import great_expectations as ge
# 1. 讀取資料
df = ge.from_pandas(pd.read_csv("/data/transactions.csv"))
# 2. 定義期望(Expectation)
df.expect_column_values_to_not_be_null("transaction_id")
df.expect_column_values_to_be_in_set("status", ["paid", "failed", "pending"])
# 3. 驗證並產生報告
results = df.validate()
print(results)
```
> **說明**:Great Expectations 可以連接多種資料來源(CSV、PostgreSQL、Snowflake 等),同時支援自動化報告與通知。
---
### 2.6 合規與隱私
| 法規 | 主要要求 | 具體措施 |
|------|----------|----------|
| GDPR | 個人資料保護 | 資料最小化、匿名化、隱私影響評估 |
| CCPA | 消費者隱私 | 同意機制、資料刪除請求 |
| HIPAA | 醫療資料 | 專門加密、存取控制 |
#### 2.6.1 匿名化範例:哈希化姓名與地址
```python
import hashlib
def anonymize(name, address):
# 使用 SHA-256 哈希並截取前 16 字元
hashed_name = hashlib.sha256(name.encode()).hexdigest()[:16]
hashed_address = hashlib.sha256(address.encode()).hexdigest()[:16]
return hashed_name, hashed_address
```
> **提示**:若需進一步保護,請考慮差分隱私或同態加密,以降低逆推風險。
---
### 2.7 數據管道設計
#### 2.7.1 ETL vs ELT
| 方向 | 優勢 | 典型工具 |
|------|------|----------|
| ETL | 數據清洗於源頭完成,資料庫負載較輕 | Talend, Informatica |
| ELT | 利用資料倉儲的運算能力,流程更靈活 | dbt, Snowflake Snowpipe |
#### 2.7.2 Batch vs Streaming
- **Batch**:批次處理,適合歷史資料、週期性報表。
- **Streaming**:實時資料流,適合交易、監控、即時推播。
#### 2.7.3 Orchestration 框架
| 框架 | 優勢 | 特色 |
|------|------|------|
| Airflow | 社群活躍、DAG 視覺化 | 依賴式排程、容錯性高 |
| Prefect | 動態任務執行、易於開發 | 內建 UI、可監控失敗 |
| Dagster | 強型別、測試友善 | 與 dbt 集成、型別安全 |
#### 2.7.4 範例:Airflow DAG 連接 API 與 S3
```python
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.utils.dates import days_ago
with DAG(
"twitter_to_s3",
schedule_interval="@daily",
start_date=days_ago(1),
) as dag:
fetch_tweets = SimpleHttpOperator(
task_id="fetch_tweets",
http_conn_id="twitter_api",
endpoint="/2/tweets/search/recent",
method="GET",
data={"query": "#python"},
headers={"Authorization": "Bearer YOUR_TOKEN"},
response_check=lambda response: response.status_code == 200,
log_response=True,
)
upload_s3 = S3CreateObjectOperator(
task_id="upload_s3",
bucket_name="my-twitter-data",
key="{{ ds }}/tweets.json",
data=fetch_tweets.output,
replace=True,
)
fetch_tweets >> upload_s3
```
---
### 2.8 實戰案例
| 產業 | 目標 | 資料來源 | 主要技術 |
|------|------|----------|----------|
| 金融 | 欺詐偵測 | 內部交易紀錄 + 信用評分 API | Kafka、Spark Structured Streaming、S3、Snowflake |
| 零售 | 營銷活動效果 | CRM、社交媒體情緒分析 | Python、NLTK、ElasticSearch |
| 健康 | 病例追蹤 | EMR、基因資料 | Hadoop、HBase、Spark MLlib |
> **案例說明(金融)**:公司利用每日批次從內部交易系統匯出 CSV,並同步呼叫外部信用評分 API(RESTful)。將兩組資料結合後寫入 Snowflake,透過 dbt 進行資料建模。最後利用 Snowflake 的 Snowpipe 進行實時上傳,並在 Airflow 中定時觸發欺詐檢測模型,結果以 Slack 通知風險團隊。
---
### 2.9 小結
- **多元來源**:結構化、半結構化、非結構化資料皆需納入考量。
- **API 與爬蟲**:設計時須重視認證、速率與合法性。
- **雲端儲存**:選擇 Object Storage、Data Warehouse 或 Data Lake 取決於資料類型與使用頻率。
- **品質保證**:使用工具(Great Expectations、DataCleaner)及自動化流程,確保數據可靠。
- **合規合法**:在收集與處理階段就實施匿名化、同意管理與訪問控制。
- **管道設計**:ETL/ELT、Batch/Streaming 的選擇應配合業務需求,並用 Airflow 或 Prefect 等工具編排。
本章奠定了**數據收集**與**儲存**的基礎,為後續的數據清洗、探索、建模與部署提供堅實的素材與基礎設施。接下來,我們將進一步探討如何將這些原始資料轉化為高品質的分析對象。