beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stas Levin (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-891) Flake in Spark metrics library?
Date Wed, 09 Nov 2016 10:10:58 GMT

    [ https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650533#comment-15650533
] 

Stas Levin commented on BEAM-891:
---------------------------------

We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and {{MetricsSystem#registerSource}}
are not synced (e.g., from Spark itself), this kind of synchronisation will only help if the
so called "race" lies in our Beam code. If the race is with Spark's internal calls, us syncing
the above block will not be very helpful.

Another option would be to replace the above block with this one (which requires some minor
changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).nonEmpty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to {{MetricsSystem#removeSource}} altogether.

> Flake in Spark metrics library?
> -------------------------------
>
>                 Key: BEAM-891
>                 URL: https://issues.apache.org/jira/browse/BEAM-891
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Daniel Halperin
>            Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to take a look?
CC [~amitsela]
> Run: https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
> 	at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
> 	at org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
> 	at org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
> 	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> 	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
> 	at org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> 	at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
> 	at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
> 	at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
> 	at scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
> 	at scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
> 	at scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
> 	at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
> 	at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
> 	at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
> 	at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
> 	at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
> 	at scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
> 	at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
> 	at org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
> 	at org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
> 	at org.apache.beam.runners.spark.translation.SparkRuntimeContext.<init>(SparkRuntimeContext.java:66)
> 	at org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:73)
> 	at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message