ActiveMQ作为一个最流行的,能力强劲的开源消息总线。通俗讲就是消息队列。而我作为一个不断前进的攻城狮,怎么可能不与它产生一些火花呢!其实早在两年前就已经接触并使用过,但是一直忘了记录。喜欢并且习惯记录小例子的我,在每次技术使用过程中,都喜欢将最原始的Demo记录下来。方便了自己,也希望能帮助到他人,更重要的是可以回顾自己的成长历程。
依赖:jdk1.7,apache-activemq-5.10.0
开发环境:ideaIU-14.1.4
测试环境:win7
activemq下载地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/5.10.0/apache-activemq-5.10.0-bin.zip
建立maven工程ActiveMQDemo,在pom.xml配置文件添加必要的依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ActiveMQDemo</groupId> <artifactId>ActiveMQDemo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-amqp-1-0-client-jms</artifactId> <version>0.32</version> </dependency> </dependencies> </project>
Listener类:
package com.mangocool.activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; import javax.jms.*; /** * Created by MANGOCOOL on 2015/11/6. */ public class Listener { public static void main(String []args) throws Exception { String user = env("ACTIVEMQ_USER", "admin"); String password = env("ACTIVEMQ_PASSWORD", "password"); String destination = arg(args, 0, "topic://event"); ConnectionFactory factory = new ActiveMQConnectionFactory(user, password, ActiveMQConnection.DEFAULT_BROKER_URL); Destination dest = null; if(destination.startsWith("topic://")) { dest = new TopicImpl(destination); } else { dest = new QueueImpl(destination); } Connection connection = factory.createConnection(user, password); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(dest); System.out.println("Waiting for messages..."); while(true) { Message objMsg = consumer.receive(); if( objMsg instanceof ObjectMessage ) { Task task = (Task)((ObjectMessage) objMsg).getObject(); if(null == task) { System.out.println("Listener: 任务全部结束!"); connection.close(); System.exit(1); } else { int status = task.getStatus(); if( status == 100 ) { System.out.println("获取内容: " + task.getContent()); System.out.println(task.getTaskId() + "任务结束!"); } else { System.out.println("进度: " + task.getStatus() + "%"); } } } } } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if(rc== null) return defaultValue; return rc; } private static String arg(String []args, int index, String defaultValue) { if(index < args.length) return args[index]; else return defaultValue; } }Publisher类:
package com.mangocool.activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; import javax.jms.*; /** * Created by MANGOCOOL on 2015/11/6. */ public class Publisher { public static void main(String []args) throws Exception { String user = env("ACTIVEMQ_USER", "admin"); String password = env("ACTIVEMQ_PASSWORD", "password"); String destination = arg(args, 0, "topic://event"); int messages = 5000; ConnectionFactory factory = new ActiveMQConnectionFactory(user, password, ActiveMQConnection.DEFAULT_BROKER_URL); Destination dest = null; if(destination.startsWith("topic://")) { dest = new TopicImpl(destination); } else { dest = new QueueImpl(destination); } Connection connection = factory.createConnection(user, password); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(dest); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Task task = null; for( int i=1; i <= messages; i ++) { if( (i % 1000) == 0) { task = new Task(i+""); System.out.println("开始第" + task.getTaskId() + " 个任务!"); ObjectMessage objMsg = session.createObjectMessage(task); objMsg.setIntProperty("id", i); int j=0; while (j <100) { Thread.sleep(500); j += 10; System.out.println("......"); task.setStatus(j); if(j == 100) { task.setContent("我是第 "+task.getTaskId()+" 个任务的内容!"); } objMsg.setObject(task); producer.send(objMsg); } System.out.println("任务 " + task.getTaskId() + " 结束!"); } } producer.send(session.createObjectMessage(null)); System.out.println("Publisher: 任务全部结束!"); Thread.sleep(1000*3); connection.close(); System.exit(0); } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if(rc== null) return defaultValue; return rc; } private static String arg(String []args, int index, String defaultValue) { if(index < args.length) return args[index]; else return defaultValue; } }Task类:
package com.mangocool.activemq; import java.io.Serializable; /** * Created by MANGOCOOL on 2015/11/6. */ public class Task implements Serializable { private String taskId = "-1"; private int status = 0; private long timeout = 30; private String content = ""; public Task(String taskId) { this.taskId = taskId; } public Task(String taskId, String content) { this.taskId = taskId; this.content = content; } public Task(String taskId, int status, String content) { this.taskId = taskId; this.status = status; this.content = content; } public Task(String taskId, int status, long timeout, String content) { this.taskId = taskId; this.status = status; this.timeout = timeout; this.content = content; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public long getTimeout() { return timeout; } public void setTimeout(long timeout) { this.timeout = timeout; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public boolean equals(String taskId) { return this.taskId.equals(taskId); } public String toString() { return ""; } }
到此就可以测试了,首先启动Listener类,再启动Publisher类,观察控制台的输出就可以看到效果了。
分享:
上一篇[Errno 14] PYCURL ERROR 22 - "The requested URL returned error: 404 Not Found"
下一篇错误: 找不到或无法加载主类 Files\apache-activemq-5.10.0\bin\..\conf\login.config
崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。
联系QQ:58742094
联系电话:
工作邮箱:
当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!
人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。
Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号
免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。