续谈ActiveMQ之java如何操作ActiveMQ(springBoot项目)

时间:2022-04-29
本文章向大家介绍续谈ActiveMQ之java如何操作ActiveMQ(springBoot项目),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
引入maven依赖
         <!-- activemq -->
		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

为了便于管理mq这里统一在xml中配置:

<mq-clients>
	<producer>
		<id>demo.test</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<producer>
		<id>demo.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<consumer>
		<id>demo.consumer.test1</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener>
	</consumer>
	<consumer>
		<id>demo.consumer.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener>
	</consumer>
</mq-clients>

XMLUtil:用来读取xml

/**
	 * 获取所有节点
	 * @param root 根节点
	 * @param map  记录每个节点及值
	 */
	@SuppressWarnings("unchecked")
	private static void getNode(Element root, LinkedHashMap<String, String> map) {
		List<Element> list = root.elements();
		Iterator<Element> iterator = list.iterator();
		while (iterator.hasNext()) {
			Element element = iterator.next();
			if (element.elements() != null && element.elements().size() > 0) {
				System.out.println("element:"+element.getName());
				getNode(element, map);

			} else {
				map.put(element.getParent().getName() + "." + element.getName(),
				        element.getTextTrim());
			}
		}

	}
	/**
	 * 读XML文件指定节点内容
	 * @param xmlName  xml文件名
	 * @param nodeName 指定节点
	 * @return
	 * @throws Exception
	 */
	public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		
		LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>();
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		SAXReader reader = new SAXReader();
		Document document = reader.read(in);
		Element root = document.getRootElement();
		if(StringUtils.isNotEmpty(nodeName)){
			root = document.getRootElement().element(nodeName);
		}
		//获取节点
		getNode(root, returnValue);
		
		if (returnValue.size()>0) {
			for (String key : returnValue.keySet()) {
				System.out.println("key:" + key + " ,value:" + returnValue.get(key));
			}

		}
		return returnValue;
	}
	
	
	/**
	 * 读XML文件所有内容,并将文件转成对象
	 * @param xmlName 文件名
	 * @param cls
	 * @return
	 * @throws Exception
	 */
	@SuppressWarnings("unchecked")
    public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		JAXBContext context = JAXBContext.newInstance(cls);// 获取上下文对象  
        Unmarshaller unmarshaller = context.createUnmarshaller();
        T t =  (T)unmarshaller.unmarshal(in);
		return t;
	}

Producer:

@XmlRootElement(name="producer")  
public class Producer {
	private String id;
	// 主题
	private String topic;
	// 类型,1-queue,2-topic
	private Integer mqType;
	// 持久化方式 :1-非持久,2-持久化
	private Integer deliveryMode;
	// 签收方式:1-自动签收,2-客户端确认,3-自动批量确认,0-事务提交并确认
	private Integer acknowledge;

//省略get set
}

Consumer:

@XmlRootElement(name = "consumer")
public class Consumer {
	private String id;
	private String topic;
	private Integer mqType;
	private Class<? extends MessageListener> messageListener;
...
}

MessageUtil:mq消息集中处理类,包括发送消息,启动消费监听等

private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE;
	private static Connection conn = null;
	private static Session session = null;

	public static void init() {
		try {
			// 获取一个连接
			if (conn == null) {
				conn = mqFactory.getConnection();
			}
			conn.start();
			// 自动提交事务
			if (session == null) {
				/*
				 * Session.AUTO_ACKNOWLEDGE 消息自动签收
				 * Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge方法手动签收
				 * Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息
				 * 头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
				 */
				session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * @param obj 序列化对象
	 * @param topic
	 * @param isQueue
	 * @throws Exception
	 */
	public static void sendObjectMessage(Serializable obj, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createObjectMessage(obj));
		destroy(producer);
	}

	private static Producer getProducerById(String id) {
		Producer p = MQUtil.getProducerById(id);
		if (p == null) {
			throw new NullPointerException("according to id:" + id + ", not found produer.");
		}
		return p;
	}

	public static void sendTextMessage(String mes, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createTextMessage(mes));
		destroy(producer);
	}

	private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode)
	        throws Exception {
		MessageProducer producer = session.createProducer(destination);
		/**
		 * PERSISTENT(持久性消息):
		 * 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
		 * 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,
		 * 消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,
		 * 它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
		 * NON_PERSISTENT(非持久性消息):
		 * 保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
		 * 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
		 * 
		 */
		producer.setDeliveryMode(deliveryMode);
		return producer;
	}

	private static Destination getDestination(Producer p) throws Exception {
		return getDestination(p.getMqType(), p.getTopic());
	}

	private static Destination getDestination(Consumer c) throws Exception {
		return getDestination(c.getMqType(), c.getTopic());
	}

	private static Destination getDestination(Integer mqType, String topic) throws Exception {
		Destination destination = null;
		if (ActiveMqType.QUEUE == mqType)
			destination = session.createQueue(topic);
		else if (ActiveMqType.TOPIC == mqType)
			destination = session.createTopic(topic);
		else
			throw new IllegalArgumentException("mqType must be 1 or 2.");
		return destination;
	}
	/**
	 * 启动所有监听
	 * @param c
	 * @throws Exception
	 */
	public static void startConsumer(Consumer c) throws Exception {
		init();
		MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c));
		MessageListener listener = c.getMessageListener().newInstance();
		consumer.setMessageListener(listener);
	}

	private static void destroy(MessageProducer producer) throws JMSException {
		if (producer != null) {
			producer.close();
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

	public static void destroy(MessageConsumer consumer) throws JMSException {
		if (consumer != null) {
			consumer.close();
			consumer = null;
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

细节不在赘述,具体代码已上传至码云:https://gitee.com/savage_xiao/boot.demo/tree/master

有兴趣可以下载下来看一下,其中有包含其他springboot的研究

测试代码:

public static void main(String[] args) {
		try {
			for(int i = 101; i<200;i++){
				MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

-----------------------

public static void main(String[] args) {
		try {
			for(int i = 0; i<100;i++){
				MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

测试结果:

listener2:hello world2!,1
listener2:hello world2!,2
listener2:hello world2!,3
listener2:hello world2!,4
listener2:hello world2!,5
....
....略
listener2:hello world2!,98
listener2:hello world2!,99
listener2:hello world2!,100
listener:hello world!,102
listener:hello world!,103
...
...略
listener:hello world!,197
listener:hello world!,198
listener:hello world!,199
listener:hello world!,200