聊天視窗

數據驅動的投資策略:從數據清洗到模型部署 - 第 6 章

第六章 模型部署與實時交易

發布於 2026-03-05 06:05

# 第六章 模型部署與實時交易 在上一章我們已經完成了模型的設計、訓練與回測,接下來的挑戰是將這些理論與程式碼搬上真實交易環境。\n本章將帶領讀者從 **API 介面**、**容器化**、**訊號生成**、**風險管理** 到 **實時監控**,完整構建一套可運行的量化交易系統。 --- ## 6.1 先備知識:雲端、容器與網路 | 概念 | 重要性 | 建議工具 | |------|--------|----------| | **雲端平台** | 可動態擴充算力,降低本地硬體成本 | AWS、GCP、Azure、DigitalOcean | | **Docker** | 確保環境一致性,易於部署 | Docker, Docker Compose | | **Kubernetes** | 自動化部署、水平擴展與彈性調度 | kubeadm, Helm | | **訊息佇列** | 解耦訊號產生與下單執行 | RabbitMQ, Kafka | | **監控工具** | 監測容器健康、資源使用 | Prometheus + Grafana | > **開發者提醒**:在實務部署前,先在 *sandbox* 環境完成所有流程,確保不會因為連線錯誤或權限不足而導致交易失誤。 --- ## 6.2 建立模型 API 為了讓交易系統能在不同時間點呼叫模型,我們使用 **FastAPI** 建立一個 RESTful API。以下示範如何將前一章訓練好的 Transformer 模型封裝成 API。 python # model_api.py from fastapi import FastAPI from pydantic import BaseModel import joblib import numpy as np app = FastAPI(title="Transformer 交易模型 API") # 假設已經將模型儲存為 joblib model = joblib.load("/opt/models/transformer.pkl") class FeatureInput(BaseModel): features: list[float] # 例如 50 维特征 @app.post("/predict") async def predict(input: FeatureInput): X = np.array(input.features).reshape(1, -1) pred = model.predict(X)[0] return {"prediction": float(pred)} **部署**: Dockerfile # Dockerfile FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY model_api.py . COPY transformer.pkl . EXPOSE 8000 CMD ["uvicorn", "model_api.py:app", "--host", "0.0.0.0", "--port", "8000"] > **小技巧**:將模型載入的路徑改成環境變數,可在多個容器之間共用同一個模型快照。 --- ## 6.3 訊號產生服務 訊號產生服務負責周期性(如每 1 分鐘)抓取最新行情,預測價格走勢,並將「多空」信號推送至訊息佇列。 python # signal_generator.py import asyncio import aiohttp import json from datetime import datetime import pika API_ENDPOINT = "http://model-api:8000/predict" QUEUE_NAME = "trade_signals" async def fetch_market_data(symbol: str): async with aiohttp.ClientSession() as session: async with session.get(f"https://api.example.com/price/{symbol}") as resp: data = await resp.json() return data async def generate_signal(symbol: str): market_data = await fetch_market_data(symbol) # 这里假设我们已经准备好特征列表 features = preprocess(market_data) async with aiohttp.ClientSession() as session: async with session.post(API_ENDPOINT, json={"features": features}) as resp: result = await resp.json() signal = "BUY" if result["prediction"] > 0 else "SELL" payload = { "symbol": symbol, "signal": signal, "price": market_data["price"], "timestamp": datetime.utcnow().isoformat() } publish_to_queue(payload) def publish_to_queue(message: dict): connection = pika.BlockingConnection(pika.ConnectionParameters(host="rabbitmq")) channel = connection.channel() channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.basic_publish( exchange="", routing_key=QUEUE_NAME, body=json.dumps(message), properties=pika.BasicProperties(delivery_mode=2), # 使訊息持久化 ) connection.close() async def main(): symbols = ["AAPL", "MSFT", "GOOGL"] while True: tasks = [generate_signal(sym) for sym in symbols] await asyncio.gather(*tasks) await asyncio.sleep(60) # 每分鐘一次 if __name__ == "__main__": asyncio.run(main()) > **實務注意**:若行情資料來源支援 WebSocket,建議改用 WebSocket 以減少延遲;同時,加入 **熔斷器**(circuit breaker)機制,避免連線失敗時快速重試導致阻塞。 --- ## 6.4 下單執行服務 下單執行服務負責監聽訊息佇列,根據收到的信號執行下單,並在交易完成後將執行情況寫回資料庫。 python # order_executor.py import pika import json import requests API_ORDER = "https://api.exchange.com/v1/order" API_KEY = "YOUR_API_KEY" API_SECRET = "YOUR_API_SECRET" def sign_payload(payload: dict) -> str: # 以 HMAC SHA256 簽名 import hmac, hashlib message = json.dumps(payload, separators=(',', ':'), sort_keys=True).encode() signature = hmac.new(API_SECRET.encode(), message, hashlib.sha256).hexdigest() return signature def execute_order(signal: dict): if signal["signal"] == "BUY": side = "buy" else: side = "sell" payload = { "symbol": signal["symbol"], "side": side, "type": "market", "quantity": 100, # 這裡可用風險控制策略決定 "timestamp": int(signal["timestamp"].timestamp() * 1000) } payload["signature"] = sign_payload(payload) headers = {"Authorization": f"Bearer {API_KEY}"} resp = requests.post(API_ORDER, json=payload, headers=headers) return resp.json() def callback(ch, method, properties, body): signal = json.loads(body) order_resp = execute_order(signal) # 這裡可寫入資料庫,或推送至監控服務 print(f"Executed {signal['symbol']} {signal['signal']} -> {order_resp}") ch.basic_ack(delivery_tag=method.delivery_tag) def main(): connection = pika.BlockingConnection(pika.ConnectionParameters(host="rabbitmq")) channel = connection.channel() channel.queue_declare(queue="trade_signals", durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue="trade_signals", on_message_callback=callback) print("Waiting for signals... Press CTRL+C to exit.") channel.start_consuming() if __name__ == "__main__": main() > **風險控制**:此處的 `quantity` 固定為 100,實務上建議透過 **Kelly‑Criterion** 或 **Portfolio‑Scale** 來動態決定下單手數;同時加入 **停損**(stop‑loss)與 **停利**(take‑profit)條件,避免單筆交易失控。 --- ## 6.5 風險監控與日誌 部署完成後,**監控** 與 **日誌** 變得尤為重要。建議結合以下工具: | 監控項目 | 工具 | 作用 | |----------|------|------| | 容器健康 | Prometheus + cAdvisor | 監測 CPU、記憶體、磁碟 I/O | | API 響應時間 | Grafana + Loki | 可視化 API 延遲、錯誤率 | | 交易成功率 | Prometheus + Alertmanager | 設定閾值,收到低成功率即時通知 | | 資料庫寫入延遲 | Elastic Stack | 日誌集中化,快速定位異常 | yaml # prometheus.yml global: scrape_interval: 15s scrape_configs: - job_name: "docker" static_configs: - targets: ["cadvisor:8080"] - job_name: "api" static_configs: - targets: ["model-api:8000"] > **警示**:若交易成功率下降至 90% 以下,或平均下單延遲超過 300 ms,即可觸發 Alertmanager 發送 Slack / Email 通知,及時排查。 --- ## 6.6 連續訓練與模型更新 市場永遠在變,模型也須隨之迭代。以下提供一個簡易的 **CI/CD** 流程: 1. **資料倉儲**:每日結算後,將歷史行情存入 PostgreSQL 或 Data Lake。 2. **資料管道**:使用 Airflow 或 Prefect 定時觸發 `train_model.py`,重新訓練模型。 3. **模型註冊**:將新模型上傳至 MLflow 或 S3,並標記版本。 4. **自動化部署**:透過 Helm Chart 更新 Docker 容器,並使用 Rolling Update 減少停機時間。 yaml # airflow/dag/train_model.yaml from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def train_and_register(): # 這裡調用前一章的 train_script.py import train_script train_script.main() with DAG( "train_model", start_date=datetime(2024,1,1), schedule_interval="0 2 * * *", # 每日凌晨 2 點 catchup=False, ) as dag: t = PythonOperator(task_id="train", python_callable=train_and_register) > **小建議**:使用 **feature store**(如 Feast)可將特徵抽象化,降低重訓時的資料重處理成本;同時,利用 **A/B 測試**(split traffic)逐步驗證新模型效能,確保不影響交易穩定。 --- ## 6.7 小結 - **API 介面**:FastAPI + Docker,確保模型可隨時被呼叫。 - **訊號與下單**:訊息佇列(RabbitMQ)解耦訊號產生與下單執行。 - **容器化與監控**:Docker + Kubernetes + Prometheus/Grafana,提供彈性擴展與即時監控。 - **風險管理**:動態手數、停損停利、持續風險監控,降低交易風險。 - **持續迭代**:Airflow + MLflow 形成自動化 CI/CD 流程,確保模型隨市場變化迭代。 **下一章** 將深入探討「多策略融合」與「機器學習運營(MLOps)」,協助讀者在實際交易環境中實現多元化風險分散與系統可持續運行。