前言

在 Apache Flink 的生产实践中,资源配置性能调优是两个避不开的话题。很多开发者在提交作业时,往往对以下问题感到困惑:

  • “我的作业到底申请了多少个 Slot?”
  • “为什么这个算子和那个算子没有合并在一起?”
  • “显式设置 SlotSharingGroup 到底有什么用?”

理解 Flink 的 Operator Chaining(算子链)Slot Sharing(槽位共享) 机制,不仅能帮我们通过“面试造火箭”,更能帮我们在生产环境中节省真金白银的服务器资源。

本文通过 5 个阶段、10 道循序渐进的实战题目,带你从 JobGraph 的视角彻底拆解 Flink 的资源调度逻辑。


核心概念速查

在做题前,我们统一一下核心术语:

  1. Slot (Task Slot):TaskManager 上的资源切片(内存隔离)。
  • 计算公式:总 Slot = Σ (每个 SlotSharingGroup 的最大并行度)。
  1. Operator Chain (算子链):Flink 将多个符合条件的算子合并在同一个 Task(线程)中执行,以减少序列化和线程切换开销。
  • 合并条件:上下游并行度相同 + 同一 SlotSharingGroup + 无 Shuffle (Forward) + 无手动打断。
  1. SubTask:运行时真正的执行线程。
  • 计算公式:SubTask 总数 = Σ (所有 Task 的并行度)。

第一阶段:基础热身 (默认机制与并行度)

题目 1:全链路默认配置

场景:Source -> Map -> Filter -> Sink,全局并行度一致 (3),无 Shuffle。

1
2
3
4
5
6
env.setParallelism(3);
env.addSource(new MySource()) // p=3
.map(v -> v.toUpperCase())
.filter(v -> v.length() > 5)
.print();

分析结果:

指标 数量 解析
SlotSharingGroup 1 默认为 “default”。
Task (算子链) 1 Source + Map + Filter + Print。满足所有合并条件,整条链路合并为一个 Task。
SubTask (线程) 3 1 个 Task × 并行度 3 = 3 个线程。
Slot (资源) 3 单一 Group,最大并行度为 3。

题目 2:并行度突变 (Rescale/Rebalance)

场景:Source (p=2) -> Map (p=4) -> Sink (p=4)。

1
2
3
4
5
env.setParallelism(4);
env.addSource(new MySource()).setParallelism(2) // 显式设为 2
.map(v -> v + " processed") // 继承全局 4
.print(); // 继承全局 4

分析结果:

指标 数量 解析
SlotSharingGroup 1 默认为 “default”。
Task (算子链) 2 1. Source
2. Map + Print
原因:并行度变化导致 Chain 断开。
SubTask (线程) 6 Source(2) + MapPrint(4) = 6 个线程。
Slot (资源) 4 虽然 Chain 断了,但属于同一 Group,Source 可以和 Map 共享 Slot。需求为 Max(2, 4) = 4。

第二阶段:数据重分区 (Shuffle 对 Chain 的影响)

题目 3:KeyBy 带来的变化

场景:Source -> Map -> KeyBy -> Sum -> Print (全局 p=2)。

1
2
3
4
5
6
7
env.setParallelism(2);
env.addSource(new MyStringSource())
.map(v -> Tuple2.of(v, 1))
.keyBy(v -> v.f0) // Hash Shuffle
.sum(1)
.print();

分析结果:

指标 数量 解析
SlotSharingGroup 1 默认为 “default”。
Task (算子链) 2 1. Source + Map
2. Sum + Print
原因:KeyBy 引入了 Hash Shuffle,强制断开链条。
SubTask (线程) 4 Task1(2) + Task2(2) = 4 个线程。
Slot (资源) 2 同一 Group,最大并行度为 2。

题目 4:强制 Rebalance**场景

**:Source -> Map1 -> Rebalance -> Map2 -> Sink (全局 p=4)。

1
2
3
4
5
6
7
env.setParallelism(4);
env.addSource(new MySource())
.map(v -> v + "_1")
.rebalance() // 显式轮询 Shuffle
.map(v -> v + "_2")
.print();

分析结果:

指标 数量 解析
SlotSharingGroup 1 默认为 “default”。
Task (算子链) 2 1. Source + Map1
2. Map2 + Print
原因:Rebalance 强制进行网络全连接分发,断开链条。
SubTask (线程) 8 Task1(4) + Task2(4) = 8 个线程。
Slot (资源) 4 同一 Group,最大并行度为 4。

第三阶段:Slot Sharing Group (资源隔离核心)

题目 5:简单的资源隔离

场景:Source (Default) -> Map (Group “red”) -> Print (Group “red”)。全局 p=2。

1
2
3
4
5
env.setParallelism(2);
env.addSource(new MySource()) // Default Group
.map(v -> v.toUpperCase()).slotSharingGroup("red") // 切换 Group
.print();

分析结果:

指标 数量 解析
SlotSharingGroup 2 “default”, “red”。
Task (算子链) 2 1. Source
2. Map + Print
原因:Group 不同,必须断开。
SubTask (线程) 4 Task1(2) + Task2(2) = 4 个线程。
Slot (资源) 4 Default组(2) + Red组(2) = 4 个 Slot。
跨组资源必须相加,无法复用。

