Python+java+websocket+SpringMVC实时监控数据库中的表

时间:2022-07-22
本文章向大家介绍Python+java+websocket+SpringMVC实时监控数据库中的表,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Python+java+websocket+SpringMVC实时监控数据库中的表

为什么要对数据库中的表进行实时的监控?

做数据展示你可能会想到ajax,做实时数据展示你可能会想到定时器+ajax。 定时器+ajax可能会造成:如果到一个指定的时间点没有新数据的话会造成一定的资源浪费。 那么就要考虑其他的方式来解决实时的数据推送了。

这里我使用SpringMVC+Websocket配合前端HTML做实时数据展示; 使用Python监控Mysql的日志文件,从而起到了监控数据库的作用。

例1:股票系统

后台有数据了会主动的把数据推向前台,前台进行实时的展示数据。

例2:聊天系统

例3:实时大数据平台

模拟环境

利用一个写好的学生信息管理系统的dome做为数据来源(添加数据),模拟硬件设备实时采集的数据往数据库中新增的数据的过程。也可以直接在数据库中新增数据。

这个dome使用的是SpringBoot+Mybatis+thymeleaf+Mysql,前端使用的是layui。 可以做为一个很好的例子来学习。

前台展示页面

很粗糙

Mysql日志文件

为了方便演示,我是在windows下的mysql,可以直接加上 log=文件路径

代码

SpringMvc+WebSocket

目录结构

BitCoinServer.java文件

package cn.socketConfig;

import cn.websocket.controller.QuartzTask;

import java.io.IOException;
import java.time.LocalTime;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

/**
 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 */
@ServerEndpoint("/ws/bitcoinServer")
public class BitCoinServer {
	
	//与某个客户端的连接会话,需要通过它来给客户端发送数据
	private Session session;

	@OnOpen
	public void onOpen(Session session){
		this.session = session;
		ServerManager.add(this);     
	}
	
	public void sendMessage(String message) throws IOException{
		this.session.getBasicRemote().sendText(message);
	}

	@OnClose
	public void onClose(){
		ServerManager.remove(this);  
	}

	@OnMessage
	public void onMessage(String message, Session session) {
//		if(message!=null){
			System.out.println("来自客户端的消息:" + message+",数据库发生改变了,正在向前台推送数据");
			QuartzTask.Message();
//		}
	}

	@OnError
	public void onError(Session session, Throwable error){
		System.out.println("发生错误");
		error.printStackTrace();
	}

}

ServerManager.java文件

package cn.socketConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

public class ServerManager {

	private static Collection<BitCoinServer> servers = Collections.synchronizedCollection(new ArrayList<BitCoinServer>());
	
