聊天視窗

Data Science Demystified: A Pragmatic Guide for Business Decision-Makers - 第 2 章

Chapter 2: Data Acquisition & Cleaning

發布於 2026-02-23 09:00

# Chapter 2: Data Acquisition & Cleaning Data acquisition and cleaning form the bedrock of any credible data‑science effort. If the data are noisy, incomplete, or improperly formatted, every downstream analysis is compromised. This chapter walks through practical techniques for sourcing data from diverse platforms, ingesting it into reproducible pipelines, and ensuring its integrity using Python’s most popular libraries: **pandas** and **Dask**. ## 2.1 The Data‑Acquisition Landscape | Source | Typical Use‑Case | Key Tools | Notes | |--------|------------------|-----------|-------| | APIs | Real‑time metrics, third‑party services | `requests`, `httpx`, `aiohttp` | Rate limits & authentication must be handled. | | Web Scraping | Public data, competitor analysis | `BeautifulSoup`, `lxml`, `scrapy` | Respect robots.txt and use polite crawling. | | Databases | Structured enterprise data | `SQLAlchemy`, `pyodbc`, `psycopg2` | Connection pooling is critical for large loads. | | Cloud Storage | Raw event logs, media | `boto3`, `azure-storage-blob`, `google-cloud-storage` | Use bucket lifecycle policies to manage cost. | | Data Lakes / Warehouses | Big‑data analytics | `aws-glue`, `snowflake`, `bigquery` | Schema‑on‑read vs schema‑on‑write trade‑offs. | **Reproducibility** starts here: document every endpoint, query, and authentication method in a version‑controlled script or Jupyter notebook. Prefer deterministic IDs over timestamps where possible. ## 2.2 Building a Robust Ingestion Pipeline A typical pipeline has three stages: 1. **Extract** – Pull data from source. 2. **Transform** – Clean, validate, and enrich. 3. **Load** – Persist to a data lake, warehouse, or analytics store. ### 2.2.1 Example: Pulling and Persisting CSVs from an S3 Bucket python import boto3 import pandas as pd from io import BytesIO s3 = boto3.client("s3", region_name="us-east-1") bucket = "company-raw-logs" key = "2024/02/23/sales.csv" obj = s3.get_object(Bucket=bucket, Key=key) data = pd.read_csv(BytesIO(obj["Body"].read())) # Persist to local parquet for faster downstream reads data.to_parquet("/data/processed/sales.parquet", index=False) *Tip:* Wrap the above in a function and use a config file (`yaml` or `json`) to store bucket names and paths. ### 2.2.2 Parallelizing with Dask for Big‑Data python import dask.dataframe as dd # Read all CSVs in a folder in parallel ddf = dd.read_csv("s3://company-raw-logs/2024/02/23/*.csv", storage_options={"anon": False}) # Persist to a single parquet file ddf.repartition(npartitions=10).to_parquet("s3://company-processed/sales/", engine="pyarrow") Dask automatically distributes the work across cores or a cluster, keeping memory footprints manageable. ## 2.3 Data Quality & Integrity | Issue | Typical Symptom | Fix | Tools | |-------|----------------|-----|-------| | Missing values | `NaN` spread across numeric columns | Imputation (mean, median, KNN) or deletion | `pandas`, `sklearn.impute` | | Duplicate rows | Repeated IDs or timestamps | `drop_duplicates()` | `pandas` | | Inconsistent types | Mixed integer/float, wrong date formats | Explicit casting (`pd.to_datetime`) | `pandas` | | Outliers | Extremely high/low values | Winsorization or robust scaling | `numpy`, `scipy.stats` | | Schema drift | New columns or missing fields | Schema registry (e.g., `great_expectations`) | `great_expectations` | ### 2.3.1 Handling Missing Data python # Simple mean imputation for numeric columns from sklearn.impute import SimpleImputer imputer = SimpleImputer(strategy="mean") X = imputer.fit_transform(df[numeric_columns]) ### 2.3.2 Detecting Schema Drift with Great Expectations python import great_expectations as ge # Convert DataFrame to Great Expectations Dataset ge_df = ge.from_pandas(df) # Validate against a previously defined expectation suite results = ge_df.validate(expectation_suite_name="sales_data_suite") print(results) > **Ethical note:** When imputing missing values, consider whether the absence is informative (e.g., a customer never interacted). Blindly imputing can introduce bias. ## 2.4 Data Validation & Auditing 1. **Unit tests** – Use `pytest` to assert shapes, types, and value ranges. 2. **Data versioning** – Store each processed file with a unique hash or timestamp. 3. **Audit logs** – Record ingestion time, source, and any transformation steps. 4. **Data quality dashboards** – Leverage tools like `dbt` or `Metabase` to surface anomalies. ### 2.4.1 Sample `pytest` Assertion python import pytest @pytest.fixture def sales_df(): return pd.read_parquet("/data/processed/sales.parquet") def test_sales_columns(sales_df): expected = {"order_id", "customer_id", "amount", "order_date"} assert set(sales_df.columns) == expected ## 2.5 Real‑World Case Study: Retail Chain Data Pipeline **Background:** A national retailer needed to consolidate daily sales, inventory, and customer sentiment data from over 200 stores and a third‑party review site. **Challenges:** - Heterogeneous formats (CSV, JSON, XML). - Near‑real‑time feeds from POS systems. - GDPR‑compliant handling of customer emails. **Solution:** 1. **Extract** – Scheduled Airflow DAGs pulled data via REST APIs and SFTP. 2. **Transform** – Pandas handled CSV/JSON, lxml parsed XML. Dask processed 1TB of sales logs per day. 3. **Load** – Data written to a Snowflake warehouse with a central schema registry. 4. **Validation** – Great Expectations suites ensured daily schema consistency. 5. **Monitoring** – Grafana dashboards displayed ingestion latency and data quality metrics. **Outcome:** 25 % reduction in data latency and a 15 % increase in forecast accuracy for inventory management. ## 2.6 Ethical & Governance Checklist | Question | Action | Responsible Party | |----------|--------|-------------------| | Are we collecting only the data we need? | Implement data minimization in collection scripts | Data Engineer | | Do we have consent for personal data? | Verify legal flags in metadata | Data Steward | | Are we handling sensitive data securely? | Encrypt at rest and in transit | Security Engineer | | Is data lineage documented? | Use lineage tools (e.g., `databricks` Unity Catalog) | MLOps Lead | | Are we transparent about data sources? | Publish a data catalog with source metadata | Product Manager | ## 2.7 Take‑Away Action Items 1. **Create a source‑to‑sink documentation matrix** for every dataset you plan to use. 2. **Automate data quality checks** with a CI pipeline that fails when key metrics deviate. 3. **Version‑control all ingestion scripts** and keep them in a public repository for auditability. 4. **Implement a data catalog** early; it saves time when onboarding new analysts. 5. **Review ethical implications** whenever you add a new data source or transformation step. By rigorously applying these practices, you turn raw data into reliable, reproducible assets that underpin trustworthy business insights.