apache ActiveMQ之初体验

时间:2019-03-15
本文章向大家介绍apache ActiveMQ之初体验,主要包括apache ActiveMQ之初体验使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
版权声明: https://blog.csdn.net/zdp072/article/details/27237549

一. 开篇语

继上一篇weblogic中使用jms发送和接受消息的文章后, 本文使用apache的一个开源组件ActiveMQ接着探讨JMS的话题, 本篇仅仅是ActiveMQ的一个入门的样例, 希望对您有所帮助.

二. ActiveMQ

1. ActiveMQ简单介绍:

ActiveMQ是Apache的一个能力强劲的开源消息总线, 它全然支持JMS1.1和JavaEE1.4规范的JMS Provider实现.

2. ActiveMQ特性:

1.) 多种语言和协议编写client。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。

应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.) 全然支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3.) 对Spring的支持,ActiveMQ能够非常easy内嵌到使用Spring的系统里面去,并且也支持Spring2.0的特性
4.) 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測试,当中通过JCA 1.5 resourceadaptors的配置,

     能够让ActiveMQ能够自己主动的部署到不论什么兼容J2EE1.4商业服务器上
5.) 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.) 支持通过JDBC和journal提供快速的消息持久化
7.) 从设计上保证了高性能的集群,client-服务器,点对点
8.) 支持Ajax
9.) 支持与Axis的整合
10.) 能够非常easy得调用内嵌JMS provider,进行測试

3. 环境准备:

1.) 下载ActiveMQ: 

     http://activemq.apache.org/download.html, 我下载的是apache-activemq-5.2.0

2.) 执行ActiveMQ server

     解压缩下载好的文件, 双击bin/activemq.bat 启动server, ActiveMQ内置了jetty服务器, 默认使用TCP连接port为61616.

     ActiveMQ提供一个用于监控ActiveMQ的admin应用: http://127.0.0.1:8161/admin

3.) 在Eclipse中建立Javaproject, 并导入activemq-all-5.2.0.jar包

4.) 新建两个Java类: 消息生产者MsgSender和消息消费者MsgReceiver

4. 代码測试(P2P):

1.) 消息生产者: MsgSender

/**
 * Message Provider
 */
public class MsgSender {
	
	// ConnectionFactory: use to create JMS connection
	private static ConnectionFactory connectionFactory;

	// Connection: connect message provider and JMS server
	private static Connection connection;

	// Session: a message send or receive thread
	private static Session session;

	// Destination: use to sign the message type
	private static Destination destination;

	// MessageProducer:sender
	private static MessageProducer messageProducer;

	/**
	 * init the JMS object
	 */
	public static void init() throws Exception {
		// use ActiveMQ to to create connection factory.
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER, 
				ActiveMQConnection.DEFAULT_PASSWORD, 
				"tcp://localhost:61616");

		// get the connection from connection factory
		connection = connectionFactory.createConnection();
		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue("myQueue");
		messageProducer = session.createProducer(destination);
		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

		connection.start();
	}

	/**
	 * send activeMq message
	 */
	public static void sendMessage() throws Exception {
		for (int i = 1; i <= 5; i++) {
			TextMessage message = session.createTextMessage("ActiveMq message " + i);
			System.out.println("send:" + "ActiveMq message " + i);
			messageProducer.send(message);
		}
		session.commit();
	}
	
	/**
	 * release resource
	 */
	public static void release() throws Exception {
		messageProducer.close();
		session.close();
		connection.close();
	}

	/**
	 * main method
	 */
	public static void main(String[] args) throws Exception {
		init();
		sendMessage();
		release();
	}
}
 

2.) 消息消费者: MsgReceiver

/**
 * Message Consumer
 */
public class MsgReceiver {
	// ConnectionFactory: use to create JMS connection
	private static ConnectionFactory connectionFactory;

	// Connection: connect message provider and JMS server
	private static Connection connection;

	// Session: a message send or receive thread
	private static Session session;

	// use to sign the message type
	private static Destination destination;

	// MessageConsumer: receiver
	private static MessageConsumer messageConsumer;

	/**
	 * init the JMS object
	 */
	public static void init() throws Exception {
		// use ActiveMQ to to create connection factory.
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER, 
				ActiveMQConnection.DEFAULT_PASSWORD, 
				"tcp://localhost:61616");

		// get the connection from connection factory
		connection = connectionFactory.createConnection();
		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue("myQueue");
		messageConsumer = session.createConsumer(destination);
		
		connection.start();
	}

	/**
	 * receive activeMq message
	 */
	public static void receiveMessage() throws Exception {
		while (true) {
			TextMessage message = (TextMessage) messageConsumer.receive();
			if (message != null) {
				System.out.println("receive: " + message.getText());
			} else {
				break;
			}
		}
	}

	/**
	 * release resource
	 */
	public static void release() throws Exception {
		messageConsumer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * main method
	 */
	public static void main(String[] args) throws Exception {
		init();
		receiveMessage();
		release();
	}
}
  3.) 源代码下载地址: http://download.csdn.net/detail/zdp072/7422401