🚀TiDB
TiCDC

TiCDC New Arch:数据面流水线与流控闭环(小白入门版)

从源码角度把一条数据从 TiKV 送到 Sink 的路径讲清:EventStore/EventService/EventCollector;并补齐 EventStore 复用,以及 CongestionControl/ReleasePath/Reset 的流控闭环。

0. 这篇文章怎么读

你说“总结有点乱,小白看不懂”,所以我把结构改成 先讲直觉、再落到代码

  • 只想看你点名的内容(数据面 + EventStore 复用 + 流控闭环):看完 1~7
  • 想知道“谁在调度 span/dispatcher、checkpoint 怎么算/怎么落盘”:看 附录 A
  • 想对照 message/topic/proto 快速定位:看 附录 C
  • 想快速 grep 源码:看 附录 D

前置约定:

  • 讨论对象是 TiCDC new architecture(启动开关 cdc server --newarch/-x)。
  • 文中源码路径以 TiCDC 仓库根目录为基准(不是本文档仓库)。

1. 先记住 3 句话 + 1 张图

先记住 3 句话(不求严谨,先求能把代码读通)

  1. 数据面是一条流水线:LogPuller → EventStore → EventService → EventCollector → Dispatcher → Sink
  2. 流控是一个闭环:EventCollector 看“我快撑爆内存了”→ 回传 quota 给 EventService 少扫一点;真顶不住就 ReleasePath 清队列,并用 Reset 把数据“补回来”。
  3. 正确性兜底seq + epoch:哪怕 drop/清队列导致事件缺口,也不会“悄悄错数据”,因为下游发现不连续就会触发 Reset 重放。

数据面主干图(后文所有细节都沿着这条线展开):

flowchart LR
  LP["LogPuller<br/>logservice/logpuller"] --> ES["EventStore(Pebble)<br/>logservice/eventstore"]
  ES --> EVS["EventService/Broker<br/>pkg/eventservice"]
  EVS -->|EventCollectorTopic| EC["EventCollector(dynstream)<br/>downstreamadapter/eventcollector"]
  EC --> D["Dispatcher<br/>downstreamadapter/dispatcher"]
  D --> S["Sink<br/>downstreamadapter/sink"]

2. 新手词典(先把词对上,再看代码就不痛苦)

  • changefeed:一条同步任务(“把哪些表同步到哪里”)。
  • span:一个 key range(很多地方会把“表的 replication 范围”抽象成 span)。
  • dispatcher:一个 span 的下游写入执行者;你可以把它理解为“一个表/一个分片的写线程”。
  • path(dynstream):在 EventCollector 里,通常 一个 dispatcher 对应一个 path(有自己的队列和内存统计)。
  • EventStore:本地持久化事件仓库(Pebble),同时管理 LogPuller 的订阅;关键能力是“订阅复用”。
  • EventService(Broker/Scanner):从 EventStore 扫 raw KV + 从 schema store 拉 DDL,然后组装成行变更/DDL 事件,发给 EventCollector。
  • EventCollector:收事件、按 dispatcher 分发到 dynstream、并做流控(软:CongestionControl;硬:ReleasePath/DropEvent + Reset)。
  • quota / CongestionControl:EventCollector 回传给 EventService 的“还能吃多少数据”的预算;EventService 用它决定“扫不扫/扫多少”。
  • blocking path:dynstream 认为该 path “短期不可能自己把队列消掉”,会停止调度它;这是 ReleasePath 重点处理的对象。
  • ReleasePath:硬断路器:在内存顶满/疑似死锁时,清掉某些 path 的 pending 队列止血(会导致事件缺口)。
  • seq / epoch / Reset:每条事件带 seq;不连续就 Reset;Reset 会提升 epoch 并触发重放,保证不 silent loss。
  • resolvedTs / checkpointTs:resolvedTs ≈ “上游已收齐到这个时间”;checkpointTs ≈ “下游已成功落地到这个时间”(更严格)。

