spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "maji2014 (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-4314) Exception when textFileStream attempts to read deleted _COPYING_ file
Date Wed, 26 Nov 2014 02:01:13 GMT

     [ https://issues.apache.org/jira/browse/SPARK-4314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

maji2014 updated SPARK-4314:
----------------------------
    Description: 
[Reproduce]
 1. Run HdfsWordCount interface, such as "ssc.textFileStream(args(0))"
 2. Upload file to hdfs(reason as followings)
 3. Exception as followings.

[Exception stack]
 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to master/192.168.84.142:9000
from ocdc sending #13
 14/11/10 01:21:19 ERROR JobScheduler: Error generating jobs for time 1415611274000 ms
 org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://master:9000/user/spark/200.\_COPYING\_
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
 at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Exception in thread "main" 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection
to master/192.168.84.142:9000 from ocdc got value #13
 org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://master:9000/user/spark/200.COPYING
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
 at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 14/11/10 01:21:19 DEBUG ProtobufRpcEngine: Call: getListing took 3ms

[Reason]
 Intermediate file 200.\_COPYING\_ is found by FileInputDStream interface, and exception throws
when NewHadoopRDD ready to handle non-existent 200.\_COPYING\_ file because file 200.\_COPYING\_
is changed to file 200 when upload is finished


  was:
[Reproduce]
 1. Run HdfsWordCount interface, such as "ssc.textFileStream(args(0))"
 2. Upload file to hdfs(reason as followings)
 3. Exception as followings.

[Exception stack]
 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to master/192.168.84.142:9000
from ocdc sending #13
 14/11/10 01:21:19 ERROR JobScheduler: Error generating jobs for time 1415611274000 ms
 org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://master:9000/user/spark/200.COPYING
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
 at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Exception in thread "main" 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection
to master/192.168.84.142:9000 from ocdc got value #13
 org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://master:9000/user/spark/200.COPYING
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
 at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
 at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
 at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
 at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
 at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 14/11/10 01:21:19 DEBUG ProtobufRpcEngine: Call: getListing took 3ms

[Reason]
 Intermediate file 200.\_COPYING\_ is found by FileInputDStream interface, and exception throws
when NewHadoopRDD ready to handle non-existent 200.COPYING file because file 200.COPYING is
changed to file 200 when upload is finished



> Exception when textFileStream attempts to read deleted _COPYING_ file
> ---------------------------------------------------------------------
>
>                 Key: SPARK-4314
>                 URL: https://issues.apache.org/jira/browse/SPARK-4314
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: maji2014
>
> [Reproduce]
>  1. Run HdfsWordCount interface, such as "ssc.textFileStream(args(0))"
>  2. Upload file to hdfs(reason as followings)
>  3. Exception as followings.
> [Exception stack]
>  14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to master/192.168.84.142:9000
from ocdc sending #13
>  14/11/10 01:21:19 ERROR JobScheduler: Error generating jobs for time 1415611274000 ms
>  org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist:
hdfs://master:9000/user/spark/200.\_COPYING\_
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
>  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.immutable.List.foreach(List.scala:318)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at scala.util.Try$.apply(Try.scala:161)
>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  Exception in thread "main" 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection
to master/192.168.84.142:9000 from ocdc got value #13
>  org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist:
hdfs://master:9000/user/spark/200.COPYING
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
>  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.immutable.List.foreach(List.scala:318)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at scala.util.Try$.apply(Try.scala:161)
>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  14/11/10 01:21:19 DEBUG ProtobufRpcEngine: Call: getListing took 3ms
> [Reason]
>  Intermediate file 200.\_COPYING\_ is found by FileInputDStream interface, and exception
throws when NewHadoopRDD ready to handle non-existent 200.\_COPYING\_ file because file 200.\_COPYING\_
is changed to file 200 when upload is finished



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message