数据库进程间通信解决方案之MQ
数据库进程间通信解决方案之MQ
摘要
你是否想过当数据库中的数据发生变化的时候出发某种操作?但因数据无法与其他进程通信(传递信号)让你放弃,而改用每隔一段时间查询一次数据变化的方法?下面的插件可以解决你的问题。
原文出处:http://netkiller.github.io/journal/mysql.plugin.fifo.html
目录
- 1. 背景
- 2. 应用场景
- 3. Mysql plugin
- 4. plugin 的开发与使用
- 5. 插件如何使用
1. 背景
之前我发表过一篇文章 http://netkiller.github.io/journal/mysql.plugin.fifo.html
该文章中提出了通过fifo 管道,实现数据库与其他进程的通信。属于 IPC 机制(同一个OS/服务器内),后我有采用ZeroMQ重新实现了一个 RPC 机制的方案,同时兼容IPC(跨越OS/服务器)
各种缩写的全称 IPC(IPC :Inter-Process Communication 进程间通信),ITC(ITC : Inter Thread Communication 线程间通信)与RPC(RPC: Remote Procedure Calls远程过程调用)。
支持协议
inproc://my_publisher
tcp://server001:5555
ipc:///tmp/feeds/0
2. 应用场景
如果你像处理数据,由于各种原因你不能在程序中实现,你可以使用这个插件。当数据库中的数据发生变化的时候出发某种操作,你可以使用这个插件。
有时候你的项目可能是外包的,项目结束后外包方不会在管你,你有无法改动现有代码,或者根本不敢改。你可以使用这个插件
采用MQ技术对数据库无任何压力,与采用程序处理并无不同,省却了写代码
处理方法,可以采用同步或者异步方式
例 1. 发送短信
发送短信、邮件,只需要查询出相应手机号码,发送到MQ的服务端,服务端接收到手机号码后,放入队列中,多线程程序从队列中领取任务,发送短信。
select zmq_client('tcp://localhost:5555',mobile) from demo where subscribed='Y' ...;
传递多个参数,可以使用符号分隔
select zmq_client('tcp://localhost:5555',concat(name,',',mobile,', news')) from demo;
select zmq_client('tcp://localhost:5555',concat(name,'|',mobile,'|news')) from demo;
json格式
select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,', template:news}')) from demo;
建议采用异步方式,MQ端接收到任务立即反馈 “成功”信息,因为我们不太关心是否能发送成功,本身就是盲目性的发送,手机号码是否可用我们无从得知,短信或者邮件的发送到达率不是100%,所以当进入队列后,让程序自行处理,将成功或者失败信息记录到日志中即可。
例 2. 处理图片
首先查询出需要处理图片,然后将路径与分辨率传递给MQ另一端的处理程序
select zmq_client('tcp://localhost:5555',concat(image,',800x600}')) from demo;
建议采用异步方式,MQ端接收到任务立即反馈 “成功”信息
例 3. 身份证号码校验
select zmq_client('tcp://localhost:5555',id_number) from demo;
可以采用同步方案,因为MQ款处理几乎不会延迟,直接将处理结构反馈
例 4. 静态化案例
情景模拟,你的项目是你个电商项目,采用外包模式开发,项目已经开发完成。外包放不再负责维护,你现在要做静态化。增加该功能,你要检查多处与商品表相关的造作。
于其改代码,不如程序从外部处理,这样更保险。我们只要写一个程序将动态 URL 下载保存成静态即可,当数据发生变化的时候重新下载覆盖即可
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_insert` AFTER INSERT ON `demo` FOR EACH ROW BEGIN
select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_update` AFTER UPDATE ON `demo` FOR EACH ROW BEGIN
select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_delete` AFTER DELETE ON `demo` FOR EACH ROW BEGIN
select zmq_client('tcp://localhost:5555', NEW.id);
END
MQ 另一端的服务会下载http://www.example.com/goods.php?cid=111&id=100, 然后生成html页面,http://www.example.com/111/100.html
插入会新建页面,更新会覆盖页面,删除会删除页面
这样无论商品的价格,属性改变,静态化程序都会做出相应的处理。
例 5. 数据同步案例
我们有多个数据库,A 库里面的数据发生变化后,要同步书库到B库,或者处理结果,或者数据转换后写入其他数据库中
方法也是采用触发器或者EVENT处理
3. Mysql plugin
我开发了几个 UDF, 共4个 function
UDF
- zmq_client(sockt,message)
- sockt .成功返回true,失败返回flase.
有了上面的function后你就可以在begin,commit,rollback 直接穿插使用,实现在事物处理期间做你爱做的事。也可以用在触发器与EVENT定时任务中。
4. plugin 的开发与使用
编译UDF你需要安装下面的软件包
sudo apt-get install pkg-config
sudo apt-get install libmysqlclient-dev
sudo apt-get install gcc gcc-c++ make cmake
https://github.com/netkiller/mysql-zmq-plugin
编译udf,最后将so文件复制到 /usr/lib/mysql/plugin/
git clone https://github.com/netkiller/mysql-zmq-plugin.git
cd mysql-zmq-plugin
cmake .
make && make install
装载
create function zmq_client returns string soname 'libzeromq.so';
create function zmq_publish returns string soname 'libzeromq.so';
卸载
drop function zmq_client;
drop function zmq_publish;
确认安装成功
mysql> SELECT * FROM `mysql`.`func` where name like 'zmq%';
+-------------+-----+--------------+----------+
| name | ret | dl | type |
+-------------+-----+--------------+----------+
| zmq_client | 0 | libzeromq.so | function |
| zmq_publish | 0 | libzeromq.so | function |
+-------------+-----+--------------+----------+
2 rows in set (0.00 sec)
5. 插件如何使用
插件有很多种用法,这里仅仅一个例
编译zeromq server 测试程序
cd test
cmake .
make
启动服务进程
./server
发送Hello world!
mysql> select zmq_client('tcp://localhost:5555','Hello world!');
+---------------------------------------------------+
| zmq_client('tcp://localhost:5555','Hello world!') |
+---------------------------------------------------+
| Hello world! OK |
+---------------------------------------------------+
1 row in set (0.01 sec)
查看服务器端是否接收到信息。
$ ./server
Received: Hello world!
我们再将上面的例子使用触发器进一步优化
mysql> select zmq_client('tcp://localhost:5555',mobile) from demo;
+-------------------------------------------+
| zmq_client('tcp://localhost:5555',mobile) |
+-------------------------------------------+
| 13113668891 OK |
| 13113668892 OK |
| 13113668893 OK |
| 13322993040 OK |
| 13588997745 OK |
+-------------------------------------------+
5 rows in set (0.03 sec)
服务器端已经接收到数据库发过来的信息
$ ./server
Received: Hello world!
Received: 13113668891
Received: 13113668892
Received: 13113668893
Received: 13322993040
Received: 13588997745
我们可以拼装json或者序列化数据,发送给远端
mysql> select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) from demo;
+------------------------------------------------------------------------------+
| zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) |
+------------------------------------------------------------------------------+
| {name:neo, tel:13113668891} OK |
| {name:jam, tel:13113668892} OK |
| {name:leo, tel:13113668893} OK |
| {name:jerry, tel:13322993040} OK |
| {name:tom, tel:13588997745} OK |
+------------------------------------------------------------------------------+
5 rows in set (0.03 sec)
返回数据取决于你服务端怎么编写处理程序,你可以返回true/false等等。
触发器以及事务处理,这里就不演示了
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Oracle 技巧篇-快速批量删除当前数据库连接的用户,一键清空所有session会话方法
- Spring Security 中如何细化权限粒度?
- 小书MybatisPlus第4篇-表格分页与下拉分页查询
- 小书MybatisPlus第3篇-自定义SQL
- Nginx + Spring Boot 实现负载均衡
- 小书MybatisPlus第2篇-条件构造器的应用及总结
- 一个案例演示 Spring Security 中粒度超细的权限控制!
- 信息收集之主机发现:nmap
- 文本文件逐行处理–用java8 Stream流的方式
- 使用java8API遍历过滤文件目录及子目录及隐藏文件
- 使用位运算、值交换等方式反转java字符串-共四种方法
- 精讲RestTemplate第2篇-多种底层HTTP客户端类库的切换
- 精讲RestTemplate第1篇-在Spring或非Spring环境下如何使用
- 在图中添加多边形
- 设置坐标轴刻度的位置和样式