博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用activeMQ实现jms
阅读量:5773 次
发布时间:2019-06-18

本文共 10137 字,大约阅读时间需要 33 分钟。

一:jms介绍

         jms说白了就是java message service,是J2EE规范的一部分,跟jdbc差不多,sun只提供了接口,由各个厂商(provider)来进行具体的实现,然后使用者使用他们的jar包进行开发使用即可。
        另外在jms的API中,jms传递消息有两种方式,一种是点对点的Queue,还有一个是发布订阅的Topic方式。区别在于:
        对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。
        对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。
       关于api的简单基础可以看下:http://www.javaeye.com/topic/64707,简单的参考!
二:ActiveMQ介绍
         activeMQ是apache下的一个开源jms产品,具体参见 apache官方网站;
         Apache ActiveMQ is fast, supports many  Cross Language Clients and Protocols , comes with easy to use  Enterprise Integration Patterns   and many  advanced features   while fully supporting  JMS 1.1   and J2EE 1.4. Apache ActiveMQ is released under the  Apache   2.0 License
三:开始实现代码
       1:使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ成功启动。
       启动成功之后可以运行:http://localhost:8161/admin/index.jsp  查看一下。
       2:发送端,sender

import javax.jms.Connection;   import javax.jms.ConnectionFactory;   import javax.jms.DeliveryMode;   import javax.jms.Destination;   import javax.jms.MessageProducer;   import javax.jms.Session;   import javax.jms.TextMessage;     import org.apache.activemq.ActiveMQConnection;   import org.apache.activemq.ActiveMQConnectionFactory;     public class Sender {       private static final int SEND_NUMBER = 5;         public static void main(String[] args) {           // ConnectionFactory :连接工厂,JMS 用它创建连接           ConnectionFactory connectionFactory;           // Connection :JMS 客户端到JMS Provider 的连接           Connection connection = null;           // Session: 一个发送或接收消息的线程           Session session;           // Destination :消息的目的地;消息发送给谁.           Destination destination;           // MessageProducer:消息发送者           MessageProducer producer;           // TextMessage message;           // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar             connectionFactory = new ActiveMQConnectionFactory(                   ActiveMQConnection.DEFAULT_USER,                   ActiveMQConnection.DEFAULT_PASSWORD,                   "tcp://localhost:61616");           try {               // 构造从工厂得到连接对象               connection = connectionFactory.createConnection();               // 启动               connection.start();               // 获取操作连接               session = connection.createSession(Boolean.TRUE,                       Session.AUTO_ACKNOWLEDGE);               // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置               destination = session.createQueue("test-queue");               // 得到消息生成者【发送者】               producer = session.createProducer(destination);               // 设置不持久化,可以更改               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);               // 构造消息               sendMessage(session, producer);               session.commit();             } catch (Exception e) {               e.printStackTrace();           } finally {               try {                   if (null != connection)                       connection.close();               } catch (Throwable ignore) {               }           }         }       public static void sendMessage(Session session, MessageProducer producer)               throws Exception {           for (int i = 1; i <=SEND_NUMBER; i++) {               TextMessage message = session                       .createTextMessage("ActiveMq 发送的消息" + i);               // 发送消息到目的地方               System.out.println("发送消息:" + i);               producer.send(message);           }       }   }

 3:接收端,receive 

import javax.jms.Connection;   import javax.jms.ConnectionFactory;   import javax.jms.Destination;   import javax.jms.JMSException;   import javax.jms.Message;   import javax.jms.MessageConsumer;   import javax.jms.MessageListener;   import javax.jms.Session;   import javax.jms.TextMessage;     import org.apache.activemq.ActiveMQConnection;   import org.apache.activemq.ActiveMQConnectionFactory;     public class Receiver {       public static void main(String[] args) {             // ConnectionFactory :连接工厂,JMS 用它创建连接           ConnectionFactory connectionFactory;           // Connection :JMS 客户端到JMS Provider 的连接           Connection connection = null;           // Session: 一个发送或接收消息的线程           Session session;           // Destination :消息的目的地;消息发送给谁.           Destination destination;           // 消费者,消息接收者           MessageConsumer consumer;             connectionFactory = new ActiveMQConnectionFactory(                   ActiveMQConnection.DEFAULT_USER,                   ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");           try {               // 构造从工厂得到连接对象               connection = connectionFactory.createConnection();               // 启动               connection.start();               // 获取操作连接               session = connection.createSession(Boolean.FALSE,                       Session.AUTO_ACKNOWLEDGE);               //test-queue跟sender的保持一致,一个创建一个来接收               destination = session.createQueue("test-queue");               consumer = session.createConsumer(destination);               consumer.setMessageListener(new MessageListener() {                   public void onMessage(Message arg0) {                       System.out.println("==================");                       try {                           System.out.println("RECEIVE1第一个获得者:"                                  + ((TextMessage) arg0).getText());                       } catch (JMSException e) {                           // TODO Auto-generated catch block                           e.printStackTrace();                       }                     }               });                 MessageConsumer consumer1 = session.createConsumer(destination);               consumer1.setMessageListener(new MessageListener() {                   public void onMessage(Message arg0) {                       System.out.println("+++++++++++++++++++");                       try {                           System.out.println("RECEIVE1第二个获得者:"                                  + ((TextMessage) arg0).getText());                       } catch (JMSException e) {                           // TODO Auto-generated catch block                           e.printStackTrace();                       }                     }               });           } catch (Exception e) {               e.printStackTrace();           }           //在eclipse里运行的时候,这里不要关闭,这样就可以一直等待服务器发送了,不然就直接结束了。           // } finally {           // try {           // if (null != connection)           // connection.close();           // } catch (Throwable ignore) {           // }           // }         }   }

4:发送端,sender 上面的是用Queue的方式来创建,下面再用topic的方式实现同样的功能。

import javax.jms.Connection;   import javax.jms.JMSException;   import javax.jms.Message;   import javax.jms.MessageConsumer;   import javax.jms.MessageListener;   import javax.jms.MessageProducer;   import javax.jms.Session;   import javax.jms.TextMessage;   import javax.jms.Topic;     import org.apache.activemq.ActiveMQConnectionFactory;   import org.apache.activemq.command.ActiveMQTopic;     public class TopicTest {       public static void main(String[] args) throws Exception {           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(                   "tcp://localhost:61616");             Connection connection = factory.createConnection();           connection.start();             // 创建一个Topic           Topic topic = new ActiveMQTopic("testTopic");           Session session = connection.createSession(false,                   Session.AUTO_ACKNOWLEDGE); //if the first argument is Boolean.TRUE,can't receive jms            // 注册消费者1           MessageConsumer comsumer1 = session.createConsumer(topic);           comsumer1.setMessageListener(new MessageListener() {               public void onMessage(Message m) {                   try {                       System.out.println("Consumer1 get "                              + ((TextMessage) m).getText());                   } catch (JMSException e) {                       e.printStackTrace();                   }               }           });             // 注册消费者2           MessageConsumer comsumer2 = session.createConsumer(topic);           comsumer2.setMessageListener(new MessageListener() {               public void onMessage(Message m) {                   try {                       System.out.println("Consumer2 get "                              + ((TextMessage) m).getText());                   } catch (JMSException e) {                       e.printStackTrace();                   }               }             });             // 创建一个生产者,然后发送多个消息。           MessageProducer producer = session.createProducer(topic);           for (int i = 0; i < 10; i++) {               System.out.println("producer begin produce=======");               producer.send(session.createTextMessage("Message:" + i));           }       }     }

activeMQ密码配置(默认用户名/密码是admin/admin) 

ActiveMQ使用的是jetty服务器, 打开conf/jetty.xml文件,找到

将property name为authenticate的属性value="false" 改为"true",

控制台的登录用户名密码保存在conf/jetty-realm.properties文件中,内容如下:

## ---------------------------------------------------------------------------## Licensed to the Apache Software Foundation (ASF) under one or more## contributor license agreements.  See the NOTICE file distributed with## this work for additional information regarding copyright ownership.## The ASF licenses this file to You under the Apache License, Version 2.0## (the "License"); you may not use this file except in compliance with## the License.  You may obtain a copy of the License at## ## http://www.apache.org/licenses/LICENSE-2.0## ## Unless required by applicable law or agreed to in writing, software## distributed under the License is distributed on an "AS IS" BASIS,## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.## See the License for the specific language governing permissions and## limitations under the License.## ---------------------------------------------------------------------------# Defines users that can access the web (console, demo, etc.)# username: password [,rolename ...]admin: admin, admin

值得注意的是 用户名和密码的格式是

用户名 : 密码 ,角色名

 

转载地址:http://krxux.baihongyu.com/

你可能感兴趣的文章
大数据环境下的数据质量管理策略
查看>>
C# FileSystemWatcher 在监控文件夹和文件时的用法
查看>>
libgdx 3D Bullet 碰撞检测二
查看>>
AD原理图统一命名
查看>>
Head First--设计模式 单件模式 独一无二的对象
查看>>
程序一
查看>>
记录的重要性
查看>>
团队-象棋游戏-代码设计规范
查看>>
Windows 服务器配置、运行、图文流程(新手必备!) - IIS建站配置一条龙
查看>>
About Me
查看>>
Linux-grep命令
查看>>
一秒去除Win7快捷方式箭头
查看>>
Oracle序列使用:建立、删除
查看>>
html5 渐变按钮练习
查看>>
ES8 (2017)新特性
查看>>
由创建文件想起的。。。
查看>>
数据字典生成工具之旅(2):数据字典生成工具及文档工具作用介绍
查看>>
asp.net(C#)去除html格式
查看>>
构建之法读后感part1
查看>>
Linux内存管理(二)
查看>>