beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stas Levin (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-891) Flake in Spark metrics library?
Date Wed, 09 Nov 2016 12:29:58 GMT

    [ https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650533#comment-15650533
] 

Stas Levin edited comment on BEAM-891 at 11/9/16 12:28 PM:
-----------------------------------------------------------

We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and {{MetricsSystem#registerSource}}
are not synced (e.g., from Spark itself), this kind of synchronisation will only help if the
so called "race" lies in our Beam code. If the race is with Spark's internal calls, us syncing
the above block will not be very helpful.

Another option would be to replace the above block with this one (which requires some minor
changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).empty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to {{MetricsSystem#removeSource}} altogether.


was (Author: staslev):
We can indeed try syncing the following block in {{SparkRuntimeContext}}:

{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}

But since other calls to {{MetricsSystem#removeSource}} and {{MetricsSystem#registerSource}}
are not synced (e.g., from Spark itself), this kind of synchronisation will only help if the
so called "race" lies in our Beam code. If the race is with Spark's internal calls, us syncing
the above block will not be very helpful.

Another option would be to replace the above block with this one (which requires some minor
changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).nonEmpty()) {
  metricsSystem.registerSource(aggregatorMetricSource);
}
{code}

Which eliminates {{SparkRuntimeContext}}'s call to {{MetricsSystem#removeSource}} altogether.

> Flake in Spark metrics library?
> -------------------------------
>
>                 Key: BEAM-891
>                 URL: https://issues.apache.org/jira/browse/BEAM-891
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Daniel Halperin
>            Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to take a look?
CC [~amitsela]
> Run: https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
> 	at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
> 	at org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
> 	at org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
> 	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> 	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
> 	at org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> 	at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
> 	at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
> 	at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
> 	at scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
> 	at scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
> 	at scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
> 	at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
> 	at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
> 	at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
> 	at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
> 	at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
> 	at scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
> 	at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
> 	at org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
> 	at org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
> 	at org.apache.beam.runners.spark.translation.SparkRuntimeContext.<init>(SparkRuntimeContext.java:66)
> 	at org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:73)
> 	at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message