本示例编写的目的在于完成Spark应用从Hbase数据库获取数据再经过数据处理最后存回Hbase的一个完整的处理流程,加深理解Spark是如何工作的。也是对我们将来的业务的一个简单构建。
部署环境:Centos6.6
部署软件:Hadoop-2.7.0 + Hbase-0.98.12.1 + Spark-1.3.1
依赖:jdk1.7+scala-2.10.4
开发环境:ideaIU-14.1.4
运行环境:Centos6.6
首先进入hbase shell创建两张表,score表和calculate表,这个可以根据自己的想法创建,只须要记得对应于代码中要操作的表即可!
./hbase shell hbase(main):005:0> create 'score','course' hbase(main):005:0> create 'calculate','score'
这里我put了6条数据到score表:
hbase(main):011:0> put 'score','zhangsan','course:chinese','98' hbase(main):011:0> put 'score','zhangsan','course:math','80' hbase(main):011:0> put 'score','zhangsan','course:english','78' hbase(main):011:0> put 'score','lisi','course:chinese','96' hbase(main):011:0> put 'score','lisi','course:math','88' hbase(main):011:0> put 'score','lisi','course:english','76'
然后可以scan一下score表,看看put后的数据:
hbase(main):011:0> scan 'score'
示例代码如下:
import java.io.Serializable import java.util.logging.Logger import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, ListBuffer} object SparkFromHbase extends Serializable { val logger = Logger.getLogger(SparkFromHbase.getClass.getName) def main(args: Array[String]) { System.setProperty("hadoop.home.dir" , "E:\\Program Files\\hadoop-2.7.0" ) val sconf = new SparkConf() .setMaster("spark://h230:7077") .setAppName("SparkFromHbase") .set("spark.executor.memory", "1g") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sconf) val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "h230") conf.set("hbase.master", "h230:60000") // conf.addResource("/home/hadoop/SW/hbase/conf/hbase-site.xml")//替代以上三条配置信息 conf.set(TableInputFormat.INPUT_TABLE, "score") // conf.set(TableInputFormat.SCAN_ROW_START, "Tag000001"); // conf.set(TableInputFormat.SCAN_ROW_STOP, "Tag000060"); //Scan操作 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hBaseRDD.count() println("HBase RDD Count:" + count) logger.info("HBase RDD Count:" + count) // hBaseRDD.cache() val map = new HashMap[String, Int] // hBaseRDD.saveAsTextFile("hdfs://h230:9000/test/")//直接存hdfs // val res = hBaseRDD.collect() val res = hBaseRDD.take(count.toInt) for (j <- 0 until count.toInt) { val rs = res(j - 0)._2 val kvs = rs.raw for (kv <- kvs) { val rowKey = new String(kv.getRow()) val family = new String(kv.getFamily()) val column = new String(kv.getQualifier()) val value = new String(kv.getValue()) if(map.contains(rowKey)) { val v = map.get(rowKey).get; map.put(rowKey, v+Integer2int(Integer.valueOf(value))) } else { map.put(rowKey, Integer2int(Integer.valueOf(value))) } println("rowKey:" + rowKey + " cf:" + family + " column:" + column + " value:" + value) println("mapContent:"+map) logger.info("rowKey:" + rowKey + " cf:" + family + " column:" + column + " value:" + value) logger.info("mapContent:"+map) } } val lists = ListBuffer[(String, Int, Int)]() for((k, v) <- map) { val list = (k, v/3, v) lists.add(list) } logger.info("lists info: "+lists.toList) println("lists info: "+lists.toList) //指定输出格式和输出表名 val jobConf = new JobConf(conf,this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"calculate") val rawData = lists.toList val localData = sc.parallelize(rawData).map(convert) localData.saveAsHadoopDataset(jobConf) } def convert(triple: (String, Int, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.add(Bytes.toBytes("score"),Bytes.toBytes("avg"),Bytes.toBytes(triple._2)) p.add(Bytes.toBytes("score"),Bytes.toBytes("sum"),Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } }
然后将其打包生成SparkFromHbase.jar包,上传服务器,这里我放在了spark-1.3.1/lib目录下。
进入spark-1.3.1/bin目录,执行命令:
./spark-submit --class SparkFromHbase ../lib/SparkFromHbase.jar
因为我们在程序中设置了AppName和Master,所以命令中可以不需要设置(--master spark://h230:7077 --name SparkFromHbase)
最后我们查看一下hbase的calculate表,是否跟预期的一样。
hbase(main):015:0> scan 'calculate'
到此本文就已结束,有不对或是不完善地方请多多指正以及多多谅解!
参考文章:http://wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/
分享:
上一篇有关百度一键分享链接到微信朋友圈无法打开网页,报404错误的解决办法,from=timeline&isappinstalled=0,和伪静态规则有关
崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。
联系QQ:58742094
联系电话:
工作邮箱:
当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!
人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。
Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号
免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。