MangoCool

Kafka简单应用示例

2017-03-24 11:12:13   作者:MangoCool   来源:MangoCool

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。

依赖:jdk1.7

开发环境:ideaIU-2016.3.4

测试环境:win10

建立maven工程KafkaDemo,在pom.xml配置文件添加必要的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test.kafka</groupId>
    <artifactId>KafkaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>

        <kafka.version>0.8.2.0</kafka.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>


    <dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>${kafka.version}</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

KafkaProducer类:

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

/**
 * Created by MANGOCOOL on 2017/03/24.
 */
public class KafkaProducer extends Thread {

    private String topic;

    public KafkaProducer(String topic){
        super();
        this.topic = topic;
    }

    @Override
    public void run() {
        Producer producer = createProducer();
        int i=0;
        while(true){
            producer.send(new KeyedMessage<Integer, String>(topic, "生产到: " + i++));
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private Producer createProducer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "hnStorm03:2181,hnStorm04:2181,hnStorm05:2181");//声明zk
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("metadata.broker.list", "hnKafka01:9092,hnKafka02:9092");// 声明kafka broker
        return new Producer<>(new ProducerConfig(properties));
    }

    public static void main(String[] args) {
        new KafkaProducer("kafka-demo").start();// 使用kafka集群中创建好的主题 kafka-demo

    }
}
KafkaConsumer类:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * Created by MangoCool on 2017/03/24.
 */
public class kafkaConsumer extends Thread{

    private String topic;

    public kafkaConsumer(String topic){
        super();
        this.topic = topic;
    }

    @Override
    public void run() {
        ConsumerConnector consumer = createConsumer();
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
        Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = 
                 consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
        ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();
        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("消费到: " + message);
        }
    }

    private ConsumerConnector createConsumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "hnStorm03:2181,hnStorm04:2181,hnStorm05:2181");//声明zk
        // 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
        properties.put("group.id", "group-test");
        properties.put("zookeeper.connection.timeout.ms", "15000");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }


    public static void main(String[] args) {
        new kafkaConsumer("kafka-demo").start();// 使用kafka集群中创建好的主题 test
    }

}

开始测试,启动KafkaProducer类,再启动KafkaConsumer类,观察控制台的输出就可以看到效果了。

标签: Kafka demo

分享:

上一篇KeeperErrorCode = NoNode for /brokers/topics/test_topic/partitions

下一篇java.lang.IllegalArgumentException: message does not exist

关于我

崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。

座右铭

当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。

Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。