一切开始源于学以致用!终于开始研究Spark Sql了,之前一直参考官网的例子和大神博客,学习过这些例子之后,发现要么基于本地数据源,要么就是基于hdfs数据源。而我试图想从Hbase获取数据后,再实现Spark Sql例子,而这种示例也是我多天来没有google到的,所以想记录下来方便自己查阅,或许也能帮到像我一下开始学习Spark Sql的菜菜鸟!废话不多说,马上开始!
部署环境:Centos6.6
部署软件:Hadoop-2.7.0 + Hbase-0.98.12.1 + Spark-1.3.1
依赖:jdk1.7+scala-2.10.4
开发环境:ideaIU-14.1.4
运行环境:Centos6.6/win7
示例代码:
import java.io.Serializable import java.util.logging.Logger import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.{SparkConf, SparkContext} object MySparkSql extends Serializable { case class Score(name: String, chinese: Int, english: Int, math: Int) val logger = Logger.getLogger(MySparkSql.getClass.getName) def main(args: Array[String]) { val jars: Array[String] = Array("D:\\workspace\\mysparksql_2.10-1.0.jar") System.setProperty("hadoop.home.dir" , "E:\\Program Files\\hadoop-2.7.0" )//win7环境下运行须加 val sconf = new SparkConf() .setMaster("local") // .setMaster("spark://h230:7077")//在集群测试下设置,h230是我的hostname,须自行修改 .setAppName("MySparkSql")//win7环境下设置 .set("spark.executor.memory", "1g") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // .setJars(jars)//在集群测试下,设置应用所依赖的jar包 val sc = new SparkContext(sconf) val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "h230") conf.set("hbase.master", "h230:60000") // conf.addResource("/home/hadoop/SW/hbase/conf/hbase-site.xml")//替代以上三条配置信息 conf.set(TableInputFormat.INPUT_TABLE, "score") // Scan操作 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val score = hBaseRDD.map(m => m._2.listCells()).map(c => Score(new String(c.get(0).getRow()), new String(c.get(0).getValue).toInt, new String(c.get(1).getValue).toInt, new String(c.get(2).getValue).toInt) ) score.foreach(println) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val scoreSchema = sqlContext.createDataFrame(score) scoreSchema.registerTempTable("score") var result = sqlContext.sql("SELECT count(0) FROM score") result.collect().foreach(println) result = sqlContext.sql("SELECT sum(chinese) FROM score") result.collect().foreach(println) result = sqlContext.sql("SELECT avg(chinese) FROM score") result.collect().foreach(println) result = sqlContext.sql("SELECT sum(math) FROM score") result.collect().foreach(println) result = sqlContext.sql("SELECT avg(math) FROM score") result.collect().foreach(println) result = sqlContext.sql("SELECT sum(english) FROM score") result.collect().foreach(println) result = sqlContext.sql("SELECT avg(english) FROM score") result.collect().foreach(println) } }
我的build.sbt配置:
name := "MySparkSql" version := "1.0" scalaVersion := "2.10.4" autoScalaLibrary := false libraryDependencies ++= Seq("org.apache.hadoop" % "hadoop-main" % "2.7.0", "org.scala-lang" % "scala-library" % "2.10.4", "org.scala-lang" % "scala-reflect" % "2.10.4", "org.scala-lang" % "scala-compiler" % "2.10.4", "org.scala-lang" % "scalap" % "2.10.4", "org.apache.thrift" % "libthrift" % "0.9.2", "org.apache.hbase" % "hbase-server" % "0.98.12.1-hadoop2", "org.apache.hbase" % "hbase-client" % "0.98.12.1-hadoop2", "org.apache.hbase" % "hbase-common" % "0.98.12.1-hadoop2", "org.apache.spark" % "spark-sql_2.10" % "1.3.1", "org.apache.spark" % "spark-core_2.10" % "1.3.1" )关于代码中的数据库表‘score’,在前面的文章中有:http://mangocool.com/1435117759266.html
最后运行结果:
Score(lisi,96,76,88) Score(wangwu,98,99,89) Score(zhangsan,98,78,80) [3] [292] [97.33333333333333] [257] [85.66666666666667] [253] [84.33333333333333]
至此本示例描述完毕!如果有任何问题可以联系本人,期待与您交流。学无止尽,永不止步!
标签: Hadoop Hbase Spark Sql Scala
分享:
崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。
联系QQ:58742094
联系电话:
工作邮箱:
当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!
人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。
Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号
免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。