上一篇【第72篇】Kafka Connect深度解析——Connector是如何工作的
下一篇【第74篇】Kafka Streams深度解析——时间、状态、窗口的三角关系
摘要
如果你只会用Kafka收发消息,那你只用了Kafka 30%的能力。Kafka Streams——Kafka原生的流处理框架,让你在不部署任何额外集群(不用Flink、不用Spark)的情况下,直接在应用里做实时数据处理。它最大的卖点就是"轻":一个普通Java库,引入依赖就能跑,不需要专门的资源管理器。
本文从Kafka Streams的核心概念讲起,手写一个完整的实时WordCount程序,深入解析KStream与KTable的本质区别、Topology的DAG构建原理、状态存储的本地化与容错机制。最后,我们把Kafka Streams、Flink、Spark Streaming拉到一起做个全面对比——不是"哪个最好",而是"你的场景最适合哪个"。读完这篇,你就能判断流处理需求该不该上Kafka Streams。
一、Kafka Streams到底是什么
Kafka Streams是Kafka官方提供的客户端库(Client Library),而不是一个独立的集群服务。这一点和Flink、Spark Streaming完全不同——后者需要你部署一套独立的计算集群,Kafka Streams只需要在pom.xml里加个依赖,你的Java应用就具备了流处理能力。
【Kafka Streams 的定位】 ┌─────────────────────────────────────────────────────────┐ │ 你的Java应用 │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Kafka Streams Library (.jar) │ │ │ │ │ │ │ │ ┌──────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ Source │→ │ Process │→ │ Sink │ │ │ │ │ │ Processor│ │ Processor │ │ Processor │ │ │ │ │ │ (消费) │ │ (处理逻辑) │ │ (写回) │ │ │ │ │ └──────────┘ └────────────┘ └────────────┘ │ │ │ │ ↑ ↑ │ │ │ │ │ ┌────┴────────────────┴───────┐ │ │ │ │ │ │ State Store (RocksDB) │ │ │ │ │ │ └─────────────────────────────┘ │ │ │ │ └────────────────────────────────────────┼────────┘ │ │ │ │ └────────────────────────────────────────────┼──────────────┘ ▲ ▼ ┌────┴─────┐ ┌───┴────┐ │ Kafka │ │ Kafka │ │ (输入Topic)│ │(输出Topic)│ └──────────┘ └────────┘Kafka Streams的核心设计哲学是**“Kafka是唯一依赖”**:
- 输入来自Kafka Topic
- 输出写回Kafka Topic
- 状态存储在本地磁盘(RocksDB),并自动备份到Kafka的Changelog Topic
- 容错靠Kafka的Consumer Group机制——实例挂了,同Group的其他实例接管对应分区
- 扩展靠增加应用实例——多实例自动分配分区,跟Kafka Consumer的Rebalance一样
这带来一个巨大优势:部署运维零额外成本。你的流处理应用就是一个普通的微服务,打包成Docker镜像,丢到K8s里就能跑。
二、KStream vs KTable——流处理的两根支柱
Kafka Streams里有两个最容易让人困惑的概念:KStream和KTable。一句话说清楚:
KStream是"无尽的事件流",KTable是"当前快照的表格"。
KStream(事件流)
KStream是无界的、持续追加的记录流。每条消息都是独立的,不关心"之前的值是什么"。
【KStream:只看增量】 时间轴 → 08:00 用户A登录 ──► 这是一条独立事件 08:01 用户B下单 ──► 这是另一条独立事件 08:02 用户A登录 ──► 又是一条独立事件(用户A登录了两次?存在!) 08:03 用户C注册 ──► 独立事件 KStream特点: - 每条记录都保留 - 不关心重复 - 适合:点击流、日志、操作记录KTable(变更表)
KTable是有界的、可更新的键值表。同一个Key的最新记录会覆盖旧记录——就像一个数据库表,同一个主键只保留最新数据。
【KTable:只看最新状态】 KTable(user_id → 余额) ━━━━━━━━━━━━━━━━━━━━ 事件:A:100 → KTable状态:{A: 100} 事件:B:200 → KTable状态:{A: 100, B: 200} 事件:A:150 → KTable状态:{A: 150, B: 200} ← A:100被覆盖了! 事件:C:300 → KTable状态:{A: 150, B: 200, C: 300} KTable特点: - 同Key只保留最新值 - 自动去重 - 适合:用户余额、库存数量、配置信息两者的核心区别
| 维度 | KStream | KTable |
|---|---|---|
| 数据模型 | 追加式事件流 | 可更新的键值表 |
| Key重复 | 允许,每条都保留 | 同Key只保留最新 |
| 有界性 | 无界,无限增长 | 逻辑有界(Key数量有限) |
| 读取时机 | 数据到来的那一刻 | 数据到来的那一刻 + 查询时 |
| 典型场景 | 点击流、日志、操作审计 | 用户信息、余额、库存 |
| Join行为 | 触发式Join | 查找式Join |
| 对应概念 | INSERT | UPSERT |
三、Topology——流处理的"电路板"
Kafka Streams的流处理逻辑通过Topology(拓扑)来描述——它本质上是一个有向无环图(DAG),由三种基本节点组成:
【Topology的三种处理器节点】 Source Processor ── 从Kafka Topic读取数据,入口节点 Stream Processor ── 处理数据(map/filter/join等),中间节点 Sink Processor ── 将结果写回Kafka Topic,出口节点 ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ │ Source │───→│ filter │───→│ map │───→│ Sink │ │Processor│ │Processor │ │Processor │ │Processor│ │(topic-A)│ │(过滤无效) │ │(格式转换) │ │(topic-B)│ └─────────┘ └──────────┘ └──────────┘ └─────────┘Kafka Streams提供了两种API来构建Topology:
DSL API(推荐,声明式)
StreamsBuilderbuilder=newStreamsBuilder();KStream<String,String>stream=builder.stream("input-topic");stream.filter((k,v)->v!=null).mapValues(v->v.toUpperCase()).to("output-topic");Processor API(底层,命令式)
Topologytopology=newTopology();topology.addSource("Source","input-topic").addProcessor("Process",MyProcessor::new,"Source").addSink("Sink","output-topic","Process");实际项目中90%用DSL API就够了,只有需要精确控制处理时机时才用Processor API。
四、State Store——流处理的"记忆力"
纯流处理(map/filter)不需要状态——来一条处理一条。但一旦涉及聚合(count/sum)、Join、窗口计算,就需要状态存储来记住"之前处理过的数据"。这就是State Store。
状态存储的架构
【State Store 的本地化 + 容错机制】 App Instance 1 (处理分区0) ┌──────────────────────────────────────┐ │ ┌──────────────────────────────┐ │ │ │ RocksDB (本地状态存储) │ │ │ │ word_count: {"hello": 5, │ │ │ │ "world": 3} │ │ │ └──────────┬───────────────────┘ │ │ │ 自动备份 │ │ ▼ │ │ Kafka Changelog Topic │ │ (word-count-store-changelog) │ └──────────────────────────────────────┘ App Instance 2 (处理分区1,App 1挂了后接管) ┌──────────────────────────────────────┐ │ ┌──────────────────────────────┐ │ │ │ RocksDB 恢复中... │ │ │ │ 从Changelog Topic重放 │ │ │ └──────────────────────────────┘ │ └──────────────────────────────────────┘Kafka Streams默认使用RocksDB(嵌入式键值数据库)作为状态存储引擎。RocksDB的数据存在本地磁盘,同时通过Changelog Topic备份到Kafka——这保证了状态不丢失。
当某个实例挂了,同Group的其他实例接管分区时,会自动从Changelog Topic恢复状态。这个机制跟Kafka本身的副本机制一脉相承。
五、实战:实时WordCount完整代码
理论讲完了,直接上代码。这个WordCount程序从Kafka的"raw-text" Topic读取文本,实时统计每个单词的出现次数,结果写入"word-count-output" Topic。
项目依赖(Maven)
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>3.6.0</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.9</version></dependency>完整的WordCount程序
importorg.apache.kafka.common.serialization.Serdes;importorg.apache.kafka.streams.KafkaStreams;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.StreamsConfig;importorg.apache.kafka.streams.kstream.*;importjava.util.Arrays;importjava.util.Properties;publicclassWordCountApp{publicstaticvoidmain(String[]args){// 1. 配置Propertiesprops=newProperties();props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// 输入输出的默认序列化器props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());// 每30秒提交一次,平衡性能与数据丢失风险props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,30*1000);// 2. 构建TopologyStreamsBuilderbuilder=newStreamsBuilder();// 从 raw-text topic 读取数据KStream<String,String>textStream=builder.stream("raw-text");// 核心处理逻辑KTable<String,Long>wordCounts=textStream// 第1步:将每行文本拆成单词数组.flatMapValues(text->Arrays.asList(text.toLowerCase().split("\\W+")))// 第2步:过滤掉空单词.filter((key,word)->!word.isEmpty())// 第3步:将单词作为Key,值设为1,并选择新的Key.selectKey((key,word)->word)// 第4步:按单词分组.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))// 第5步:计数聚合 - 这里产生了KTable.count(Materialized.as("word-count-store"));// 将结果写回 KafkawordCounts.toStream().to("word-count-output",Produced.with(Serdes.String(),Serdes.Long()));// 3. 启动KafkaStreamsstreams=newKafkaStreams(builder.build(),props);// 注册优雅关闭钩子Runtime.getRuntime().addShutdownHook(newThread(streams::close));streams.start();System.out.println("WordCount App started!");}}数据流转过程
【WordCount 数据流转全链路】 Kafka Topic: raw-text Kafka Topic: word-count-output ┌──────────────────┐ ┌──────────────────────────┐ │ "hello world" │ │ hello: 1 │ │ "hello kafka" │ │ world: 1 │ │ "world kafka" │ │ kafka: 1 │ │ "hello world" │ ────处理───→ │ hello: 2 │ └──────────────────┘ │ kafka: 2 │ │ world: 2 │ ┌────────────────────────┤ hello: 3 │ │ State Store │ world: 3 │ │ (RocksDB) └──────────────────────────┘ │ │ word_count_store: │ {"hello": 3, │ "world": 3, │ "kafka": 2} └────────────────────────运行验证
# 1. 创建输入输出Topickafka-topics.sh--create--topicraw-text--partitions3--bootstrap-server localhost:9092 kafka-topics.sh--create--topicword-count-output--partitions3--bootstrap-server localhost:9092# 2. 启动WordCount程序java-jarwordcount-app.jar# 3. 发送测试数据kafka-console-producer.sh--topicraw-text --bootstrap-server localhost:9092>hello world>hello kafka streams>hello world# 4. 查看结果kafka-console-consumer.sh--topicword-count-output --from-beginning\--bootstrap-server localhost:9092\--propertyprint.key=true\--propertyvalue.deserializer=org.apache.kafka.common.serialization.LongDeserializer# 输出:# hello 3# world 2# kafka 1# streams 1六、Kafka Streams vs Flink vs Spark Streaming——全面对比
很多同学纠结:"流处理到底用哪个框架?"三种框架的目标客户和设计哲学完全不同:
核心差异
| 对比维度 | Kafka Streams | Apache Flink | Spark Streaming |
|---|---|---|---|
| 部署模式 | Java库,嵌入应用 | 独立集群(JobManager/TaskManager) | 独立集群(需YARN/K8s) |
| 启动成本 | 极低(加依赖即可) | 中等(需部署集群) | 高(需部署集群) |
| 资源管理 | 应用自行管理 | YARN/K8s/Standalone | YARN/K8s/Standalone |
| 状态后端 | RocksDB(本地磁盘) | RocksDB/Heap/Remote | HDFS/本地 |
| 消息语义 | Exactly-once(事务支持) | Exactly-once(Checkpoint) | Exactly-once(WAL) |
| 窗口支持 | 滚动/滑动/会话 | 滚动/滑动/会话/累计 | 滚动/滑动 |
| 事件时间 | 支持(TimestampExtractor) | 完善支持(Watermark) | 支持(但不如Flink强大) |
| SQL支持 | KSQL(kdb) | Flink SQL | Spark SQL |
| 运维复杂度 | 无额外运维 | 需要维护Flink集群 | 需要维护Spark集群 |
| 吞吐量 | 高 | 极高 | 高 |
| 延迟 | 毫秒级 | 毫秒级 | 秒级(微批) |
| 生态整合 | 仅Kafka | 丰富连接器 | 丰富连接器 |
| 学习曲线 | 低(Java DSL) | 中高 | 中高 |
选型决策树
【流处理框架选型决策】 你的数据源是Kafka吗? ├── 是 │ ├── 数据目的地也是Kafka吗? │ │ ├── 是 → 处理逻辑简单?(过滤/转换/聚合) │ │ │ ├── 是 → ✅ Kafka Streams(最佳选择) │ │ │ └── 否 → 需要复杂CEP/事件模式匹配? │ │ │ ├── 是 → ✅ Flink CEP │ │ │ └── 否 → ✅ Kafka Streams 或 Flink 都可以 │ │ └── 否 → 需要写JDBC/HDFS/ES等外部系统? │ │ ├── 是 → ✅ Flink(连接器更丰富) │ │ └── 否 → ✅ Kafka Streams 也可以 │ └── 否 → 数据来自文件/数据库/消息队列? │ └── ✅ Flink / Spark Streaming └── 否 → ✅ Flink / Spark Streaming一句话总结
- Kafka Streams:Kafka原生的轻量流处理,适合"Kafka进Kafka出"的简单ETL/聚合场景
- Flink:全功能流处理引擎,适合复杂事件处理、多数据源混合、超大状态场景
- Spark Streaming:批流一体,适合已有Spark集群、对延迟不敏感的场景
七、Kafka Streams的几大"坑"
新手最容易踩的坑:
坑一:APPLICATION_ID_CONFIG不能随便改
application.id决定了Consumer Group身份,改了就会触发Rebalance,而且状态存储的Changelog Topic也会用新名字重新创建——老数据全丢。
坑二:Key的重要性被低估
Kafka Streams的很多操作依赖Key来确定数据归属哪个分区。如果你用selectKey改变了Key,数据可能被重新分区(Repartition),这会产生额外的中间Topic,影响性能。
坑三:状态存储会"长胖"
RocksDB把状态写在本地磁盘(默认路径/tmp/kafka-streams),状态大了会吃满磁盘。生产环境必须配置state.dir指向大容量磁盘,并定期清理过期状态。
坑四:KTable的"墓碑消息"
删除KTable中的记录时,Kafka Streams会写入一条value为null的"墓碑消息"。如果你的下游消费者没处理null,会导致NullPointerException。
// 正确处理墓碑消息stream.filter((key,value)->value!=null).to("output-topic");本篇小结
本文带你快速上手了Kafka Streams:
- Kafka Streams是嵌入式Java库,不需要独立集群,引入依赖就能跑,是Kafka原生流处理方案
- KStream是无界事件流(保留所有记录),KTable是可更新键值表(只保留最新状态),理解这对概念是使用Kafka Streams的基础
- Topology是DAG图,由Source Processor → Stream Processor → Sink Processor三种节点组成,DSL API 90%够用
- State Store用RocksDB存本地磁盘,通过Changelog Topic自动备份到Kafka,实现状态容错
- 选型决策很简单:Kafka进Kafka出、简单ETL/聚合→Kafka Streams;多数据源、复杂计算→Flink;已有Spark→Spark Streaming
下一篇我们深入Kafka Streams的时间维度——时间窗口、乱序处理、流表Join,这些才是生产环境真正考验人的地方。
上一篇【第72篇】Kafka Connect深度解析——Connector是如何工作的
下一篇【第74篇】Kafka Streams深度解析——时间、状态、窗口的三角关系