返回目錄
A
Data Science for the Modern Analyst: From Concepts to Implementation - 第 6 章
Chapter 6: Building and Deploying an End‑to‑End Analytics Solution
發布於 2026-02-26 07:04
# Chapter 6: Building and Deploying an End‑to‑End Analytics Solution
## 1. Recap of the Project Scope
We have a real‑world scenario: a mid‑size retailer wants to **predict next‑quarter sales per store** and feed those predictions into a BI dashboard that guides inventory procurement. The stakeholders have asked for:
1. **Data acquisition** from internal OLAP cubes and an external weather API.
2. **Data cleaning** that handles missing values, outliers, and seasonal drift.
3. **Feature engineering** that captures temporal patterns, store‑level promotions, and weather effects.
4. **Model selection** that balances predictive power with interpretability.
5. **Model deployment** to an automated pipeline that refreshes nightly.
6. **Monitoring** for data drift and model performance.
7. **Governance** that logs every decision, model version, and prediction.
Our goal is to demonstrate how to stitch all of these steps together in a reproducible, auditable workflow.
## 2. Architecture Overview
+----------------+ +----------------+
| External Weather|----->| Data Ingestion |
+----------------+ +----------------+
\
\
+----------------+ +----------------+
| Internal OLAP |----->| Data Cleaning |
+----------------+ +----------------+
\
\
+----------------+
| Feature Engine |
+----------------+
\
\
+----------------+
| Model Training |
+----------------+
\
\
+----------------+
| Model Deployment|
+----------------+
\
\
+----------------+ +----------------+
| Monitoring & |<-----| Governance & |
| Alerts | | Audit Trail |
+----------------+ +----------------+
Each block will be implemented with a combination of **Python**, **SQL**, **Airflow**, and **MLflow**, while the BI layer will be a Power BI dashboard that pulls predictions from a Postgres database.
## 3. Step‑by‑Step Implementation
### 3.1 Data Ingestion
**3.1.1 Weather API Pull**
python
import requests, json, pandas as pd
API_KEY = 'YOUR_WEATHER_KEY'
BASE_URL = 'https://api.weather.com/v3/wx/forecast/daily/5day'
params = {
'geocode': '40.7128,-74.0060', # New York
'format': 'json',
'apiKey': API_KEY
}
response = requests.get(BASE_URL, params=params)
weather_df = pd.json_normalize(response.json()['narrative'])
weather_df['date'] = pd.to_datetime(weather_df['validTime']).dt.date
**3.1.2 OLAP Cube Extraction**
sql
-- Using Snowflake for illustration
SELECT store_id, sales, transaction_date
FROM retail_sales_cube
WHERE transaction_date BETWEEN DATEADD(month, -24, CURRENT_DATE()) AND CURRENT_DATE();
The two data sources are stored in a staging schema for further processing.
### 3.2 Data Cleaning and Quality Checks
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
sales_df = spark.read.format("delta").load("s3://data/warehouse/staging/sales/")
weather_df = spark.read.format("delta").load("s3://data/warehouse/staging/weather/")
# Basic sanity checks
assert sales_df.count() > 0, "Sales data is empty"
assert weather_df.count() > 0, "Weather data is empty"
# Impute missing sales with rolling mean per store
sales_df = sales_df.withColumn(
"sales_filled",
when(col("sales").isNull(),
avg(col("sales")).over(Window.partitionBy("store_id")))
.otherwise(col("sales"))
)
# Remove obvious outliers (> 3 std dev per store)
sales_df = sales_df.withColumn("std_dev", stddev("sales_filled").over(Window.partitionBy("store_id")))
clean_df = sales_df.filter(
(col("sales_filled") <= col("sales_filled").mean() + 3 * col("std_dev")) &
(col("sales_filled") >= col("sales_filled").mean() - 3 * col("std_dev"))
)
All cleaning scripts are packaged into **Data Quality Checks** that run nightly via Airflow.
### 3.3 Feature Engineering
| Feature | Source | Rationale |
|---------|--------|-----------|
| `is_promotion_day` | Internal promo calendar | Captures promotional impact |
| `days_to_Christmas` | Calendar | Seasonal effect |
| `avg_weekday_sales` | Aggregated past 4 weeks | Trend baseline |
| `temp_high` | Weather API | Temperature drives foot traffic |
| `precipitation` | Weather API | Heavy rain reduces visits |
| `store_size` | Metadata | Larger stores have higher sales |
Python implementation:
python
from sklearn.preprocessing import StandardScaler
features = ['is_promotion_day', 'days_to_Christmas', 'avg_weekday_sales',
'temp_high', 'precipitation', 'store_size']
X = clean_df.select(features)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X.toPandas())
Feature importance will later be extracted via SHAP to satisfy the transparency requirement.
### 3.4 Model Selection and Training
We compare three models:
1. **Gradient Boosting Regressor** (XGBoost) – strong performance.
2. **ElasticNet** – linear, easier to explain.
3. **Prophet** – handles seasonality naturally.
python
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from xgboost import XGBRegressor
from sklearn.linear_model import ElasticNet
from prophet import Prophet
import pandas as pd
# Prepare data
train_X = X_scaled
train_y = clean_df.select("sales_filled").toPandas().values.ravel()
# Time‑series split
tscv = TimeSeriesSplit(n_splits=5)
# XGBoost
xgb = XGBRegressor(n_estimators=500, learning_rate=0.05, max_depth=5)
scores = cross_val_score(xgb, train_X, train_y, cv=tscv, scoring='neg_root_mean_squared_error')
print('XGB RMSE:', -scores.mean())
# ElasticNet
enet = ElasticNet(alpha=0.1, l1_ratio=0.5)
print('ElasticNet RMSE:', -cross_val_score(enet, train_X, train_y, cv=tscv, scoring='neg_root_mean_squared_error').mean())
# Prophet
# Prophet requires a DataFrame with ds, y
prophet_df = pd.DataFrame({'ds': clean_df.select('transaction_date').toPandas(),
'y': train_y})
prophet = Prophet(yearly_seasonality=True, weekly_seasonality=True)
prophet.fit(prophet_df)
print('Prophet RMSE:', -cross_val_score(prophet, prophet_df, cv=tscv, scoring='neg_root_mean_squared_error').mean())
**Result:** XGBoost outperforms the others by ~12 % but is less interpretable. We settle on **ElasticNet** because stakeholders prioritize transparency, and we’ll augment it with SHAP for deeper insight.
### 3.5 Model Interpretation
python
import shap
model = enet.fit(train_X, train_y)
explainer = shap.Explainer(model, train_X)
shap_values = explainer(train_X)
# Plot global importance
shap.summary_plot(shap_values, feature_names=features)
The plot reveals that `temp_high` and `is_promotion_day` drive most of the variance. These findings are documented in the governance log.
### 3.6 Deployment Pipeline
**Airflow DAG** (Python script):
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'analytics',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('sales_prediction', default_args=default_args, schedule_interval='@daily', start_date=datetime(2024, 1, 1))
def ingest(**kwargs):
# call ingestion functions
pass
def clean(**kwargs):
# call cleaning scripts
pass
def feature_engineer(**kwargs):
# call feature scripts
pass
def predict(**kwargs):
# load model, make predictions, write to Postgres
pass
def monitor(**kwargs):
# check drift, send alerts
pass
ingest_task = PythonOperator(task_id='ingest', python_callable=ingest, dag=dag)
clean_task = PythonOperator(task_id='clean', python_callable=clean, dag=dag)
fe_task = PythonOperator(task_id='feature_engineer', python_callable=feature_engineer, dag=dag)
predict_task = PythonOperator(task_id='predict', python_callable=predict, dag=dag)
monitor_task = PythonOperator(task_id='monitor', python_callable=monitor, dag=dag)
ingest_task >> clean_task >> fe_task >> predict_task >> monitor_task
**Model versioning** is handled by MLflow. Each run records parameters, metrics, and artifacts. The **Airflow** DAG pulls the latest MLflow artifact before making predictions.
### 3.7 BI Integration
Predictions are stored in a dedicated Postgres table:
sql
CREATE TABLE IF NOT EXISTS store_sales_predictions (
store_id INT,
forecast_date DATE,
predicted_sales NUMERIC,
confidence_low NUMERIC,
confidence_high NUMERIC,
run_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Power BI connects to this table via ODBC. The dashboard shows:
- **Top 10 stores** by predicted sales.
- **Trend line** with actual vs. forecast.
- **Promotion impact** overlay.
- **Alert panel** that triggers if predictions deviate >10 % from last run.
### 3.8 Monitoring & Governance
- **Data drift detection** uses **PCA‑based** Mahalanobis distance on feature distributions.
- **Model performance** is tracked via RMSE and MAE weekly. If performance drops >15 % compared to baseline, an alert is sent to the data science Ops channel.
- **Audit trail** logs every ingestion, cleaning, and prediction step in a separate `audit_log` table, capturing user, timestamp, and run ID.
- **Compliance**: all logs are immutable and stored in an encrypted S3 bucket with lifecycle policies.
## 4. Deliverables
1. **Airflow DAG** – orchestrates ingestion, cleaning, feature engineering, prediction, and monitoring.
2. **MLflow Registry** – holds model versions and lineage.
3. **Postgres prediction table** – feeds the BI dashboard.
4. **Power BI dashboard** – interactive visualization of forecasts and alerts.
5. **Governance report** – automated weekly PDF summarizing data quality, model performance, and audit events.
## 5. Next Steps for Your Team
- **Run the DAG** on a staging environment; verify that predictions appear in Postgres.
- **Validate the dashboard** against ground truth for the last month.
- **Set up alerts** in Airflow for data drift and performance degradation.
- **Schedule a quarterly review** to retrain the model with new data and update feature engineering as business changes.
By following this workflow, you ensure that every step is reproducible, auditable, and aligned with stakeholder expectations. The solution is not a one‑off; it’s a living system that evolves with your organization’s data and goals.