activmq:android平台下使用openwire协议连接activemq服务的问题

时间:2022-07-24
本文章向大家介绍activmq:android平台下使用openwire协议连接activemq服务的问题,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

如果在android平台连接activemq服务,应该怎么实现呢?大部分网上的文章都是eclipse 提供的paho MQTT client实现。如果你只是用消息发布/订阅功能,那么用paho MQTT client就足够了。 但是MQTT协议只支持消息发布/订阅,所以如果你需要使用生产者/消费者模型,就不能用MQTT协议了。 我就遇到这样的需求,需要在android平台以消费者身份接收消息队列的数据。于是我尝试使用activemq提供的activemq-client库来连接activemq服务.

// https://mvnrepository.com/artifact/org.apache.activemq/activemq-client
implementation 'org.apache.activemq:activemq-client:5.14.5'

然而在编译时就报错了:

错误: 无法访问Referenceable 找不到javax.naming.Referenceable的类文件

找不到 javax.naming.Referenceable类,实际就是找不到包名前缀为javax.naming的所有类,也就是JNDI库(Java Naming and Directory Interface,Java命名和目录接口),在JDK中这个库是内置的。因为android使用的DVM并不是完整的JVM,所以缺少JNDI库。所以现在的问题就变成了如何找到android平台可以用的JNDI库.

在maven中央仓库可以搜索到JNDI库,但却找不到对应的jar包。

https://mvnrepository.com/artifact/javax.naming/jndi/1.2.1

所以指望在maven找到JNDI库是不可能了,我又不死心通过google一通找,

在以下两个网站找到了jndi-1.2.1.jar,经验证都能在Android平台正常使用 http://www.java2s.com/Code/JarDownload/jndi/jndi-1.2.1.jar.zip http://treebase.sourceforge.net/maven2/javax/naming/jndi/1.2.1/jndi-1.2.1.jar

使用方法也很简单,将jndi-1.2.1.jar复制到app/libs文件夹下。如果你的app/build.gradle中有定义implementation fileTree(dir: 'libs', include: ['*.jar']),jndi-1.2.1.jar就被自动导入了项目,如果没有这一行就要手工加一行implementation files('libs/jndi-1.2.1.jar')

有了JNDI库,编译不再报错,测试也就正常通过了

发布消息的JUNIT测试

ActivemqPublisherTest.java

package gu.simplemq.activemq;

import java.util.Date;
import java.util.Map;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;

/**
 * @author guyadong
 *
 */
public class ActivemqPublisherTest {
    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    // activemq服务主机地址
	private static final String OPENWIRE_HOST = "192.168.10.226";

	private static ActiveMQConnectionFactory createFactory(){
		Properties props = new Properties();
    	props.setProperty("brokerURL","tcp://" + OPENWIRE_HOST + ":61616");
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
       factory.setProperties(props);    
       return factory;
	}
	@Test
	public void test1() throws InterruptedException, JMSException {
		ActiveMQConnectionFactory factory = createFactory();
		Connection connection = null;
		Session session = null;
		MessageProducer p1 = null;
		MessageProducer p2 = null;
		try {
			connection = factory.createConnection();
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            p1 = session.createProducer(session.createTopic("chat1"));
            p2 = session.createProducer(session.createTopic("chat2"));
			for(int i=0; i<100; ++i){
				Date date = new Date();
				String str = "OPENWIRE "  + date.toString();
                TextMessage message = session.createTextMessage(str);
                p1.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
                p2.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);

				logger.info(date.getTime() +" : " +date.toString());
				Thread.sleep(2000);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} finally{
			if(null != p2){
				p2.close();
			}
			if(null != p1){
				p1.close();
			}
			if(null != session){
				session.close();
			}
			if(null != connection){
				connection.close();
			}
		}

	}

    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}

订阅消息的JUNIT测试

ActivemqSubscriberTest.java

package gu.simplemq.activemq;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;

/**
 * @author guyadong
 *
 */
public class ActivemqSubscriberTest {
	// activemq服务主机地址
	private static final String OPENWIRE_HOST = "192.168.10.226";

	private static ActiveMQConnectionFactory createFactory(){
		Properties props = new Properties();
    	props.setProperty("brokerURL","tcp://" + OPENWIRE_HOST + ":61616");
    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
       factory.setProperties(props);    
       return factory;
	}
	private static Connection conn;
	private static Session session;

	@BeforeClass
	public static void setUpBeforeClass() throws Exception {
		ActiveMQConnectionFactory factory = createFactory();
		conn = factory.createConnection();
		conn.setExceptionListener(new MyExceptionListener());
		conn.start();
		session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
	}

	@AfterClass
	public static void tearDownAfterClass() throws Exception {
		session.close();
		conn.close();
	}
	public void sub(Session session,String topic,MessageListener listener) throws JMSException {
		Topic activeMQTopic = session.createTopic(topic);
		MessageConsumer consumer = session.createConsumer(activeMQTopic);
		consumer.setMessageListener(listener);
	}
	private static void waitquit(){
		System.out.println("PRESS 'quit' OR 'CTRL-C' to exit");
		BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
		try{
			while(!"quit".equalsIgnoreCase(reader.readLine())){
			}
			System.exit(0);
		} catch (IOException e) {
	
		}finally {
	
		}
	}
	@Test
	public void test1() {
		try {
			sub(session,"chat1", new LogListener());
			sub(session,"chat2", new LogListener());
			sub(session,"chat3", new LogListener());
			waitquit();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	private static class LogListener implements MessageListener{
		private String textOf(Message message) throws JMSException{
			if(message instanceof TextMessage){
				return ((TextMessage) message).getText();
			}
			if(message instanceof BytesMessage){
				BytesMessage bytesMessage = (BytesMessage)message;
				byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
				bytesMessage.readBytes(buf);
				return new String(buf);
			}
			throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required",
					TextMessage.class.getName(),
					BytesMessage.class.getName()));
		}
		@Override
		public void onMessage(Message message) {
			try {
				logger.info("dest {}:{}",message.getJMSDestination(),textOf(message));
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}

完整的测试代码参见码云仓库 ActivemqSubscriberTest.java https://gitee.com/l0km/simplemq/blob/dev/simplemq-android-test/app/src/androidTest/java/gu/simplemq/activemq/ActivemqPublisherTest.java

ActivemqSubscriberTest.java https://gitee.com/l0km/simplemq/blob/dev/simplemq-android-test/app/src/androidTest/java/gu/simplemq/activemq/ActivemqSubscriberTest.java