题目 6:复杂的跨组交互 (反复横跳)

场景:Source(Default) -> MapA(Green) -> MapB(Default) -> Sink(Green)。全局 p=2。

1
2
3
4
5
6
// 这是一个性能很差的反例
env.addSource(...).slotSharingGroup("default")
.map(...).slotSharingGroup("green")
.map(...).slotSharingGroup("default")
.print().slotSharingGroup("green");

分析结果:

指标 数量 解析
SlotSharingGroup 2 “default”, “green”。
Task (算子链) 4 链条被切得粉碎:Source, MapA, MapB, Sink。
SubTask (线程) 8 4 个 Task × 并行度 2 = 8 个线程。
Slot (资源) 4 陷阱题!
Default组需要 Max(Source, MapB) = 2。
Green组需要 Max(MapA, Sink) = 2。
总计 2+2=4。

第四阶段:手动控制 Chain (高级 API)

题目 7:开启新链 (StartNewChain)

场景:Source -> Map1 -> StartNewChain -> Map2 -> Print。全局 p=2。

1
2
3
4
5
env.addSource(...)
.map(...) // Map1
.map(...).startNewChain() // Map2: 强制断开与前者的连接
.print();

分析结果:

指标 数量 解析
SlotSharingGroup 1 默认为 “default”。
Task (算子链) 2 1. Source + Map1
2. Map2 + Print (Map2 虽断前,但仍可连后)。
SubTask (线程) 4 Task1(2) + Task2(2) = 4 个线程。
Slot (资源) 2 同一 Group,最大并行度 2。

题目 8:禁用 Chaining (DisableChaining)

场景:Source -> Map1 -> DisableChaining -> Map2 -> Print。全局 p=2。

1
2
3
4
5
env.addSource(...)
.map(...) // Map1
.map(...).disableChaining() // Map2: 前后都断开,完全孤立
.print();

分析结果:

指标 数量 解析
SlotSharingGroup 1 默认为 “default”。
Task (算子链) 3 1. Source + Map1
2. Map2 (孤立)
3. Print (无法连接 Map2,也没法连别人,此处假设无Map3)
SubTask (线程) 6 3 个 Task × 并行度 2 = 6 个线程。
Slot (资源) 2 同一 Group,最大并行度 2。

第五阶段:综合大题 (Boss 关卡)

题目 9:双流 Connect 与复杂配置

场景:StreamA(p=2, GroupA) 连接 StreamB(p=4, GroupB) -> CoMap(GroupA) -> Print(GroupB)。

1
2
3
4
5
// CoMap(p=4) 属于 GroupA
// Print(p=4) 属于 GroupB
streamA.connect(streamB).map(new CoMap()).slotSharingGroup("Group_A")
.print().slotSharingGroup("Group_B");

分析结果:

指标 数量 解析
SlotSharingGroup 2 “Group_A”, “Group_B”。
Task (算子链) 4 1. SourceA
2. SourceB
3. CoMap
4. Print
全断开:并行度不同或Group不同。
SubTask (线程) 14 SourceA(2) + SourceB(4) + CoMap(4) + Print(4) = 14。
Slot (资源) 8 GroupA(Max: SourceA[2], CoMap[4]) = 4。
GroupB(Max: SourceB[4], Print[4]) = 4。
总计 4+4=8。

题目 10:终极拓扑分析 (SideOutput + Window)

场景

  • 主流程:Source(p=1, Default) -> Map(p=4, Default) -> Window(p=4, Default) -> Print(p=4, IO Group)。
  • 侧输出:从 Window 拿出 SideOutput -> Map(p=2, Default) -> Print(p=2, Default)。

分析结果:

指标 数量 解析
SlotSharingGroup 2 “default”, “IO”。
Task (算子链) 5 1. Source (p=1)
2. Map (p=4, Rebalance后)
3. Window (p=4, KeyBy后)
4. MainPrint (p=4, 切Group)
5. SideMap+SidePrint (p=2, 侧流)
SubTask (线程) 15 1 + 4 + 4 + 4 + 2 = 15 个线程。
Slot (资源) 8 Default Group: Max(1, 4, 4, 2) = 4 个。
IO Group: Max(4) = 4 个。
总计 4+4=8 个 Slot。

总结

通过这 10 道题,我们可以得出 Flink 资源计算的“黄金法则”:

  1. 关于 Chain:只要“并行度不同”、“SlotSharingGroup 不同”或“发生 Shuffle (KeyBy/Rebalance)”,Chain 一定会断开。
  2. 关于 Slot:先看 Group,再看 Group 内的最大并行度。不同 Group 的 Slot 必须相加,不能混用。
  3. 关于优化:默认情况下 Flink 会尽最大努力合并 Chain 以提升性能。除非有明确的资源隔离需求(如重 I/O 与 计算分离),否则尽量不要把 Job 拆得支离破碎。

希望这篇博文能帮你彻底搞懂 Flink 的资源调度!