SocketIO实现消息推送
时间:2020-07-21
本文章向大家介绍SocketIO实现消息推送,主要包括SocketIO实现消息推送使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Springboot + SocketIO实现点赞、关注、评论等即时消息推送(Java后台代码)
pom.xml
<!-- 集中定义依赖版本号 -->
<properties>
<!-- Tools Settings -->
<hutool.version>5.3.8</hutool.version>
<!-- Netty Setting -->
<netty-all.version>4.1.50.Final</netty-all.version>
<netty-socketio.version>1.7.17</netty-socketio.version>
</properties>
<dependencies>
<!-- Tools Begin -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- Tools End -->
<!-- SocketIO Begin -->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>${netty-socketio.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
</dependency>
<!-- SocketIO End -->
</dependencies>
application.yml
socketIO:
# SocketIO端口
port: 9090
# 连接数大小
workCount: 100
# 允许客户请求
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
upgradeTimeout: 10000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 60000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000
# 设置HTTP交互最大内容长度
maxHttpContentLength: 1048576
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
SocketConfig
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Socket配置
*
* @author bai
* @since 2020/7/20
*/
@Configuration
public class SocketConfig {
@Value("${socketIO.port}")
private Integer port;
@Value("${socketIO.workCount}")
private int workCount;
@Value("${socketIO.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketIO.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketIO.pingTimeout}")
private int pingTimeout;
@Value("${socketIO.pingInterval}")
private int pingInterval;
@Value("${socketIO.maxFramePayloadLength}")
private int maxFramePayloadLength;
@Value("${socketIO.maxHttpContentLength}")
private int maxHttpContentLength;
/**
* SocketIOServer 配置
*
* @return {@link SocketIOServer}
* @author bai
* @since 2020/7/20
*/
@Bean("socketIOServer")
public SocketIOServer socketIoServer() {
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
// 配置端口
config.setPort(port);
// 开启Socket端口复用
com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
socketConfig.setReuseAddress(true);
config.setSocketConfig(socketConfig);
// 连接数大小
config.setWorkerThreads(workCount);
// 允许客户请求
config.setAllowCustomRequests(allowCustomRequests);
// 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
config.setUpgradeTimeout(upgradeTimeout);
// Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
config.setPingTimeout(pingTimeout);
// Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
config.setPingInterval(pingInterval);
// 设置HTTP交互最大内容长度
config.setMaxHttpContentLength(maxHttpContentLength);
// 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
config.setMaxFramePayloadLength(maxFramePayloadLength);
return new SocketIOServer(config);
}
/**
* 开启 SocketIOServer 注解支持
*
* @param socketServer socketServer
* @return {@link SpringAnnotationScanner}
* @author bai
* @since 2020/7/20
*/
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
SocketServer
import ch.qos.logback.core.net.server.ServerRunner;
import com.corundumstudio.socketio.SocketIOServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* SpringBoot启动之后执行
*
* @author bai
* @since 2020/7/20
*/
@Component
@Order(1)
public class SocketServer implements CommandLineRunner {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);
/**
* socketIOServer
*/
private final SocketIOServer socketIOServer;
@Autowired
public SocketServer(SocketIOServer server) {
this.socketIOServer = server;
}
@Override
public void run(String... args) {
logger.info("---------- NettySocket通知服务开始启动 ----------");
socketIOServer.start();
logger.info("---------- NettySocket通知服务启动成功 ----------");
}
}
MessageDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 消息数据传输对象
*
* @author bai
* @since 2020/7/20
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {
/**
* 登录用户 Id
*/
@ApiModelProperty(value = "登录用户Id", name = "userId")
private String userId;
/**
* 接收消息用户 Id
*/
@ApiModelProperty(value = "接收消息用户Id", name = "toUserId")
private String toUserId;
/**
* 被操作对象 Id
*/
@ApiModelProperty(value = "被操作对象Id", name = "beOperatedId")
private String beOperatedId;
/**
* 消息类型
*/
@ApiModelProperty(value = "消息类型", name = "msgType")
private String msgType;
}
MsgTypeEnum
/**
* 消息类型
*
* @author bai
* @since 2020/7/20
*/
public enum MsgTypeEnum {
/**
* 关注
*/
FOLLOW("follow"),
/**
* 认同
*/
LIKE("like"),
/**
* 评论
*/
COMMENT("comment");
private String value;
MsgTypeEnum(String type) {
value = type;
}
public String getValue() {
return value;
}
}
MsgStatusEnum
/**
* 消息状态
*
* @author bai
* @since 2020/7/20
*/
public enum MsgStatusEnum {
/**
* 上线
*/
ONLINE("上线"),
/**
* 下线
*/
OFFLINE("下线");
private String value;
MsgStatusEnum(String type) {
value = type;
}
public String getValue() {
return value;
}
}
SocketHandler
import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.tbsc.pojo.dto.MessageDTO;
import com.tbsc.service.UserService;
import com.tbsc.utils.enums.MsgStatusEnum;
import com.tbsc.utils.enums.MsgTypeEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* Socket 处理器
*
* @author bai
* @since 2020/7/20
*/
@Component
public class SocketHandler {
/**
* ConcurrentHashMap 保存当前 SocketServer 用户 Id 对应关系
*/
private Map<String, UUID> clientMap = new ConcurrentHashMap<>(16);
/**
* socketIOServer
*/
private final SocketIOServer socketIOServer;
@Autowired
public SocketHandler(SocketIOServer server) {
this.socketIOServer = server;
}
@Resource
private UserService userService;
/**
* 当客户端发起连接时调用
*
* @param socketClient socketClient
* @author bai
* @since 2020/7/20
*/
@OnConnect
public void onConnect(SocketIOClient socketClient) {
String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
if (StrUtil.isNotBlank(userId)) {
if (userService.queryUserById(userId) != null) {
System.out.println("用户{" + userId + "}开启长连接通知, NettySocketSessionId: {"
+ socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
+ socketClient.getRemoteAddress().toString() + "}");
// 保存
clientMap.put(userId, socketClient.getSessionId());
// 发送上线通知
this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.ONLINE.getValue()));
}
}
}
/**
* 客户端断开连接时调用,刷新客户端信息
*
* @param socketClient socketClient
* @author bai
* @since 2020/7/20
*/
@OnDisconnect
public void onDisConnect(SocketIOClient socketClient) {
String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
if (StrUtil.isNotBlank(userId)) {
System.out.println("用户{" + userId + "}断开长连接通知, NettySocketSessionId: {"
+ socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
+ socketClient.getRemoteAddress().toString() + "}");
// 移除
clientMap.remove(userId);
// 发送下线通知
this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.OFFLINE.getValue()));
}
}
/**
* 发送上下线通知
*
* @param messageDTO messageDTO
* @author bai
* @since 2020/7/20
*/
private void sendNotice(MessageDTO messageDTO) {
if (messageDTO != null) {
// 全部发送
clientMap.forEach((key, value) -> {
if (value != null) {
socketIOServer.getClient(value).sendEvent("receiveMsg", messageDTO);
}
});
}
}
/**
* sendMsg: 接收前端消息,方法名需与前端一致
* receiveMsg:前端接收后端发送数据的方法,方法名需与前端一致
*
* @param socketClient socketClient
* @param messageDTO messageDTO
* @author bai
* @since 2020/7/20
*/
@OnEvent("sendMsg")
public void sendMsg(SocketIOClient socketClient, MessageDTO messageDTO) {
// 获取前端传入的接收消息用户 Id
String toUserId = messageDTO.getToUserId();
// 客户端 SessionId
UUID sessionId = clientMap.get(toUserId);
// 获取前端传入的消息类型
String msgType = messageDTO.getMsgType();
// 获取前端传入的当前登录用户 Id
String userId = messageDTO.getUserId();
// 获取前端传入的被操作对象 Id
String beOperatedId = messageDTO.getBeOperatedId();
// 如果 SessionId 相同,表示用户在线,发送即时通知,用户每次打开 APP 都会生成新的 SessionId
if (sessionId.equals(socketClient.getSessionId())) {
if (msgType.equals(MsgTypeEnum.LIKE.getValue())) {
socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一条认同消息");
} else if (msgType.equals(MsgTypeEnum.FOLLOW.getValue())) {
socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一条关注消息");
} else if (msgType.equals(MsgTypeEnum.COMMENT.getValue())) {
socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一条评论消息");
} else {
System.out.println("消息类型不匹配");
}
}
}
}
原文地址:https://www.cnblogs.com/cnbai/p/13355048.html
- 百度魅族深度学习大赛初赛冠军作品(图像识别.源码)
- easyUI整合富文本编辑器KindEditor详细教程(附源码)
- 经典Java面试题收集(二)
- 使用sqlt手工创建sql_profile(r4笔记第37天)
- 使用ash分析ORA-01652问题(r4笔记第36天)
- Spring+SpringMVC+MyBatis+easyUI整合进阶篇(八)线上Mysql数据库崩溃事故的原因和处理
- 数据结构01 算法的时间复杂度和空间复杂度
- Docker+SpringBoot+Mybatis+thymeleaf的Java博客系统开源啦
- Spring【依赖注入】就是这么简单
- 数据结构02 线性表之顺序表
- struts2+spring+hibernate整合步骤(2)
- Spring入门这一篇就够了
- Mybatis面试题
- 从业务角度分析奇怪的数据库高负载问题 (r4笔记第35天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Linux如何查看端口被占用情况?Windows如何查看端口被占用情况?
- centos7安装Docker详细步骤(无坑版教程)
- Java的Object类详解(入门必备)
- Java的包机制以及import、static import的用法详解(看这篇就够了)
- AIM2020 Efficient Super Resolution: Methods and Results
- hive的group by与distinct的区别及性能测试比较
- hive的order by操作
- centos安装mysql-server报错:No package mysql-server available.
- Java的内部类详解(结合代码全面分析)
- jdk8安装及环境变量配置
- 使用MA Anderson御用软件SpliceSeq对TCGA数据库的RNA-seq找可变剪切
- Tomcat9安装配置、服务配置开机自启动以及启动窗口的中文乱码问题解决
- Java的System.exit()详解
- Hadoop伪分布式搭建(hadoop2.x通用)
- R语言tryCatch使用方法:判断Warning和Error