返回目錄
A
數據驅動的投資策略:從數據清洗到模型部署 - 第 6 章
第六章 模型部署與實時交易
發布於 2026-03-05 06:05
# 第六章 模型部署與實時交易
在上一章我們已經完成了模型的設計、訓練與回測,接下來的挑戰是將這些理論與程式碼搬上真實交易環境。\n本章將帶領讀者從 **API 介面**、**容器化**、**訊號生成**、**風險管理** 到 **實時監控**,完整構建一套可運行的量化交易系統。
---
## 6.1 先備知識:雲端、容器與網路
| 概念 | 重要性 | 建議工具 |
|------|--------|----------|
| **雲端平台** | 可動態擴充算力,降低本地硬體成本 | AWS、GCP、Azure、DigitalOcean |
| **Docker** | 確保環境一致性,易於部署 | Docker, Docker Compose |
| **Kubernetes** | 自動化部署、水平擴展與彈性調度 | kubeadm, Helm |
| **訊息佇列** | 解耦訊號產生與下單執行 | RabbitMQ, Kafka |
| **監控工具** | 監測容器健康、資源使用 | Prometheus + Grafana |
> **開發者提醒**:在實務部署前,先在 *sandbox* 環境完成所有流程,確保不會因為連線錯誤或權限不足而導致交易失誤。
---
## 6.2 建立模型 API
為了讓交易系統能在不同時間點呼叫模型,我們使用 **FastAPI** 建立一個 RESTful API。以下示範如何將前一章訓練好的 Transformer 模型封裝成 API。
python
# model_api.py
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np
app = FastAPI(title="Transformer 交易模型 API")
# 假設已經將模型儲存為 joblib
model = joblib.load("/opt/models/transformer.pkl")
class FeatureInput(BaseModel):
features: list[float] # 例如 50 维特征
@app.post("/predict")
async def predict(input: FeatureInput):
X = np.array(input.features).reshape(1, -1)
pred = model.predict(X)[0]
return {"prediction": float(pred)}
**部署**:
Dockerfile
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model_api.py .
COPY transformer.pkl .
EXPOSE 8000
CMD ["uvicorn", "model_api.py:app", "--host", "0.0.0.0", "--port", "8000"]
> **小技巧**:將模型載入的路徑改成環境變數,可在多個容器之間共用同一個模型快照。
---
## 6.3 訊號產生服務
訊號產生服務負責周期性(如每 1 分鐘)抓取最新行情,預測價格走勢,並將「多空」信號推送至訊息佇列。
python
# signal_generator.py
import asyncio
import aiohttp
import json
from datetime import datetime
import pika
API_ENDPOINT = "http://model-api:8000/predict"
QUEUE_NAME = "trade_signals"
async def fetch_market_data(symbol: str):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/price/{symbol}") as resp:
data = await resp.json()
return data
async def generate_signal(symbol: str):
market_data = await fetch_market_data(symbol)
# 这里假设我们已经准备好特征列表
features = preprocess(market_data)
async with aiohttp.ClientSession() as session:
async with session.post(API_ENDPOINT, json={"features": features}) as resp:
result = await resp.json()
signal = "BUY" if result["prediction"] > 0 else "SELL"
payload = {
"symbol": symbol,
"signal": signal,
"price": market_data["price"],
"timestamp": datetime.utcnow().isoformat()
}
publish_to_queue(payload)
def publish_to_queue(message: dict):
connection = pika.BlockingConnection(pika.ConnectionParameters(host="rabbitmq"))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_publish(
exchange="",
routing_key=QUEUE_NAME,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2), # 使訊息持久化
)
connection.close()
async def main():
symbols = ["AAPL", "MSFT", "GOOGL"]
while True:
tasks = [generate_signal(sym) for sym in symbols]
await asyncio.gather(*tasks)
await asyncio.sleep(60) # 每分鐘一次
if __name__ == "__main__":
asyncio.run(main())
> **實務注意**:若行情資料來源支援 WebSocket,建議改用 WebSocket 以減少延遲;同時,加入 **熔斷器**(circuit breaker)機制,避免連線失敗時快速重試導致阻塞。
---
## 6.4 下單執行服務
下單執行服務負責監聽訊息佇列,根據收到的信號執行下單,並在交易完成後將執行情況寫回資料庫。
python
# order_executor.py
import pika
import json
import requests
API_ORDER = "https://api.exchange.com/v1/order"
API_KEY = "YOUR_API_KEY"
API_SECRET = "YOUR_API_SECRET"
def sign_payload(payload: dict) -> str:
# 以 HMAC SHA256 簽名
import hmac, hashlib
message = json.dumps(payload, separators=(',', ':'), sort_keys=True).encode()
signature = hmac.new(API_SECRET.encode(), message, hashlib.sha256).hexdigest()
return signature
def execute_order(signal: dict):
if signal["signal"] == "BUY":
side = "buy"
else:
side = "sell"
payload = {
"symbol": signal["symbol"],
"side": side,
"type": "market",
"quantity": 100, # 這裡可用風險控制策略決定
"timestamp": int(signal["timestamp"].timestamp() * 1000)
}
payload["signature"] = sign_payload(payload)
headers = {"Authorization": f"Bearer {API_KEY}"}
resp = requests.post(API_ORDER, json=payload, headers=headers)
return resp.json()
def callback(ch, method, properties, body):
signal = json.loads(body)
order_resp = execute_order(signal)
# 這裡可寫入資料庫,或推送至監控服務
print(f"Executed {signal['symbol']} {signal['signal']} -> {order_resp}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host="rabbitmq"))
channel = connection.channel()
channel.queue_declare(queue="trade_signals", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="trade_signals", on_message_callback=callback)
print("Waiting for signals... Press CTRL+C to exit.")
channel.start_consuming()
if __name__ == "__main__":
main()
> **風險控制**:此處的 `quantity` 固定為 100,實務上建議透過 **Kelly‑Criterion** 或 **Portfolio‑Scale** 來動態決定下單手數;同時加入 **停損**(stop‑loss)與 **停利**(take‑profit)條件,避免單筆交易失控。
---
## 6.5 風險監控與日誌
部署完成後,**監控** 與 **日誌** 變得尤為重要。建議結合以下工具:
| 監控項目 | 工具 | 作用 |
|----------|------|------|
| 容器健康 | Prometheus + cAdvisor | 監測 CPU、記憶體、磁碟 I/O |
| API 響應時間 | Grafana + Loki | 可視化 API 延遲、錯誤率 |
| 交易成功率 | Prometheus + Alertmanager | 設定閾值,收到低成功率即時通知 |
| 資料庫寫入延遲 | Elastic Stack | 日誌集中化,快速定位異常 |
yaml
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "docker"
static_configs:
- targets: ["cadvisor:8080"]
- job_name: "api"
static_configs:
- targets: ["model-api:8000"]
> **警示**:若交易成功率下降至 90% 以下,或平均下單延遲超過 300 ms,即可觸發 Alertmanager 發送 Slack / Email 通知,及時排查。
---
## 6.6 連續訓練與模型更新
市場永遠在變,模型也須隨之迭代。以下提供一個簡易的 **CI/CD** 流程:
1. **資料倉儲**:每日結算後,將歷史行情存入 PostgreSQL 或 Data Lake。
2. **資料管道**:使用 Airflow 或 Prefect 定時觸發 `train_model.py`,重新訓練模型。
3. **模型註冊**:將新模型上傳至 MLflow 或 S3,並標記版本。
4. **自動化部署**:透過 Helm Chart 更新 Docker 容器,並使用 Rolling Update 減少停機時間。
yaml
# airflow/dag/train_model.yaml
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def train_and_register():
# 這裡調用前一章的 train_script.py
import train_script
train_script.main()
with DAG(
"train_model",
start_date=datetime(2024,1,1),
schedule_interval="0 2 * * *", # 每日凌晨 2 點
catchup=False,
) as dag:
t = PythonOperator(task_id="train", python_callable=train_and_register)
> **小建議**:使用 **feature store**(如 Feast)可將特徵抽象化,降低重訓時的資料重處理成本;同時,利用 **A/B 測試**(split traffic)逐步驗證新模型效能,確保不影響交易穩定。
---
## 6.7 小結
- **API 介面**:FastAPI + Docker,確保模型可隨時被呼叫。
- **訊號與下單**:訊息佇列(RabbitMQ)解耦訊號產生與下單執行。
- **容器化與監控**:Docker + Kubernetes + Prometheus/Grafana,提供彈性擴展與即時監控。
- **風險管理**:動態手數、停損停利、持續風險監控,降低交易風險。
- **持續迭代**:Airflow + MLflow 形成自動化 CI/CD 流程,確保模型隨市場變化迭代。
**下一章** 將深入探討「多策略融合」與「機器學習運營(MLOps)」,協助讀者在實際交易環境中實現多元化風險分散與系統可持續運行。