注册

Spring Boot整合Kafka+SSE实现实时数据展示

2024年3月10日


知识积累


为什么使用Kafka?


不使用Rabbitmq或者Rocketmq是因为Kafka是Hadoop集群下的组成部分,对于大数据的相关开发适应性好,且当前业务场景下不需要使用死信队列,不过要注意Kafka对于更新时间慢的数据拉取也较慢,因此对与实时性要求高可以选择其他MQ。
使用消息队列是因为该中间件具有实时性,且可以作为广播进行消息分发。


为什么使用SSE?


使用Websocket传输信息的时候,会转成二进制数据,产生一定的时间损耗,SSE直接传输文本,不存在这个问题
由于Websocket是双向的,读取日志的时候,如果有人连接ws日志,会发送大量异常信息,会给使用段和日志段造成问题;SSE是单向的,不需要考虑这个问题,提高了安全性
另外就是SSE支持断线重连;Websocket协议本身并没有提供心跳机制,所以长时间没有数据发送时,会将这个连接断掉,因此需要手写心跳机制进行实现。
此外,由于是长连接的一个实现方式,所以SSE也可以替代Websocket实现扫码登陆(比如通过SSE的超时组件在实现二维码的超时功能,具体实现我可以整理一下)
另外,如果是普通项目,不需要过高的实时性,则不需要使用Websocket,使用SSE即可


代码实现


Java代码


pom.xml引入SSE和Kafka


<!-- SSE,一般springboot开发web应用的都有 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka,最主要的是第一个,剩下两个是测试用的 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency
<groupId>
org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>

application.properties增加Kafka配置信息


# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

配置Kafka信息


@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}

配置controller,通过web方式开启效果


@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {

private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

@Resource
private KafkaTemplate<String, String> kafkaTemplate;

@Resource
private SseEmitter sseEmitter;

/**
* @param message
* @apiNote 发送信息到Kafka主题中
*/

@PostMapping("/send")
public void sendMessage(@RequestBody String message) {
kafkaTemplate.send("my-topic", message);
}

/**
* 监听Kafka数据
*
* @param message
*/

@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void consume(String message) {
System.out.println("Received message: " + message);
//使用接口建立起sse连接后,监听到kafka消息则会发送给对应链接
SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.send(content); }
}

/**
* 连接sse服务
*
* @param id
* @return
* @throws IOException
*/

@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(@RequestParam("id") String id) throws IOException {
// 超时时间设置为5分钟,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(5_60_000L);
// 设置前端的重试时间为1s
// send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.send("你好", MediaType.APPLICATION_JSON);
SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");
sseEmitter.send(data);
// onTimeout(): 超时回调触发
sseEmitter.onTimeout(() -> {
System.out.println(id + "超时");
sseCache.remove(id);
});
// onCompletion(): 结束之后的回调触发
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
/**
* http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa
* @param id
* @param content
* @return
* @throws IOException
*/

@ResponseBody
@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(content);
}
return "over";
}

@ResponseBody
@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
// complete(): 表示执行完毕,会断开连接
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}

}

前端方式


<html>
<head>
<script>
console.log('start')
const clientId = "your_client_id_x"; // 设置客户端ID
const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 订阅服务器端的SSE

eventSource.onmessage = event => {
console.log(event.data)
const message = JSON.parse(event.data);
console.log(`Received message from server: ${message}`);
};

// 发送消息给服务器端 可通过 postman 调用,所以下面 sendMessage() 调用被注释掉了
function sendMessage() {
const message = "hello sse";
fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(message)
});
console.log('dddd'+JSON.stringify(message))
}
// sendMessage()
</script>
</head>
</html>

作者:艾迪的技术之路
来源:juejin.cn/post/7356770034180898857

0 个评论

要回复文章请先登录注册