聊天視窗

Data Science for the Modern Analyst: From Concepts to Implementation - 第 6 章

Chapter 6: Building and Deploying an End‑to‑End Analytics Solution

發布於 2026-02-26 07:04

# Chapter 6: Building and Deploying an End‑to‑End Analytics Solution ## 1. Recap of the Project Scope We have a real‑world scenario: a mid‑size retailer wants to **predict next‑quarter sales per store** and feed those predictions into a BI dashboard that guides inventory procurement. The stakeholders have asked for: 1. **Data acquisition** from internal OLAP cubes and an external weather API. 2. **Data cleaning** that handles missing values, outliers, and seasonal drift. 3. **Feature engineering** that captures temporal patterns, store‑level promotions, and weather effects. 4. **Model selection** that balances predictive power with interpretability. 5. **Model deployment** to an automated pipeline that refreshes nightly. 6. **Monitoring** for data drift and model performance. 7. **Governance** that logs every decision, model version, and prediction. Our goal is to demonstrate how to stitch all of these steps together in a reproducible, auditable workflow. ## 2. Architecture Overview +----------------+ +----------------+ | External Weather|----->| Data Ingestion | +----------------+ +----------------+ \ \ +----------------+ +----------------+ | Internal OLAP |----->| Data Cleaning | +----------------+ +----------------+ \ \ +----------------+ | Feature Engine | +----------------+ \ \ +----------------+ | Model Training | +----------------+ \ \ +----------------+ | Model Deployment| +----------------+ \ \ +----------------+ +----------------+ | Monitoring & |<-----| Governance & | | Alerts | | Audit Trail | +----------------+ +----------------+ Each block will be implemented with a combination of **Python**, **SQL**, **Airflow**, and **MLflow**, while the BI layer will be a Power BI dashboard that pulls predictions from a Postgres database. ## 3. Step‑by‑Step Implementation ### 3.1 Data Ingestion **3.1.1 Weather API Pull** python import requests, json, pandas as pd API_KEY = 'YOUR_WEATHER_KEY' BASE_URL = 'https://api.weather.com/v3/wx/forecast/daily/5day' params = { 'geocode': '40.7128,-74.0060', # New York 'format': 'json', 'apiKey': API_KEY } response = requests.get(BASE_URL, params=params) weather_df = pd.json_normalize(response.json()['narrative']) weather_df['date'] = pd.to_datetime(weather_df['validTime']).dt.date **3.1.2 OLAP Cube Extraction** sql -- Using Snowflake for illustration SELECT store_id, sales, transaction_date FROM retail_sales_cube WHERE transaction_date BETWEEN DATEADD(month, -24, CURRENT_DATE()) AND CURRENT_DATE(); The two data sources are stored in a staging schema for further processing. ### 3.2 Data Cleaning and Quality Checks python from pyspark.sql import SparkSession from pyspark.sql.functions import col, when spark = SparkSession.builder.appName("DataCleaning").getOrCreate() sales_df = spark.read.format("delta").load("s3://data/warehouse/staging/sales/") weather_df = spark.read.format("delta").load("s3://data/warehouse/staging/weather/") # Basic sanity checks assert sales_df.count() > 0, "Sales data is empty" assert weather_df.count() > 0, "Weather data is empty" # Impute missing sales with rolling mean per store sales_df = sales_df.withColumn( "sales_filled", when(col("sales").isNull(), avg(col("sales")).over(Window.partitionBy("store_id"))) .otherwise(col("sales")) ) # Remove obvious outliers (> 3 std dev per store) sales_df = sales_df.withColumn("std_dev", stddev("sales_filled").over(Window.partitionBy("store_id"))) clean_df = sales_df.filter( (col("sales_filled") <= col("sales_filled").mean() + 3 * col("std_dev")) & (col("sales_filled") >= col("sales_filled").mean() - 3 * col("std_dev")) ) All cleaning scripts are packaged into **Data Quality Checks** that run nightly via Airflow. ### 3.3 Feature Engineering | Feature | Source | Rationale | |---------|--------|-----------| | `is_promotion_day` | Internal promo calendar | Captures promotional impact | | `days_to_Christmas` | Calendar | Seasonal effect | | `avg_weekday_sales` | Aggregated past 4 weeks | Trend baseline | | `temp_high` | Weather API | Temperature drives foot traffic | | `precipitation` | Weather API | Heavy rain reduces visits | | `store_size` | Metadata | Larger stores have higher sales | Python implementation: python from sklearn.preprocessing import StandardScaler features = ['is_promotion_day', 'days_to_Christmas', 'avg_weekday_sales', 'temp_high', 'precipitation', 'store_size'] X = clean_df.select(features) scaler = StandardScaler() X_scaled = scaler.fit_transform(X.toPandas()) Feature importance will later be extracted via SHAP to satisfy the transparency requirement. ### 3.4 Model Selection and Training We compare three models: 1. **Gradient Boosting Regressor** (XGBoost) – strong performance. 2. **ElasticNet** – linear, easier to explain. 3. **Prophet** – handles seasonality naturally. python from sklearn.model_selection import TimeSeriesSplit, cross_val_score from xgboost import XGBRegressor from sklearn.linear_model import ElasticNet from prophet import Prophet import pandas as pd # Prepare data train_X = X_scaled train_y = clean_df.select("sales_filled").toPandas().values.ravel() # Time‑series split tscv = TimeSeriesSplit(n_splits=5) # XGBoost xgb = XGBRegressor(n_estimators=500, learning_rate=0.05, max_depth=5) scores = cross_val_score(xgb, train_X, train_y, cv=tscv, scoring='neg_root_mean_squared_error') print('XGB RMSE:', -scores.mean()) # ElasticNet enet = ElasticNet(alpha=0.1, l1_ratio=0.5) print('ElasticNet RMSE:', -cross_val_score(enet, train_X, train_y, cv=tscv, scoring='neg_root_mean_squared_error').mean()) # Prophet # Prophet requires a DataFrame with ds, y prophet_df = pd.DataFrame({'ds': clean_df.select('transaction_date').toPandas(), 'y': train_y}) prophet = Prophet(yearly_seasonality=True, weekly_seasonality=True) prophet.fit(prophet_df) print('Prophet RMSE:', -cross_val_score(prophet, prophet_df, cv=tscv, scoring='neg_root_mean_squared_error').mean()) **Result:** XGBoost outperforms the others by ~12 % but is less interpretable. We settle on **ElasticNet** because stakeholders prioritize transparency, and we’ll augment it with SHAP for deeper insight. ### 3.5 Model Interpretation python import shap model = enet.fit(train_X, train_y) explainer = shap.Explainer(model, train_X) shap_values = explainer(train_X) # Plot global importance shap.summary_plot(shap_values, feature_names=features) The plot reveals that `temp_high` and `is_promotion_day` drive most of the variance. These findings are documented in the governance log. ### 3.6 Deployment Pipeline **Airflow DAG** (Python script): python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'analytics', 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG('sales_prediction', default_args=default_args, schedule_interval='@daily', start_date=datetime(2024, 1, 1)) def ingest(**kwargs): # call ingestion functions pass def clean(**kwargs): # call cleaning scripts pass def feature_engineer(**kwargs): # call feature scripts pass def predict(**kwargs): # load model, make predictions, write to Postgres pass def monitor(**kwargs): # check drift, send alerts pass ingest_task = PythonOperator(task_id='ingest', python_callable=ingest, dag=dag) clean_task = PythonOperator(task_id='clean', python_callable=clean, dag=dag) fe_task = PythonOperator(task_id='feature_engineer', python_callable=feature_engineer, dag=dag) predict_task = PythonOperator(task_id='predict', python_callable=predict, dag=dag) monitor_task = PythonOperator(task_id='monitor', python_callable=monitor, dag=dag) ingest_task >> clean_task >> fe_task >> predict_task >> monitor_task **Model versioning** is handled by MLflow. Each run records parameters, metrics, and artifacts. The **Airflow** DAG pulls the latest MLflow artifact before making predictions. ### 3.7 BI Integration Predictions are stored in a dedicated Postgres table: sql CREATE TABLE IF NOT EXISTS store_sales_predictions ( store_id INT, forecast_date DATE, predicted_sales NUMERIC, confidence_low NUMERIC, confidence_high NUMERIC, run_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); Power BI connects to this table via ODBC. The dashboard shows: - **Top 10 stores** by predicted sales. - **Trend line** with actual vs. forecast. - **Promotion impact** overlay. - **Alert panel** that triggers if predictions deviate >10 % from last run. ### 3.8 Monitoring & Governance - **Data drift detection** uses **PCA‑based** Mahalanobis distance on feature distributions. - **Model performance** is tracked via RMSE and MAE weekly. If performance drops >15 % compared to baseline, an alert is sent to the data science Ops channel. - **Audit trail** logs every ingestion, cleaning, and prediction step in a separate `audit_log` table, capturing user, timestamp, and run ID. - **Compliance**: all logs are immutable and stored in an encrypted S3 bucket with lifecycle policies. ## 4. Deliverables 1. **Airflow DAG** – orchestrates ingestion, cleaning, feature engineering, prediction, and monitoring. 2. **MLflow Registry** – holds model versions and lineage. 3. **Postgres prediction table** – feeds the BI dashboard. 4. **Power BI dashboard** – interactive visualization of forecasts and alerts. 5. **Governance report** – automated weekly PDF summarizing data quality, model performance, and audit events. ## 5. Next Steps for Your Team - **Run the DAG** on a staging environment; verify that predictions appear in Postgres. - **Validate the dashboard** against ground truth for the last month. - **Set up alerts** in Airflow for data drift and performance degradation. - **Schedule a quarterly review** to retrain the model with new data and update feature engineering as business changes. By following this workflow, you ensure that every step is reproducible, auditable, and aligned with stakeholder expectations. The solution is not a one‑off; it’s a living system that evolves with your organization’s data and goals.