ActiveMQ简单应用示例

2015-11-07 15:11:18   作者:MangoCool   来源:MangoCool

ActiveMQ作为一个最流行的,能力强劲的开源消息总线。通俗讲就是消息队列。而我作为一个不断前进的攻城狮,怎么可能不与它产生一些火花呢!其实早在两年前就已经接触并使用过,但是一直忘了记录。喜欢并且习惯记录小例子的我,在每次技术使用过程中,都喜欢将最原始的Demo记录下来。方便了自己,也希望能帮助到他人,更重要的是可以回顾自己的成长历程。

依赖:jdk1.7,apache-activemq-5.10.0

开发环境:ideaIU-14.1.4

测试环境:win7

activemq下载地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/5.10.0/apache-activemq-5.10.0-bin.zip

建立maven工程ActiveMQDemo,在pom.xml配置文件添加必要的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ActiveMQDemo</groupId>
    <artifactId>ActiveMQDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
            <version>0.32</version>
        </dependency>

    </dependencies>

</project>

Listener类:

package com.mangocool.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;

import javax.jms.*;

/**
 * Created by MANGOCOOL on 2015/11/6.
 */
public class Listener {

    public static void main(String []args) throws Exception {

        String user = env("ACTIVEMQ_USER", "admin");
        String password = env("ACTIVEMQ_PASSWORD", "password");
        String destination = arg(args, 0, "topic://event");
        ConnectionFactory factory = 
                new ActiveMQConnectionFactory(user, password, ActiveMQConnection.DEFAULT_BROKER_URL);
        Destination dest = null;
        if(destination.startsWith("topic://"))
        {
            dest = new TopicImpl(destination);
        } else
        {
            dest = new QueueImpl(destination);
        }
        Connection connection = factory.createConnection(user, password);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(dest);
        System.out.println("Waiting for messages...");
        while(true) {
            Message objMsg = consumer.receive();
            if( objMsg instanceof ObjectMessage ) {
                Task task = (Task)((ObjectMessage) objMsg).getObject();
                if(null == task) {
                    System.out.println("Listener: 任务全部结束!");
                    connection.close();
                    System.exit(1);
                } else {
                    int status = task.getStatus();
                    if( status == 100 ) {
                        System.out.println("获取内容: " + task.getContent());
                        System.out.println(task.getTaskId() + "任务结束!");
                    } else {
                        System.out.println("进度: " + task.getStatus() + "%");
                    }
                }
            }
        }
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if(rc== null)
            return defaultValue;
        return rc;
    }

    private static String arg(String []args, int index, String defaultValue) {
        if(index < args.length)
            return args[index];
        else
            return defaultValue;
    }
}
Publisher类:
package com.mangocool.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;

import javax.jms.*;

/**
 * Created by MANGOCOOL on 2015/11/6.
 */
public class Publisher {

    public static void main(String []args) throws Exception {

        String user = env("ACTIVEMQ_USER", "admin");
        String password = env("ACTIVEMQ_PASSWORD", "password");
        String destination = arg(args, 0, "topic://event");
        int messages = 5000;
        ConnectionFactory factory = 
                new ActiveMQConnectionFactory(user, password, ActiveMQConnection.DEFAULT_BROKER_URL);
        Destination dest = null;
        if(destination.startsWith("topic://"))
        {
            dest = new TopicImpl(destination);
        } else
        {
            dest = new QueueImpl(destination);
        }
        Connection connection = factory.createConnection(user, password);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        Task task = null;
        for( int i=1; i <= messages; i ++) {
            if( (i % 1000) == 0) {
                task = new Task(i+"");
                System.out.println("开始第" + task.getTaskId() + " 个任务!");
                ObjectMessage objMsg = session.createObjectMessage(task);
                objMsg.setIntProperty("id", i);
                int j=0;
                while (j <100)
                {
                    Thread.sleep(500);
                    j += 10;
                    System.out.println("......");
                    task.setStatus(j);
                    if(j == 100)
                    {
                        task.setContent("我是第 "+task.getTaskId()+" 个任务的内容!");
                    }
                    objMsg.setObject(task);
                    producer.send(objMsg);
                }
                System.out.println("任务 " + task.getTaskId() + " 结束!");
            }
        }
        producer.send(session.createObjectMessage(null));
        System.out.println("Publisher: 任务全部结束!");
        Thread.sleep(1000*3);
        connection.close();
        System.exit(0);
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if(rc== null)
            return defaultValue;
        return rc;
    }

    private static String arg(String []args, int index, String defaultValue) {
        if(index < args.length)
            return args[index];
        else
            return defaultValue;
    }
}
Task类:
package com.mangocool.activemq;

import java.io.Serializable;

/**
 * Created by MANGOCOOL on 2015/11/6.
 */
public class Task implements Serializable {

    private String taskId = "-1";
    private int status = 0;
    private long timeout = 30;
    private String content = "";

    public Task(String taskId)
    {
        this.taskId = taskId;
    }

    public Task(String taskId, String content)
    {
        this.taskId = taskId;
        this.content = content;
    }

    public Task(String taskId, int status, String content)
    {
        this.taskId = taskId;
        this.status = status;
        this.content = content;
    }

    public Task(String taskId, int status, long timeout, String content)
    {
        this.taskId = taskId;
        this.status = status;
        this.timeout = timeout;
        this.content = content;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public int getStatus() {
        return status;
    }
    public void setStatus(int status) {
        this.status = status;
    }

    public long getTimeout() {
        return timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public boolean equals(String taskId)
    {
        return this.taskId.equals(taskId);
    }

    public String toString()
    {
        return "";
    }
}

到此就可以测试了,首先启动Listener类,再启动Publisher类,观察控制台的输出就可以看到效果了。

标签: ActiveMQ Demo

分享:

关于我

一个喜欢唱歌,热衷旅行,爱好电子产品的码农。没事,跟三五好友吼上几嗓子,约上几个背着行囊去露营或者宅在家里抱着孩子敲代码。

座右铭:当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

            人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。


Copyright 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。