跳转至

工业级量化模拟盘 — 架构设计文档

编写时间: 2026-03-03 基于: 现有 V0 代码库 + overview/strategy_1.md (Top 10 多因子策略) + overview/strategy_4.md (rev_1d 1d 反转) + overview/strategy_3.md (ofi_14d 单因子动量) 前置文档: docs/overview/01_system_design_v1_v2.md


目录


一、系统设计 — 解耦与高可用

1.1 设计原则

原则 说明 在本系统中的体现
事件驱动 服务间通过异步事件通信,不直接调用 Redis Pub/Sub 事件链,BaseEventService 抽象
策略可插拔 策略逻辑与执行引擎解耦 BaseStrategy 接口,YAML 配置切换
数据管线 / 交易管线分离 数据采集→特征计算 与 信号→风控→下单 独立运行 两条独立事件链,互不阻塞
故障隔离 单服务崩溃不拖垮整个系统 独立进程/容器 + Redis 状态缓存 + 自动重启
幂等恢复 服务重启后能从 Redis 缓存恢复状态 每个服务启动时从 Redis 加载最新状态
配置驱动 所有参数通过 YAML + 环境变量注入 Pydantic Settings + YAML config
单输入单输出 tick 聚合为唯一数据源,下游只消费统一 Bar 契约 Source Adapter 归一化 + bar_normalized 事件

1.2 整体架构图

flowchart TD
    subgraph dataPipeline ["数据管线 (Data Pipeline)"]
        direction TB
        TickerSvc["TickerService"] -->|"写入"| RedisStream["Redis Stream\n(market:trades)"]
        AssetPoolSvc["AssetPoolService"] -->|"写入"| RedisSet["Redis SET\n(quant:asset_pool:*)"]
        RedisStream -->|"消费者组读取"| BarAdapter["BarSourceAdapterService\n(tick→bar聚合 + tick特征 + 统一Bar契约)\n支持 dollar_bar / time_bar"]
        RedisSet -->|"读取 symbol 列表"| BarAdapter
        BarAdapter -->|bar_normalized| FeatureService["Feature Service\n(ret/zscore/日级聚合/融合特征)"]
        FeatureService -->|feature_calculated| outputData((" "))
    end

    subgraph tradingPipeline ["交易管线 (Trading Pipeline)"]
        direction LR
        StrategyService["Strategy Service"] -->|signal_generated| RiskService["Risk Service"]
        RiskService -->|order_approved| OrderService["Order Service"]
        RiskService -.->|risk_rejected| AlertPath((" "))
        OrderService -->|order_executed| AccountService["Account Service"]
    end

    subgraph infra ["基础设施层"]
        Redis["Redis\n(Streams + Pub/Sub + 状态键)"]
        TimescaleDB["TimescaleDB"]
        Monitor["Monitor"]
        APIGateway["API Gateway"]
    end

    outputData --> StrategyService

1.2.1 关键约束

  1. TickerServiceAssetPoolService 只负责往 Redis 写数据,不直接与下游服务通信。
  2. BarSourceAdapterService 从 Redis Stream 消费 tick 数据,支持 dollar_bar(按累计 dollar volume 切分)和 time_bar(按固定时间窗口切分)两种聚合模式,同时计算 tick 微观特征,输出统一 Bar 契约。
  3. 下游 Feature/Strategy/Risk/Order 不感知数据来源和 bar 类型,只消费 bar_normalized

1.3 策略与执行解耦: BaseStrategy 抽象

现有问题: FuturesOrderService 将策略逻辑 (_rank_by_momentum) 和下单逻辑 (_open_positions, _close_all_positions) 耦合在同一个类中。无法独立测试策略、无法复用到回测、无法同时运行多策略。

解耦方案: 引入 BaseStrategy 抽象接口,所有策略(包括 strategy_1 的 Top 10、strategy_4 的 rev_1d、strategy_3 的 ofi_14d)实现相同接口:

class BaseStrategy(ABC):
    """所有策略必须实现此接口。回测和实盘共用同一套代码。"""

    rebalance_interval: int = 1       # 换仓周期(交易日数),R1=1, R14=14
    pool_name: str = "default"        # 使用的资产池名称
    min_history_days: int = 0         # 最少历史天数要求(不足则过滤)

    @abstractmethod
    def generate_signal(
        self,
        features: Dict[str, FeatureVector],
        current_positions: Dict[str, float],
        timestamp: datetime,
    ) -> TargetPortfolio:
        """输入特征 → 输出目标仓位权重"""

    def should_skip_rebalance(
        self, n_candidates: int, cross_section_std: float
    ) -> bool:
        """子类可覆写,定义跳过换仓的条件。"""
        return False

@dataclass
class TargetPortfolio:
    positions: Dict[str, TargetPosition]  # symbol -> weight
    strategy_name: str
    signal_timestamp: datetime
    metadata: Dict[str, Any]  # 可选调试信息

@dataclass
class TargetPosition:
    symbol: str
    side: str         # "long" | "short"
    weight: float     # 占总资金的比例, e.g. +0.0167 (= 1/60)
    signal_value: float
    reason: str       # 可读说明

strategy_4 (rev_1d) 实现示例:

class Rev1dStrategy(BaseStrategy):
    """1d 价格反转策略, 对应 overview/strategy_4.md"""

    def __init__(self, long_n: int = 30, short_n: int = 30):
        self.long_n = long_n
        self.short_n = short_n

    def generate_signal(self, features, current_positions, timestamp):
        signals = {}
        for symbol, feat in features.items():
            z1d = feat.get("zscore_neg_ret_1d")
            if z1d is not None:
                signals[symbol] = z1d

        ranked = sorted(signals.items(), key=lambda x: x[1])
        total = self.long_n + self.short_n
        weight_per_symbol = 1.0 / total

        positions = {}
        for symbol, val in ranked[-self.long_n:]:
            positions[symbol] = TargetPosition(
                symbol=symbol, side="long",
                weight=+weight_per_symbol, signal_value=val,
                reason="rev_1d_long"
            )
        for symbol, val in ranked[:self.short_n]:
            positions[symbol] = TargetPosition(
                symbol=symbol, side="short",
                weight=-weight_per_symbol, signal_value=val,
                reason="rev_1d_short"
            )
        return TargetPortfolio(
            positions=positions,
            strategy_name="rev_1d",
            signal_timestamp=timestamp,
            metadata={"n_candidates": len(signals)},
        )

策略汇总表 (strategy_1 / strategy_4 / strategy_3):

所有策略共享相同的 BaseStrategy 接口,区别仅在 generate_signal 内部的信号构造逻辑、换仓周期和所用资产池:

