beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Amit Sela (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (BEAM-1403) Weird excpetion about watermarks in Batch procesing mode
Date Mon, 06 Feb 2017 12:22:42 GMT

     [ https://issues.apache.org/jira/browse/BEAM-1403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Amit Sela reassigned BEAM-1403:
-------------------------------

    Assignee: Amit Sela  (was: Kenneth Knowles)

> Weird excpetion about watermarks in Batch procesing mode
> --------------------------------------------------------
>
>                 Key: BEAM-1403
>                 URL: https://issues.apache.org/jira/browse/BEAM-1403
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>    Affects Versions: 0.4.0
>         Environment: Beam 0.4.0 with Spark Runner on Spark 1.6.2 over YARN
>            Reporter: Rico Bergmann
>            Assignee: Amit Sela
>            Priority: Blocker
>
> When executing a complex Beam Pipeline in SprakRuntime I get the following error: Cannot
move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
> The pipeline does not make use of any streaming feature of beam (it's a simple batch
processing piepline). Also we do not expose or use any timestamps to Beam.
> Running the same pipeline wit 0.5.0-SNAPSHOT ran into the same problem but with a slightly
different stacktrace. 
> Attachment (mailing wit Amit Sela):
> OK, this is indeed a different stacktrace - the problem now is in SparkGroupAlsoByWIndow
which did not exist in 0.4.0, and I hoped would fix any issues you've encountered.
> More questions: is your data timestamped ? is your pipeline aware of the timestamp fields
(using DoFn#outputWithTimestamp or a source that defines the timestamp)?
> Looks like this is broken anyway, I don't think there's actually time-order guarantee
when processing a partition. Could you open a ticket please ? Thanks!
> On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <Rico.Bergmann@ext.gfk.com>
wrote:
>  
> Due to restricitions in my contract I can not show you the pipeline. But it’s a very
complex we are work on for several months already. Also with Beam 0.4.0
>  
> Interesting to note is, that we already successfully ran our pipeline with that version.
Now in a series of 30 executions about 20 get this exception, the others succeed…
>  
>  
> The full StackTrace (Beam-0.5.0-SNAPSHOT from 2017-02-03):
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException:
Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>         at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>         at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>         at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>         at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>         at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>         at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
> Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards
from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>         at org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>         at org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
>         at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
>         at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
>         at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>         at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>         at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
>         at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
>         at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
>         at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>         at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>  
>  
> Von: Amit Sela [mailto:amitsela33@gmail.com] 
> Gesendet: Freitag, 3. Februar 2017 13:10
> An: user@beam.apache.org
> Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards
from"
>  
> Is it the exact same stack trace ? Would you mind sharing the stack trace and the pipeline
?
> Thanks,
> Amit
>  
> On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <Rico.Bergmann@ext.gfk.com>
wrote:
> Hi!
>  
> Thanks for the insights.
>  
> As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into the same
error … 
>  
> Any further ideas or suggestions?
>  
> Best,
> Rico.
>  
> Von: Amit Sela [mailto:amitsela33@gmail.com] 
> Gesendet: Donnerstag, 2. Februar 2017 17:15
> An: user@beam.apache.org
> Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards
from"
>  
> Hi Rico, 
>  
> Batch sort of uses Watermarks by noting the "start watermark"  at the beginning of time,
and the "end watermark" at the end of time (this is the "overflow" you see), or in a more
"Beam" way, the watermark at the beginning is the start of time, and after processing all
the elements the watermark jumps to the end of time because we know there are no more elements
left to process.
>  
> Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in
the Spark runner, and release is on the way so 0.5.0 should be available within a few days
anyway.
>  
> Thanks,
> Amit 
>  
> On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <Rico.Bergmann@ext.gfk.com>
wrote:
> Hi @all!
>  
> I’m using Beam 0.4.0 and only the batch processing features of it.
> While executing the pipeline I get an exception: Cannot move input watermark time backwards
from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
> First, since I’m not using the streaming features I’m wondering about watermarks
(but this may be an Beam internal thing, I don’t know).
> Second, the timestamp stated in the exception message is really weird and looks a bit
like an overflow in a long value to me.
>  
> Does anyone have a clue what the reason for this exception could be? 
>  
> Thanks,
> Rico.
>  
>  
> Full Stacktrace (Beam-0.4.0):
> 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95))
- User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException:
Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException:
Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>         at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>         at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
>         at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>         at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
>         at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>         at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
>         at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
> Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards
from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
>         at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
>         at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
>         at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
>         at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
>         at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
>         at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>         at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>         at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
>         at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
>         at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
>         at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>         at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>  
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message