高频行情事件队列
2026/5/8 11:28:35 网站建设 项目流程

高频行情事件队列

一、原问题分析

1.1 原有模数分配算法问题

算法公式:

index=(next_index_+1)%handler_ptrs_.size()

问题分析:

  • 算法错误:每次分配都先+1再取模,导致实际分配的起始索引偏移了1
  • 轮转偏移:如果next_index_初始为0,且size=4,那么分配序列为:
    第一次:(0+1)%4=1→ 处理器1第二次:(1+1)%4=2→ 处理器2第三次:(2+1)%4=3→ 处理器3第四次:(3+1)%4=0→ 处理器0第五次:(0+1)%4=1→ 处理器1
  • 正确公式index = next_index_ % handler_ptrs_.size()

1.2 根本性问题

单线程绑定限制
原队列核心缺陷
只能单线程处理
处理器2绑定核心2
无法利用其他核心
队列堆积恶化
股票A→处理器2
股票B→处理器2
股票C→处理器2
静态哈希分配
大量消息同时到达
处理器2过载洪塞
其他处理器空闲
消息处理延迟暴增
可能丢失数据

问题:

  1. 静态哈希映射:股票ID到处理器的映射是固定不变的
  2. 无法动态均衡:即使某些处理器空闲,也无法分担其他处理器的负载
  3. 单线程瓶颈:每个处理器只能单线程运行,无法利用多核
  4. 热点股票问题:热门股票消息量大,其固定分配的处理器必然过载

二、未来方向

2.1 目标

  1. 多线程可驱动同一队列:允许多个CPU核心同时处理同一股票的消息
  2. 保持时序线性:同一股票的多个消息必须按接收顺序串行处理
  3. 动态负载均衡:队列能自动将负载分配到空闲处理器
  4. 最小化同步开销:避免传统锁带来的性能损耗

2.2 示意图

执行保证机制
并行驱动层
核心管理层
StrandQueue集合
输入层
所有线程可竞争
任意StrandQueue执行权
同一队列只能有一个
线程获得执行权
通过CAS原子操作
保证独占性
处理完毕释放执行权
允许其他线程竞争
驱动器线程1
绑定核心0
驱动器线程2
绑定核心1
驱动器线程3
绑定核心2
驱动器线程4
绑定核心3
主队列容器
unordered_map<股票ID, StrandQueue*>
调度链表
仅包含活跃队列
股票123456
CAS自旋锁保护
股票789012
CAS自旋锁保护
股票345678
CAS自旋锁保护
全局队列管理器
券商SDK线程1
券商SDK线程2
券商SDK线程N

三、StrandQueue(串行队列)

3.1 状态机与执行权

队列无执行线程
线程尝试获取执行权
CAS操作成功
CAS操作失败
获取写锁
处理单个消息
更新队列状态
队列非空且保持执行权
队列空或丢失执行权
空闲状态
竞争状态
执行状态
处理中状态
释放写锁状态
检查状态

3.2 原子操作与内存屏障

原子变量:

classStrandQueue{// 执行线程标记:0=空闲,非0=当前执行线程IDstd::atomic<uint64_t>executing_thread{0};// 待处理消息计数std::atomic<uint32_t>pending_messages{0};// 写锁(用于消息入队/出队)std::atomic_flag write_lock=ATOMIC_FLAG_INIT;// 内部消息队列(线程不安全)std::list<MessagePtr>message_list;}

执行权获取:

booltry_acquire_execution(uint64_tthread_id){uint64_texpected=0;// CAS操作:只有当executing_thread==0时,才将其设置为thread_idreturnexecuting_thread.compare_exchange_strong(expected,thread_id,std::memory_order_acquire,// 成功时的内存序std::memory_order_relaxed// 失败时的内存序);}

四、公平调度

4.1 调度结构

公平调度: ┌─────────────────────────────────────────────┐ │ 第一级:主队列管理器 │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ 股票A队列 │ │ 股票B队列 │...│ │ │ pending=3│ │ pending=0│ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌────────────────────────────────────┐ │ │ │ 调度链表(仅pending>0) │ │ │ │[股票A][股票C][股票E]...│ │ │ └────────────────────────────────────┘ │ └─────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────┐ │ 第二级:多线程并行驱动 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 线程1│ │ 线程2│ │ 线程3│ │ │ │ 绑定核0│ │ 绑定核1│ │ 绑定核2│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ └───────────┼───────────┘ │ │ ▼ │ │ 竞争执行StrandQueue │ └─────────────────────────────────────────────┘

4.2 调度链表

惰性调度:

  • 只有pending_messages > 0的队列才会进入调度链表
  • 当队列的pending_messages从0变为1时,自动加入调度链表
  • 当队列的pending_messages从1变为0时,自动从调度链表移除
  • 调度链表操作由主队列自旋锁保护,但操作频率很低

调度链表:

classScheduleList{// 双链表节点,便于快速插入/删除structNode{StrandQueue*queue;Node*prev;Node*next;};// 头尾指针,支持FIFO调度Node*head;Node*tail;// 自旋锁保护链表操作SpinLock lock;};

五、流程

5.1 消息到达(入队)

SDK回调线程主队列管理器调度链表StrandQueue步骤1:查找对应队列获取主队列自旋锁返回现有StrandQueue指针创建新的StrandQueue返回新队列指针alt[队列已存在][队列不存在]释放主队列自旋锁步骤2:消息入队CAS自旋获取写锁将消息加入内部列表pending_messages.fetch_add(1)释放写锁步骤3:触发调度获取主队列自旋锁将队列加入调度链表尾部释放主队列自旋锁alt[pending_messages从0→1]SDK回调线程主队列管理器调度链表StrandQueue

5.2 消息处理(出队)

单队列处理循环
失败
队列非空且持有执行权
队列空或丢失执行权
成功
弹出队首消息
获取队列写锁
释放队列写锁
调用消息处理函数
pending_messages.fetch_sub(1)
队列是否为空
且仍持有执行权
驱动线程开始
获取调度链表锁
弹出链表头部队列
释放调度链表锁
检查pending_messages>0
尝试获取队列执行权
CAS(0→thread_id)
释放执行权
executing_thread=0
处理期间pending_messages归零
获取调度链表锁移除队列
队列仍在调度链表
释放调度链表锁

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询