MangoCool

Spark操作Hbase的简单示例

2015-06-25 11:46:51   作者:MangoCool   来源:MangoCool

本示例编写的目的在于完成Spark应用从Hbase数据库获取数据再经过数据处理最后存回Hbase的一个完整的处理流程,加深理解Spark是如何工作的。也是对我们将来的业务的一个简单构建。

部署环境:Centos6.6

部署软件:Hadoop-2.7.0 + Hbase-0.98.12.1 + Spark-1.3.1

依赖:jdk1.7+scala-2.10.4

开发环境:ideaIU-14.1.4

运行环境:Centos6.6

首先进入hbase shell创建两张表,score表和calculate表,这个可以根据自己的想法创建,只须要记得对应于代码中要操作的表即可!

./hbase shell
hbase(main):005:0> create 'score','course'
hbase(main):005:0> create 'calculate','score'

这里我put了6条数据到score表:

hbase(main):011:0> put 'score','zhangsan','course:chinese','98'
hbase(main):011:0> put 'score','zhangsan','course:math','80'
hbase(main):011:0> put 'score','zhangsan','course:english','78'
hbase(main):011:0> put 'score','lisi','course:chinese','96'
hbase(main):011:0> put 'score','lisi','course:math','88'
hbase(main):011:0> put 'score','lisi','course:english','76'

然后可以scan一下score表,看看put后的数据:

hbase(main):011:0> scan 'score'

示例代码如下:

import java.io.Serializable
import java.util.logging.Logger
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer}

object SparkFromHbase extends Serializable {

  val logger = Logger.getLogger(SparkFromHbase.getClass.getName)

  def main(args: Array[String]) {

    System.setProperty("hadoop.home.dir" , "E:\\Program Files\\hadoop-2.7.0" )

    val sconf = new SparkConf()
      .setMaster("spark://h230:7077")
      .setAppName("SparkFromHbase")
      .set("spark.executor.memory", "1g")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sconf)

    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("hbase.zookeeper.quorum", "h230")
    conf.set("hbase.master", "h230:60000")
//    conf.addResource("/home/hadoop/SW/hbase/conf/hbase-site.xml")//替代以上三条配置信息
    conf.set(TableInputFormat.INPUT_TABLE, "score")
//    conf.set(TableInputFormat.SCAN_ROW_START, "Tag000001");
//    conf.set(TableInputFormat.SCAN_ROW_STOP, "Tag000060");

    //Scan操作
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val count = hBaseRDD.count()
    println("HBase RDD Count:" + count)
    logger.info("HBase RDD Count:" + count)
//    hBaseRDD.cache()

    val map = new HashMap[String, Int]
//    hBaseRDD.saveAsTextFile("hdfs://h230:9000/test/")//直接存hdfs
//    val res = hBaseRDD.collect()
    val res = hBaseRDD.take(count.toInt)
    for (j <- 0 until count.toInt)
    {
      val rs = res(j - 0)._2
      val kvs = rs.raw
      for (kv <- kvs)
      {
        val rowKey = new String(kv.getRow())
        val family = new String(kv.getFamily())
        val column = new String(kv.getQualifier())
        val value = new String(kv.getValue())
        if(map.contains(rowKey))
        {
          val v = map.get(rowKey).get;
          map.put(rowKey, v+Integer2int(Integer.valueOf(value)))
        } else
        {
          map.put(rowKey, Integer2int(Integer.valueOf(value)))
        }
        println("rowKey:" + rowKey + " cf:" + family + " column:" + column + " value:" + value)
        println("mapContent:"+map)
        logger.info("rowKey:" + rowKey + " cf:" + family + " column:" + column + " value:" + value)
        logger.info("mapContent:"+map)
      }
    }

    val lists = ListBuffer[(String, Int, Int)]()
    for((k, v) <- map)
    {
      val list = (k, v/3, v)
      lists.add(list)
    }
    logger.info("lists info: "+lists.toList)
    println("lists info: "+lists.toList)

    //指定输出格式和输出表名
    val jobConf = new JobConf(conf,this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"calculate")
    val rawData = lists.toList
    val localData = sc.parallelize(rawData).map(convert)
    localData.saveAsHadoopDataset(jobConf)
  }

  def convert(triple: (String, Int, Int)) = {
    val p = new Put(Bytes.toBytes(triple._1))
    p.add(Bytes.toBytes("score"),Bytes.toBytes("avg"),Bytes.toBytes(triple._2))
    p.add(Bytes.toBytes("score"),Bytes.toBytes("sum"),Bytes.toBytes(triple._3))
    (new ImmutableBytesWritable, p)
  }
}

然后将其打包生成SparkFromHbase.jar包,上传服务器,这里我放在了spark-1.3.1/lib目录下。

进入spark-1.3.1/bin目录,执行命令:

./spark-submit --class SparkFromHbase ../lib/SparkFromHbase.jar

因为我们在程序中设置了AppName和Master,所以命令中可以不需要设置(--master spark://h230:7077 --name SparkFromHbase)

最后我们查看一下hbase的calculate表,是否跟预期的一样。

hbase(main):015:0> scan 'calculate'

到此本文就已结束,有不对或是不完善地方请多多指正以及多多谅解!


参考文章:http://wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/

标签: Spark Hbase Scala demo

分享:

上一篇有关百度一键分享链接到微信朋友圈无法打开网页,报404错误的解决办法,from=timeline&isappinstalled=0,和伪静态规则有关

下一篇org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 2) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable

关于我

崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。

座右铭

当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。

Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。