org.apache.spark.SparkException: Task not serializable

2015-08-19 15:51:41   作者:MangoCool   来源:MangoCool

最近写Spark程序,经常出现这个异常:org.apache.spark.SparkException: Task not serializable,很郁闷的,相信大伙也跟我一样,所以抽时间特别研究了一下,并分享给大家,希望有帮助。

先看看这个例子吧:

package mangocool

import org.apache.spark.{SparkConf, SparkContext}

object TaskNotSerializationTest {
  def main(args: Array[String]) {
    new Test1().runJob
  }
}

object Spark1 {
  val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest")
  val ctx = new SparkContext(conf)
}

class Test1 {
  val rddList = Spark1.ctx.parallelize(List(1,2,3))

  def runJob() =  {
    val after = rddList.map(someFunc(_))
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1

}

执行后的异常:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
	at org.apache.spark.rdd.RDD.map(RDD.scala:293)
	at mangocool.Test.runJob(Test.scala:17)
	at mangocool.TaskNotSerializationTest$.main(Test.scala:7)
	at mangocool.TaskNotSerializationTest.main(Test.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@507d3e4a)
	- field (class: mangocool.Test, name: conf, type: class org.apache.spark.SparkConf)
	- object (class mangocool.Test, mangocool.Test@78bc38a1)
	- field (class: mangocool.Test$$anonfun$runJob$1, name: $outer, type: class mangocool.Test)
	- object (class mangocool.Test$$anonfun$runJob$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
	... 17 more

当你执行各种转换(map,flatMap,filter等等)的时候,会有以下转换:

1、在driver节点上序列化,

2、上传到合适的集群中的节点,

3、在节点上执行反序列化,

4、最后在节点上执行。

当然你也可以在本地运行,除了没有网络传输,其他的过程都一样的,这样的好处就是方便调试在你部署之前。

在这个例子中,你在class Test1中定义了一个方法,并运用在了map中,Spark知道不能序列化这个方法,于是试图序列化整个类,因此才能使得这个方法能运行在其他JVM之上,正因为本例没有序列化,所以才出现异常。要解决这个问题可以有以下两种方法:

第一种,直接让class Test1继承序列化。

package mangocool

import org.apache.spark.{SparkConf, SparkContext}

object TaskNotSerializationTest2 {
  def main(args: Array[String]) {
    new Test2().runJob
  }
}

object Spark2 {
  val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest")
  val ctx = new SparkContext(conf)
}

class Test2 extends java.io.Serializable {
  val rddList = Spark2.ctx.parallelize(List(1,2,3))

  def runJob() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1//method 在Scala中function都是object
}

第二种,将someFunc方法改成函数。注:在Scala中函数都是对象,可以直接序列化。

package mangocool

import org.apache.spark.{SparkConf, SparkContext}

object TaskNotSerializationTest3 {
  def main(args: Array[String]) {
    new Test3().runJob
  }
}

object Spark3 {
  val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest")
  val ctx = new SparkContext(conf)
}

class Test3 {
  val rddList = Spark3.ctx.parallelize(List(1,2,3))

  def runJob() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1//function 在Scala中function都是object
}

至此问题解决,不对或是不完善地方请多多指正以及多多谅解!

标签: Spark Scala Exception Serializable

分享:

关于我

一个喜欢唱歌,热衷旅行,爱好电子产品的码农。没事,跟三五好友吼上几嗓子,约上几个背着行囊去露营或者宅在家里抱着孩子敲代码。

座右铭:当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

            人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。


Copyright 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。