Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CC758189BD for ; Wed, 14 Oct 2015 16:35:27 +0000 (UTC) Received: (qmail 58581 invoked by uid 500); 14 Oct 2015 16:35:27 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 58507 invoked by uid 500); 14 Oct 2015 16:35:27 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 58497 invoked by uid 99); 14 Oct 2015 16:35:27 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Oct 2015 16:35:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2F7BA1A2302 for ; Wed, 14 Oct 2015 16:35:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.988 X-Spam-Level: ** X-Spam-Status: No, score=2.988 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.008, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id oNtmBIXz6wx0 for ; Wed, 14 Oct 2015 16:35:21 +0000 (UTC) Received: from mail-qk0-f169.google.com (mail-qk0-f169.google.com [209.85.220.169]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 14D7D204DA for ; Wed, 14 Oct 2015 16:35:21 +0000 (UTC) Received: by qkht68 with SMTP id t68so24966358qkh.3 for ; Wed, 14 Oct 2015 09:35:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=HUcnc9AAvlQVYmQsKzHTX5dbhwpkhQ2oPUtE4HEsORc=; b=voJkxQqOcIUooyaHLADaetgWkf63Y126lChNIV8Dhuyf/ioLnVAfMBgTmvHNCqw5yY tGd0klP5BnZo3YiFDsBRwMw1TXM6GcYKNYkYNnEzzMiWwquibAzHtuukDADVNVy/eA5l eVVB4ZLw/cAdiEldnGowgpK7A33jfJglKo31ff+qQChddSjiZ3SygDfZ2cNiXXISTb8Z MbQ2XTHoQKg+ZobAPzuG8wkmq+NNrfO0zuw4/6rWCWhpHqdzu1KLwk077wcxyhxkYKoN OYAZB/zppOQx/+kaaeJDke6FiqUE56/h/BxnT4hWWyLHdhXWW7Z5XNcC56fJ9eR6/HL5 3/Gw== MIME-Version: 1.0 X-Received: by 10.55.198.212 with SMTP id s81mr5220473qkl.108.1444840519939; Wed, 14 Oct 2015 09:35:19 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.55.5.21 with HTTP; Wed, 14 Oct 2015 09:35:19 -0700 (PDT) In-Reply-To: References: Date: Wed, 14 Oct 2015 18:35:19 +0200 X-Google-Sender-Auth: 5BK2a4m_jLDCu4kD3ZDdwr2iPcQ Message-ID: Subject: Re: ExecutionEnvironment setConfiguration API From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11455e92f2bb5d0522132552 --001a11455e92f2bb5d0522132552 Content-Type: text/plain; charset=UTF-8 Hi Flavio! ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() by default picks up the number of cores as the parallelism, while the manual environments do not do that. You can still set it manually set the parallelism "env.setParallelism(Runtime.getRuntime().availableProcessors());" I would not configure the slots for the local execution, they should be automatically configured based on the max parallelism. Greetings, Stephan On Wed, Oct 14, 2015 at 3:36 PM, Flavio Pompermaier wrote: > Hi Fabian and Stephan, back to work :) > > I finally managed to find the problem of the parallelism encountered by my > colleague! > Basically that was introduced by this API change. Before I was using > env.setConfiguration() to merge the default params with some custom ones. > Now, after the API change I was using the following code: > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > if (env instanceof LocalEnvironment) { > Configuration c = new Configuration(); > c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, FLINK_TEST_TMP_DIR); > c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,FLINK_TEST_TMP_DIR); > c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 2); > c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); > env = ExecutionEnvironment.createLocalEnvironment(c); > } > > However, the first env and the reassigned one doesn't behave in the same > manner. > If I don't reassign env I have parallelism=8, otherwise it's 1 :( > Am I using the wrong APIs or the execution environment doesn't allow now > to configure such parameters anymore? > > Thanks in advance, > Flavio > > > On Tue, Oct 6, 2015 at 11:31 AM, Flavio Pompermaier > wrote: > >> That makes sense: what can be configured should be differentiated between >> local and remote envs (obviously this is a minor issue/improvement) >> >> Thanks again, >> Flavio >> >> On Tue, Oct 6, 2015 at 11:25 AM, Stephan Ewen wrote: >> >>> We can think about that, but I think it may be quite confusing. The >>> configurations actually mean something different for local and remote >>> environments: >>> >>> - For the local environment, the config basically describes the entire >>> Flink cluster setup (for the local execution cluster in the background) >>> - For the remote environment, the config describes the parameters for >>> the client that connects to the cluster (akka paramters, optimizer >>> parameters, ...), but not parameters of the cluster itself (like >>> taskmanager slots and memory). >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Oct 6, 2015 at 10:56 AM, Flavio Pompermaier < >>> pompermaier@okkam.it> wrote: >>> >>>> However it could be a good idea to overload also >>>> the getExecutionEnvironment() to be able to pass a custom >>>> configuration..what do you think? >>>> Otherwise I have to know a priori if I'm working in a local deployment >>>> or in a remote one, or check if getExecutionEnvironment() returned an >>>> instance of LocalEnvironment/RemoteEnvironment.. >>>> >>>> >>>> >>>> On Tue, Oct 6, 2015 at 10:53 AM, Flavio Pompermaier < >>>> pompermaier@okkam.it> wrote: >>>> >>>>> Yes Stephan! >>>>> I usually work with the master version, at least in development ;) >>>>> Thanks for the quick support! >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> On Tue, Oct 6, 2015 at 10:48 AM, Stephan Ewen >>>>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> Are you on the SNAPSHOT master version? >>>>>> >>>>>> You can pass the configuration to the constructor of the execution >>>>>> environment, or create one via >>>>>> ExecutionEnvironment.createLocalEnvironment(config) or via >>>>>> createRemoteEnvironment(host, port, configuration, jarFiles); >>>>>> >>>>>> The change of the signature was part of an API cleanup for the next >>>>>> release. Sorry for the inconvenience... >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Tue, Oct 6, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>> pompermaier@okkam.it> wrote: >>>>>> >>>>>>> Hi to all, >>>>>>> >>>>>>> today my code doesn't compile anymore because ExecutionEnvironment >>>>>>> doesn't have setConfiguration() anymore..how can I set the following >>>>>>> parameters in my unit tests? >>>>>>> >>>>>>> - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY >>>>>>> - ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY >>>>>>> - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > --001a11455e92f2bb5d0522132552 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Flavio!

