activmq:android平台下使用openwire协议连接activemq服务的问题
如果在android平台连接activemq服务,应该怎么实现呢?大部分网上的文章都是eclipse 提供的paho MQTT client实现。如果你只是用消息发布/订阅功能,那么用paho MQTT client就足够了。
但是MQTT协议只支持消息发布/订阅,所以如果你需要使用生产者/消费者模型,就不能用MQTT协议了。
我就遇到这样的需求,需要在android平台以消费者身份接收消息队列的数据。于是我尝试使用activemq提供的activemq-client
库来连接activemq服务.
// https://mvnrepository.com/artifact/org.apache.activemq/activemq-client
implementation 'org.apache.activemq:activemq-client:5.14.5'
然而在编译时就报错了:
错误: 无法访问Referenceable 找不到javax.naming.Referenceable的类文件
找不到 javax.naming.Referenceable
类,实际就是找不到包名前缀为javax.naming
的所有类,也就是JNDI
库(Java Naming and Directory Interface,Java命名和目录接口),在JDK中这个库是内置的。因为android使用的DVM并不是完整的JVM,所以缺少JNDI
库。所以现在的问题就变成了如何找到android平台可以用的JNDI
库.
在maven中央仓库可以搜索到JNDI
库,但却找不到对应的jar包。
所以指望在maven找到JNDI库是不可能了,我又不死心通过google一通找,
在以下两个网站找到了jndi-1.2.1.jar
,经验证都能在Android平台正常使用
http://www.java2s.com/Code/JarDownload/jndi/jndi-1.2.1.jar.zip
http://treebase.sourceforge.net/maven2/javax/naming/jndi/1.2.1/jndi-1.2.1.jar
使用方法也很简单,将jndi-1.2.1.jar
复制到app/libs
文件夹下。如果你的app/build.gradle
中有定义implementation fileTree(dir: 'libs', include: ['*.jar'])
,jndi-1.2.1.jar
就被自动导入了项目,如果没有这一行就要手工加一行implementation files('libs/jndi-1.2.1.jar')
有了JNDI库,编译不再报错,测试也就正常通过了
发布消息的JUNIT测试
ActivemqPublisherTest.java
package gu.simplemq.activemq;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
/**
* @author guyadong
*
*/
public class ActivemqPublisherTest {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
// activemq服务主机地址
private static final String OPENWIRE_HOST = "192.168.10.226";
private static ActiveMQConnectionFactory createFactory(){
Properties props = new Properties();
props.setProperty("brokerURL","tcp://" + OPENWIRE_HOST + ":61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setProperties(props);
return factory;
}
@Test
public void test1() throws InterruptedException, JMSException {
ActiveMQConnectionFactory factory = createFactory();
Connection connection = null;
Session session = null;
MessageProducer p1 = null;
MessageProducer p2 = null;
try {
connection = factory.createConnection();
connection.setExceptionListener(new MyExceptionListener());
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
p1 = session.createProducer(session.createTopic("chat1"));
p2 = session.createProducer(session.createTopic("chat2"));
for(int i=0; i<100; ++i){
Date date = new Date();
String str = "OPENWIRE " + date.toString();
TextMessage message = session.createTextMessage(str);
p1.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
p2.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
logger.info(date.getTime() +" : " +date.toString());
Thread.sleep(2000);
}
} catch (JMSException e) {
e.printStackTrace();
} finally{
if(null != p2){
p2.close();
}
if(null != p1){
p1.close();
}
if(null != session){
session.close();
}
if(null != connection){
connection.close();
}
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
订阅消息的JUNIT测试
ActivemqSubscriberTest.java
package gu.simplemq.activemq;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
/**
* @author guyadong
*
*/
public class ActivemqSubscriberTest {
// activemq服务主机地址
private static final String OPENWIRE_HOST = "192.168.10.226";
private static ActiveMQConnectionFactory createFactory(){
Properties props = new Properties();
props.setProperty("brokerURL","tcp://" + OPENWIRE_HOST + ":61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setProperties(props);
return factory;
}
private static Connection conn;
private static Session session;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
ActiveMQConnectionFactory factory = createFactory();
conn = factory.createConnection();
conn.setExceptionListener(new MyExceptionListener());
conn.start();
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
session.close();
conn.close();
}
public void sub(Session session,String topic,MessageListener listener) throws JMSException {
Topic activeMQTopic = session.createTopic(topic);
MessageConsumer consumer = session.createConsumer(activeMQTopic);
consumer.setMessageListener(listener);
}
private static void waitquit(){
System.out.println("PRESS 'quit' OR 'CTRL-C' to exit");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try{
while(!"quit".equalsIgnoreCase(reader.readLine())){
}
System.exit(0);
} catch (IOException e) {
}finally {
}
}
@Test
public void test1() {
try {
sub(session,"chat1", new LogListener());
sub(session,"chat2", new LogListener());
sub(session,"chat3", new LogListener());
waitquit();
} catch (JMSException e) {
e.printStackTrace();
}
}
private static class LogListener implements MessageListener{
private String textOf(Message message) throws JMSException{
if(message instanceof TextMessage){
return ((TextMessage) message).getText();
}
if(message instanceof BytesMessage){
BytesMessage bytesMessage = (BytesMessage)message;
byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
return new String(buf);
}
throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required",
TextMessage.class.getName(),
BytesMessage.class.getName()));
}
@Override
public void onMessage(Message message) {
try {
logger.info("dest {}:{}",message.getJMSDestination(),textOf(message));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
完整的测试代码参见码云仓库 ActivemqSubscriberTest.java https://gitee.com/l0km/simplemq/blob/dev/simplemq-android-test/app/src/androidTest/java/gu/simplemq/activemq/ActivemqPublisherTest.java
ActivemqSubscriberTest.java https://gitee.com/l0km/simplemq/blob/dev/simplemq-android-test/app/src/androidTest/java/gu/simplemq/activemq/ActivemqSubscriberTest.java
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- Springboot thymeleaf热部署
- Java初始化List的6种方式
- Java遍历Map对象的四种方式
- 【SpringBoot源码解析】第三章:SpringBoot通过打成war包的方式是如何启动的
- 让你编码嗨到停不下来的8个VSCode插件
- 【SpringBoot源码解析】第四章:SpringBoot是如何自动装配SpringMvc的
- 【SpringBoot源码解析】第二章:SpringBoot是如何通过内置Tomcat启动的
- 技术译文 | How Can ScaleFlux Handle MySQL Workload?
- 技术译文 | MySQL 8 需要多大的 innodb_buffer_pool_instances 值(上)
- 前端登录,这一篇就够了
- 技术译文 | MySQL 8 需要多大的 innodb_buffer_pool_instances 值(下)
- 创建线程到底有多少种方式?
- CANet|拼接注意力网络
- SQL 中判断条件的先后顺序,会引起索引失效么?
- 一个超酷的开源uHand2.0机械手掌项目