Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 958D817738 for ; Mon, 3 Nov 2014 22:33:13 +0000 (UTC) Received: (qmail 41983 invoked by uid 500); 3 Nov 2014 22:33:13 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 41925 invoked by uid 500); 3 Nov 2014 22:33:13 -0000 Mailing-List: contact dev-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list dev@flink.incubator.apache.org Received: (qmail 41914 invoked by uid 99); 3 Nov 2014 22:33:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2014 22:33:13 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 03 Nov 2014 22:33:12 +0000 Received: (qmail 41190 invoked by uid 99); 3 Nov 2014 22:32:52 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2014 22:32:52 +0000 Received: from mail-wi0-f177.google.com (mail-wi0-f177.google.com [209.85.212.177]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id C54DA1A02B8 for ; Mon, 3 Nov 2014 22:32:08 +0000 (UTC) Received: by mail-wi0-f177.google.com with SMTP id ex7so7750544wid.16 for ; Mon, 03 Nov 2014 14:32:49 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.194.61.208 with SMTP id s16mr6266833wjr.104.1415053969713; Mon, 03 Nov 2014 14:32:49 -0800 (PST) Received: by 10.27.175.34 with HTTP; Mon, 3 Nov 2014 14:32:49 -0800 (PST) In-Reply-To: References: Date: Mon, 3 Nov 2014 23:32:49 +0100 Message-ID: Subject: Re: Static context environment From: Aljoscha Krettek To: dev@flink.incubator.apache.org Content-Type: text/plain; charset=UTF-8 X-Virus-Checked: Checked by ClamAV on apache.org +1 Yes, this seems like a very good idea and the Environment is very lightweight, so this would not worsen performance. On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen wrote: > We implement the "context dependent switching" of the execution > environments (cluster / local / test) with static variables in the > ExecutionEnvironment. > > That means that these environments are potentially shared between multiple > threads that run programs (also in case where they run one after the other). > > This may lead to exceptions, as we sometimes see in the tests, when using > forked test execution: The later test in the same JVM may access the same > environment object as the prior ones. In particular, we see that half > finished programs may still be associated with the execution environment, > such that mixes between programs occur, producing hard to understand cast > exceptions (see trace below) > > This is so far only relevant to tests with forked execution, but may become > relevant to users that build different programs at the same time. > > I propose to change the static members from environments to environment > factories. That way, we can switch type of environment depending on the > context as before, and we guarantee that each call to > "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh > environment. > > > Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest > java.lang.ClassCastException: > org.apache.flink.api.common.operators.base.DeltaIterationBase cannot > be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase > at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264) > at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124) > at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200) > at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153) > at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027 > sec <<< FAILURE! - in > org.apache.flink.api.scala.operators.translation.DistinctTranslationTest > testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest) > Time elapsed: 0.024 sec <<< FAILURE! > java.lang.AssertionError: > org.apache.flink.api.common.operators.base.DeltaIterationBase cannot > be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase > at org.junit.Assert.fail(Assert.java:88) > at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)