Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 526B018E94 for ; Fri, 26 Feb 2016 13:07:08 +0000 (UTC) Received: (qmail 13159 invoked by uid 500); 26 Feb 2016 13:07:08 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 13073 invoked by uid 500); 26 Feb 2016 13:07:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 13063 invoked by uid 99); 26 Feb 2016 13:07:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 13:07:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 84B78C0179 for ; Fri, 26 Feb 2016 13:07:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id F7j5t5kLio4f for ; Fri, 26 Feb 2016 13:07:05 +0000 (UTC) Received: from mail-ig0-f179.google.com (mail-ig0-f179.google.com [209.85.213.179]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 055E05FAD3 for ; Fri, 26 Feb 2016 13:07:05 +0000 (UTC) Received: by mail-ig0-f179.google.com with SMTP id z8so35148657ige.0 for ; Fri, 26 Feb 2016 05:07:04 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to; bh=scyZa/AyL5G+FRJMfMwI+Nz01CX/xNpMeZ1P76++nw8=; b=pMZqgPMMgAOc+6uEIVWWyWGSacNslaFIRszJ6qh1RCSDXnp39BR5b5dSPq5fYOXcbS VaivLosCvn3E2fFq9JFMhbFdGRha+MS0S8WjEnmxTSusqKlypzPkRTMJ+iguSrl+w20i Ml8H8zx4Q4ifziJqe3UQC1zB9AkGXGmfb0MJEbJgOFCgGZgLTqGlYKEv7xXqgL1s5Aot Q8IgzQzkGruheWdcJgpUn/OJkK9BAxCBhZDbwc1F+EhxZ4VPTTROVP5STVaCmCG5QcGk 99m56wNYYrXer6DN9B1gzNJRuqZM+qWMn1+VGc2+yXokhqM0fUCOm2caFMkDUPnVJLWp g9wQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:sender:in-reply-to:references:date :message-id:subject:from:to; bh=scyZa/AyL5G+FRJMfMwI+Nz01CX/xNpMeZ1P76++nw8=; b=dL70tGhk5kVGRWp1si88R3dJ5qMMl/F+DRIu0qWC3KvOXM/nETqPtJgPOXAneYPc0+ R2t1KHaAuZfUfpSoyMPmWApGJXLnlTTTycs5Bgfnwh8cYMETVE0Z6ipyQz+LvVXNiSMP AZYK1OMjiXPDA73/inhNgsRmXpV0RXOLeAvykQPq706naoiKJ6/zN7tRArzSZ8disQdk QJ2K9t8KFe8AQogMF8d+VC6RLn2697kz11sjxYtxQ5JAfiDkP1DavBv5/yhX3Hj7XQkn UVO7FaEHSy543zOt/RH5+kWiMHL4McwFbcunMPcO2C5R+vJP809vCSjzqdkGoOGoS9Vy oOCA== X-Gm-Message-State: AD7BkJIj4Axvctg80ybg5E96BDIhwh5izys8ZUscKs4IDbeGBixbC60/ikph227hdude/oupkucdP4I42+YzEQ== MIME-Version: 1.0 X-Received: by 10.50.178.148 with SMTP id cy20mr573745igc.71.1456492024404; Fri, 26 Feb 2016 05:07:04 -0800 (PST) Sender: ewenstephan@gmail.com Received: by 10.107.159.194 with HTTP; Fri, 26 Feb 2016 05:07:04 -0800 (PST) In-Reply-To: <00F0E2F9-7F4B-4A8F-A60E-1AAA7EB56C90@e.ntu.edu.sg> References: <00F0E2F9-7F4B-4A8F-A60E-1AAA7EB56C90@e.ntu.edu.sg> Date: Fri, 26 Feb 2016 14:07:04 +0100 X-Google-Sender-Auth: tV5jCYJdRIyt0NCnqojJiR34W4g Message-ID: Subject: Re: flink-storm FlinkLocalCluster issue From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e01538bcabba4bc052cabf9dc --089e01538bcabba4bc052cabf9dc Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi! On 0.10.x, the Storm compatibility layer does not properly configure the Local Flink Executor to have the right parallelism. In 1.0 that is fixed. If you try the latest snapshot, or the 1.0-Release-Candidate-1, it should work. Greetings, Stephan On Fri, Feb 26, 2016 at 12:16 PM, #ZHANG SHUHAO# wrote: > Hi till, > > Thanks for your reply. > But it appears that it only started with #slot of 1. > I have traced down to the source code of flink step by step, where I have > confirmed it. > > I'm using flink 0.10.2, source code downloaded from flink website. Nothin= g > have been changed. I simply try to run the flink-Storm word count local > example. > > It just failed to work. > > > Sent from my iPhone > > On 26 Feb 2016, at 6:16 PM, Till Rohrmann wrote: > > Hi Shuhao, > > the configuration you=E2=80=99re providing is only used for the storm > compatibility layer and not Flink itself. When you run your job locally, > the LocalFlinkMiniCluster should be started with as many slots as your > maximum degree of parallelism is in your topology. You can check this in > FlinkLocalCluster.java:96. When you submit your job to a remote cluster, > then you have to define the number of slots in the flink-conf.yaml file. > > Cheers, > Till > =E2=80=8B > > On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# > wrote: > >> Hi everyone, >> >> >> >> I=E2=80=99m a student researcher working on Flink recently. >> >> >> >> I=E2=80=99m trying out the flink-storm example project, version 0.10.2, >> flink-storm-examples, word-count-local. >> >> >> >> But, I got the following error: >> >> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExcepti= on: >> Not enough free slots available to run the job. You can decrease the >> operator parallelism or increase the number of slots per TaskManager in = the >> configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ >> (unassigned) - [SCHEDULED] > with groupID < >> b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup >> [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72, >> cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: >> Number of instances=3D1, total number of slots=3D1, available slots=3D0 >> >> at >> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Sch= eduler.java:255) >> >> at >> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediat= ely(Scheduler.java:131) >> >> at >> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(E= xecution.java:298) >> >> at >> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecu= tion(ExecutionVertex.java:458) >> >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(E= xecutionJobVertex.java:322) >> >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecut= ion(ExecutionGraph.java:686) >> >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink= $runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:9= 92) >> >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink= $runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972) >> >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink= $runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972) >> >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Fut= ure.scala:24) >> >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:= 24) >> >> at >> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstrac= tDispatcher.scala:401) >> >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoin= Pool.java:1253) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.ja= va:1346) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.= java:107) >> >> >> >> >> >> I notice that by default, task manager has only one slot, changing the >> setting in flink-conf does not help as I want to debug locally through >> FlinkLocalCluster (not to submit it locally). >> >> >> >> I have try the following: >> >> >> >> Import backtype.storm.Config; >> >> >> >> >> >> *Config config *=3D new Config(); >> *config*.put(ConfigConstants.*TASK_MANAGER_NUM_TASK_SLOTS*, 1024); >> cluster.submitTopology(*topologyId*, *config*, ft); >> >> >> >> >> >> But it=E2=80=99s not working. >> >> >> >> >> >> Is there any way to work around? >> >> >> >> Many thanks. >> >> >> >> shuhao zhang (Tony). >> > > --089e01538bcabba4bc052cabf9dc Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi!

On 0.10.x, the Storm compatibility = layer does not properly configure the Local Flink Executor to have the righ= t parallelism.

In 1.0 that is fixed. If you try th= e latest snapshot, or the 1.0-Release-Candidate-1, it should work.

Greetings,
Stephan


On Fri, Feb 26, 2016= at 12:16 PM, #ZHANG SHUHAO# <SZHANG026@e.ntu.edu.sg> w= rote:
Hi till,=C2=A0

Thanks for your reply.
But it appears that it only started with #slot of 1.
I have traced down to the source code of flink step by step, where I h= ave confirmed it.

I'm using flink 0.10.2, source code downloaded from flink website.= Nothing have been changed. I simply try to run the flink-Storm word count = local example.=C2=A0

It just failed to work.


Sent from my iPhone

On 26 Feb 2016, at 6:16 PM, Till Rohrmann <trohrmann@apache.org> wrote:

Hi Shuhao,

the configuration you=E2=80=99r= e providing is only used for the storm compatibility layer and not Flink it= self. When you run your job locally, the LocalFlinkMiniCluster should be started with as many slots as your m= aximum degree of parallelism is in your topology. You can check this in FlinkLocalCluster.java:96. When you submit your job to a remote clus= ter, then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till

=E2=80=8B

On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO#= <SZHANG026@e= .ntu.edu.sg> wrote:

Hi everyone,

=C2=A0

I=E2=80=99m a student researcher working on Flink re= cently.

=C2=A0

I=E2=80=99m trying out the flink-storm example proje= ct, version 0.10.2, flink-storm-examples, word-count-local.

=C2=A0

But, I got the following error:

org.apache.flink.runtime.jobmanager.scheduler.NoReso= urceAvailableException: Not enough free slots available to run the job. You= can decrease the operator parallelism or increase the number of slots per = TaskManager in the configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) - [SCHE= DULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in shar= ing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb= 76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: Number of instances=3D1, total num= ber of slots=3D1, available slots=3D0

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmana= ger.scheduler.Scheduler.scheduleTask(Scheduler.java:255)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmana= ger.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.executi= ongraph.Execution.scheduleForExecution(Execution.java:298)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.executi= ongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.executi= ongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)<= u>

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.executi= ongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmana= ger.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$sub= mitJob$1.apply$mcV$sp(JobManager.scala:992)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmana= ger.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$sub= mitJob$1.apply(JobManager.scala:972)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmana= ger.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$sub= mitJob$1.apply(JobManager.scala:972)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.impl.Future$Pro= miseCompletingRunnable.liftedTree1$1(Future.scala:24)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.impl.Future$Pro= miseCompletingRunnable.run(Future.scala:24)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at akka.dispatch.TaskInvocation.run= (AbstractDispatcher.scala:41)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at akka.dispatch.ForkJoinExecutorCo= nfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJo= inTask.doExec(ForkJoinTask.java:260)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJo= inPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJo= inPool$WorkQueue.runTask(ForkJoinPool.java:1346)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJo= inPool.runWorker(ForkJoinPool.java:1979)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJo= inWorkerThread.run(ForkJoinWorkerThread.java:107)

=C2=A0

=C2=A0

I notice that by default, task manager has only one = slot, changing the setting in flink-conf does not help as I want to debug l= ocally through FlinkLocalCluster (not to submit it locally).<= /p>

=C2=A0

I have try the following:

=C2=A0

Import backtype.storm.Config;
=C2=A0
=C2=A0

Config confi= g =3D new Config();
config.put(ConfigConstants.<= span style=3D"font-size:10.5pt;font-family:"Courier New";color:#9= 876aa">TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);

=C2=A0

=C2=A0

But it=E2=80=99s not working.

=C2=A0

=C2=A0

Is there any way to work around?

=C2=A0

Many thanks.

=C2=A0

shuhao zhang (Tony).



--089e01538bcabba4bc052cabf9dc--