3. 最短读代码路线(6 个文件,足够把闭环读通)

如果你只想“最快在代码里把机制跑通”,按这个顺序读(每个文件只看列出来的入口就行):

  1. EventStore 复用(订阅怎么复用/怎么切换)
    • logservice/eventstore/event_store.goRegisterDispatcher(...)GetIterator(...)
  2. EventService scan gate(什么时候“选择不扫”)
    • pkg/eventservice/event_broker.gohandleCongestionControl(...)doScan(...)
  3. EventCollector 主动流控(quota 怎么生成、ReleasePath 怎么处理)
    • downstreamadapter/eventcollector/event_collector.gocontrolCongestion(...)processDSFeedback(...)
  4. ReleasePath 阈值与触发(硬断路器算法)
    • utils/dynstream/memory_control.gocheckDeadlock(...)releaseMemory(...)
  5. blocking path / releasePath 效果(为什么会“停止调度/清队列”)
    • utils/dynstream/event_queue.goreleasePath(...)、以及 “pop 忽略 blocking path”
  6. 正确性自愈(seq gap / DropEvent → Reset)
    • downstreamadapter/eventcollector/dispatcher_stat.goverifyEventSequence(...)handleDropEvent(...)reset(...)

4. 数据面流水线:一条数据从 TiKV 走到 Sink 的全过程

这一节你只要抓住“输入/输出/关键点”,不要急着抠每个 message 字段。

4.1 LogPuller → EventStore:把 TiKV 的变更先落到本地

直觉版理解:

  • LogPuller 是“从 TiKV 拿到事件”的输入端(region feed)。
  • EventStore 是“本地缓冲池 + 订阅管理器”:把事件落到 Pebble,并整合 subscription(复用在这里发生)。

入口提示:

  • LogPuller:logservice/logpuller/subscription_client.go
  • EventStore:logservice/eventstore/event_store.go

4.2 “dispatcher 什么时候开始出数”:REGISTER 只是建档,RESET 才是开闸

这里是新手最容易误解的点:REGISTER 不是开始同步,RESET 才会触发真正的数据流

一个最小闭环大概是:

  1. DispatcherManager 创建 dispatcher(下游写入者),交给 EventCollector 管
    downstreamadapter/dispatchermanager/dispatcher_manager.go:458
  2. EventCollector 向 EventService 发 REGISTER(告诉它:我有一个新的 dispatcher 需要数据)
    downstreamadapter/eventcollector/dispatcher_stat.go:151
  3. EventCollector 收到 Ready/Handshake 后,再发 RESET(带起始 ts / 新 epoch),从这刻开始 EventService 才会扫数据并发送事件
    downstreamadapter/eventcollector/dispatcher_stat.go:183

对照代码最短路径:

  • EventService 的 Ready/退避重发:pkg/eventservice/event_broker.go:454
  • EventCollector 的 reset request:downstreamadapter/eventcollector/dispatcher_stat.go:183

4.3 EventService:扫 EventStore + 拼 DDL/schema,产出可下游消费的事件流

EventService 的工作是把“EventStore 里按 ts 排序的 rawKV/DDL”变成“下游能直接写 sink 的事件”,你可以先按这个简化版扫描过程理解:

  1. EventStore.GetIterator():按 commitTs range 读 raw KV
  2. SchemaStore.FetchTableDDLEvents():拿 DDL events
  3. mounter:把 rawKV 解成行变更
  4. 同 commitTs 下:DML 在前、DDL 在后
  5. send worker:把事件发到 EventCollectorTopic(或 redo 的 topic)

对应文件:

  • broker:pkg/eventservice/event_broker.go
  • scanner:pkg/eventservice/event_scanner.go

4.3.1 被动流控 gate:EventService 什么时候“选择不扫”

EventService 在真正扫描前,会检查一些 gate,避免“越扫越堆、把内存撑爆”:

  • remote target 未 ready:不扫(避免无意义扫描)
  • scanRateLimiter:限速
  • changefeed 级 quota + dispatcher 级 quota:不足就 skip scan(quota 来自 EventCollector)

