flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Static context environment
Date Mon, 03 Nov 2014 22:32:49 GMT
+1

Yes, this seems like a very good idea and the Environment is very
lightweight, so this would not worsen performance.

On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen <sewen@apache.org> wrote:
> We implement the "context dependent switching" of the execution
> environments (cluster / local / test) with static variables in the
> ExecutionEnvironment.
>
> That means that these environments are potentially shared between multiple
> threads that run programs (also in case where they run one after the other).
>
> This may lead to exceptions, as we sometimes see in the tests, when using
> forked test execution: The later test in the same JVM may access the same
> environment object as the prior ones. In particular, we see that half
> finished programs may still be associated with the execution environment,
> such that mixes between programs occur, producing hard to understand cast
> exceptions (see trace below)
>
> This is so far only relevant to tests with forked execution, but may become
> relevant to users that build different programs at the same time.
>
> I propose to change the static members from environments to environment
> factories. That way, we can switch type of environment depending on the
> context as before, and we guarantee that each call to
> "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh
> environment.
>
>
> Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
> java.lang.ClassCastException:
> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>         at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>         at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>         at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
>         at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>         at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
>         at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
>         at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
>         at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027
> sec <<< FAILURE! - in
> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
> testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest)
>  Time elapsed: 0.024 sec  <<< FAILURE!
> java.lang.AssertionError:
> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>         at org.junit.Assert.fail(Assert.java:88)
>         at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)

Mime
View raw message