Return-Path: X-Original-To: apmail-spark-issues-archive@minotaur.apache.org Delivered-To: apmail-spark-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B7D910405 for ; Wed, 26 Nov 2014 02:05:13 +0000 (UTC) Received: (qmail 45007 invoked by uid 500); 26 Nov 2014 02:05:12 -0000 Delivered-To: apmail-spark-issues-archive@spark.apache.org Received: (qmail 44833 invoked by uid 500); 26 Nov 2014 02:05:12 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 44654 invoked by uid 99); 26 Nov 2014 02:05:12 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 02:05:12 +0000 Date: Wed, 26 Nov 2014 02:05:12 +0000 (UTC) From: "maji2014 (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (SPARK-4314) Exception when textFileStream attempts to read deleted _COPYING_ file MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-4314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225589#comment-14225589 ] maji2014 edited comment on SPARK-4314 at 11/26/14 2:05 AM: ----------------------------------------------------------- Discription for fileStream in textFileStream method is "HDFS directory to monitor for new file". To reproduce this scenario, upload the file into the monitoring hdfs directory through command like "hadoop fs -put filename /user/". do you think "hadoop fs -put filename /user/" is the wrong way? the first time, the upload filename is in intermediate partly written state "filename.\_COPYING\_", and then the filename is changed when upload complete. But the intermediate partly is caught by spark. then the scenario appears. of course, you can also refer to conversation in the pull request. was (Author: maji2014): Discription for fileStream in textFileStream method is "HDFS directory to monitor for new file". To reproduce this scenario, upload the file into the monitoring hdfs directory through command like "hadoop fs -put filename /user/". the first time, the upload filename is in intermediate partly written state "filename.\_COPYING\_", and then the filename is changed when upload complete. But the intermediate partly is caught by spark. then the scenario appears. of course, you can also refer to conversation in the pull request. > Exception when textFileStream attempts to read deleted _COPYING_ file > --------------------------------------------------------------------- > > Key: SPARK-4314 > URL: https://issues.apache.org/jira/browse/SPARK-4314 > Project: Spark > Issue Type: Bug > Components: Streaming > Reporter: maji2014 > > [Reproduce] > 1. Run HdfsWordCount interface, such as "ssc.textFileStream(args(0))" > 2. Upload file to hdfs(reason as followings) > 3. Exception as followings. > [Exception stack] > 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to master/192.168.84.142:9000 from ocdc sending #13 > 14/11/10 01:21:19 ERROR JobScheduler: Error generating jobs for time 1415611274000 ms > org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://master:9000/user/spark/200.\_COPYING\_ > at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285) > at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) > at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125) > at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124) > at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40) > at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) > at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) > at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221) > at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) > at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Exception in thread "main" 14/11/10 01:21:19 DEBUG Client: IPC Client (842425021) connection to master/192.168.84.142:9000 from ocdc got value #13 > org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://master:9000/user/spark/200.COPYING > at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285) > at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) > at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125) > at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124) > at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40) > at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:291) > at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) > at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) > at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221) > at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) > at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 14/11/10 01:21:19 DEBUG ProtobufRpcEngine: Call: getListing took 3ms > [Reason] > Intermediate file 200.\_COPYING\_ is found by FileInputDStream interface, and exception throws when NewHadoopRDD ready to handle non-existent 200.\_COPYING\_ file because file 200.\_COPYING\_ is changed to file 200 when upload is finished -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org