ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <ming...@thomsonreuters.com>
Subject IgniteConfiguration object not serializable during spark job
Date Wed, 07 Sep 2016 09:57:04 GMT
Hi Guys,

I am coding a simple application with Ignite and Spark, UT works well, but when make assembly
fat jar and run with Spark, I got this error.

It looks IgniteConfiguration can't serializable, but in IgniteContext, Once variable, this
type value marked as @transient, it should not need to serialize.

Version is 1.7.0
Exception:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
        at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:226)
        at com.tr.IgniteSparkApps$.main(IgniteSparkApps.scala:38)
        at com.tr.IgniteSparkApps.main(IgniteSparkApps.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.ignite.configuration.IgniteConfiguration
Serialization stack:
        - object not serializable (class: org.apache.ignite.configuration.IgniteConfiguration,
value: IgniteConfiguration [gridName=null, pubPoolSize=64, callbackPoolSize=64, sysPoolSize=64,
mgmtPoolSize=4, igfsPoolSize=32, utilityCachePoolSize=64, utilityCacheKeepAliveTime=10000,
marshCachePoolSize=64, marshCacheKeepAliveTime=10000, p2pPoolSize=2, igniteHome=null, igniteWorkDir=null,
mbeanSrv=null, nodeId=null, marsh=null, marshLocJobs=false, daemon=false, p2pEnabled=false,
netTimeout=5000, sndRetryDelay=1000, sndRetryCnt=3, clockSyncSamples=8, clockSyncFreq=120000,
metricsHistSize=10000, metricsUpdateFreq=2000, metricsExpTime=9223372036854775807, discoSpi=TcpDiscoverySpi
[addrRslvr=null, sockTimeout=5000, ackTimeout=5000, reconCnt=10, maxAckTimeout=600000, forceSrvMode=false,
clientReconnectDisabled=false], segPlc=STOP, segResolveAttempts=2, waitForSegOnStart=true,
allResolversPassReq=true, segChkFreq=10000, commSpi=null, evtSpi=null, colSpi=null, deploySpi=null,
swapSpaceSpi=null, indexingSpi=null, addrRslvr=null, clientMode=true, rebalanceThreadPoolSize=1,
txCfg=org.apache.ignite.configuration.TransactionConfiguration@25b74370, cacheSanityCheckEnabled=true,
discoStartupDelay=60000, deployMode=SHARED, p2pMissedCacheSize=100, locHost=null, timeSrvPortBase=31100,
timeSrvPortRange=100, failureDetectionTimeout=10000, metricsLogFreq=60000, hadoopCfg=null,
connectorCfg=org.apache.ignite.configuration.ConnectorConfiguration@15e8c040, odbcCfg=null,
warmupClos=null, atomicCfg=AtomicConfiguration [seqReserveSize=1000, cacheMode=PARTITIONED,
backups=0], classLdr=null, sslCtxFactory=null, platformCfg=null, binaryCfg=null, lateAffAssignment=true])
        - field (class: com.tr.IgniteSparkApps$$anonfun$1, name: igniteConf$1, type: class
org.apache.ignite.configuration.IgniteConfiguration)
        - object (class com.tr.IgniteSparkApps$$anonfun$1, <function0>)
        - field (class: org.apache.ignite.spark.Once, name: clo, type: interface scala.Function0)
        - object (class org.apache.ignite.spark.Once, org.apache.ignite.spark.Once@7eae55)
        - field (class: org.apache.ignite.spark.IgniteContext, name: cfgClo, type: class org.apache.ignite.spark.Once)
        - object (class org.apache.ignite.spark.IgniteContext, org.apache.ignite.spark.IgniteContext@6f2d3391)
        - field (class: org.apache.ignite.spark.impl.IgniteAbstractRDD, name: ic, type: class
org.apache.ignite.spark.IgniteContext)
        - object (class org.apache.ignite.spark.IgniteRDD, IgniteRDD[0] at RDD at IgniteAbstractRDD.scala:32)
        - field (class: org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1, name: $outer,
type: class org.apache.ignite.spark.IgniteRDD)
        - object (class org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

here is my code:

val sc = new SparkContext(new SparkConf().setAppName("IgniteSparkApp"))

val igniteConf = new IgniteConfiguration
val disco = new TcpDiscoverySpi()
val ipFinder = new TcpDiscoveryMulticastIpFinder
ipFinder.setMulticastGroup("228.10.10.157")
disco.setIpFinder(ipFinder)
igniteConf.setDiscoverySpi(disco)
igniteConf.setClientMode(true)

val igniteContext = new IgniteContext(sc, () => igniteConf)

for (cache <- igniteContext.ignite().cacheNames().asScala) {
    igniteContext.ignite().destroyCache(cache)
    logInfo(s"SUCCESS Destroy:${cache}")
}

val sharedRdd = igniteContext.fromCache[String, Row]("persist_Fundamental")
if (sharedRdd.isEmpty()) {
    import com.tr.hbase.HBaseContext
    val sc: HBaseContext = igniteContext
    val rdd = sc.hbase("Fundamental")
    logInfo(s"start saving RDD:${rdd.getClass.getName}")
    sharedRdd.savePairs(rdd, true)
}

Thanks.
Ming

Mime
View raw message