TiCDC New Arch:数据面流水线与流控闭环(小白入门版)
从源码角度把一条数据从 TiKV 送到 Sink 的路径讲清:EventStore/EventService/EventCollector;并补齐 EventStore 复用,以及 CongestionControl/ReleasePath/Reset 的流控闭环。
0. 这篇文章怎么读
你说“总结有点乱,小白看不懂”,所以我把结构改成 先讲直觉、再落到代码:
- 只想看你点名的内容(数据面 + EventStore 复用 + 流控闭环):看完 1~7
- 想知道“谁在调度 span/dispatcher、checkpoint 怎么算/怎么落盘”:看 附录 A
- 想对照 message/topic/proto 快速定位:看 附录 C
- 想快速 grep 源码:看 附录 D
前置约定:
- 讨论对象是 TiCDC new architecture(启动开关
cdc server --newarch/-x)。 - 文中源码路径以 TiCDC 仓库根目录为基准(不是本文档仓库)。
1. 先记住 3 句话 + 1 张图
先记住 3 句话(不求严谨,先求能把代码读通)
- 数据面是一条流水线:
LogPuller → EventStore → EventService → EventCollector → Dispatcher → Sink。 - 流控是一个闭环:EventCollector 看“我快撑爆内存了”→ 回传 quota 给 EventService 少扫一点;真顶不住就 ReleasePath 清队列,并用 Reset 把数据“补回来”。
- 正确性兜底靠
seq + epoch:哪怕 drop/清队列导致事件缺口,也不会“悄悄错数据”,因为下游发现不连续就会触发 Reset 重放。
数据面主干图(后文所有细节都沿着这条线展开):
flowchart LR
LP["LogPuller<br/>logservice/logpuller"] --> ES["EventStore(Pebble)<br/>logservice/eventstore"]
ES --> EVS["EventService/Broker<br/>pkg/eventservice"]
EVS -->|EventCollectorTopic| EC["EventCollector(dynstream)<br/>downstreamadapter/eventcollector"]
EC --> D["Dispatcher<br/>downstreamadapter/dispatcher"]
D --> S["Sink<br/>downstreamadapter/sink"]2. 新手词典(先把词对上,再看代码就不痛苦)
- changefeed:一条同步任务(“把哪些表同步到哪里”)。
- span:一个 key range(很多地方会把“表的 replication 范围”抽象成 span)。
- dispatcher:一个 span 的下游写入执行者;你可以把它理解为“一个表/一个分片的写线程”。
- path(dynstream):在 EventCollector 里,通常 一个 dispatcher 对应一个 path(有自己的队列和内存统计)。
- EventStore:本地持久化事件仓库(Pebble),同时管理 LogPuller 的订阅;关键能力是“订阅复用”。
- EventService(Broker/Scanner):从 EventStore 扫 raw KV + 从 schema store 拉 DDL,然后组装成行变更/DDL 事件,发给 EventCollector。
- EventCollector:收事件、按 dispatcher 分发到 dynstream、并做流控(软:CongestionControl;硬:ReleasePath/DropEvent + Reset)。
- quota / CongestionControl:EventCollector 回传给 EventService 的“还能吃多少数据”的预算;EventService 用它决定“扫不扫/扫多少”。
- blocking path:dynstream 认为该 path “短期不可能自己把队列消掉”,会停止调度它;这是 ReleasePath 重点处理的对象。
- ReleasePath:硬断路器:在内存顶满/疑似死锁时,清掉某些 path 的 pending 队列止血(会导致事件缺口)。
- seq / epoch / Reset:每条事件带 seq;不连续就 Reset;Reset 会提升 epoch 并触发重放,保证不 silent loss。
- resolvedTs / checkpointTs:resolvedTs ≈ “上游已收齐到这个时间”;checkpointTs ≈ “下游已成功落地到这个时间”(更严格)。
3. 最短读代码路线(6 个文件,足够把闭环读通)
如果你只想“最快在代码里把机制跑通”,按这个顺序读(每个文件只看列出来的入口就行):
- EventStore 复用(订阅怎么复用/怎么切换)
logservice/eventstore/event_store.go:RegisterDispatcher(...)、GetIterator(...)
- EventService scan gate(什么时候“选择不扫”)
pkg/eventservice/event_broker.go:handleCongestionControl(...)、doScan(...)
- EventCollector 主动流控(quota 怎么生成、ReleasePath 怎么处理)
downstreamadapter/eventcollector/event_collector.go:controlCongestion(...)、processDSFeedback(...)
- ReleasePath 阈值与触发(硬断路器算法)
utils/dynstream/memory_control.go:checkDeadlock(...)、releaseMemory(...)
- blocking path / releasePath 效果(为什么会“停止调度/清队列”)
utils/dynstream/event_queue.go:releasePath(...)、以及 “pop 忽略 blocking path”
- 正确性自愈(seq gap / DropEvent → Reset)
downstreamadapter/eventcollector/dispatcher_stat.go:verifyEventSequence(...)、handleDropEvent(...)、reset(...)
4. 数据面流水线:一条数据从 TiKV 走到 Sink 的全过程
这一节你只要抓住“输入/输出/关键点”,不要急着抠每个 message 字段。
4.1 LogPuller → EventStore:把 TiKV 的变更先落到本地
直觉版理解:
- LogPuller 是“从 TiKV 拿到事件”的输入端(region feed)。
- EventStore 是“本地缓冲池 + 订阅管理器”:把事件落到 Pebble,并整合 subscription(复用在这里发生)。
入口提示:
- LogPuller:
logservice/logpuller/subscription_client.go - EventStore:
logservice/eventstore/event_store.go
4.2 “dispatcher 什么时候开始出数”:REGISTER 只是建档,RESET 才是开闸
这里是新手最容易误解的点:REGISTER 不是开始同步,RESET 才会触发真正的数据流。
一个最小闭环大概是:
- DispatcherManager 创建 dispatcher(下游写入者),交给 EventCollector 管
downstreamadapter/dispatchermanager/dispatcher_manager.go:458 - EventCollector 向 EventService 发
REGISTER(告诉它:我有一个新的 dispatcher 需要数据)
downstreamadapter/eventcollector/dispatcher_stat.go:151 - EventCollector 收到 Ready/Handshake 后,再发
RESET(带起始 ts / 新 epoch),从这刻开始 EventService 才会扫数据并发送事件
downstreamadapter/eventcollector/dispatcher_stat.go:183
对照代码最短路径:
- EventService 的 Ready/退避重发:
pkg/eventservice/event_broker.go:454 - EventCollector 的 reset request:
downstreamadapter/eventcollector/dispatcher_stat.go:183
4.3 EventService:扫 EventStore + 拼 DDL/schema,产出可下游消费的事件流
EventService 的工作是把“EventStore 里按 ts 排序的 rawKV/DDL”变成“下游能直接写 sink 的事件”,你可以先按这个简化版扫描过程理解:
EventStore.GetIterator():按 commitTs range 读 raw KVSchemaStore.FetchTableDDLEvents():拿 DDL events- mounter:把 rawKV 解成行变更
- 同 commitTs 下:DML 在前、DDL 在后
- send worker:把事件发到
EventCollectorTopic(或 redo 的 topic)
对应文件:
- broker:
pkg/eventservice/event_broker.go - scanner:
pkg/eventservice/event_scanner.go
4.3.1 被动流控 gate:EventService 什么时候“选择不扫”
EventService 在真正扫描前,会检查一些 gate,避免“越扫越堆、把内存撑爆”:
- remote target 未 ready:不扫(避免无意义扫描)
- scanRateLimiter:限速
- changefeed 级 quota + dispatcher 级 quota:不足就 skip scan(quota 来自 EventCollector)
对应代码入口:
- scan gate:
pkg/eventservice/event_broker.go:552 - quota 更新:
pkg/eventservice/event_broker.go:1161(handleCongestionControl)
4.4 EventCollector:按 dispatcher 分发 + 主动流控闭环(重点)
EventCollector 先按两件事理解就够用:
- 把 EventService 发来的事件 按 dispatcherID 分发到 dynstream(一个 dispatcher 一个 path/队列)
downstreamadapter/eventcollector/event_collector.go:527 - 从 dynstream 拿到内存指标,形成 流控闭环(下一节会把闭环画出来)
- 软:
CongestionControl(回传 quota) - 硬:
ReleasePath/DropEvent(止血) - 正确性:
seq gap → Reset(epoch++)(补洞)
- 软:
4.5 Dispatcher → Sink:真正写下游
Dispatcher 负责把事件按协议/顺序写到 sink;sink 可以是 MySQL、Kafka、Storage 等。
你先记两个入口就行:
- dispatcher:
downstreamadapter/dispatcher/* - sink:
downstreamadapter/sink/*(接口在downstreamadapter/sink/sink.go)
(Sink/redo 的更多细节放到附录 B,避免打断“数据面 + 流控”主线。)
5. EventStore 复用(为什么能“少订阅、多分发”)
如果每个 dispatcher 都单独建一个 LogPuller subscription,那 changefeed 一多、表一多,上游就会被订阅数打爆。
EventStore 的核心目标是:用尽量少的 subscription 覆盖尽量多的 dispatcher span。
你读代码时抓住两个入口就够了:
RegisterDispatcher(span, startTs, onlyReuse):注册 dispatcher 想要的 span,决定复用/新建 subscription
logservice/eventstore/event_store.go:415GetIterator(...):真正扫描时,决定“这次 iterator 用哪个 subscription”(这里发生 pending → active 的切换)
logservice/eventstore/event_store.go:738
5.1 复用策略(先跑起来,再逐步变精确)
EventStore 在注册时会按 span 做匹配(简化理解即可):
- 找 exact span match:直接复用(最理想)
- 找 smallest containing span:先复用“最小包含”的 subscription(能先跑起来)
- 根据
onlyReuse决策:onlyReuse=true:只允许复用,禁止新建(remote reuse 会用到)onlyReuse=false:如果只找到 containing,会额外创建 exact subscription,挂到pendingSubStat(后台追进度,准备切换)
5.2 为什么需要 pendingSubStat/removingSubStat(安全切换,避免 gap)
这两个状态的意义一句话就够:我想换成更精确的 subscription,但不能在换的瞬间丢一段数据。
发生切换的关键点在 GetIterator(...):
- iterator 会优先用
pendingSubStat(span 更精确、dbIndex 更匹配) - 当 pending 的
checkpointTs/resolvedTs覆盖当前 scan range 后:pendingSubStat → subStat
同时旧的subStat → removingSubStat(留一段重叠期,避免短暂 gap) - removing 后续再 detach 清理
5.3 onlyReuse 与 remote reuse(了解即可)
你看到 onlyReuse=true 通常意味着“我只允许复用已有 subscription,不允许你帮我新建”。这经常出现在“跨节点复用 EventService/EventStore”的场景里(由 LogCoordinator 聚合状态并回答可复用位置)。
入口提示(想追再看):
- Elector/LogCoordinator:
server/module_election.go、logservice/coordinator/coordinator.go - 复用请求:
ReusableEventServiceRequest/Response
6. 流控闭环(CongestionControl / ReleasePath / Reset)
这一节回答你点名的问题:CongestionControl/ReleasePath 是怎么形成闭环的,以及 “清队列后怎么保证不丢数据”。
6.1 闭环总图(先建立直觉)
flowchart TB
ES["EventService(scan)"] -->|events| Q["EventCollector(dynstream pending queues)"]
Q --> M["MemoryMetric"]
M --> CC["CongestionControl(quota)"]
CC -->|update quota| ES
Q -->|max pending / deadlock suspected| RP["ReleasePath(feedback)"]
RP --> RST["RESET(epoch++)"]
RST -->|DispatcherRequest| ES一句话解释:
- 软流控:EventCollector 调小 quota → EventService 少扫点 → pending 增长放缓
- 硬止血:ReleasePath 清掉“最堵、最不可能自己变好”的队列
- 正确性补洞:seq 不连续 → Reset → EventService 重扫/重发缺失事件
6.2 CongestionControl(软流控):EventCollector 把“还能吃多少”回传给 EventService
关键事实(够你读代码用):
- 生成频率:每秒生成并发送一次
downstreamadapter/eventcollector/event_collector.go:566(controlCongestion) - 数据来源:dynstream
MemoryMetric(area=changefeed,path=dispatcher) - EventService 收到后更新 quota:
pkg/eventservice/event_broker.go:1161(handleCongestionControl)
直觉解释:
- EventCollector 说:“我这边每个 changefeed 还能再攒多少 pending bytes;每个 dispatcher path 还能再攒多少。”
- EventService 用这个 quota 做 scan gate(见
pkg/eventservice/event_broker.go:552),把压力挡在源头。
6.3 ReleasePath(硬断路器):顶不住了怎么办
ReleasePath 由 dynstream 的内存控制逻辑触发(EventCollector 专用算法 MemoryControlForEventCollector):
- 触发/阈值在:
utils/dynstream/memory_control.go - 真正“清队列”的动作在:
utils/dynstream/event_queue.go:74(releasePath)
ReleasePath 会导致事件缺口,所以后面一定要配合 Reset(下一节)。
6.4 Reset(正确性自愈):为什么不会“悄悄错数据”
只要允许“清队列/丢事件”,就必须解决一个问题:如何避免 silent data loss。
TiCDC new arch 的答案是:seq 连续性校验 + RESET(epoch++) 重建数据流。
关键点对应代码:
- EventCollector 校验 seq:
downstreamadapter/eventcollector/dispatcher_stat.go:238(verifyEventSequence)
一旦 seq 不连续(表示丢了事件或被清队列清掉),就触发reset()。 - DropEvent 是“我明确告诉你丢了”的显式信号:
- dynstream 内存压力下可把 droppable 事件转成 DropEvent:
utils/dynstream/memory_control.go:120 - EventCollector 收到 DropEvent 直接 reset:
downstreamadapter/eventcollector/dispatcher_stat.go:588
- dynstream 内存压力下可把 droppable 事件转成 DropEvent:
- EventService 发送侧也会 drop(比如 send buffer 满):
pkg/eventservice/event_broker.go:791
发送侧注释明确依赖 “下游发现不连续 → reset → 重发缺失事件”。
Reset 的语义要点:
- EventCollector 发 RESET:
downstreamadapter/eventcollector/dispatcher_stat.go:183 - EventService 替换为新 epoch 的 dispatcherStat:
pkg/eventservice/event_broker.go:1039(resetDispatcher) - 陈旧 epoch 的事件会被过滤(不会污染新 epoch):
downstreamadapter/eventcollector/dispatcher_stat.go:356
7. ReleasePath 深入:触发条件/阈值/为什么只挑 blocking path
你点名要的三个问题,这里一次讲清楚。
7.1 触发条件:什么时候会走到 ReleasePath
在 utils/dynstream/memory_control.go 里,触发主要有两类:
- 硬顶满(hard limit):
totalPendingSize >= maxPendingSizeutils/dynstream/memory_control.go:117
- 疑似死锁/卡住(deadlock suspected):在 5s 窗口内仍在 append,但 pendingSize 5s 内没下降,并且处于高水位(>60%)
utils/dynstream/memory_control.go:151
你可以把第 2 个条件理解为:系统一直在往队列里塞东西,但队列完全不见变小,很像“下游卡死”,所以要启动硬止血。
7.2 释放策略:释放多少、多久触发一次、释放哪些 path
在 releaseMemory(...) 里有几个关键阈值:
- 频率限制:每个 area(changefeed)最多 1 秒触发一次
utils/dynstream/memory_control.go:167 - 目标释放量:
sizeToRelease = totalPendingSize * 0.4(释放 40%)
utils/dynstream/memory_control.go:183 - 最小队列:
path.pendingSize >= 256 bytes才值得处理(太小的队列 reset 反而是负收益)
utils/dynstream/memory_control.go:190
7.3 为什么只挑 blocking path(核心)
ReleasePath 只选择:
path.blocking == true(该 dispatcher 当前无法继续消费)
utils/dynstream/memory_control.go:190
原因用两层解释最容易懂:
- dynstream 语义层(代码事实)
blocking path 在 dynstream 里会被“停止调度”:pop 会忽略 blocking path(直到被 wake)utils/dynstream/event_queue.go:123
这意味着:blocking path 的 pendingQueue 很可能短期不会自己变小,只会越堆越大。
- 系统策略层(为什么这样挑)
内存顶满时我们希望“最小代价止血”:优先清掉那些 已经不可能推进 的队列,避免把还在正常 drain 的 path 一起拖下水。- 对 blocking path 做 ReleasePath:止血快、命中主要内存来源
- 对非 blocking path 乱清:吞吐会下降、reset 变多、系统抖动更大
7.4 ReleasePath 真正做了什么(你可以直接记成一句话)
- 清空某个 path 的 pendingQueue(从而让总 pending bytes 立刻下降)
utils/dynstream/event_queue.go:74
接下来发生什么?一定会走到上一节说的 seq/DropEvent → Reset,把缺口补回来。
8. 附录
附录 A:控制面(Elector/Coordinator/Maintainer)用 1 页建立概念
如果你还没读控制面,先用这 4 句话够用:
- Elector 选出 Coordinator leader(全局 owner)来管理 changefeed 元数据与调度。
- Coordinator 通过 Controller/scheduler/operator 把一个 changefeed 拆成很多 spans,并分配到不同节点的 Maintainer/Dispatcher。
- Maintainer 汇总下游 dispatcher 的 heartbeat,计算 watermark(resolved/checkpoint/redo),并做 barrier 协议来处理 DDL/SyncPoint 等阻塞事件。
- 数据面需要的很多“指令/水位”是靠 MessageCenter topic 在模块间传递的(见附录 C)。
你想继续深入时,入口建议:
- 选主:
server/module_election.go - Coordinator/Controller:
coordinator/coordinator.go/coordinator/controller.go - Maintainer:
maintainer/maintainer.go/maintainer/maintainer_controller.go
附录 B:Sink / redo(了解即可)
Sink 抽象与接口:
downstreamadapter/sink/sink.go
两个你可能会踩的点(先记住结论即可):
- MySQL sink crash recovery 会读 ddl_ts 修正 startTs(避免“写到旧 schema”):
downstreamadapter/sink/mysql/sink.go - Kafka/Storage 这类 sink 可能需要 “checkpoint event”,由
AddCheckpointTs(ts)发出:downstreamadapter/sink/*
redo 的闭环(极简版):
commitTs > redoGlobalTs时 dispatcher 会先缓存:downstreamadapter/dispatcher/event_dispatcher.go:158- redo watermark 更新后再广播唤醒(更细节见原文
ticdc-sink-and-protocol)
附录 C:topic / IOType / proto 速查(消息就是系统的“真实 API”)
new arch 模块间通信大量通过 pkg/messaging 的 MessageCenter 完成:
- topics(路由目标):
pkg/messaging/topic.go - IOType ↔ payload 映射(反查消息类型很有用):
pkg/messaging/message.go
protobuf 索引:
heartbeatpb/heartbeat.proto:Maintainer/DispatcherManager/Barrier/CheckpointTs/Redo 等eventpb/event.proto:EventService 的DispatcherRequest(register/remove/reset)logservice/logservicepb/*:log coordinator/event store state 等
附录 D:常用定位命令(面向源码阅读/排障)
下列命令假设你在 TiCDC 源码仓库根目录执行。
# EventStore 复用:exact/containing/pending/removing/onlyReuse
rg -n "RegisterDispatcher\\(|pendingSubStat|removingSubStat|GetIterator\\(" logservice/eventstore/event_store.go
# EventService quota gate / CongestionControl / sendMsg drop
rg -n "handleCongestionControl\\(|doScan\\(|sendMsg\\(" pkg/eventservice/event_broker.go
# EventCollector:CongestionControl + ReleasePath feedback + seq/reset
rg -n "controlCongestion\\(|processDSFeedback\\(|verifyEventSequence\\(|newDispatcherResetRequest\\(" downstreamadapter/eventcollector
# dynstream:ReleasePath 触发条件/阈值
rg -n "ReleasePath|releaseMemory\\(|memoryUsageRatio\\(|checkDeadlock\\(" utils/dynstream附录 E:本文来源(你要追更细节就回到原文)
本文合并整理自 TiCDC 源码仓库内的 5 篇 code-oriented 文档:
docs/design/2026-02-09-ticdc-code-architecture-guide.mddocs/design/2026-02-09-ticdc-control-plane-deep-dive.mddocs/design/2026-02-09-ticdc-maintainer-deep-dive.mddocs/design/2026-02-09-ticdc-data-plane-deep-dive.mddocs/design/2026-02-09-ticdc-sink-and-protocol.md