spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "donhoff_h" <165612...@qq.com>
Subject meet weird exception when studying rdd caching
Date Tue, 21 Apr 2015 02:39:29 GMT
Hi,

I am studying the RDD Caching function and write a small program to verify it. I run the program
in a Spark1.3.0 environment and on Yarn cluster. But I meet a weird exception. It isn't always
generated in the log. Only sometimes I can see this exception. And it does not affect the
output of my program.  Could anyone explain why this happens and how to eliminate it?

My program and the exception is listed in the following. Thanks very much for the help!

*****Program*****
object TestSparkCaching01 {
 def main(args: Array[String]) {
   val conf = new SparkConf()
   conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   conf.set("spark.kryo.registrationRequired","true")
   conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]]))
   val inFile = "hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt"
   val sc = new SparkContext(conf)
   val rdd = sc.textFile(inFile)
   rdd.cache()
   rdd.map("Cache String: "+_).foreach(println )
   sc.stop()
 }
}

*****Exception*****
15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was thrown by an exception
handler.
java.util.concurrent.RejectedExecutionException: Worker has already been shutdown
                at org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
                at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
                at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
                at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
                at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
                at org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
                at org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
                at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
                at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
                at org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
                at org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
                at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
                at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
                at scala.util.Success.foreach(Try.scala:205)
                at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
                at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
                at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
                at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
                at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
                at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
                at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
                at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
                at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
                at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
                at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Mime
View raw message