基于HDFS的Spark Sql简单示例

2015-07-18 16:25:39   作者:colobu   来源:鸟窝

一切开始源于学习!终于开始学习Spark Sql了,之前一直没有了解过除了浏览了一下官网,于是就想从大神博客那里找点资料跑跑,实际感受一下。本示例是基于hdfs数据源。数据是前年从网上放出来的2000W的开房记录,嘿嘿!这是屌丝程序员的绝佳数据啊!且xx且珍惜!记录此文章的目的是为了记录自己学习的过程,效仿大神。废话不多说,马上开始!

部署环境:Centos6.6

部署软件:Hadoop-2.7.0 + Hbase-0.98.12.1 + Spark-1.3.1

依赖:jdk1.7+scala-2.10.4

开发环境:ideaIU-14.1.4

运行环境:Centos6.6

示例代码:

import org.apache.spark.{SparkConf, SparkContext}

object SparkSql {

  case class Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)

  def main(args: Array[String]) {

    System.setProperty("hadoop.home.dir" , "E:\\Program Files\\hadoop-2.7.0" )//win7环境下运行须加

    val sconf = new SparkConf()
      .setMaster("spark://h230:7077")//h230是我的hostname,须自行修改
      .setAppName("SparkSql")//应用名称
      .set("spark.executor.memory", "1g")//应用执行时所用内存1g
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//序列化
    val sc = new SparkContext(sconf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val customer = sc.textFile("/input/*.csv").map(_.split(",")).filter(line => line.length > 7).map(a => Customer(a(0), a(5), a(4), a(6), a(7)))
    val customerSchema = sqlContext.createDataFrame(customer)
    customerSchema.registerTempTable("customer")
    def toInt(s: String):Int = {
      try {
        s.toInt
      } catch {
        case e:Exception => 9999
      }
    }
    def myfun(birthday: String) : String = {
      var rt = "未知"
      if (birthday.length == 8) {
        val md = toInt(birthday.substring(4))
        if (md >= 120 & md <= 219)
          rt = "水瓶座"
        else if (md >= 220 & md <= 320)
          rt = "双鱼座"
        else if (md >= 321 & md <= 420)
          rt = "白羊座"
        else if (md >= 421 & md <= 521)
          rt = "金牛座"
        else if (md >= 522 & md <= 621)
          rt = "双子座"
        else if (md >= 622 & md <= 722)
          rt = "巨蟹座"
        else if (md >= 723 & md <= 823)
          rt = "狮子座"
        else if (md >= 824 & md <= 923)
          rt = "处女座"
        else if (md >= 924 & md <= 1023)
          rt = "天秤座"
        else if (md >= 1024 & md <= 1122)
          rt = "天蝎座"
        else if (md >= 1123 & md <= 1222)
          rt = "射手座"
        else if ((md >= 1223 & md <= 1231) | (md >= 101 & md <= 119))
          rt = "摩蝎座"
        else
          rt = "未知"
      }
      rt
    }
    sqlContext.udf.register("constellation",  (x:String) => myfun(x))
    var result = sqlContext.sql("SELECT count(0) FROM customer")
    result.collect().foreach(println)

    result = sqlContext.sql("SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)")
    result.collect().foreach(println)
  }
}

我的build.sbt配置:

name := "SparkSql"
 
version := "1.0"
 
scalaVersion := "2.10.4"
 
autoScalaLibrary := false
 
libraryDependencies ++= Seq("org.apache.hadoop" % "hadoop-main" % "2.7.0",
  "org.scala-lang" % "scala-library" % "2.10.4",
  "org.scala-lang" % "scala-reflect" % "2.10.4",
  "org.scala-lang" % "scala-compiler" % "2.10.4",
  "org.scala-lang" % "scalap" % "2.10.4",
  "org.apache.thrift" % "libthrift" % "0.9.2",
  "org.apache.hbase" % "hbase-server" % "0.98.12.1-hadoop2",
  "org.apache.hbase" % "hbase-client" % "0.98.12.1-hadoop2",
  "org.apache.hbase" % "hbase-common" % "0.98.12.1-hadoop2",
  "org.apache.spark" % "spark-sql_2.10" % "1.3.1",
  "org.apache.spark" % "spark-core_2.10" % "1.3.1"
)
关于这篇文章,我唯一的困难就是找不到这2000W的开房记录(好黄躁的!),所以我就自己恶搞了几条,结果太恶心我就不贴了。

提示:如果谁能提供这2000W的开房记录,就此拜谢!QQ邮箱:58742094@qq.com


参考文章:http://colobu.com/2014/12/11/spark-sql-quick-start/

标签: HDFS Hbase Spark Sql Scala

分享:

关于我

一个喜欢唱歌,热衷旅行,爱好电子产品的码农。没事,跟三五好友吼上几嗓子,约上几个背着行囊去露营或者宅在家里抱着孩子敲代码。

座右铭:当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

            人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。


Copyright 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。