flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Static context environment
Date Tue, 04 Nov 2014 00:37:30 GMT
+1 as well and nice catch with the exception.

On 03 Nov 2014, at 14:32, Aljoscha Krettek <aljoscha@apache.org> wrote:

> +1
> 
> Yes, this seems like a very good idea and the Environment is very
> lightweight, so this would not worsen performance.
> 
> On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen <sewen@apache.org> wrote:
>> We implement the "context dependent switching" of the execution
>> environments (cluster / local / test) with static variables in the
>> ExecutionEnvironment.
>> 
>> That means that these environments are potentially shared between multiple
>> threads that run programs (also in case where they run one after the other).
>> 
>> This may lead to exceptions, as we sometimes see in the tests, when using
>> forked test execution: The later test in the same JVM may access the same
>> environment object as the prior ones. In particular, we see that half
>> finished programs may still be associated with the execution environment,
>> such that mixes between programs occur, producing hard to understand cast
>> exceptions (see trace below)
>> 
>> This is so far only relevant to tests with forked execution, but may become
>> relevant to users that build different programs at the same time.
>> 
>> I propose to change the static members from environments to environment
>> factories. That way, we can switch type of environment depending on the
>> context as before, and we guarantee that each call to
>> "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh
>> environment.
>> 
>> 
>> Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
>> java.lang.ClassCastException:
>> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
>> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>>        at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>        at java.lang.reflect.Method.invoke(Method.java:483)
>>        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
>>        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>>        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
>>        at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
>>        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
>>        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
>> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027
>> sec <<< FAILURE! - in
>> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
>> testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest)
>> Time elapsed: 0.024 sec  <<< FAILURE!
>> java.lang.AssertionError:
>> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
>> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>>        at org.junit.Assert.fail(Assert.java:88)
>>        at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)


Mime
View raw message