	public static void broadCast(String msg){
		for (BitCoinServer bitCoinServer : servers) {
			try {
				bitCoinServer.sendMessage(msg);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	public static int getTotal(){
		return servers.size();
	}
	public static void add(BitCoinServer server){
		System.out.println("有新连接加入! 当前总连接数是:"+ servers.size());
		servers.add(server);
	}
	public static void remove(BitCoinServer server){
		System.out.println("有连接退出! 当前总连接数是:"+ servers.size());
		servers.remove(server);
	}
	
}

QuartzTask.java文件

package cn.websocket.controller;

import cn.socketConfig.ServerManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.time.LocalTime;

//@Component("taskJob")
public class QuartzTask {

    public static void Message() {
        LocalTime time = LocalTime.now();
//			System.out.println("哈哈,我执行了!"+"当前时间=" + time);
        System.out.println("数据库更新数据了"+"当前时间=" + time+",已向前台推送数据!");
        //广播出去
        ServerManager.broadCast("数据库更新数据了"+"当前时间=" + time+",已向前台推送数据!");
        System.out.println("已成功向前台推送了数据");
    }

    /**
     * CRON表达式                含义 
    "0 0 12 * * ?"            每天中午十二点触发 
    "0 15 10 ? * *"            每天早上10:15触发 
    "0 15 10 * * ?"            每天早上10:15触发 
    "0 15 10 * * ? *"        每天早上10:15触发 
    "0 15 10 * * ? 2005"    2005年的每天早上10:15触发 
    "0 * 14 * * ?"            每天从下午2点开始到2点59分每分钟一次触发 
    "0 0/5 14 * * ?"        每天从下午2点开始到2:55分结束每5分钟一次触发 
    "0 0/5 14,18 * * ?"        每天的下午2点至2:55和6点至6点55分两个时间段内每5分钟一次触发 
    "0 0-5 14 * * ?"        每天14:00至14:05每分钟一次触发 
    "0 10,44 14 ? 3 WED"    三月的每周三的14:10和14:44触发 
    "0 15 10 ? * MON-FRI"   每个周一、周二、周三、周四、周五的10:15触发
     */
    
    /**
     * 每天5点触发(清空验证码表t_captcha中的数据)
     */
//    @Scheduled(cron = "0 0/1 1-23 * * ?")
//    public void testTask(){
//        LocalTime time = LocalTime.now();
//        System.out.println("哈哈,我执行了!"+"当前时间=" + time);
//        //广播出去
//        ServerManager.broadCast("数据库更新数据了"+"当前时间=" + time);
//    }
}

index.jsp文件

<%@ page language="java" pageEncoding="UTF-8" %>
<!DOCTYPE html>
<html>
<head>
    <title>用WebSocket实时消息推送,监控mysql数据表变化</title>
</head>
<body>

<div style="width:400px;margin:20px auto;border:1px solid lightgray;padding:20px;text-align:center;">
    实时消息推送,监控mysql数据表变化:
<%--    <span style="color:#FF7519" id="price">10000</span>--%>
<%--    <div style="font-size:0.9em;margin-top:20px">查看的人数越多,价格越高, 当前总共 <span id="total">1</span> 个人在线</div>--%>
    <div style="color:silver;font-size:0.8em;margin-top:20px" id="data">正在等待数据</div>
</div>

</body>

<script type="text/javascript">
    var websocket = null;
    //判断当前浏览器是否支持WebSocket
    if ('WebSocket' in window) {
        websocket = new WebSocket("ws://localhost:8080/SpringMvc_WebSocket_war_exploded/ws/bitcoinServer");

        //连接成功建立的回调方法
        // websocket.onopen = function () {
        //     websocket.send("客户端链接成功");
        // }

        //接收到消息的回调方法
        websocket.onmessage = function (event) {
            setMessageInnerHTML(event.data);
            console.log(event)
        }

        //连接发生错误的回调方法
        // websocket.onerror = function () {
        //     alert("WebSocket连接发生错误");
        // };

        //连接关闭的回调方法
        websocket.onclose = function () {
            alert("WebSocket连接关闭");
        }

        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
            closeWebSocket();
        }

    } else {
        alert('当前浏览器 Not support websocket')
    }

    //将消息显示在网页上
    function setMessageInnerHTML(data) {
        // var bitcoin = eval("("+innerHTML+")");
        document.getElementById('data').append(data);
        // document.getElementById('total').innerHTML = bitcoin.total;
    }

    //关闭WebSocket连接
    function closeWebSocket() {
        websocket.close();
    }

</script>
</html>

启动程序后可访问: http://localhost:8080/SpringMvc_WebSocket_war_exploded/

出现如下图所示的界面就成功了

Python代码

import time
import test
# import test

url = "G:/soft/phpStudy/PHPTutorial/MySQL/log/mysql_log.txt"  #监控的mysql日志的地址
file = open(url,encoding='UTF-8') #打开日志文件
# ws = test.sendMessage()


while 1:  #一直监控
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        # print(line)
        # 如果最新的一行包含了insert这个语句,就主动连接websocket服务器,并告知服务器有新数据
        if line.find("INSERT INTO `course`")>=0 or line.find("insert into course")>=0:
            print("数据库有新数据了,正在向socket服务发送请求消息,请求接收数据")
            ws = None
            try:
                ws = test.CG_Client('ws://localhost:8080/SpringMvc_WebSocket_war_exploded/ws/bitcoinServer')
                ws.connect()
                ws.run_forever()
            except KeyboardInterrupt:
                ws.close()
                print("出错了")

启动py文件

新增数据测试

测试第二条:

测试直接在数据库中插入新数据