聊天視窗

Unveiling Insight: Data Science for Strategic Decision‑Making - 第 2 章

Chapter 2: Foundations of Data Acquisition

發布於 2026-03-07 18:15

# Chapter 2: Foundations of Data Acquisition Data acquisition is the first concrete step in any data science pipeline. The quality, breadth, and timeliness of the data you ingest directly influence the validity of downstream analytics, models, and business decisions. This chapter walks through the main sources of data, practical techniques for extracting it, and key considerations for ensuring that the data you bring into your environment is reliable, compliant, and ready for analysis. --- ## 1. Understanding the Landscape of Data Sources | Source Type | Typical Format | Example Industries | Common Challenges | |--------------|----------------|--------------------|-------------------| | **Structured** | Relational tables, CSV, Parquet | Banking, e‑commerce, logistics | Schema drift, duplicate keys | | **Semi‑structured** | JSON, XML, Avro | APIs, IoT, mobile logs | Inconsistent nesting, missing fields | | **Unstructured** | Text, images, audio, video | Media, healthcare imaging, customer support | Feature extraction, large storage | | **Time‑series** | TSDB (InfluxDB, TimescaleDB), Kafka streams | Finance, IoT, monitoring | Clock skew, window alignment | | **Key Insight**: A single project often pulls data from *multiple* source types. The more heterogeneous the data, the greater the need for robust schema management and flexible ingestion tools. --- ## 2. Core Techniques for Data Collection ### 2.1 API Integration APIs are the most common interface for accessing data from SaaS platforms, public services, and custom micro‑services. | Layer | Common Pattern | Tooling | |-------|----------------|---------| | **Authentication** | OAuth2, API Keys, JWT | `requests-oauthlib`, `authlib` | | **Rate Limiting** | Exponential back‑off, token buckets | `tenacity`, `ratelimit` | | **Serialization** | JSON, XML, Protobuf | `json`, `lxml`, `protobuf` | | **Python Example** – Fetching customer data from a REST endpoint: python import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry BASE_URL = "https://api.example.com/v1/customers" API_KEY = "YOUR_API_KEY" session = requests.Session() retry = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retry) session.mount("https://", adapter) headers = {"Authorization": f"Bearer {API_KEY}"} params = {"page": 1, "limit": 100} response = session.get(BASE_URL, headers=headers, params=params) response.raise_for_status() customers = response.json() print(f"Fetched {len(customers['data'])} customers") > **Tip**: Persist pagination logic or use the service’s *cursor* mechanism to avoid hitting hard limits. ### 2.2 Web Scraping When data is only available in HTML format, scraping can be a powerful yet delicate tool. | Technique | When to Use | Tool | |-----------|-------------|------| | **Static Scraping** | Simple, low‑traffic pages | BeautifulSoup, lxml | | **Dynamic Scraping** | JavaScript‑rendered content | Selenium, Playwright | | **Headless Browsers** | Avoid detection | Puppeteer, headless Chrome | | **Legal & Ethical Checklist** 1. Review the site’s `robots.txt`. 2. Read the Terms of Service. 3. Respect rate limits; implement delays. 4. Store only the data you actually need. 5. Avoid repeated requests that could strain the server. **Python Example** – Extracting product titles from an e‑commerce site: python import requests from bs4 import BeautifulSoup import time BASE_URL = "https://www.example.com/products" headers = {"User-Agent": "Mozilla/5.0 (compatible; DataCollector/1.0)"} for page in range(1, 6): r = requests.get(f"{BASE_URL}?page={page}", headers=headers) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") for card in soup.select("div.product-card"): title = card.select_one("h2.title").get_text(strip=True) price = card.select_one("span.price").get_text(strip=True) print(title, price) time.sleep(2) # polite pause > **Caution**: Some websites implement CAPTCHAs or bot detection; use rotating proxies or official APIs if available. ### 2.3 Database Querying Databases are the backbone of structured data. SQL remains the lingua franca for querying relational stores. | Feature | Typical Use | Example | Tool | |---------|-------------|---------|------| | **Joins** | Combine tables | `SELECT * FROM sales s JOIN customers c ON s.cust_id = c.id` | PostgreSQL, MySQL | | **Aggregations** | Summary metrics | `SELECT country, COUNT(*) FROM customers GROUP BY country` | Snowflake, Redshift | | **Window Functions** | Rolling metrics | `SUM(amount) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)` | BigQuery | | **Python Example** – Using SQLAlchemy to pull recent orders: python from sqlalchemy import create_engine, text engine = create_engine("postgresql+psycopg2://user:pass@host:5432/warehouse") with engine.connect() as conn: result = conn.execute( text("SELECT order_id, order_date, total FROM orders WHERE order_date >= :today"), {"today": "2023-01-01"} ) orders = [dict(row) for row in result] print(f"Fetched {len(orders)} orders") > **Pro Tip**: Use connection pools and prepared statements to mitigate SQL injection and improve performance. ### 2.4 IoT and Streaming Data Real‑time data is essential for applications like predictive maintenance, fraud detection, or dynamic pricing. | Protocol | Use Case | Typical Broker | Example | |----------|----------|----------------|---------| | **MQTT** | Low‑latency sensor data | Mosquitto, EMQX | Home automation | | **Kafka** | High‑throughput logs | Confluent Kafka | Click‑stream analytics | | **AWS Kinesis** | Managed streaming | AWS Kinesis | Serverless event processing | | **Data Ingestion Flow** 1. Sensors publish to MQTT topics. 2. A *Bridge* (e.g., `mqtt‑to‑kafka`) forwards messages. 3. Kafka Connect persists to a data lake or stream processing engine (Flink, Spark Structured Streaming). > **Tip**: Keep a *watermark* or *offset* table to avoid data loss in case of failures. --- ## 3. Ensuring Data Quality at Ingestion | Dimension | Check | Tool | Sample Code | |-----------|-------|------|-------------| | **Schema Validation** | Does the incoming record match expected types? | `pandera`, `pydantic` | `pd.read_json(...).validate(schema)` | | **Missingness** | Are required fields present? | `pyjanitor`, `missingno` | `df.isna().sum()` | | **Duplicate Detection** | Are there repeated rows? | `dedupe`, `duplicated()` | `df.duplicated(subset=["id"])` | | **Temporal Integrity** | Timestamps in order, no gaps | `time‑series‑check` | `df['ts'].diff().abs() < threshold` | | **Example – Using `pandera` for JSON validation**: python import pandera as pa import pandas as pd schema = pa.DataFrameSchema( { "id": pa.Column(int, checks=pa.Check.greater_than(0)), "name": pa.Column(str, checks=pa.Check.str_length(min_value=1)), "score": pa.Column(float, checks=pa.Check.in_range(0, 100)), "ts": pa.Column("datetime64[ns]"), } ) raw_df = pd.read_json("raw_data.json", orient="records") validated_df = schema.validate(raw_df) --- ## 4. Building Robust Ingestion Pipelines | Pipeline Type | Characteristics | Tooling | Typical Workflow | |---------------|-----------------|---------|------------------| | **Batch** | Daily/weekly loads, high volume | Airflow, Prefect, Dagster | Extract → Transform → Load (ETL) | | **Streaming** | Low latency, real‑time | Kafka Streams, Spark Structured Streaming, Flink | Ingest → Process → Sink | | **Hybrid** | Mixed workloads | dbt + dbt-spark, Spark + Airflow | Schedule micro‑tasks, trigger real‑time jobs | | ### 4.1 Orchestration Example – Airflow DAG for API extraction python from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago import requests import json with DAG( "api_to_s3", default_args={"owner": "data_team"}, schedule_interval="@daily", start_date=days_ago(1), catchup=False, ) as dag: def fetch_and_store(**kwargs): url = "https://api.example.com/v1/transactions" headers = {"Authorization": "Bearer $API_KEY"} response = requests.get(url, headers=headers) data = response.json() with open("/tmp/transactions.json", "w") as f: json.dump(data, f) # Assuming S3 bucket is mounted or using boto3 # upload_to_s3("/tmp/transactions.json", "my-bucket/raw/transactions/", "transactions.json") extract_task = PythonOperator( task_id="extract_api", python_callable=fetch_and_store, ) extract_task > **Key Point**: Use secrets back‑ends (Vault, AWS Secrets Manager) to manage API keys; avoid hard‑coding. --- ## 5. Practical Workflow Example – Retail Sales Data | Step | Action | Tool | Output | |------|--------|------|--------| | 1 | Pull daily sales from ERP API | `requests` | JSON dump | | 2 | Store raw JSON in S3 | AWS S3 | Raw bucket | | 3 | Transform JSON to Parquet via Spark | PySpark | Structured table | | 4 | Load into data warehouse (Redshift) | `sqlalchemy`, `psycopg2` | Fact/Dim tables | | 5 | Run weekly ETL job to aggregate sales | Airflow | Summary tables | | **Code Snippet – Spark Transformation**: python from pyspark.sql import SparkSession from pyspark.sql.functions import col, to_timestamp spark = SparkSession.builder.appName("RetailETL").getOrCreate() raw_df = spark.read.json("s3://my-bucket/raw/transactions/2023-03-07.json") transformed = raw_df.select( col("transaction_id").alias("txn_id"), col("customer_id"), col("product_id"), col("quantity"), col("price"), to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss").alias("txn_ts") ) transformed.write.mode("overwrite").parquet("s3://my-bucket/processed/transactions/2023-03-07/") --- ## 6. Summary Table – Tool Stack by Data Source | Data Source | Recommended Retrieval | Best‑Fit Storage | Typical Pipeline Tool | |--------------|-----------------------|-----------------|----------------------| | REST API | `requests`, `httpx` | S3, Redshift | Airflow, Prefect | | GraphQL | `gql`, `requests` | S3 | Airflow | | HTML Scraping | BeautifulSoup, Selenium | S3 | Airflow | | SQL DB | SQLAlchemy, JDBC | Redshift, Snowflake | Airflow, dbt | | MQTT IoT | `paho-mqtt` | Kafka | Kafka Connect | | Kafka Streams | `kafka-python`, `confluent-kafka` | HDFS, S3 | Spark Structured Streaming | | --- ## 7. Best Practices & Common Pitfalls 1. **Version your schemas** – keep track of changes to data models. 2. **Implement retry/back‑off logic** – avoid hitting external limits. 3. **Log all failures** – store payloads and error messages for debugging. 4. **Validate data immediately** – catch bad records early. 5. **Use data catalogs** – e.g., Amundsen, DataHub to maintain lineage. 6. **Secure all endpoints** – enforce TLS, rotate secrets. 7. **Avoid data duplication** – de‑dup early, use primary keys. 8. **Plan for latency** – differentiate batch vs streaming pipelines. --- ## 8. Key Takeaways - **Data acquisition is a multi‑faceted process**: from simple API pulls to complex real‑time streams. - **Robust pipelines demand automation, validation, and monitoring** to sustain quality over time. - **Choosing the right tool for the source** (e.g., Airflow for batch, Kafka for streaming) aligns operational cost with business needs. - **Quality starts at ingestion**—schema checks, missing‑value handling, and duplicate removal should be built into the pipeline, not added later. - **Compliance matters**—ensure you honor data‑use agreements, privacy regulations, and ethical scraping guidelines. By mastering the foundations of data acquisition, analysts and engineers position themselves to deliver high‑impact insights across the entire data science workflow.