golang rabbitmq 工具类
时间:2020-05-28
本文章向大家介绍golang rabbitmq 工具类,主要包括golang rabbitmq 工具类使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
package mq import ( "bytes" "errors" "github.com/streadway/amqp" "strings" ) var conn *amqp.Connection var channel *amqp.Channel var exchanges string var topics string var hasMQ bool = false var mqAddr string type Reader interface { Read(msg *string) (err error) } // 初始化 参数格式:amqp://用户名:密码@地址:端口号/host func SetupRMQ(rmqAddr string) (err error) { //用于重连 mqAddr = rmqAddr if channel == nil || conn == nil { conn, err = amqp.Dial(rmqAddr) if err != nil { return err } channel, err = conn.Channel() if err != nil { return err } hasMQ = true } if conn.IsClosed() { conn, err = amqp.Dial(rmqAddr) if err != nil { return err } channel, err = conn.Channel() if err != nil { return err } hasMQ = true } return nil } // 是否已经初始化 func HasMQ() bool { return hasMQ } // 测试连接是否正常 func Ping() (err error) { if !hasMQ || channel == nil { return errors.New("RabbitMQ is not initialize") } err = channel.ExchangeDeclare("ping.ping", "topic", false, true, false, true, nil) if err != nil { return err } msgContent := "ping.ping" err = channel.Publish("ping.ping", "ping.ping", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgContent), }) if err != nil { return err } err = channel.ExchangeDelete("ping.ping", false, false) return err } // 发布消息 func Publish(exchange, routeKey string, msg string, priority uint8) (err error) { if conn == nil { _ = SetupRMQ(mqAddr) } if conn.IsClosed() { _ = SetupRMQ(mqAddr) } if exchanges == "" || !strings.Contains(exchanges, exchange) { err = channel.ExchangeDeclare(exchange, "topic", true, false, false, true, nil) if err != nil { return err } err = channel.ExchangeDeclare(exchange+"_dlx", "topic", true, false, false, true, nil) if err != nil { return err } exchanges += " " + exchange + " " } err = channel.Publish(exchange, routeKey, false, false, amqp.Publishing{ Priority: priority, DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(msg), }) if err != nil { _ = SetupRMQ(mqAddr) } return err } // 监听接收到的消息 func Receive(exchange, topic string, reader func(msg *string)) (err error) { if exchanges == "" || !strings.Contains(exchanges, exchange) { err = channel.ExchangeDeclare(exchange, "topic", true, false, false, true, nil) if err != nil { return err } exchanges += " " + exchange + " " } if topics == "" || !strings.Contains(topics, topic) { //声明队列为优先级队列 queeuDeclareArgs := make(map[string]interface{}) queeuDeclareArgs["x-max-priority"] = 255 _, err = channel.QueueDeclare(topic, true, false, false, true, queeuDeclareArgs) if err != nil { return err } err = channel.QueueBind(topic, exchange, exchange, true, nil) if err != nil { return err } topics += " " + topic + " " } msgs, err := channel.Consume(topic, "", true, false, false, false, nil) if err != nil { return err } go func() { //fmt.Println(*msgs) for d := range msgs { s := bytesToString(&(d.Body)) reader(s) } }() return nil } // 关闭连接 func Close() { channel.Close() conn.Close() hasMQ = false } func bytesToString(b *[]byte) *string { s := bytes.NewBuffer(*b) r := s.String() return &r }
原文地址:https://www.cnblogs.com/zipon/p/12980937.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使括号有效的最少添加
- 设计模式~观察者模式
- 网页无插件视频流媒体播放器EasyPlayerPro-IOS版如何解决有声音无画面的问题?
- (建议收藏)Java基础知识笔记二(详细)
- Android 手机如何拍摄RAW图
- 「干货」基本数据类型和引用数据类型的区别
- int 和 integer :装箱和拆箱的过程,会用到什么方法,你觉得这个会对性能有影响吗,原因是什么(百度一面)
- 数组:这个循环可以转懵很多人!
- 企业远程办公视频会议系统EasyRTC-SFU下侧边栏边框超限问题如何解决?
- 编写高质量可维护的代码:数据建模
- 新版企业远程办公视频通话系统EasyRTC-SFU,如何解决用户登录信息更新不及时的问题?
- 服务应用突然宕机了?别怕,Dubbo 帮你自动搞定服务隔离!
- 33.Python字符串方法find以及与序列解包的技巧结合
- 代码审计从0到1 —— Centreon One-click To RCE
- 一文带你深扒ClassLoader内核,揭开它的神秘面纱!