beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arvid Heise (JIRA)" <j...@apache.org>
Subject [jira] [Created] (BEAM-2095) SourceRDD hasNext not idempotent
Date Thu, 27 Apr 2017 12:32:04 GMT
Arvid Heise created BEAM-2095:
---------------------------------

             Summary: SourceRDD hasNext not idempotent
                 Key: BEAM-2095
                 URL: https://issues.apache.org/jira/browse/BEAM-2095
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 0.6.0
            Reporter: Arvid Heise
            Assignee: Amit Sela


When reading an Avro from HDFS with the new HDFSFileSource, we experience the following exceptions:

{code}
17/04/27 11:48:38 ERROR executor.Executor: Exception in task 2.0 in stage 1.0 (TID 32)
java.util.NoSuchElementException
	at com.gfk.hyperlane.engine.target_group_evaluation.dataset.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:498)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
	at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:105)
	at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:48)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

The error comes from a call to BoundedReader#getCurrent after it has been closed.

We logged the following call patterns:
(for data)
  advance
  getCurrent
(when drained)
advance
  close
getCurrent

The issue probably comes from the implementation in SourceRDD 
https://github.com/apache/beam/blob/3101e69c438d5c42577fc7d3476d623f6e551837/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L145

A repeated call to hasNext will result in repeated calls of advance. This results in a data
loss and may return different results. In particular, it may cause the issue as observed.

The usual solution is to use hasNext() to already retrieve and cache the next element if cache
empty and return and reset the cache in next().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message