Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4606200AE8 for ; Tue, 3 May 2016 11:20:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D1D141609F4; Tue, 3 May 2016 11:20:32 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2ADB71609F3 for ; Tue, 3 May 2016 11:20:32 +0200 (CEST) Received: (qmail 25354 invoked by uid 500); 3 May 2016 09:20:31 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 25345 invoked by uid 99); 3 May 2016 09:20:31 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 09:20:31 +0000 Received: from mail-oi0-f52.google.com (mail-oi0-f52.google.com [209.85.218.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id DDFC81A003F for ; Tue, 3 May 2016 09:20:30 +0000 (UTC) Received: by mail-oi0-f52.google.com with SMTP id x19so17415987oix.2 for ; Tue, 03 May 2016 02:20:30 -0700 (PDT) X-Gm-Message-State: AOPr4FXZW57wKO/pV/LQLLeDSLtbWJkpNjm53cLz3Dg3pknDO1GiYe8f801moys92Qc90aDtIlKRIdPf2AhsGCfL X-Received: by 10.202.185.214 with SMTP id j205mr481113oif.68.1462267230202; Tue, 03 May 2016 02:20:30 -0700 (PDT) MIME-Version: 1.0 Received: by 10.157.44.239 with HTTP; Tue, 3 May 2016 02:19:50 -0700 (PDT) In-Reply-To: References: From: Ufuk Celebi Date: Tue, 3 May 2016 11:19:50 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Insufficient number of network buffers To: user@flink.apache.org Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable archived-at: Tue, 03 May 2016 09:20:33 -0000 Hey Tarandeep, I think the failures are unrelated. Regarding the number of network buffers: https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/= config.html#configuring-the-network-buffers The timeouts might occur, because the task managers are pretty loaded. I would suggest to increase the Akka ask timeouts via akka.ask.timeout: 100 s (https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.h= tml#distributed-coordination-via-akka) =E2=80=93 Ufuk On Tue, May 3, 2016 at 6:40 AM, Tarandeep Singh wrote= : > Hi, > > I have written ETL jobs in Flink (DataSet API). When I execute them in ID= E, > they run and finish fine. When I try to run them on my cluster, I get > "Insufficient number of network buffers" error. > > I have 5 machines in my cluster with 4 cores each. TaskManager is given 3= GB > each. I increased the number of buffers to 5000, but got the same error. > When I increased it further (say 7500), I get exception listed below. > > The DAG or execution plan is pretty big. What is recommended way to run y= our > jobs when the DAG becomes huge? Shall I break it into parts by calling > execute on execution environment in between jobs ? > > Thanks, > Tarandeep > > Exception I got after I tried to run with 7500 buffers: > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$= anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$= anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$= anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Futu= re.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:2= 4) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract= Dispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:26= 0) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinP= ool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav= a:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j= ava:107) > Caused by: java.lang.IllegalStateException: Update task on instance > d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL: > akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to: > at > org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.j= ava:954) > at akka.dispatch.OnFailure.internal(Future.scala:228) > at akka.dispatch.OnFailure.internal(Future.scala:227) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction= .scala:28) > at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:13= 6) > at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:13= 4) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(Execu= tionContextImpl.scala:121) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:26= 0) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav= a:1339) > ... 2 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]] > after [10000 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333= ) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future= .scala:599) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10= 9) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59= 7) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s= cala:467) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.= scala:419) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4= 23) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:745) > > > > > > > >