上一篇文章只是简单帮大家梳理一下什么是SSE。这篇文章,则会放上真实Spring框架对SSE的封装了。框架封装了send方法,我们可以通过业务主动去给客户端推送事件。
我本来考虑实现服务器宕机重启后,SSE请求对象保持原有不变,实现前端SSE重连。但是经过实际操作,以及思考后,我发现此方案不能解决此问题。因为响应对象存储在服务端的JUC包下的Map中。我们无法通过Redis存储信息,然后重新获取原来的响应对象。SSE在服务端的响应对象与Session机制类似。也就是无法跨服务使用!所以,我们压根就不用考虑这个问题。这对于SSE来说是个伪需求!
先放上前端代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE,服务器主动触发消息</title>
</head>
<body>
<h1>开发者模式,查看控制台、以及网络可以查看相关输出</h1>
消息类型是message消息:
<div id="ssediv">默认消息</div>
<br>
消息类型是diyEventType消息:
<div id="diyssediv">DIY SSE消息</div>
<br>
<br>
<br>
<br>
<div id="diybutton">
<button type="button" onclick="connectSSE()">主动连接SSE服务器!</button>
<button type="button" onclick="closeSSE()">关闭SSE连接!</button>
<button type="button" onclick="diyclick()">点我模拟服务器发送消息!</button>
</div>
</body>
<script>
var sse = new EventSource("http://localhost:8089/sse-plus");
/**
* 默认是没有指定eventTtpe的消息,但eventType就为message。
* 等价于addEventListener("message" ...
*/
sse.onmessage = function (ev) {
console.info("这里只能处理eventType为message的消息")
var elementById = document.getElementById("ssediv");
elementById.innerHTML = ev.data;
}
/**
* 添加指定类型消息处理,eventType是后台自定义的
*/
sse.addEventListener("diyEventType", event => {
console.info("自定义事件" + event.data)
var elementById2 = document.getElementById("diyssediv");
elementById2.innerHTML = event.data;
})
/**
* SSE连接异常
*/
sse.onerror = function () {
alert("服务器已停止!")
}
/**
* SSE连接成功
*/
sse.onopen = function () {
alert("服务器已连接!")
}
// 不要忘记关闭断开连接哦
// sse.close()
</script>
<script>
function diyclick() {
var xmlHttpRequest = new XMLHttpRequest();
xmlHttpRequest.open("get", "http://localhost:8089/sendMessage")
xmlHttpRequest.send()
}
// 关闭SSE
function closeSSE() {
sse.close()
console.info("服务器已关闭!")
}
// 连接SSE
function connectSSE() {
sse.close()
sse = new EventSource("http://localhost:8089/sse-plus");
sse.addEventListener("diyEventType", event => {
console.info("自定义事件" + event.data)
var elementById2 = document.getElementById("diyssediv");
elementById2.innerHTML = event.data;
})
/**
* 默认是没有指定eventTtpe的消息,但eventType就为message。
* 等价于addEventListener("message" ...
*/
sse.onmessage = function (ev) {
console.info("这里只能处理eventType为message的消息")
var elementById = document.getElementById("ssediv");
elementById.innerHTML = ev.data;
}
/**
* SSE连接异常
*/
sse.onerror = function () {
console.info("服务器已停止!")
}
/**
* SSE连接成功
*/
sse.onopen = function () {
console.info("服务器已连接!")
}
}
</script>
</html>
粘贴Java代码
放入导包内容,谨防部分朋友找不到包内容
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.function.Consumer;
private static Integer sendTimes = 0;
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@GetMapping(value = "/sse-plus")
@ResponseBody
public SseEmitter SseEmitter(HttpServletResponse response) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常,需在全局异常捕获:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L); // 单位ms,如果你设定了,会自动断开。如果前端有自动重试机制,间歇断开可减少连接被长久占用。
response.setContentType("text/event-stream"); // 指定ContentType,不可变
response.setCharacterEncoding("utf-8"); // 指定响应字符集,不可变,经测试非UTF-8则会中文乱码,但建议指定utf-8
String clientId = UUID.randomUUID().toString();
// 注册回调
// >> 回调1:长链接完成后回调接口(即关闭连接时调用)
sseEmitter.onCompletion(() -> {
sseCache.remove(clientId);
log.info("SSE onCompletion: {}连接关闭时触发", clientId);
});
// >> 回调2:出现异常会调用此方法
sseEmitter.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
sseCache.remove(clientId);
log.info("SSE onError:{}出现异常", clientId);
}
});
// >> 回调3:出现连接超时,会调用此方法
sseEmitter.onTimeout(() -> {
sseCache.remove(clientId);
log.error("SSE onTimeout:{}超时了", clientId);
});
sseCache.put(clientId, sseEmitter);
log.info("创建新的sse连接,当前用户:{}", clientId);
try {
sseEmitter.send(SseEmitter.event().id(clientId).name("diyEventType").data("连接成功" + clientId));
} catch (IOException e) {
log.error("SSE: 给客户端发送消息异常,客户端ID:{}", clientId, e);
throw new RuntimeException("给客户端发送消息异常!", e);
}
return sseEmitter;
}
/**
* 长链接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端ID
**/
private Runnable completionCallBack(String clientId) {
return () -> {
log.info("结束连接:{}", clientId);
};
}
// 创建一个线程池,用于处理大批量用户掉线的问题
private static ExecutorService executorService = new ThreadPoolExecutor(3,
5,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); // 采用阻塞队列。不丢任何消息
/**
* 缺陷:此处发送消息时候,如果用户连接失效,服务器实际无法感知,只能通过再次调用send出现异常时候来判断用户已经断联
* 如果采用重试机制,很容易造成阻塞。如果用户体量很大,建议采用MQ的方式,将消息甩到MQ。剩下由其他服务或线程来处理。
*
* @return
*/
@GetMapping(value = "/sendMessage")
@ResponseBody
public String sendMessage() {
TimeInterval timer = DateUtil.timer();
sendTimes++;
// 这里从缓存中拿到sse对象,调用send方法实现主动推送
sseCache.forEach((clientId, sseEmitter) -> {
try {
// 建议每次发送都以新Obj:此对象是builder,调用一次,相当于append一次!
SseEmitter.SseEventBuilder data = SseEmitter.event()
.reconnectTime(5000)
.name("diyEventType");
// 直接调用对象的send方法就可以,自定义主动推送消息了
data.id(clientId)
.data("宝贝Aa1😊 " + clientId + " : " + sendTimes, MediaType.APPLICATION_JSON)
.data("</br>当前连接用户数: " + sseCache.size())
.data("</br>发送本次耗时:" + timer.interval());
sseEmitter.send(data);
} catch (IOException e) {
executorService.execute(() -> {
// 推送消息失败后,每隔3s推送一次,推送5次。如果不使用线程池,就会导致发消息时,重试机制导致其他用户消息无法处理!
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(3000);
if (sseEmitter == null) {
log.error("消息推送出现异常:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
// 这里data非全局消息,想办法抽离出去即可 sseEmitter.send(data);
} catch (Exception ex) {
log.error("消息推送出现异常:{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
if (i == 4) {
sseCache.remove(clientId);
log.error("由于用户{},消息推送老是失败,则不再尝试推送消息!", clientId);
}
//log.info("消息推送出现异常:{}的第{}次消息重推成功,{}", clientId, i + 1, data);
return;
}
});
}
});
return "消息推送成功!";
}
特殊说明: 上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