最近写Spark程序,经常出现这个异常:org.apache.spark.SparkException: Task not serializable,很郁闷的,相信大伙也跟我一样,所以抽时间特别研究了一下,并分享给大家,希望有帮助。
先看看这个例子吧:
package mangocool import org.apache.spark.{SparkConf, SparkContext} object TaskNotSerializationTest { def main(args: Array[String]) { new Test1().runJob } } object Spark1 { val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest") val ctx = new SparkContext(conf) } class Test1 { val rddList = Spark1.ctx.parallelize(List(1,2,3)) def runJob() = { val after = rddList.map(someFunc(_)) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 }
执行后的异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at mangocool.Test.runJob(Test.scala:17) at mangocool.TaskNotSerializationTest$.main(Test.scala:7) at mangocool.TaskNotSerializationTest.main(Test.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf Serialization stack: - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@507d3e4a) - field (class: mangocool.Test, name: conf, type: class org.apache.spark.SparkConf) - object (class mangocool.Test, mangocool.Test@78bc38a1) - field (class: mangocool.Test$$anonfun$runJob$1, name: $outer, type: class mangocool.Test) - object (class mangocool.Test$$anonfun$runJob$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 17 more
当你执行各种转换(map,flatMap,filter等等)的时候,会有以下转换:
1、在driver节点上序列化,
2、上传到合适的集群中的节点,
3、在节点上执行反序列化,
4、最后在节点上执行。
当然你也可以在本地运行,除了没有网络传输,其他的过程都一样的,这样的好处就是方便调试在你部署之前。
在这个例子中,你在class Test1中定义了一个方法,并运用在了map中,Spark知道不能序列化这个方法,于是试图序列化整个类,因此才能使得这个方法能运行在其他JVM之上,正因为本例没有序列化,所以才出现异常。要解决这个问题可以有以下两种方法:
第一种,直接让class Test1继承序列化。
package mangocool import org.apache.spark.{SparkConf, SparkContext} object TaskNotSerializationTest2 { def main(args: Array[String]) { new Test2().runJob } } object Spark2 { val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest") val ctx = new SparkContext(conf) } class Test2 extends java.io.Serializable { val rddList = Spark2.ctx.parallelize(List(1,2,3)) def runJob() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1//method 在Scala中function都是object }
第二种,将someFunc方法改成函数。注:在Scala中函数都是对象,可以直接序列化。
package mangocool import org.apache.spark.{SparkConf, SparkContext} object TaskNotSerializationTest3 { def main(args: Array[String]) { new Test3().runJob } } object Spark3 { val conf = new SparkConf().setMaster("local").setAppName("TaskNotSerializationTest") val ctx = new SparkContext(conf) } class Test3 { val rddList = Spark3.ctx.parallelize(List(1,2,3)) def runJob() = { val after = rddList.map(someFunc) after.collect().foreach(println) } val someFunc = (a: Int) => a + 1//function 在Scala中function都是object }
至此问题解决,有不对或是不完善地方请多多指正以及多多谅解!
标签: Spark Scala Exception Serializable
分享:
崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。
联系QQ:58742094
联系电话:
工作邮箱:
当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!
人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。
Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号
免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。