对应代码入口:

  • scan gate:pkg/eventservice/event_broker.go:552
  • quota 更新:pkg/eventservice/event_broker.go:1161handleCongestionControl

4.4 EventCollector:按 dispatcher 分发 + 主动流控闭环(重点)

EventCollector 先按两件事理解就够用:

  1. 把 EventService 发来的事件 按 dispatcherID 分发到 dynstream(一个 dispatcher 一个 path/队列)
    downstreamadapter/eventcollector/event_collector.go:527
  2. 从 dynstream 拿到内存指标,形成 流控闭环(下一节会把闭环画出来)
    • 软:CongestionControl(回传 quota)
    • 硬:ReleasePath/DropEvent(止血)
    • 正确性:seq gap → Reset(epoch++)(补洞)

4.5 Dispatcher → Sink:真正写下游

Dispatcher 负责把事件按协议/顺序写到 sink;sink 可以是 MySQL、Kafka、Storage 等。

你先记两个入口就行:

  • dispatcher:downstreamadapter/dispatcher/*
  • sink:downstreamadapter/sink/*(接口在 downstreamadapter/sink/sink.go

(Sink/redo 的更多细节放到附录 B,避免打断“数据面 + 流控”主线。)


5. EventStore 复用(为什么能“少订阅、多分发”)

如果每个 dispatcher 都单独建一个 LogPuller subscription,那 changefeed 一多、表一多,上游就会被订阅数打爆。

EventStore 的核心目标是:用尽量少的 subscription 覆盖尽量多的 dispatcher span

你读代码时抓住两个入口就够了:

  • RegisterDispatcher(span, startTs, onlyReuse):注册 dispatcher 想要的 span,决定复用/新建 subscription
    logservice/eventstore/event_store.go:415
  • GetIterator(...):真正扫描时,决定“这次 iterator 用哪个 subscription”(这里发生 pending → active 的切换)
    logservice/eventstore/event_store.go:738

5.1 复用策略(先跑起来,再逐步变精确)

EventStore 在注册时会按 span 做匹配(简化理解即可):

  1. exact span match:直接复用(最理想)
  2. smallest containing span:先复用“最小包含”的 subscription(能先跑起来)
  3. 根据 onlyReuse 决策:
    • onlyReuse=true只允许复用,禁止新建(remote reuse 会用到)
    • onlyReuse=false:如果只找到 containing,会额外创建 exact subscription,挂到 pendingSubStat(后台追进度,准备切换)

5.2 为什么需要 pendingSubStat/removingSubStat(安全切换,避免 gap)

这两个状态的意义一句话就够:我想换成更精确的 subscription,但不能在换的瞬间丢一段数据

发生切换的关键点在 GetIterator(...)

  • iterator 会优先用 pendingSubStat(span 更精确、dbIndex 更匹配)
  • 当 pending 的 checkpointTs/resolvedTs 覆盖当前 scan range 后:pendingSubStat → subStat
    同时旧的 subStat → removingSubStat(留一段重叠期,避免短暂 gap)
  • removing 后续再 detach 清理

5.3 onlyReuse 与 remote reuse(了解即可)

你看到 onlyReuse=true 通常意味着“我只允许复用已有 subscription,不允许你帮我新建”。这经常出现在“跨节点复用 EventService/EventStore”的场景里(由 LogCoordinator 聚合状态并回答可复用位置)。

入口提示(想追再看):

  • Elector/LogCoordinator:server/module_election.gologservice/coordinator/coordinator.go
  • 复用请求:ReusableEventServiceRequest/Response

6. 流控闭环(CongestionControl / ReleasePath / Reset)

这一节回答你点名的问题:CongestionControl/ReleasePath 是怎么形成闭环的,以及 “清队列后怎么保证不丢数据”。

6.1 闭环总图(先建立直觉)

flowchart TB
  ES["EventService(scan)"] -->|events| Q["EventCollector(dynstream pending queues)"]
  Q --> M["MemoryMetric"]
  M --> CC["CongestionControl(quota)"]
  CC -->|update quota| ES

  Q -->|max pending / deadlock suspected| RP["ReleasePath(feedback)"]
  RP --> RST["RESET(epoch++)"]
  RST -->|DispatcherRequest| ES

一句话解释:

  • 软流控:EventCollector 调小 quota → EventService 少扫点 → pending 增长放缓
  • 硬止血:ReleasePath 清掉“最堵、最不可能自己变好”的队列
  • 正确性补洞:seq 不连续 → Reset → EventService 重扫/重发缺失事件

6.2 CongestionControl(软流控):EventCollector 把“还能吃多少”回传给 EventService

关键事实(够你读代码用):

  • 生成频率:每秒生成并发送一次
    downstreamadapter/eventcollector/event_collector.go:566controlCongestion
  • 数据来源:dynstream MemoryMetric(area=changefeed,path=dispatcher)
  • EventService 收到后更新 quota:
    pkg/eventservice/event_broker.go:1161handleCongestionControl

直觉解释:

  • EventCollector 说:“我这边每个 changefeed 还能再攒多少 pending bytes;每个 dispatcher path 还能再攒多少。”
  • EventService 用这个 quota 做 scan gate(见 pkg/eventservice/event_broker.go:552),把压力挡在源头。

6.3 ReleasePath(硬断路器):顶不住了怎么办

ReleasePath 由 dynstream 的内存控制逻辑触发(EventCollector 专用算法 MemoryControlForEventCollector):

  • 触发/阈值在:utils/dynstream/memory_control.go
  • 真正“清队列”的动作在:utils/dynstream/event_queue.go:74releasePath

ReleasePath 会导致事件缺口,所以后面一定要配合 Reset(下一节)。

6.4 Reset(正确性自愈):为什么不会“悄悄错数据”

只要允许“清队列/丢事件”,就必须解决一个问题:如何避免 silent data loss

TiCDC new arch 的答案是:seq 连续性校验 + RESET(epoch++) 重建数据流

关键点对应代码:

  • EventCollector 校验 seq:downstreamadapter/eventcollector/dispatcher_stat.go:238verifyEventSequence
    一旦 seq 不连续(表示丢了事件或被清队列清掉),就触发 reset()
  • DropEvent 是“我明确告诉你丢了”的显式信号:
    • dynstream 内存压力下可把 droppable 事件转成 DropEvent:utils/dynstream/memory_control.go:120
    • EventCollector 收到 DropEvent 直接 reset:downstreamadapter/eventcollector/dispatcher_stat.go:588
  • EventService 发送侧也会 drop(比如 send buffer 满):pkg/eventservice/event_broker.go:791
    发送侧注释明确依赖 “下游发现不连续 → reset → 重发缺失事件”。

Reset 的语义要点:

  • EventCollector 发 RESET:downstreamadapter/eventcollector/dispatcher_stat.go:183
  • EventService 替换为新 epoch 的 dispatcherStat:pkg/eventservice/event_broker.go:1039resetDispatcher
  • 陈旧 epoch 的事件会被过滤(不会污染新 epoch):downstreamadapter/eventcollector/dispatcher_stat.go:356

7. ReleasePath 深入:触发条件/阈值/为什么只挑 blocking path

你点名要的三个问题,这里一次讲清楚。

7.1 触发条件:什么时候会走到 ReleasePath

utils/dynstream/memory_control.go 里,触发主要有两类:

  1. 硬顶满(hard limit)totalPendingSize >= maxPendingSize
    • utils/dynstream/memory_control.go:117
  2. 疑似死锁/卡住(deadlock suspected):在 5s 窗口内仍在 append,但 pendingSize 5s 内没下降,并且处于高水位(>60%)
    • utils/dynstream/memory_control.go:151

你可以把第 2 个条件理解为:系统一直在往队列里塞东西,但队列完全不见变小,很像“下游卡死”,所以要启动硬止血。

7.2 释放策略:释放多少、多久触发一次、释放哪些 path

releaseMemory(...) 里有几个关键阈值:

  • 频率限制:每个 area(changefeed)最多 1 秒触发一次
    utils/dynstream/memory_control.go:167
  • 目标释放量sizeToRelease = totalPendingSize * 0.4(释放 40%)
    utils/dynstream/memory_control.go:183
  • 最小队列path.pendingSize >= 256 bytes 才值得处理(太小的队列 reset 反而是负收益)
    utils/dynstream/memory_control.go:190

7.3 为什么只挑 blocking path(核心)

ReleasePath 只选择:

  • path.blocking == true(该 dispatcher 当前无法继续消费)
    utils/dynstream/memory_control.go:190

原因用两层解释最容易懂:

  1. dynstream 语义层(代码事实)
    blocking path 在 dynstream 里会被“停止调度”:pop 会忽略 blocking path(直到被 wake)
    • utils/dynstream/event_queue.go:123
      这意味着:blocking path 的 pendingQueue 很可能短期不会自己变小,只会越堆越大
  2. 系统策略层(为什么这样挑)
    内存顶满时我们希望“最小代价止血”:优先清掉那些 已经不可能推进 的队列,避免把还在正常 drain 的 path 一起拖下水。
    • 对 blocking path 做 ReleasePath:止血快、命中主要内存来源
    • 对非 blocking path 乱清:吞吐会下降、reset 变多、系统抖动更大

7.4 ReleasePath 真正做了什么(你可以直接记成一句话)

  • 清空某个 path 的 pendingQueue(从而让总 pending bytes 立刻下降)
    utils/dynstream/event_queue.go:74

接下来发生什么?一定会走到上一节说的 seq/DropEvent → Reset,把缺口补回来。


8. 附录

附录 A:控制面(Elector/Coordinator/Maintainer)用 1 页建立概念

如果你还没读控制面,先用这 4 句话够用:

  1. Elector 选出 Coordinator leader(全局 owner)来管理 changefeed 元数据与调度。
  2. Coordinator 通过 Controller/scheduler/operator 把一个 changefeed 拆成很多 spans,并分配到不同节点的 Maintainer/Dispatcher。
  3. Maintainer 汇总下游 dispatcher 的 heartbeat,计算 watermark(resolved/checkpoint/redo),并做 barrier 协议来处理 DDL/SyncPoint 等阻塞事件。
  4. 数据面需要的很多“指令/水位”是靠 MessageCenter topic 在模块间传递的(见附录 C)。

你想继续深入时,入口建议:

  • 选主:server/module_election.go
  • Coordinator/Controller:coordinator/coordinator.go / coordinator/controller.go
  • Maintainer:maintainer/maintainer.go / maintainer/maintainer_controller.go

附录 B:Sink / redo(了解即可)

Sink 抽象与接口:

  • downstreamadapter/sink/sink.go

两个你可能会踩的点(先记住结论即可):

  • MySQL sink crash recovery 会读 ddl_ts 修正 startTs(避免“写到旧 schema”):downstreamadapter/sink/mysql/sink.go
  • Kafka/Storage 这类 sink 可能需要 “checkpoint event”,由 AddCheckpointTs(ts) 发出:downstreamadapter/sink/*

redo 的闭环(极简版):

  • commitTs > redoGlobalTs 时 dispatcher 会先缓存:downstreamadapter/dispatcher/event_dispatcher.go:158
  • redo watermark 更新后再广播唤醒(更细节见原文 ticdc-sink-and-protocol

附录 C:topic / IOType / proto 速查(消息就是系统的“真实 API”)

new arch 模块间通信大量通过 pkg/messagingMessageCenter 完成:

  • topics(路由目标):pkg/messaging/topic.go
  • IOType ↔ payload 映射(反查消息类型很有用):pkg/messaging/message.go

protobuf 索引:

  • heartbeatpb/heartbeat.proto:Maintainer/DispatcherManager/Barrier/CheckpointTs/Redo 等
  • eventpb/event.proto:EventService 的 DispatcherRequest(register/remove/reset)
  • logservice/logservicepb/*:log coordinator/event store state 等

附录 D:常用定位命令(面向源码阅读/排障)

下列命令假设你在 TiCDC 源码仓库根目录执行。

# EventStore 复用:exact/containing/pending/removing/onlyReuse
rg -n "RegisterDispatcher\\(|pendingSubStat|removingSubStat|GetIterator\\(" logservice/eventstore/event_store.go

# EventService quota gate / CongestionControl / sendMsg drop
rg -n "handleCongestionControl\\(|doScan\\(|sendMsg\\(" pkg/eventservice/event_broker.go

# EventCollector:CongestionControl + ReleasePath feedback + seq/reset
rg -n "controlCongestion\\(|processDSFeedback\\(|verifyEventSequence\\(|newDispatcherResetRequest\\(" downstreamadapter/eventcollector

# dynstream:ReleasePath 触发条件/阈值
rg -n "ReleasePath|releaseMemory\\(|memoryUsageRatio\\(|checkDeadlock\\(" utils/dynstream

附录 E:本文来源(你要追更细节就回到原文)

本文合并整理自 TiCDC 源码仓库内的 5 篇 code-oriented 文档:

  • docs/design/2026-02-09-ticdc-code-architecture-guide.md
  • docs/design/2026-02-09-ticdc-control-plane-deep-dive.md
  • docs/design/2026-02-09-ticdc-maintainer-deep-dive.md
  • docs/design/2026-02-09-ticdc-data-plane-deep-dive.md
  • docs/design/2026-02-09-ticdc-sink-and-protocol.md

On this page

0. 这篇文章怎么读1. 先记住 3 句话 + 1 张图2. 新手词典(先把词对上,再看代码就不痛苦)3. 最短读代码路线(6 个文件,足够把闭环读通)4. 数据面流水线:一条数据从 TiKV 走到 Sink 的全过程4.1 LogPuller → EventStore:把 TiKV 的变更先落到本地4.2 “dispatcher 什么时候开始出数”:REGISTER 只是建档,RESET 才是开闸4.3 EventService:扫 EventStore + 拼 DDL/schema,产出可下游消费的事件流4.3.1 被动流控 gate:EventService 什么时候“选择不扫”4.4 EventCollector:按 dispatcher 分发 + 主动流控闭环(重点)4.5 Dispatcher → Sink:真正写下游5. EventStore 复用(为什么能“少订阅、多分发”)5.1 复用策略(先跑起来,再逐步变精确)5.2 为什么需要 pendingSubStat/removingSubStat(安全切换,避免 gap)5.3 onlyReuse 与 remote reuse(了解即可)6. 流控闭环(CongestionControl / ReleasePath / Reset)6.1 闭环总图(先建立直觉)6.2 CongestionControl(软流控):EventCollector 把“还能吃多少”回传给 EventService6.3 ReleasePath(硬断路器):顶不住了怎么办6.4 Reset(正确性自愈):为什么不会“悄悄错数据”7. ReleasePath 深入:触发条件/阈值/为什么只挑 blocking path7.1 触发条件:什么时候会走到 ReleasePath7.2 释放策略:释放多少、多久触发一次、释放哪些 path7.3 为什么只挑 blocking path(核心)7.4 ReleasePath 真正做了什么(你可以直接记成一句话)8. 附录附录 A:控制面(Elector/Coordinator/Maintainer)用 1 页建立概念附录 B:Sink / redo(了解即可)附录 C:topic / IOType / proto 速查(消息就是系统的“真实 API”)附录 D:常用定位命令(面向源码阅读/排障)附录 E:本文来源(你要追更细节就回到原文)