一:jms介绍
jms说白了就是java message service,是J2EE规范的一部分,跟jdbc差不多,sun只提供了接口,由各个厂商(provider)来进行具体的实现,然后使用者使用他们的jar包进行开发使用即可。 另外在jms的API中,jms传递消息有两种方式,一种是点对点的Queue,还有一个是发布订阅的Topic方式。区别在于: 对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。 对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。 关于api的简单基础可以看下:http://www.javaeye.com/topic/64707,简单的参考!二:ActiveMQ介绍 activeMQ是apache下的一个开源jms产品,具体参见 apache官方网站; Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols , comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4. Apache ActiveMQ is released under the Apache 2.0 License三:开始实现代码 1:使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ成功启动。 启动成功之后可以运行:http://localhost:8161/admin/index.jsp 查看一下。 2:发送端,senderimport javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-queue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,可以更改 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <=SEND_NUMBER; i++) { TextMessage message = session .createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + i); producer.send(message); } } }
3:接收端,receive
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //test-queue跟sender的保持一致,一个创建一个来接收 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { System.out.println("=================="); try { System.out.println("RECEIVE1第一个获得者:" + ((TextMessage) arg0).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); MessageConsumer consumer1 = session.createConsumer(destination); consumer1.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { System.out.println("+++++++++++++++++++"); try { System.out.println("RECEIVE1第二个获得者:" + ((TextMessage) arg0).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } //在eclipse里运行的时候,这里不要关闭,这样就可以一直等待服务器发送了,不然就直接结束了。 // } finally { // try { // if (null != connection) // connection.close(); // } catch (Throwable ignore) { // } // } } }
4:发送端,sender 上面的是用Queue的方式来创建,下面再用topic的方式实现同样的功能。
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class TopicTest { public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); // 创建一个Topic Topic topic = new ActiveMQTopic("testTopic"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //if the first argument is Boolean.TRUE,can't receive jms // 注册消费者1 MessageConsumer comsumer1 = session.createConsumer(topic); comsumer1.setMessageListener(new MessageListener() { public void onMessage(Message m) { try { System.out.println("Consumer1 get " + ((TextMessage) m).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 注册消费者2 MessageConsumer comsumer2 = session.createConsumer(topic); comsumer2.setMessageListener(new MessageListener() { public void onMessage(Message m) { try { System.out.println("Consumer2 get " + ((TextMessage) m).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 创建一个生产者,然后发送多个消息。 MessageProducer producer = session.createProducer(topic); for (int i = 0; i < 10; i++) { System.out.println("producer begin produce======="); producer.send(session.createTextMessage("Message:" + i)); } } }
activeMQ密码配置(默认用户名/密码是admin/admin)
ActiveMQ使用的是jetty服务器, 打开conf/jetty.xml文件,找到
将property name为authenticate的属性value="false" 改为"true",
控制台的登录用户名密码保存在conf/jetty-realm.properties文件中,内容如下:## ---------------------------------------------------------------------------## Licensed to the Apache Software Foundation (ASF) under one or more## contributor license agreements. See the NOTICE file distributed with## this work for additional information regarding copyright ownership.## The ASF licenses this file to You under the Apache License, Version 2.0## (the "License"); you may not use this file except in compliance with## the License. You may obtain a copy of the License at## ## http://www.apache.org/licenses/LICENSE-2.0## ## Unless required by applicable law or agreed to in writing, software## distributed under the License is distributed on an "AS IS" BASIS,## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.## See the License for the specific language governing permissions and## limitations under the License.## ---------------------------------------------------------------------------# Defines users that can access the web (console, demo, etc.)# username: password [,rolename ...]admin: admin, admin
值得注意的是 用户名和密码的格式是
用户名 : 密码 ,角色名