聊天視窗

Data Science Unlocked: A Practical Guide for Modern Analysts - 第 2 章

Chapter 2: Data Collection & Engineering

發布於 2026-02-23 15:44

# Chapter 2: Data Collection & Engineering In the data science lifecycle, the **quality and accessibility of data** dictate how quickly you can turn insights into action. This chapter walks through the modern toolkit for collecting, ingesting, and engineering data so that analysts and engineers can focus on analysis and modeling rather than plumbing. --- ## 2.1 The Data Collection Landscape | Source | Typical Use‑Case | Frequency | Example Tech Stack | |--------|-----------------|-----------|-------------------| | **CSV/Excel** | Small business reports, legacy data | One‑off or periodic | pandas, openpyxl | | **Relational DBs** | OLTP/OLAP data | Continuous | JDBC, SQLAlchemy | | **NoSQL** | Log files, event streams | Near‑real‑time | MongoDB, Redis | | **APIs** | External services (social media, weather) | Hourly/Daily | requests, httpx | | **Web Scraping** | Competitive pricing, real‑time listings | Batch or continuous | BeautifulSoup, Scrapy | | **Streaming** | IoT sensors, financial tickers | Millisecond‑level | Kafka, Flink | > **Key Takeaway:** Align your ingestion strategy with *source velocity* and *data fidelity* needs. ## 2.2 Ingestion Techniques ### 2.2.1 Batch Ingestion Batch pipelines pull snapshots of data at scheduled intervals. Use case: nightly loads of sales data. **Python‑based ETL example**: python import pandas as pd import sqlalchemy # 1. Extract url = "https://example.com/sales_2023.csv" raw = pd.read_csv(url) # 2. Transform raw['order_date'] = pd.to_datetime(raw['order_date']) raw['total'] = raw['quantity'] * raw['unit_price'] # 3. Load engine = sqlalchemy.create_engine("postgresql://user:pw@host/db") raw.to_sql("sales", engine, if_exists="append", index=False) ### 2.2.2 Incremental / Change Data Capture (CDC) Avoid re‑processing the entire dataset. Most databases expose a *watermark* column or change logs. sql -- PostgreSQL example using a timestamp watermark INSERT INTO sales (order_id, order_date, total) SELECT order_id, order_date, total FROM source_sales WHERE order_date > (SELECT MAX(order_date) FROM sales); ### 2.2.3 Streaming / Near‑Real‑time When latency matters, pull data from a broker. python from kafka import KafkaConsumer import json consumer = KafkaConsumer( "sensor_events", bootstrap_servers=["kafka-broker:9092"], value_deserializer=lambda m: json.loads(m.decode("utf-8")) ) for msg in consumer: event = msg.value # Process event here ## 2.3 Web Scraping & APIs ### 2.3.1 API Consumption APIs are the most stable source. Use OAuth, rate‑limit handling, and retry logic. python import httpx from tenacity import retry, wait_exponential, stop_after_attempt @retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5)) def fetch_twitter_user(user_id: str): url = f"https://api.twitter.com/2/users/{user_id}" headers = {"Authorization": f"Bearer {TOKEN}"} resp = httpx.get(url, headers=headers) resp.raise_for_status() return resp.json() ### 2.3.2 Web Scraping Scraping should be the last resort. Always respect robots.txt and terms of service. python import requests from bs4 import BeautifulSoup import pandas as pd page = requests.get("https://example.com/products") soup = BeautifulSoup(page.text, "html.parser") rows = [] for product in soup.select(".product-card"): name = product.select_one(".name").text.strip() price = float(product.select_one(".price").text.strip().replace("$", "")) rows.append({"name": name, "price": price}) df = pd.DataFrame(rows) ## 2.4 Building Robust Pipelines | Tool | Strength | Use‑Case | |------|----------|----------| | **Apache Airflow** | DAG orchestration, scheduling | Complex batch workflows | | **dbt** | SQL‑based transformations, version control | Data warehouse modeling | | **Prefect** | Declarative tasks, cloud‑native | Hybrid pipelines | | **Dagster** | Type safety, composable pipelines | Enterprise data ops | ### 2.4.1 Airflow DAG Example python from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago import pandas as pd import sqlalchemy def extract(**kwargs): url = kwargs['params']['csv_url'] return pd.read_csv(url).to_dict(orient='records') def transform(**kwargs): df = pd.DataFrame(kwargs['ti'].xcom_pull(task_ids='extract')) df['order_date'] = pd.to_datetime(df['order_date']) df['total'] = df['quantity'] * df['unit_price'] return df.to_dict(orient='records') def load(**kwargs): df = pd.DataFrame(kwargs['ti'].xcom_pull(task_ids='transform')) engine = sqlalchemy.create_engine("postgresql://user:pw@host/db") df.to_sql("sales", engine, if_exists="append", index=False) def create_dag(): default_args = { 'owner': 'data-eng', 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'sales_etl', default_args=default_args, schedule_interval='@daily', start_date=days_ago(1), catchup=False ) extract_task = PythonOperator( task_id='extract', python_callable=extract, params={'csv_url': 'https://example.com/sales.csv'}, dag=dag ) transform_task = PythonOperator( task_id='transform', python_callable=transform, dag=dag ) load_task = PythonOperator( task_id='load', python_callable=load, dag=dag ) extract_task >> transform_task >> load_task return dag dag = create_dag() > **Best Practice:** Keep your DAG logic *purely* Python‑based and delegate heavy lifting to external services (Spark, Snowflake, etc.). ### 2.4.2 dbt Model Example **`models/staging/sales.sql`** sql -- staging/sales.sql SELECT id AS order_id, CAST(order_date AS DATE) AS order_date, quantity, unit_price FROM {{ source('raw', 'sales_csv') }} WHERE order_date IS NOT NULL; **`models/marts/sales_fct.sql`** sql WITH base AS ( SELECT * FROM {{ ref('sales') }} ) SELECT order_id, order_date, quantity, unit_price, quantity * unit_price AS total FROM base; dbt automatically creates DAGs, handles dependencies, and stores lineage metadata. ## 2.5 Data Storage & Governance 1. **Choose the right warehouse** – Columnar storage (Snowflake, BigQuery, Redshift) is optimal for analytical workloads. 2. **Partitioning & Clustering** – Improves query performance; e.g., partition by `order_date`. 3. **Versioning & Backups** – Use incremental models and snapshots. 4. **Metadata & Lineage** – Capture source, transformation, and destination details for auditability. 5. **Security** – Encrypt data at rest, enforce least‑privilege access. ## 2.6 Common Pitfalls & Mitigation | Pitfall | Symptom | Fix | |----------|---------|-----| | **Unreliable scrapers** | Data stops appearing when site changes | Use APIs where possible; wrap scraping in retry logic; monitor failures | | **Data drift** | Model accuracy degrades | Set up drift detection; retrain on fresh data | | **No logging** | Hard to debug pipeline failures | Integrate with Airflow logs or external log aggregator (ELK) | | **Duplicate rows** | Inflated metrics | Use idempotent loads or deduplication logic | ## 2.7 Case Study: Building a Real‑Time Retail Dashboard 1. **Sources** – POS system (MySQL), e‑commerce API (REST), weather data (OpenWeather API). 2. **Ingestion** – Airflow pulls nightly sales; Prefect streams POS updates; API wrapper fetches weather. 3. **Transformation** – dbt standardizes schemas; feature engineering adds `seasonal_adjustment`. 4. **Storage** – Data lands in Snowflake, partitioned by `sale_date`. 5. **Serving** – Looker (or Power BI) connects to Snowflake for real‑time dashboards. 6. **Monitoring** – CloudWatch alerts on Airflow task failures; Grafana dashboards for throughput. > **Outcome:** A 10‑minute‑lag dashboard that updates sales, inventory, and external factors, enabling store managers to adjust staffing in near real‑time. ## 2.8 Practical Exercises 1. **ETL Mini‑Project** – Create an Airflow DAG that pulls the latest 100 rows from a public REST API (e.g., GitHub events) and loads them into a PostgreSQL table. 2. **dbt Sprint** – Model a simple sales fact from two raw CSV files using dbt. Deploy the DAG to a local Airflow instance. 3. **Scraping Challenge** – Scrape product titles and prices from an e‑commerce category page and store them in a local SQLite database. Add error handling and a basic deduplication step. 4. **Data Drift Simulation** – Simulate drift by inserting a row with an unusually high price. Build a drift detection alert in Airflow. --- **Summary** – Effective data ingestion and pipeline engineering are the backbone of any data‑driven organization. Mastering extraction techniques, orchestration tools, and storage best practices ensures high‑quality, reliable datasets that feed downstream analytics and machine learning models.