策略 来源 信号公式 所需特征 换仓 资产池
rev_x_inv_vpin strategy_1 Top 1 mean(z2h,z4h) × clip(1 - 0.3·vpin_z, 0.3, 1.7) ret_2h, ret_4h, tick_vpin_24h R1 default
rev_vpin_filter_t1.0 strategy_1 Top 2/3 zscore_neg_ret_2h 仅保留 zscore_vpin > 1.0 ret_2h, tick_vpin R1 default
rev_jump_filter_t1.0 strategy_1 Top 4/5 zscore_neg_ret_2h 仅保留 zscore_jump > 1.0 ret_2h, tick_jump_ratio R1 default
regime_switch strategy_1 Top 7 高波做趋势, 低波做反转 ret_2h, volatility R1 default
rev_1d strategy_4 -zscore(ret_1d, window=30) ret_1d R1 default
ofi_14d strategy_3 zscore(RM_14(ofi_d)) 截面多空 ofi_d, ofi_14d, dollar_volume_d R14 t50_monthly

1.4 事件流设计 (完整)

flowchart TD
    TickerSvc["TickerService"] -->|"写入 Redis Stream"| RedisStream["market:trades"]
    AssetPoolSvc["AssetPoolService"] -->|"写入 Redis SET"| RedisSet["quant:asset_pool:*"]
    AssetPoolSvc -.->|asset_pool_updated| MonitorAlert

    RedisStream -->|"消费者组"| BarAdapter["BarSourceAdapterService\n(tick→bar + tick特征 + 统一契约)"]
    RedisSet -->|"读取 symbol 列表"| BarAdapter

    BarAdapter -->|bar_normalized| FeatureSvc["FeatureService\n(融合特征 + zscore + 日级聚合)"]
    FeatureSvc -->|feature_calculated| StrategySvc["StrategyService\n(多策略并行)"]
    StrategySvc -->|signal_generated| RiskSvc["RiskService"]
    RiskSvc -->|order_approved| OrderSvc["OrderService\n(差量下单)"]
    RiskSvc -.->|risk_rejected| MonitorAlert
    OrderSvc -->|order_executed| AccountSvc["AccountService"]
    OrderSvc -.->|order_failed| MonitorAlert

1.5 故障隔离与恢复策略

故障场景 隔离机制 恢复策略
单服务崩溃 独立容器, ECS 自动重启 启动时从 Redis 加载 asset_pool / features / positions
Redis 短暂不可用 指数退避重连 (BaseEventService 内置) 重连后重新订阅, 从 Redis 恢复状态
交易所 API 限频 AccountService 统一调用, 其他服务读 Redis 429 → 退避重试, 最多 3 次
特征计算超时 单 symbol 超时不阻塞其他 symbol 跳过该 symbol, 下轮重新计算
下单失败 OrderService 重试 3 次 + dry-run 模式 记录失败订单, emit order_failed 供 Monitor 告警
策略信号异常 RiskService 拦截 (最大仓位/最大敞口) risk_rejected 事件 → 不下单 + 告警

1.6 数据一致性保障

问题: 事件驱动架构中, 服务重启可能丢失 Redis Pub/Sub 消息 (Pub/Sub 无持久化)。

解决方案 — 双保险机制:

  1. 事件触发 (主路径): 正常运行时通过 Pub/Sub 实时触发
  2. 定时兜底 (备用路径): 每个服务维护 cron 定时器,即使错过事件也能按时执行
  3. AssetPoolService: 每 24h 更新
  4. StrategyService: 每日 23:59 UTC 触发 (对应 strategy_4 的 same_day 模式)
  5. OrderService: 兜底检查目标仓位 vs 实际仓位偏差

  6. 关键状态持久化到 Redis:

  7. quant:state:{service_name}:last_run — 上次执行时间戳
  8. quant:state:{service_name}:status — running / idle / error
  9. 启动时检查 last_run, 若距今超过阈值则立即补执行

二、目录结构与服务职责

2.1 目标目录树

基于现有 src/quant_trading/ 结构扩展。已有模块保留,新增模块用 标注。

