Spring Kafka 是 Spring Framework 生态系统中的一个模块,用于简化在 Spring 应用程序中集成 Apache Kafka 的过程,记录 (record) 指 Kafka 消息中的一条记录。
受影响版本中默认未对记录配置 ErrorHandlingDeserializer
,当用户将容器属性 checkDeserExWhenKeyNull
或 checkDeserExWhenValueNull
设置为 true(默认为 false),并且允许不受信任的源发布到 Kafka 主题中时,攻击者可将恶意 payload 注入到 Kafka 主题中,当反序列化记录头时远程执行任意代码。
2.8.1 <= Spring-Kafka <= 2.9.103.0.0 <= Spring-Kafka <= 3.0.9
这一个漏洞所影响的组件其实是 Spring-Kafka,严格意义上来说并不算是 kafka 的漏洞,应该算是 Spring 的漏洞。
先来看一看 SpringBoot 和 Kafka 是怎么完成通讯/消费的
工作流程如下
1、生产者将消息发送到 Kafka 集群中的某个 Broker(也可以是多个)2、Kafka 集群将消息存储在一个或多个分区中,并为每个分区维护一个偏移量3、消费者订阅一个或多个主题,并从 Kafka 集群中读取消息。4、消费者按顺序读取每个分区中的消息,并跟踪每个分区的偏移量。
ErrorHandlingDeserializer:是 Kafka中的一种反序列化器(Deserializer),它可以在反序列化过程中处理异常和错误。
checkDeserExWhenKeyNull && checkDeserExWhenValueNull:是 Kafka 中的一种序列化器(Serializer),它可以在序列化过程中检查键(key/value)是否为 null,并在发现值为 null 时抛出异常。
再简单整理一下漏洞条件
在受到影响的版本中,默认未对记录配置
ErrorHandlingDeserializer
容器属性checkDeserExWhenKeyNull
或checkDeserExWhenValueNull
设置为 true
其中需要我们起一个 Kafka 的服务,用来接收消息,本机上起比较麻烦,可以在 vps 上用 docker 迅速搭建,且需注意,Kafka 要能够接受外连,docker-compose.yml
如下
version'2'
services
zookeeper
image zookeeper
restart always
ports
"2181:2181"
container_name zookeeper
kafka
image wurstmeister/kafka
restart always
ports
"9092:9092"
"9094:9094"
depends_on
zookeeper
environment
KAFKA_ADVERTISED_HOST_NAME124.222.21.138
KAFKA_ZOOKEEPER_CONNECT zookeeper2181
KAFKA_LISTENERS PLAINTEXT //0.0.0.0 9092,SSL //0.0.0.09094
KAFKA_ADVERTISED_LISTENERS PLAINTEXT //124.222.21.138 9092,SSL //124.222.21.1389094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP PLAINTEXT PLAINTEXT,SSL SSL
KAFKA_INTER_BROKER_LISTENER_NAME PLAINTEXT
container_name kafka
Spring Kafka 的生产者和消费者可以通过使用 Spring Kafka 提供的 KafkaTemplate
和 `@KafkaListener
注解来编写。
生产者可以使用 KafkaTemplate
来发送消息到 Kafka 集群:
package com.drunkbaby.springkafkatest.controller;
import com.drunkbaby.springkafkatest.common.KafkaInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutionException;
"/producer") (
public class ProducerController {
private KafkaTemplate<String,String> kafkaTemplate;
"/fireAndForget") (
public String fireAndForget() {
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "fireAndForget:" + LocalDateTime.now());
return "success";
}
}
消费者可以使用 @KafkaListener
注解来监听 Kafka 集群中的消息:
package com.drunkbaby.springkafkatest.consumer;
import com.drunkbaby.springkafkatest.common.KafkaInfo;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
public class Consumer {
topics = KafkaInfo.TOPIC_WELCOME) (
public String consumer2( String message, MessageHeaders headers) {
System.out.println("消费者(注解方式):收到消息==> ");
System.out.println(" message:" + message);
System.out.println(" headers:");
headers.keySet().forEach(key -> System.out.println(" " + key + ":" + headers.get(key)));
return "success";
}
连接成功
访问 http://localhost:8083/producer/sync
发送一条记录
实际影响到的是 Consumer,且 Consumer 要设置 checkDeserExWhenKeyNull
或 checkDeserExWhenValueNull
为 true
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
payload 参考 https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040
主要是来看反序列化的部分
断点会先走到 org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader
方法,它这里面会获取到 PoC 中的 KEY_DESERIALIZER_EXCEPTION_HEADER
,并将其作为 headers
往下跟进 byteArrayToDeserializationException()
方法,这里就直接到反序列化的部分了,而在反序列化之前做了一次 resolveClass()
的校验。
而这里的 resolveClass()
校验是一次性的,这就代表我们可以构造其他的 Payload,如 CC 链等,证实是可以打通的
之后便会进入到对应类的 readObject()
方法
https://github.com/spring-projects/spring-kafka/commit/25ac793a78725e2ca4a3a2888a1506a4bfcf0c9d
相当于把这里的 header 头加黑了