spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "maji2014 (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-4314) Exception when textFileStream attempts to read deleted _COPYING_ file
Date Wed, 26 Nov 2014 02:05:12 GMT

    [ https://issues.apache.org/jira/browse/SPARK-4314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225589#comment-14225589
] 

maji2014 edited comment on SPARK-4314 at 11/26/14 2:05 AM:
-----------------------------------------------------------

Discription for  fileStream in textFileStream method is "HDFS directory to monitor for new
file". To reproduce this scenario, upload the file into the monitoring hdfs directory through
command like "hadoop fs -put filename /user/". do you think "hadoop fs -put filename /user/"
is the wrong way?
the first time, the upload filename is in intermediate partly written state "filename.\_COPYING\_",
and then the filename is changed when upload complete. But the  intermediate partly is caught
by spark. then the scenario appears. of course, you can also refer to conversation in the
pull request.


was (Author: maji2014):
Discription for  fileStream in textFileStream method is "HDFS directory to monitor for new
file". To reproduce this scenario, upload the file into the monitoring hdfs directory through
command like "hadoop fs -put filename /user/".
the first time, the upload filename is in intermediate partly written state "filename.\_COPYING\_",
and then the filename is changed when upload complete. But the  intermediate partly is caught
by spark. then the scenario appears. of course, you can also refer to conversation in the
pull request.

> Exception when textFileStream attempts to read deleted _COPYING_ file
> ---------------------------------------------------------------------
>
>                 Key: SPARK-4314
>                 URL: https://issues.apache.org/jira/browse/SPARK-4314
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: maji2014
>
> [Reproduce]
>  1. Run HdfsWordCount interface, such as "ssc.textFileStream(args(0))"
>  2. Upload file to hdfs(reason as followings)
>  3. Exception as followings.
> [Exception stack]
>  14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to master/192.168.84.142:9000
from ocdc sending #13
>  14/11/10 01:21:19 ERROR JobScheduler: Error generating jobs for time 1415611274000 ms
>  org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist:
hdfs://master:9000/user/spark/200.\_COPYING\_
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
>  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.immutable.List.foreach(List.scala:318)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at scala.util.Try$.apply(Try.scala:161)
>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  Exception in thread "main" 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection
to master/192.168.84.142:9000 from ocdc got value #13
>  org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist:
hdfs://master:9000/user/spark/200.COPYING
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
>  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
>  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>  at scala.collection.immutable.List.foreach(List.scala:318)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291)
>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>  at scala.util.Try$.apply(Try.scala:161)
>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  14/11/10 01:21:19 DEBUG ProtobufRpcEngine: Call: getListing took 3ms
> [Reason]
>  Intermediate file 200.\_COPYING\_ is found by FileInputDStream interface, and exception
throws when NewHadoopRDD ready to handle non-existent 200.\_COPYING\_ file because file 200.\_COPYING\_
is changed to file 200 when upload is finished



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message