quant_trading_backend/
├── yamls/                                # YAML 业务配置 (现有)
│   ├── ticker.yaml                       # TickerService 配置
│   ├── account.yaml                      # AccountService 配置
│   ├── exchange.yaml                     # 交易所通用配置
│   ├── strategy_config.yaml              ★ 策略选择 + 参数
│   ├── risk_config.yaml                  ★ 风控规则
│   ├── monitor_config.yaml               ★ 告警阈值 + 渠道
│   └── bar_source_config.yaml            ★ BarSourceAdapterService 配置 (bar 类型 + 阈值)
├── docker/
│   ├── Dockerfile
│   ├── docker-compose.yml
│   └── docker-compose.local.yml          # 本地开发
├── infra/                                ★ Terraform IaC
│   ├── main.tf
│   ├── variables.tf
│   ├── outputs.tf
│   ├── terraform.tfvars.example
│   └── modules/
│       ├── vpc/                          ★ 网络
│       ├── ecs/                          ★ ECS 集群 + 服务
│       ├── redis/                        ★ ElastiCache
│       ├── rds/                          ★ TimescaleDB on RDS
│       ├── ecr/                          ★ 容器镜像仓库
│       ├── secrets/                      ★ Secrets Manager
│       ├── monitoring/                   ★ CloudWatch + SNS
│       └── iam/                          ★ IAM 角色
├── src/quant_trading/
│   ├── app/
│   │   ├── cli.py                        # Typer CLI 入口
│   │   └── commands/
│   │       ├── _utils.py
│   │       ├── account_service/          # CLI: account-service run
│   │       ├── ticker_service/           # CLI: ticker-service run
│   │       ├── configs/                  # CLI: configs list/generate/load
│   │       ├── strategy_service/         ★ CLI: strategy-service run
│   │       ├── risk_service/             ★ CLI: risk-service run
│   │       └── monitor_service/          ★ CLI: monitor-service run
│   │
│   ├── common/
│   │   ├── configs/
│   │   │   ├── paths.py                  # 配置路径管理
│   │   │   ├── envs/                     # Pydantic 环境配置
│   │   │   │   ├── base.py
│   │   │   │   ├── loader.py
│   │   │   │   └── services/             # account_env, database_env, exchange_env, global_env, redis_env
│   │   │   └── yamls/                    # YAML 配置加载
│   │   │       ├── base.py
│   │   │       ├── loader.py
│   │   │       └── services/             # account_yaml, exchange_yaml, ticker_yaml
│   │   ├── redis/
│   │   │   ├── base_service.py           # BaseEventService
│   │   │   ├── channels.py               # Channels, RedisKeys
│   │   │   ├── client.py                 # RedisClient
│   │   │   └── pubsub.py                 # RedisPubSub, EventMessage
│   │   └── utils/
│   │       ├── config_loader.py
│   │       └── loggers.py
│   │
│   ├── data/
│   │   └── db/                           # SQLAlchemy + TimescaleDB
│   │       ├── database.py               # DatabaseManager
│   │       ├── models/                   # account_model, symbol_ohlcv_model, tick_model
│   │       └── repositories/             # base, account, symbol_ohlcv
│   │
│   ├── services/
│   │   ├── account_service/              # [现有] 账户状态同步
│   │   │   └── service.py                # AccountService
│   │   │
│   │   ├── ticker_service/               # [现有] 行情采集,只负责写入 Redis Stream
│   │   │   └── service.py                # TickerService
│   │   │
│   │   ├── feature_service/              # [现有] 特征计算
│   │   │   ├── feature_calculator.py     # 基础技术指标
│   │   │   ├── zscore_calculator.py      # Z-Score 标准化
│   │   │   ├── intraday_features.py      # 日内特征
│   │   │   └── service.py                # FeatureService
│   │   │
│   │   │
│   │   ├── bar_source_adapter_service/   ★ [新增] Bar 聚合 + tick 特征 + 统一契约
│   │   │   ├── __init__.py
│   │   │   ├── dollar_bar_aggregator.py  # Dollar Bar 自适应阈值 + 聚合
│   │   │   ├── time_bar_aggregator.py    # Time Bar 固定窗口聚合
│   │   │   ├── tick_features.py          # VPIN / Kyle's Lambda 等 9 个微观特征
│   │   │   ├── threshold_tracker.py      # auto_K50_ema 阈值管理
│   │   │   └── service.py               # BarSourceAdapterService 主服务 (消费 Redis Stream)
│   │   │
│   │   ├── strategy_service/             ★ [新增] 策略引擎
│   │   │   ├── __init__.py
│   │   │   ├── base_strategy.py          # BaseStrategy 抽象
│   │   │   ├── registry.py               # 策略注册表 (name → class)
│   │   │   ├── strategies/
│   │   │   │   ├── __init__.py
│   │   │   │   ├── rev_1d.py             # strategy_4: 1d 反转
│   │   │   │   ├── rev_x_inv_vpin.py     # strategy_1 Top 1
│   │   │   │   ├── rev_vpin_filter.py    # strategy_1 Top 2/3
│   │   │   │   ├── rev_jump_filter.py    # strategy_1 Top 4/5
│   │   │   │   ├── rev_noise_boost.py    # strategy_1 Top 6
│   │   │   │   ├── regime_switch.py      # strategy_1 Top 7
│   │   │   │   ├── vol_weighted_blend.py # strategy_1 Top 8
│   │   │   │   ├── quad_blend.py         # strategy_1 Top 9
│   │   │   │   └── ofi_14d.py            ★ strategy_3: OFI 14d 动量
│   │   │   ├── ensemble.py               ★ 多策略集成
│   │   │   └── service.py
│   │   │
│   │   ├── risk_service/                 ★ [新增] 风控
│   │   │   ├── __init__.py
│   │   │   ├── risk_rules.py             # 风控规则引擎
│   │   │   └── service.py
│   │   │
│   │   ├── order_service/                ★ [新增] 纯执行层
│   │   │   ├── __init__.py
│   │   │   ├── position_differ.py        # 目标仓位 vs 当前仓位 差量计算
│   │   │   └── order_executor.py         # 下单 + 重试 + 精度处理
│   │   │
│   │   ├── monitor_service/              ★ [新增] 监控告警
│   │   │   ├── __init__.py
│   │   │   ├── health_checker.py         # 心跳检查
│   │   │   ├── alerter.py                # Webhook 告警 (Telegram)
│   │   │   └── service.py
│   │   │
│   │   └── backtest_engine/              ★ [新增] 回测引擎
│   │       ├── __init__.py
│   │       ├── data_loader.py            # 加载历史数据
│   │       ├── simulator.py              # 回测核心循环
│   │       ├── fee_model.py              # 手续费模型
│   │       ├── metrics.py                # Sharpe / MDD / Calmar
│   │       └── report.py                 # 报告生成
│   │
│   ├── controller/
│   │   ├── gateway/                      # FastAPI 网关
│   │   │   ├── base.py                   # ServiceSpec, BaseServiceApp, ServiceApp
│   │   │   └── specs.py                  # 各服务 App 定义
│   │   ├── routes/
│   │   │   └── account.py                # /api/v1/account/* (现有)
│   │   └── schemas/
│   │       ├── common.py                 # ApiResponse, PaginationParams
│   │       └── account.py                # BalanceResponse, PositionItem 等
│   │
│   └── domain/                           # 领域模型
│       ├── portfolio.py                  # TargetPortfolio / TargetPosition
│       └── features.py                   # FeatureVector 标准接口
└── tests/
    ├── conftest.py                       # mock_redis_client 等 fixture
    ├── services/
    │   └── test_ticker_service.py
    ├── common/configs/
    │   ├── test_yaml_loader.py
    │   └── test_env_loader.py
    ├── unit/                             ★ 待补充
    │   ├── test_strategies/
    │   │   ├── test_rev_1d.py            ★
    │   │   └── test_ofi_14d.py           ★
    │   └── test_risk_rules.py            ★
    └── integration/                      ★ 待补充
        └── test_event_flow.py            ★

2.2 各服务详细职责

2.2.1 Asset Pool Service (扩展 — 多池支持)

属性
现有文件 services/asset_pool_service/service.py
订阅事件
发布事件 asset_pool_updated(仅供 Monitor 监听,下游 BarSourceAdapterService 不订阅此事件,直接读 Redis SET)
职责 只负责筛选资产池并写入 Redis SET,不与下游服务直接通信
核心逻辑 支持多种 pool profile,按不同指标/周期筛选合约池,结果写入 Redis SET
调度 default 池每 24h 更新; t50_monthly 池每月 1 日更新
Redis 键 quant:asset_pool:{exchange} (Set, default), quant:asset_pool:{exchange}:t50_monthly (Set)

多池配置 (asset_pool_config.yaml 扩展):

pools:
  default:
    top_k: 100
    metric: usdt_volume_30d
    update_interval: 24h
  t50_monthly:
    top_k: 50
    metric: dollar_volume_monthly   # 按月统计 sum(dollar_volume_d)
    lag: 1                          # 使用上月数据 (1 个月滞后)
    update_interval: monthly        # 每月 1 日更新
    min_history_days: 60            # 历史不足 60 天的标的排除

月度流动性池 T50 计算逻辑 (对应 strategy_3.md Section 4.1):

def compute_monthly_pool(
    daily_dollar_volumes: Dict[str, Dict[date, float]],
    target_month: date,
    top_k: int = 50,
    min_history_days: int = 60,
) -> List[str]:
    """
    按上月 dollar_volume 总和降序取 Top-K,
    同时过滤历史不足 min_history_days 的标的。
    """
    monthly_totals = {}
    for symbol, date_to_dv in daily_dollar_volumes.items():
        month_days = [d for d in date_to_dv if d.month == target_month.month and d.year == target_month.year]
        if len(date_to_dv) < min_history_days:
            continue
        monthly_totals[symbol] = sum(date_to_dv[d] for d in month_days)
    ranked = sorted(monthly_totals.items(), key=lambda x: x[1], reverse=True)
    return [sym for sym, _ in ranked[:top_k]]

2.2.2 BarSourceAdapterService (新增 — 合并原 DataIngestion + DollarBar + TickFeature)

BarSourceAdapterService 是数据管线的核心聚合服务,从 Redis 拉取原始 tick 数据和资产池信息,完成 bar 聚合、tick 微观特征计算和统一 Bar 契约输出。