ExecutionEnvironment env =3D ExecutionEnvironment.getExecutionEnvironment() by default picks up the num= ber of cores as the parallelism, while the manual environments do not do th= at.
You can still set= it manually set the parallelism "env.setParallelism(Runtime.getRuntim= e().availableProcessors());"

I would n= ot configure the slots for the local execution, they should be automaticall= y configured based on the max parallelism.

Greetin= gs,
Stephan

=
On Wed, Oct 14, 2015 at 3:36 PM, Flavio Pomp= ermaier <pompermaier@okkam.it> wrote:
Hi Fabian and Stephan, back to work :)
=
I finally managed to find the problem of the parallelism encounter= ed by my colleague!
Basically that was introduced by this A= PI change. Before I was using env.setConfiguration() to merge the default p= arams with some custom ones.
Now, after the API change I was usin= g the following code:

ExecutionEnvironment en= v =3D ExecutionEnvironment.getExecutionEnvironment();
if (env ins= tanceof LocalEnvironment) {
= Configuration c =3D new Configuration();
c.setString(ConfigConstants.TASK_MANAGER_TMP_D= IR_KEY, FLINK_TEST_TMP_DIR);
c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,FLINK_TEST_= TMP_DIR);
c.setLong(= ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 2);
= c.setLong(ConfigConstants.TASK= _MANAGER_NUM_TASK_SLOTS, 4);
env =3D ExecutionEnvironment.createLocalEnvironment(c);
= }

However, the first env and the reassigned one doesn't behave in the s= ame manner.
If I don't reassign env I h= ave parallelism=3D8, otherwise it's 1 :(
Am I using the wrong APIs or the execution environment doesn't allow = now to configure such parameters anymore?
<= br>
Thanks in advance,
Flavio
<= br>

On Tue, = Oct 6, 2015 at 11:31 AM, Flavio Pompermaier <pompermaier@okkam.it= > wrote:
That makes sense: w= hat can be configured should be differentiated between local and remote env= s (obviously this is a minor issue/improvement)

Thanks a= gain,
Flavio

On Tue, Oct 6, 2015 at 11:25 AM, Stephan Ewen <sewen@a= pache.org> wrote:
We ca= n think about that, but I think it may be quite confusing. The configuratio= ns actually mean something different for local and remote environments:
=C2=A0 - For the local environment, the config basically de= scribes the entire Flink cluster setup (for the local execution cluster in = the background)
=C2=A0 - For the remote environment, the config d= escribes the parameters for the client that connects to the cluster (akka p= aramters, optimizer parameters, ...), but not parameters of the cluster its= elf (like taskmanager slots and memory).

Greetings= ,
Stephan


On Tue, Oct 6, 2015 at 10:56 AM, Fla= vio Pompermaier <pompermaier@okkam.it> wrote:
However it could be a good idea to overload also = the=C2=A0getExecutionEnvironment() to be able to pass a custom configuratio= n..what do you think?
Otherwise I have to know a priori if I'm work= ing in a local deployment or in a remote one, or check if=C2=A0getExecution= Environment() returned an instance of LocalEnvironment/RemoteEnvironment..<= /div>


On Tue, Oct 6, 2015 at 10:53 AM, Flavio Pomperm= aier <pompermaier@okkam.it> wrote:
=
Yes Stephan!
I usually work with the master version, a= t least in development ;)
Thanks for the quick support!

<= /div>
Best,
Flavio
=
On Tue, Oct 6, 2015 at 10:48 AM, Stephan Ewe= n <sewen@apache.org> wrote:
Hi!

Are you on the SNAPSHOT master version?

You can pass the configuration to the constructor of the e= xecution environment, or create one via ExecutionEnvironment.createLocalEnv= ironment(config) or via createRemoteEnvironment(host, port, configuration, = jarFiles);

The change of the signature was part of= an API cleanup for the next release. Sorry for the inconvenience...
<= span>

Stephan


On Tue, Oct 6, 2015 at 10:36 AM, Flavio Pompermaier <p= ompermaier@okkam.it> wrote:
Hi to all,

today my code doesn't compile anymore because=C2=A0ExecutionEnviro= nment doesn't have setConfiguration() anymore..how can I set the follow= ing parameters in my unit tests?

- ConfigConstants= .TASK_MANAGER_TMP_DIR_KEY
- ConfigConstants.BLOB_STORAGE_DIRE= CTORY_KEY
-=C2=A0ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS= _KEY

Best,
Flavio


<= /p>




<= /p>



--001a11455e92f2bb5d0522132552--