flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: flink-storm FlinkLocalCluster issue
Date Fri, 26 Feb 2016 10:16:32 GMT
Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility
layer and not Flink itself. When you run your job locally, the
LocalFlinkMiniCluster should be started with as many slots as your maximum
degree of parallelism is in your topology. You can check this in
FlinkLocalCluster.java:96. When you submit your job to a remote cluster,
then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till
​

On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# <SZHANG026@e.ntu.edu.sg>
wrote:

> Hi everyone,
>
>
>
> I’m a student researcher working on Flink recently.
>
>
>
> I’m trying out the flink-storm example project, version 0.10.2,
> flink-storm-examples, word-count-local.
>
>
>
> But, I got the following error:
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @
> (unassigned) - [SCHEDULED] > with groupID <
> b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup
> [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72,
> cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler:
> Number of instances=1, total number of slots=1, available slots=0
>
>                 at
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
>
>                 at
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>
>                 at
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>
>                 at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>
>                 at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>
>                 at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>
>                 at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
> I notice that by default, task manager has only one slot, changing the
> setting in flink-conf does not help as I want to debug locally through
> FlinkLocalCluster (not to submit it locally).
>
>
>
> I have try the following:
>
>
>
> Import backtype.storm.Config;
>
>
>
>
>
> *Config config *= new Config();
> *config*.put(ConfigConstants.*TASK_MANAGER_NUM_TASK_SLOTS*, 1024);
> cluster.submitTopology(*topologyId*, *config*, ft);
>
>
>
>
>
> But it’s not working.
>
>
>
>
>
> Is there any way to work around?
>
>
>
> Many thanks.
>
>
>
> shuhao zhang (Tony).
>

Mime
View raw message