属性
新增文件 services/bar_source_adapter_service/
数据输入 Redis Stream market:trades(消费者组读取,由 TickerService 写入);Redis SET quant:asset_pool:*(确定需要聚合的 symbol 列表)
发布事件 bar_normalized
Bar 类型 支持 dollar_bar(按累计 dollar volume 切分,阈值 auto_K50_ema 自适应)和 time_bar(按固定时间窗口切分,如 1m/5m/15m),通过配置切换
tick 微观特征 在每根 bar 内计算 9 个特征:tick_vpin, tick_toxicity_run_mean/max/ratio, tick_kyle_lambda, tick_burstiness, tick_jump_ratio, tick_whale_imbalance/impact
输出 统一 Bar 契约(OHLCV + tick 微观特征 + buy_sell_imbalance 等),下游 FeatureService 无需感知 bar 类型
性能要求 使用 NumPy/Polars 向量化,避免 Python 循环
容量 488 symbols × ~250 ticks/s (峰值) ≈ 120K ticks/s
冷启动 dollar_bar 模式:预加载最近 7 天 tick 生成种子 bar,确定初始阈值

消费者组读取示例:

async def consume_trades(self):
    stream_key = "market:trades"
    group = "bar_source_consumers"
    consumer = f"bar_source_{self.instance_id}"

    try:
        await self.redis.xgroup_create(stream_key, group, id="0", mkstream=True)
    except Exception:
        pass

    while self._running:
        messages = await self.redis.xreadgroup(
            group, consumer,
            streams={stream_key: ">"},
            count=100,
            block=1000,
        )
        for stream, entries in messages:
            for msg_id, trade_data in entries:
                symbol = trade_data["symbol"]
                if symbol in self._active_symbols:
                    self._process_trade(symbol, trade_data)
                await self.redis.xack(stream_key, group, msg_id)

配置 (bar_source_config.yaml):

bar_type: dollar_bar           # dollar_bar | time_bar

dollar_bar:
  initial_threshold: 50000     # 初始 dollar volume 阈值 (USD)
  ema_window: 50               # auto_K50_ema 自适应窗口
  min_ticks_per_bar: 10        # 最少 tick 数

time_bar:
  interval: 5m                 # 时间窗口 (1m / 5m / 15m / 1h)

tick_features:
  rolling_window: 50           # rolling bar 窗口大小
  enabled: true                # 是否计算 tick 微观特征

output_columns: 23             # 统一输出列数

2.2.3 Feature Service (现有 + 扩展)

属性
现有文件 services/feature_service/service.py
订阅事件 bar_normalized(由 BarSourceAdapterService 发布)
发布事件 feature_calculated
现有功能 基础技术指标 (SMA/EMA 等)、日内收益率 (intraday_features.py)、截面 Z-Score (zscore_calculator.py)
扩展功能 日级聚合 (daily_aggregator.py ★)、多日滚动特征 (rolling_features.py ★)
Redis 键 quant:features:latest:{symbol}(保持), quant:features:{symbol}:{timestamp}(保持), quant:features:daily:{symbol}

日级聚合层 (daily_aggregator.py) — 对应 strategy_3.md Section 2.3/3.2:

Bar 级特征在每日 EOD (23:59 UTC) 聚合为日级特征,供中低频策略(如 ofi_14d)消费:

日级字段 聚合公式 来源
close_d 当日最后一根 bar 的 close bar OHLCV
ret_d close_d / close_{d-1} - 1 close_d
ofi_d 当日所有 bar buy_sell_imbalance 的均值 Dollar Bar 23 列
dollar_volume_d 当日所有 bar dollar_volume 之和 Dollar Bar 23 列
def aggregate_daily_features(
    bars_today: List[BarNormalized],
    prev_close: float,
) -> DailyFeatures:
    """将当日所有 bar 聚合为日级特征。"""
    close_d = bars_today[-1].close
    ret_d = close_d / prev_close - 1 if prev_close > 0 else float("nan")
    ofi_d = np.nanmean([b.buy_sell_imbalance for b in bars_today])
    dollar_volume_d = sum(b.dollar_volume for b in bars_today)
    return DailyFeatures(close_d=close_d, ret_d=ret_d, ofi_d=ofi_d, dollar_volume_d=dollar_volume_d)

多日滚动特征 (rolling_features.py) — 对应 strategy_3.md Section 3.3:

def rolling_mean(values: List[float], window: int = 14) -> float:
    """
    滚动均值,有效值条件: count_valid >= max(floor(w/2), 3)。
    """
    recent = values[-window:]
    valid = [v for v in recent if not math.isnan(v)]
    if len(valid) < max(window // 2, 3):
        return float("nan")
    return sum(valid) / len(valid)

ofi_14d 因子由 rolling_mean(ofi_d_series, window=14) 计算得出。

Z-Score 计算 (关键):

def calculate_zscore(values: List[float], window: int = 30, negate: bool = False) -> float:
    """
    时序 Z-Score, shift(1) 避免 look-ahead bias。
    negate=True 用于反转信号: 价格跌 → zscore 正 → 做多。
    """
    if len(values) < window + 1:
        return float("nan")
    # shift(1): 只用截至前一根 bar 的数据
    historical = values[-(window + 1):-1]
    current = values[-1]
    mean = sum(historical) / len(historical)
    std = (sum((x - mean) ** 2 for x in historical) / len(historical)) ** 0.5
    if std < 1e-10:
        return 0.0
    zscore = (current - mean) / std
    return -zscore if negate else zscore

2.2.7 Strategy Service (新增)

属性
新增文件 services/strategy_service/
订阅事件 feature_calculated
发布事件 signal_generated
核心逻辑 加载配置中指定的策略, 调用 generate_signal(), 输出 TargetPortfolio
多策略支持 StrategyRegistry 注册表 + ensemble 加权聚合
调度 按策略独立换仓周期调度: R1 策略每日 23:59 UTC 触发, R14 策略每 14 个交易日触发
资产池路由 根据策略配置的 pool_name 从对应的 Redis 池键读取候选标的

策略注册表设计:

class StrategyRegistry:
    """通过 YAML 配置动态加载策略。"""
    _registry: Dict[str, Type[BaseStrategy]] = {}

    @classmethod
    def register(cls, name: str):
        def decorator(strategy_cls):
            cls._registry[name] = strategy_cls
            return strategy_cls
        return decorator

    @classmethod
    def create(cls, name: str, **kwargs) -> BaseStrategy:
        return cls._registry[name](**kwargs)

# 使用:
@StrategyRegistry.register("rev_1d")
class Rev1dStrategy(BaseStrategy): ...

@StrategyRegistry.register("rev_x_inv_vpin")
class RevXInvVpinStrategy(BaseStrategy): ...

@StrategyRegistry.register("ofi_14d")
class Ofi14dStrategy(BaseStrategy): ...

strategy_3 (ofi_14d) 实现示例:

@StrategyRegistry.register("ofi_14d")
class Ofi14dStrategy(BaseStrategy):
    """OFI 14 日动量策略, 对应 overview/strategy_3.md"""

    rebalance_interval = 14
    pool_name = "t50_monthly"
    min_history_days = 60

    def __init__(self, long_n: int = 15, short_n: int = 15, min_candidates: int = 30):
        self.long_n = long_n
        self.short_n = short_n
        self.min_candidates = min_candidates

    def generate_signal(self, features, current_positions, timestamp):
        signals = {}
        for symbol, feat in features.items():
            ofi_14d_value = feat.get("ofi_14d")
            if ofi_14d_value is not None and not math.isnan(ofi_14d_value):
                signals[symbol] = ofi_14d_value

        if self.should_skip_rebalance(len(signals), self._cross_section_std(signals)):
            return TargetPortfolio(
                positions={}, strategy_name="ofi_14d",
                signal_timestamp=timestamp, metadata={"skipped": True},
            )

        ranked = sorted(signals.items(), key=lambda x: x[1])
        weight_long = 0.5 / self.long_n
        weight_short = 0.5 / self.short_n

        positions = {}
        for symbol, val in ranked[-self.long_n:]:
            positions[symbol] = TargetPosition(
                symbol=symbol, side="long",
                weight=+weight_long, signal_value=val, reason="ofi_14d_long",
            )
        for symbol, val in ranked[:self.short_n]:
            positions[symbol] = TargetPosition(
                symbol=symbol, side="short",
                weight=-weight_short, signal_value=val, reason="ofi_14d_short",
            )
        return TargetPortfolio(
            positions=positions, strategy_name="ofi_14d",
            signal_timestamp=timestamp, metadata={"n_candidates": len(signals)},
        )

    def should_skip_rebalance(self, n_candidates: int, cross_section_std: float) -> bool:
        if n_candidates < self.min_candidates:
            return True
        if cross_section_std < 1e-10:
            return True
        return False

    @staticmethod
    def _cross_section_std(signals: Dict[str, float]) -> float:
        if not signals:
            return 0.0
        values = list(signals.values())
        mean_val = sum(values) / len(values)
        return (sum((v - mean_val) ** 2 for v in values) / len(values)) ** 0.5

StrategyService 灵活换仓调度:

class StrategyService:
    def on_feature_calculated(self, features, timestamp):
        trading_day_index = self._get_trading_day_index(timestamp)

        for strategy_config in self.active_strategies:
            strategy = StrategyRegistry.create(strategy_config.name, **strategy_config.parameters)
            rebalance_interval = strategy.rebalance_interval

            if trading_day_index > 0 and trading_day_index % rebalance_interval != 0:
                continue

            pool_symbols = self._load_pool(strategy.pool_name)
            filtered_features = {s: features[s] for s in pool_symbols if s in features}
            target = strategy.generate_signal(filtered_features, self.current_positions, timestamp)
            self.emit_signal(target)

多策略集成配置 (strategy_config.yaml):

# 单策略模式
mode: single
active_strategy: rev_1d
parameters:
  long_n: 30
  short_n: 30
  rebalance_days: 1             # R1 = 每日换仓

# 或: 多策略集成模式
# mode: ensemble
# ensemble_method: weighted_average
# strategies:
#   - name: rev_x_inv_vpin
#     weight: 0.3
#     pool: default
#     rebalance_days: 1
#     parameters: { long_n: 10, short_n: 10 }
#   - name: rev_1d
#     weight: 0.3
#     pool: default
#     rebalance_days: 1
#     parameters: { long_n: 30, short_n: 30 }
#   - name: ofi_14d
#     weight: 0.4
#     pool: t50_monthly
#     rebalance_days: 14
#     parameters:
#       long_n: 15
#       short_n: 15
#       min_candidates: 30
#       min_history_days: 60

2.2.8 Risk Service (新增)

属性
新增文件 services/risk_service/
订阅事件 signal_generated
发布事件 order_approvedrisk_rejected
核心逻辑 对 TargetPortfolio 逐条规则检查, 全部通过才 emit order_approved

风控规则配置 (risk_config.yaml):

max_position_pct: 0.15        # 单币种最大仓位占比
max_total_exposure: 1.0       # 最大总敞口 (|多头| + |空头|)
max_drawdown_halt: 0.30       # 触发暂停交易的最大回撤
min_balance_reserve: 100      # 最低保留余额 (USDT)
max_order_value: 5000         # 单笔订单最大金额 (USDT)
max_daily_turnover: 3.0       # 每日最大换手率
blacklist: []                 # 禁止交易的币种

2.2.9 Order Service (重构)

属性
现有文件 services/order_service/futures_service.py
变更 移除策略逻辑 (_rank_by_momentum), 只保留下单执行
订阅事件 order_approved
发布事件 order_executed / order_failed
新增 position_differ.py (差量计算), order_executor.py (重试 + 精度)
Dry-run 配置 dry_run: true 时只记录不实际下单

差量计算逻辑:

target_positions (来自 StrategyService)
    - current_positions (来自 AccountService via Redis)
    = delta_orders (需要执行的订单列表)

对每个 delta_order:
    if delta > 0: buy
    if delta < 0: sell
    if delta ≈ 0: skip (低于最小下单量)

2.2.10 Account Service (新增)

属性
新增文件 services/account_service/
核心逻辑 定时轮询 (30s) 交易所 API, 同步余额/仓位/挂单到 Redis
发布事件 account_updated
Redis 键 quant:account:balance, quant:account:positions, quant:account:orders
设计要点 统一调用入口, 避免多个服务同时调用交易所 API 导致限频

2.2.11 Monitor Service (新增)

属性
新增文件 services/monitor_service/
订阅事件 所有事件 (用于延迟检测)
核心功能 心跳检查、数据延迟告警、异常仓位检测、资金变动告警
告警渠道 Webhook → Telegram / DingTalk

心跳机制:

每个服务定期写入 Redis:
    key = quant:heartbeat:{service_name}
    value = { timestamp, status, metrics }
    TTL = 120s

Monitor Service 每 30s 扫描:
    若某服务 heartbeat 过期 → 告警

2.2.12 Backtest Engine (新增)

属性
新增文件 services/backtest_engine/
核心原则 与实盘共用 BaseStrategy 代码, 策略无需感知回测 vs 实盘
数据输入 从 Parquet / CSV / Redis 加载历史 kline 或 dollar bar
输出指标 Sharpe, MDD, Calmar, 年化收益率, 换手率, NAV 曲线
CLI main backtest run --strategy rev_1d --start 2024-01-01 --end 2025-01-01

回测核心循环 (对应 overview/strategy_1.md Stage 6):

for date in all_trading_dates:
    # 1. 计算当日 PnL (用旧权重)
    daily_pnl = sum(weight[sym] * daily_return[sym] for sym in portfolio)

    # 2. 在换仓日, 调用 strategy.generate_signal()
    if is_rebalance_day(date, rebalance_interval):
        features = feature_store.get_features(date)
        target = strategy.generate_signal(features, current_weights, date)
        turnover = sum(abs(target[sym] - current_weights[sym]) for sym in all_symbols)
        daily_pnl -= turnover * fee_rate
        current_weights = target

    # 3. 更新 NAV
    nav_curve.append(nav_curve[-1] * (1 + daily_pnl))

2.3 Redis 通道 / 键扩展清单

Pub/Sub 通道 (在 channels.py 中扩展):

通道 发布者 订阅者
quant:asset_pool_updated AssetPoolService MonitorService(BarSourceAdapterService 不订阅,直接读 Redis SET)
quant:bar_normalized BarSourceAdapterService FeatureService
quant:feature_calculated FeatureService StrategyService
quant:signal_generated StrategyService RiskService
quant:order_approved RiskService OrderService
quant:risk_rejected RiskService MonitorService
quant:order_executed OrderService AccountService, MonitorService
quant:order_failed OrderService MonitorService
quant:order_rebalanced OrderService MonitorService
quant:account_updated AccountService RiskService

Redis 键:

键模式 类型 用途
market:trades Stream TickerService 写入 tick 数据,BarSourceAdapterService 消费者组读取 (MAXLEN ~100000)
market:tickers Stream 聚合行情快照
market:ohlcv Stream K 线数据
quant:bar:normalized:{symbol}:{timeframe} String (JSON) 统一 Bar 快照 (BarSourceAdapterService 输出)
quant:asset_pool:{exchange}:t50_monthly Set ★ T50 月度流动性池
quant:features:daily:{symbol} String (JSON) ★ 日级聚合特征缓存 (ofi_d, dollar_volume_d 等)
quant:features:rolling:{symbol} String (JSON) ★ 多日滚动特征缓存 (ofi_14d 等)
quant:kline:{symbol}:{timeframe} String (JSON) Kline 数据缓存
quant:account:balance String (JSON) 账户余额快照
quant:account:positions String (JSON) 当前持仓快照
quant:account:orders String (JSON) 活跃挂单快照
quant:positions String (JSON) 策略当前仓位
quant:portfolio:snapshot String (JSON) NAV / PnL / 回撤等组合指标快照
quant:signal:latest String (JSON) 最新策略信号
quant:heartbeat:{service} String (JSON + TTL) 服务心跳
quant:state:{service}:last_run String 上次执行时间戳
quant:state:{service}:status String 服务状态 (running / idle / error)
quant:risk:status String 风控总体状态
quant:risk:disabled_symbols String 风控禁止交易的 symbols
quant:system:emergency_stop String 全局紧急停止开关

三、AWS 部署方案

3.1 是否使用 Terraform?

结论: 是, 推荐使用 Terraform 管理所有 AWS 基础设施。

维度 不用 Terraform (现状) 使用 Terraform
环境一致性 手动创建, 易出错 代码即基础设施, 可重复部署
多环境支持 难以维护 staging / prod terraform workspace 切换
变更追踪 无法审计 Git 版本控制 + terraform plan 预览
灾难恢复 需要手工重建 terraform apply 一键恢复
团队协作 口口相传 代码 Review + CI 自动化
学习成本 中等 (一次性投入)

当前项目已有 ECS Fargate + ECR + GitHub Actions CI/CD, Terraform 可以将这些资源编码化。

3.2 是否使用多节点?

结论: 多服务、单集群、按需扩缩。

不建议 "多节点" (多 EC2 集群), 而是继续使用 ECS Fargate — 每个服务是独立的 ECS Service (独立 Task Definition), 共享一个 ECS Cluster。Fargate 自动管理底层节点, 无需手动运维 EC2。

各服务资源配置建议:

服务 CPU Memory 实例数 说明
Ticker Service 1024 2 GB 1 高吞吐, WebSocket 连接池, 写入 Redis Stream
Asset Pool Service 256 512 MB 1 低频, 每 24h 执行一次, 写入 Redis SET
BarSourceAdapterService 1024 2 GB 1 从 Redis Stream 消费 tick, bar 聚合 + tick 特征计算
Feature Service 512 1 GB 1
Strategy Service 256 512 MB 1 轻量计算
Risk Service 256 512 MB 1 规则检查
Order Service 256 512 MB 1 API 调用
Account Service 256 512 MB 1 定时轮询
Monitor Service 256 512 MB 1 心跳检查
API Gateway 512 1 GB 1-2 按访问量扩
总计 10-11 Fargate 按使用计费

月费估算 (ap-southeast-1):

ECS Fargate:
  10 tasks × 平均 0.5 vCPU × 1 GB × 730h ≈ $30-45/月

ElastiCache Redis (cache.t3.small):
  ≈ $25/月

RDS TimescaleDB (db.t3.small):
  ≈ $30/月 (含 20GB GP3 存储)

NAT Gateway:
  ≈ $35/月 (固定) + 数据传输费

ECR:
  < $5/月

CloudWatch:
  ≈ $10/月

总计: ≈ $140-160/月

3.3 网络架构

flowchart TD
    subgraph VPC ["VPC (10.0.0.0/16)"]
        subgraph publicSubnet ["Public Subnet (10.0.1.0/24)"]
            NAT["NAT Gateway"]
            ALB["ALB (可选)"]
        end
        subgraph privateSubnet ["Private Subnet (10.0.10.0/24)"]
            ECS1["ECS Fargate\nService 1"]
            ECS2["ECS Fargate\nService 2"]
            ECSn["... 所有服务"]
        end
        subgraph dataSubnet ["Data Subnet (10.0.20.0/24)"]
            RedisNode["ElastiCache\nRedis"]
            RDS["RDS\nTimescaleDB"]
        end
    end

    Internet(("Internet")) --> ALB
    ALB -->|8000| ECS1
    ECS1 --> RedisNode
    ECS1 --> RDS
    ECS1 -->|via NAT| Internet
    ECS2 --> RedisNode
    ECS2 --> RDS

安全组规则:

入站 出站
ECS Services 仅 ALB (8000 端口) Redis (6379), TimescaleDB (5432), 互联网 (via NAT)
Redis 仅 ECS SG (6379)
TimescaleDB 仅 ECS SG (5432)

3.4 密钥管理

现有问题: API Key 和 Private Key 通过 .env 文件和环境变量传递, 存储在 configs/ 目录中。

生产方案: AWS Secrets Manager

secrets/
  quant/exchange/api_key         → EXCHANGE_API_KEY
  quant/exchange/private_key     → EXCHANGE_PRIVATE_KEY (PEM 内容)
  quant/db/password              → DB_PASSWORD
  quant/telegram/bot_token       → TELEGRAM_BOT_TOKEN (告警用)

ECS Task Definition 通过 secrets 字段引用:

{
  "containerDefinitions": [{
    "secrets": [
      { "name": "EXCHANGE_API_KEY", "valueFrom": "arn:aws:secretsmanager:...:quant/exchange/api_key" },
      { "name": "EXCHANGE_PRIVATE_KEY", "valueFrom": "arn:aws:secretsmanager:...:quant/exchange/private_key" }
    ]
  }]
}

3.5 CI/CD 流程 (扩展)

现有的 GitHub Actions workflow 只部署单容器。扩展为多服务部署(每个服务独立的 ECS Task Definition,但共用同一个 Docker 镜像,区别仅在 command 参数不同):

flowchart LR
    Push["main branch push"] --> GHA["GitHub Actions"]
    GHA --> Build["Build Docker Image\n+ Push to ECR"]
    Build --> D1["ticker-service\n(ticker run)"]
    Build --> D2["asset-pool-service\n(trading asset-pool)"]
    Build --> D3["bar-source-adapter\n(bar-source run)"]
    Build --> D4["feature-service\n(trading feature)"]
    Build --> D5["strategy-service\n(trading strategy)"]
    Build --> D6["risk-service\n(trading risk)"]
    Build --> D7["order-service\n(trading futures)"]
    Build --> D8["account-service\n(trading account --daemon)"]
    Build --> D9["monitor-service\n(trading monitor)"]
    Build --> D10["api-gateway\n(system server)"]

3.6 Terraform 模块结构

infra/
├── main.tf                 # Root module, 组合所有子模块
├── variables.tf            # 全局变量 (region, env, project_name)
├── outputs.tf              # 输出值 (ALB DNS, Redis endpoint 等)
├── terraform.tfvars.example
├── backend.tf              # S3 + DynamoDB 远端状态
└── modules/
    ├── vpc/
    │   ├── main.tf         # VPC, subnets, NAT, IGW, route tables
    │   ├── variables.tf
    │   └── outputs.tf
    ├── ecr/
    │   ├── main.tf         # ECR repository
    │   └── ...
    ├── ecs/
    │   ├── main.tf         # ECS Cluster
    │   ├── services.tf     # 每个服务的 Task Definition + Service
    │   ├── iam.tf          # Task execution role, task role
    │   └── ...
    ├── redis/
    │   ├── main.tf         # ElastiCache Redis cluster
    │   └── ...
    ├── rds/
    │   ├── main.tf         # RDS PostgreSQL (TimescaleDB AMI)
    │   └── ...
    ├── secrets/
    │   ├── main.tf         # Secrets Manager entries
    │   └── ...
    └── monitoring/
        ├── main.tf         # CloudWatch Log Groups, Alarms, SNS
        └── ...

Root module 示例 (main.tf):

module "vpc" {
  source = "./modules/vpc"
  project_name = var.project_name
  environment  = var.environment
}

module "redis" {
  source     = "./modules/redis"
  vpc_id     = module.vpc.vpc_id
  subnet_ids = module.vpc.data_subnet_ids
  security_group_ids = [module.vpc.redis_sg_id]
}

module "rds" {
  source     = "./modules/rds"
  vpc_id     = module.vpc.vpc_id
  subnet_ids = module.vpc.data_subnet_ids
  security_group_ids = [module.vpc.rds_sg_id]
}

module "ecs" {
  source     = "./modules/ecs"
  vpc_id     = module.vpc.vpc_id
  subnet_ids = module.vpc.private_subnet_ids
  redis_endpoint = module.redis.endpoint
  db_endpoint    = module.rds.endpoint
  ecr_repo_url   = module.ecr.repository_url
}

四、通信总线选型: Redis vs Kafka

4.1 系统内的通信类型分析

本系统存在两类截然不同的通信模式:

类型 吞吐量 延迟要求 消息大小 持久化需求 示例
高频数据流 高 (120K msg/s 峰值) 低 (<100ms) 小 (~200B) 短期 (几小时) aggTrade tick 数据
低频事件 低 (<10 msg/min) 中 (<1s) 中 (~1KB) 不需要 feature_calculated, signal_generated

4.2 Redis vs Kafka 对比

维度 Redis Pub/Sub Redis Streams Apache Kafka
消息模型 发布/订阅, fire-and-forget 日志型, 持久化, 消费者组 日志型, 持久化, 消费者组
消息持久化 无 (订阅者不在线则丢失) 有 (MAXLEN 限制) 有 (可配置保留期)
吞吐量 ~100K msg/s (单节点) ~100K msg/s (单节点) ~1M msg/s (集群)
延迟 <1ms <2ms 5-15ms
运维复杂度 极低 (已部署) 低 (复用 Redis) 高 (ZooKeeper/KRaft + Broker)
AWS 托管成本 ElastiCache 包含 ElastiCache 包含 MSK ~$200-400/月 (最小集群)
消费者组 不支持 支持 (XREADGROUP) 支持
消息回放 不支持 支持 (从指定 ID 开始读) 支持 (从指定 offset 读)
背压处理 无 (慢消费者被跳过) MAXLEN 自动裁剪 消费者自主控制
社区生态 成熟 成熟 非常成熟

4.3 推荐方案

flowchart TD
    subgraph highFreq ["高频数据层: Redis Streams"]
        aggTrade["aggTrade tick 数据"] -->|"120K msg/s"| Stream["Redis Stream\n(quant:stream:aggTrades)"]
        Stream -->|消费者组| DollarBarConsumer["DollarBarService"]
    end

    subgraph lowFreq ["低频事件层: Redis Pub/Sub"]
        Events["asset_pool_updated\nfeature_calculated\nsignal_generated\norder_approved ..."]
        Events -->|"<10 msg/min\n广播模式"| PubSub["Redis Pub/Sub\n(BaseEventService)"]
    end

    subgraph notUsed ["暂不引入"]
        Kafka["Apache Kafka\n(详见 4.4)"]
    end

4.4 为什么暂不引入 Kafka?

考虑因素 分析
数据量级 488 symbols × ~250 ticks/s (峰值) ≈ 120K msg/s。Redis Streams 在单节点即可轻松处理这个量级。Kafka 的优势在百万级 msg/s 才凸显。
运维成本 Kafka 需要 ZooKeeper/KRaft 集群 + 多 Broker。AWS MSK 最小 3 节点, 月费 $200-400。Redis 已经部署, Streams 功能零额外成本。
团队规模 目前是小团队 / 个人项目。Kafka 的运维、调优、监控需要投入大量精力。Redis 几乎零运维。
消息语义 本系统的低频事件不需要 exactly-once 语义。at-most-once (Pub/Sub) + 定时兜底 已经足够。高频数据层用 Streams 的 at-least-once 即可。
已有投入 现有 BaseEventService + RedisPubSub 已经稳定运行。替换为 Kafka 需要重写所有服务的通信层。
延迟 Redis (<2ms) 优于 Kafka (5-15ms)。对于量化交易, 更低的延迟是优势。

4.5 Redis Streams 的使用方式

高频数据链路: DataIngestionService → DollarBarService

# DataIngestionService: 写入 Redis Stream
async def write_to_stream(self, symbol: str, trade: dict):
    stream_key = "market:trades"
    await self.redis.xadd(
        stream_key,
        trade,
        maxlen=100000,      # 自动裁剪, 防止内存膨胀
        approximate=True,   # 近似裁剪, 性能更好
    )

# DollarBarService: 消费者组读取
async def consume_trades(self, symbol: str):
    stream_key = "market:trades"
    group = "dollar_bar_consumers"
    consumer = f"dollar_bar_{symbol}"

    # 创建消费者组 (幂等)
    try:
        await self.redis.xgroup_create(stream_key, group, id="0", mkstream=True)
    except Exception:
        pass  # 组已存在

    while self._running:
        messages = await self.redis.xreadgroup(
            group, consumer,
            streams={stream_key: ">"},  # 只读新消息
            count=100,                  # 批量读取
            block=1000,                 # 阻塞等待 1s
        )
        for stream, entries in messages:
            for msg_id, trade_data in entries:
                self._process_trade(symbol, trade_data)
                await self.redis.xack(stream_key, group, msg_id)

4.6 何时迁移到 Kafka?

如果未来遇到以下任一情况, 可考虑引入 Kafka:

触发条件 说明
Redis 内存超过 8GB aggTrade 数据量过大, Redis Streams 占用内存过多
需要多天消息回放 Redis Streams 的 MAXLEN 限制了回放窗口
多系统消费同一数据 如需要同时供给回测系统 / 风控系统 / 监控系统
吞吐突破 500K msg/s Redis 单节点的吞吐上限
需要 exactly-once 语义 对订单相关事件的严格要求

迁移路径: 1. 先在高频数据链路 (aggTrade → Dollar Bar) 替换 Redis Streams → Kafka 2. 低频事件链路保持 Redis Pub/Sub 不变 3. 为 BaseEventService 增加 Kafka backend 适配器, 上层服务代码无需修改


附录 A: 策略兼容性矩阵

本架构同时兼容 strategy_4 (rev_1d)、strategy_1 (Top 10 多因子) 和 strategy_3 (ofi_14d):

数据/特征需求 strategy_4 (rev_1d) strategy_1 (Top 10) strategy_3 (ofi_14d) 提供服务
资产池筛选 (default Top-100) AssetPoolService → Redis SET
月度流动性池 (T50) AssetPoolService → Redis SET
Tick 采集写入 Redis Stream TickerService → Redis Stream
Bar 聚合 (dollar_bar) ✅ (buy_sell_imbalance) BarSourceAdapterService
Bar 聚合 (time_bar) BarSourceAdapterService
Tick 微观特征 (9 个) ✅ (Top 1-7) BarSourceAdapterService
日内 ret (1d) FeatureService
日级聚合 (ofi_d, dollar_volume_d) FeatureService (扩展)
多日滚动特征 (ofi_14d) FeatureService (扩展)
Z-Score 标准化 ✅ (截面) FeatureService
历史长度过滤 (≥60 天) AssetPoolService / StrategyService
灵活换仓周期 R1 R1 R14 StrategyService
多策略并行 单策略 多策略集成 可独立或集成 StrategyService
风控检查 RiskService
市价单执行 OrderService

分阶段实施:

阶段 可运行的策略 需要的服务
V1 (MVP) strategy_4 (rev_1d) Ticker + AssetPool + BarSourceAdapterService + Feature + Strategy + Risk + Order
V1.5 (可选扩展) V1 + strategy_3 (ofi_14d) V1 + AssetPool 多池扩展 + FeatureService 日级聚合
V2 (全量多因子) strategy_1 + strategy_4 + strategy_3 全部 V1 + BarSourceAdapterService tick 特征启用 + 多策略集成

附录 B: Terraform 模块清单

模块 资源 关键参数
vpc VPC, 3 Subnet (Public/Private/Data), NAT Gateway, IGW, Route Tables CIDR: 10.0.0.0/16
ecr ECR Repository 镜像保留策略: 最近 10 个
ecs ECS Cluster, 10 个 Task Definition + Service, Auto Scaling 见 3.2 资源表
redis ElastiCache Redis (cache.t3.small, 单节点) Port 6379, maxmemory-policy: allkeys-lru
rds RDS PostgreSQL 16 (db.t3.small) + TimescaleDB 扩展 Port 5432, 20GB GP3
secrets Secrets Manager × 4 (API Key, Private Key, DB Password, Telegram Token) 自动轮转: 关闭
monitoring CloudWatch Log Groups (每服务一个), Alarms (CPU/Memory/Error), SNS Topic 告警 → Email/Webhook
iam ECS Task Execution Role (ECR + Secrets + CW), Task Role (S3 可选) 最小权限原则

附录 C: 配置文件模板

asset_pool_config.yaml

pools:
  default:
    top_k: 100
    metric: usdt_volume_30d
    update_interval: 24h

  t50_monthly:
    top_k: 50
    metric: dollar_volume_monthly
    lag: 1                        # 使用上月数据
    update_interval: monthly
    min_history_days: 60          # 历史不足 60 天排除

bar_source_config.yaml

bar_type: dollar_bar             # dollar_bar | time_bar

dollar_bar:
  initial_threshold: 50000       # 初始 dollar volume 阈值 (USD)
  ema_window: 50                 # auto_K50_ema 自适应窗口
  min_ticks_per_bar: 10          # 最少 tick 数

time_bar:
  interval: 5m                   # 时间窗口 (1m / 5m / 15m / 1h)

tick_features:
  rolling_window: 50             # rolling bar 窗口大小
  enabled: true                  # 是否计算 tick 微观特征

consumer_group: bar_source_consumers
stream_key: market:trades
batch_size: 100                  # 每次从 Stream 读取的最大消息数
block_ms: 1000                   # 阻塞等待时间 (毫秒)

strategy_config.yaml

mode: single                    # single | ensemble
active_strategy: rev_1d

# 单策略参数
parameters:
  long_n: 30
  short_n: 30
  rebalance_days: 1             # R1 = 每日换仓

# 多策略集成 (mode: ensemble 时生效)
# mode: ensemble
# ensemble_method: weighted_average
# strategies:
#   - name: rev_x_inv_vpin
#     weight: 0.3
#     pool: default
#     rebalance_days: 1
#     parameters:
#       long_n: 10
#       short_n: 10
#   - name: rev_1d
#     weight: 0.3
#     pool: default
#     rebalance_days: 1
#     parameters:
#       long_n: 30
#       short_n: 30
#   - name: ofi_14d
#     weight: 0.4
#     pool: t50_monthly
#     rebalance_days: 14
#     parameters:
#       long_n: 15
#       short_n: 15
#       min_candidates: 30
#       min_history_days: 60

# 调度
cron_enabled: true
cron_schedule: "59 23 * * *"    # 每日 23:59 UTC (same_day 模式)

risk_config.yaml

max_position_pct: 0.15
max_total_exposure: 1.0
max_drawdown_halt: 0.30
min_balance_reserve: 100.0
max_order_value: 5000.0
max_daily_turnover: 3.0
blacklist: []
dry_run: true                   # 模拟盘默认 true

monitor_config.yaml

heartbeat_check_interval: 30    # 秒
heartbeat_timeout: 120          # 秒, 超过则告警

alerts:
  data_delay_threshold: 300     # 秒, 最新 kline 延迟告警
  balance_drop_threshold: 0.05  # 5% 余额下降告警
  position_mismatch_threshold: 0.1  # 目标 vs 实际仓位偏差 10%

webhook:
  enabled: true
  url: ""                       # Telegram Bot API URL
  channel_id: ""                # Telegram Chat ID