SocketIO实现消息推送

时间:2020-07-21
本文章向大家介绍SocketIO实现消息推送,主要包括SocketIO实现消息推送使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Springboot + SocketIO实现点赞、关注、评论等即时消息推送(Java后台代码)

pom.xml

<!-- 集中定义依赖版本号 -->
<properties>
    <!-- Tools Settings -->
    <hutool.version>5.3.8</hutool.version>
    
    <!-- Netty Setting -->
    <netty-all.version>4.1.50.Final</netty-all.version>
    <netty-socketio.version>1.7.17</netty-socketio.version>
</properties>

<dependencies>
    <!-- Tools Begin -->
    <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
       <groupId>cn.hutool</groupId>
       <artifactId>hutool-all</artifactId>
       <version>${hutool.version}</version>
    </dependency>
    <!-- Tools End -->

    <!-- SocketIO Begin -->
    <dependency>
        <groupId>com.corundumstudio.socketio</groupId>
        <artifactId>netty-socketio</artifactId>
        <version>${netty-socketio.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>${netty-all.version}</version>
    </dependency>
    <!-- SocketIO End -->
  </dependencies>

application.yml

socketIO:
  # SocketIO端口
  port: 9090
  # 连接数大小
  workCount: 100
  # 允许客户请求
  allowCustomRequests: true
  # 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
  upgradeTimeout: 10000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 60000
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000
  # 设置HTTP交互最大内容长度
  maxHttpContentLength: 1048576
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576

SocketConfig

import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Socket配置
 *
 * @author bai
 * @since 2020/7/20
 */
@Configuration
public class SocketConfig {

    @Value("${socketIO.port}")
    private Integer port;

    @Value("${socketIO.workCount}")
    private int workCount;

    @Value("${socketIO.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketIO.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketIO.pingTimeout}")
    private int pingTimeout;

    @Value("${socketIO.pingInterval}")
    private int pingInterval;

    @Value("${socketIO.maxFramePayloadLength}")
    private int maxFramePayloadLength;

    @Value("${socketIO.maxHttpContentLength}")
    private int maxHttpContentLength;


    /**
     * SocketIOServer 配置
     *
     * @return {@link SocketIOServer}
     * @author bai
     * @since 2020/7/20
     */
    @Bean("socketIOServer")
    public SocketIOServer socketIoServer() {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        // 配置端口
        config.setPort(port);
        // 开启Socket端口复用
        com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
        socketConfig.setReuseAddress(true);
        config.setSocketConfig(socketConfig);
        // 连接数大小
        config.setWorkerThreads(workCount);
        // 允许客户请求
        config.setAllowCustomRequests(allowCustomRequests);
        // 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
        config.setUpgradeTimeout(upgradeTimeout);
        // Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
        config.setPingTimeout(pingTimeout);
        // Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
        config.setPingInterval(pingInterval);
        // 设置HTTP交互最大内容长度
        config.setMaxHttpContentLength(maxHttpContentLength);
        // 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
        config.setMaxFramePayloadLength(maxFramePayloadLength);
        return new SocketIOServer(config);
    }


    /**
     * 开启 SocketIOServer 注解支持
     *
     * @param socketServer socketServer
     * @return {@link SpringAnnotationScanner}
     * @author bai
     * @since 2020/7/20
     */
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

SocketServer

import ch.qos.logback.core.net.server.ServerRunner;
import com.corundumstudio.socketio.SocketIOServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * SpringBoot启动之后执行
 *
 * @author bai
 * @since 2020/7/20
 */
@Component
@Order(1)
public class SocketServer implements CommandLineRunner {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);

    /**
     * socketIOServer
     */
    private final SocketIOServer socketIOServer;

    @Autowired
    public SocketServer(SocketIOServer server) {
        this.socketIOServer = server;
    }

    @Override
    public void run(String... args) {
        logger.info("---------- NettySocket通知服务开始启动 ----------");
        socketIOServer.start();
        logger.info("---------- NettySocket通知服务启动成功 ----------");
    }
}

MessageDTO

import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 消息数据传输对象
 *
 * @author bai
 * @since 2020/7/20
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {

    /**
     * 登录用户 Id
     */
    @ApiModelProperty(value = "登录用户Id", name = "userId")
    private String userId;

    /**
     * 接收消息用户 Id
     */
    @ApiModelProperty(value = "接收消息用户Id", name = "toUserId")
    private String toUserId;

    /**
     * 被操作对象 Id
     */
    @ApiModelProperty(value = "被操作对象Id", name = "beOperatedId")
    private String beOperatedId;

    /**
     * 消息类型
     */
    @ApiModelProperty(value = "消息类型", name = "msgType")
    private String msgType;
}

MsgTypeEnum

/**
 * 消息类型
 *
 * @author bai
 * @since 2020/7/20
 */
public enum MsgTypeEnum {

    /**
     * 关注
     */
    FOLLOW("follow"),

    /**
     * 认同
     */
    LIKE("like"),

    /**
     * 评论
     */
    COMMENT("comment");

    private String value;

    MsgTypeEnum(String type) {
        value = type;
    }

    public String getValue() {
        return value;
    }
}

MsgStatusEnum

/**
 * 消息状态
 *
 * @author bai
 * @since 2020/7/20
 */
public enum MsgStatusEnum {

    /**
     * 上线
     */
    ONLINE("上线"),

    /**
     * 下线
     */
    OFFLINE("下线");

    private String value;

    MsgStatusEnum(String type) {
        value = type;
    }

    public String getValue() {
        return value;
    }
}

SocketHandler

import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.tbsc.pojo.dto.MessageDTO;
import com.tbsc.service.UserService;
import com.tbsc.utils.enums.MsgStatusEnum;
import com.tbsc.utils.enums.MsgTypeEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Socket 处理器
 *
 * @author bai
 * @since 2020/7/20
 */
@Component
public class SocketHandler {

    /**
     * ConcurrentHashMap 保存当前 SocketServer 用户 Id 对应关系
     */
    private Map<String, UUID> clientMap = new ConcurrentHashMap<>(16);

    /**
     * socketIOServer
     */
    private final SocketIOServer socketIOServer;

    @Autowired
    public SocketHandler(SocketIOServer server) {
        this.socketIOServer = server;
    }

    @Resource
    private UserService userService;

    /**
     * 当客户端发起连接时调用
     *
     * @param socketClient socketClient
     * @author bai
     * @since 2020/7/20
     */
    @OnConnect
    public void onConnect(SocketIOClient socketClient) {
        String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
        if (StrUtil.isNotBlank(userId)) {
            if (userService.queryUserById(userId) != null) {
                System.out.println("用户{" + userId + "}开启长连接通知, NettySocketSessionId: {"
                        + socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
                        + socketClient.getRemoteAddress().toString() + "}");
                // 保存
                clientMap.put(userId, socketClient.getSessionId());
                // 发送上线通知
                this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.ONLINE.getValue()));
            }
        }
    }


    /**
     * 客户端断开连接时调用,刷新客户端信息
     *
     * @param socketClient socketClient
     * @author bai
     * @since 2020/7/20
     */
    @OnDisconnect
    public void onDisConnect(SocketIOClient socketClient) {
        String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
        if (StrUtil.isNotBlank(userId)) {
            System.out.println("用户{" + userId + "}断开长连接通知, NettySocketSessionId: {"
                    + socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
                    + socketClient.getRemoteAddress().toString() + "}");
            // 移除
            clientMap.remove(userId);
            // 发送下线通知
            this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.OFFLINE.getValue()));
        }
    }


    /**
     * 发送上下线通知
     *
     * @param messageDTO messageDTO
     * @author bai
     * @since 2020/7/20
     */
    private void sendNotice(MessageDTO messageDTO) {
        if (messageDTO != null) {
            // 全部发送
            clientMap.forEach((key, value) -> {
                if (value != null) {
                    socketIOServer.getClient(value).sendEvent("receiveMsg", messageDTO);
                }
            });
        }
    }


    /**
     * sendMsg:   接收前端消息,方法名需与前端一致
     * receiveMsg:前端接收后端发送数据的方法,方法名需与前端一致
     *
     * @param socketClient socketClient
     * @param messageDTO messageDTO
     * @author bai
     * @since 2020/7/20
     */
    @OnEvent("sendMsg")
    public void sendMsg(SocketIOClient socketClient, MessageDTO messageDTO) {

        // 获取前端传入的接收消息用户 Id
        String toUserId = messageDTO.getToUserId();
        // 客户端 SessionId
        UUID sessionId = clientMap.get(toUserId);
        // 获取前端传入的消息类型
        String msgType = messageDTO.getMsgType();
        // 获取前端传入的当前登录用户 Id
        String userId = messageDTO.getUserId();
        // 获取前端传入的被操作对象 Id
        String beOperatedId = messageDTO.getBeOperatedId();

        // 如果 SessionId 相同,表示用户在线,发送即时通知,用户每次打开 APP 都会生成新的 SessionId
        if (sessionId.equals(socketClient.getSessionId())) {
            if (msgType.equals(MsgTypeEnum.LIKE.getValue())) {
                socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一条认同消息");
            } else if (msgType.equals(MsgTypeEnum.FOLLOW.getValue())) {
                socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一条关注消息");
            } else if (msgType.equals(MsgTypeEnum.COMMENT.getValue())) {
                socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一条评论消息");
            } else {
                System.out.println("消息类型不匹配");
            }
        }
    }
}

原文地址:https://www.cnblogs.com/cnbai/p/13355048.html