Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6BD1E18D6D for ; Fri, 29 Apr 2016 12:57:59 +0000 (UTC) Received: (qmail 54928 invoked by uid 500); 29 Apr 2016 12:57:59 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 54836 invoked by uid 500); 29 Apr 2016 12:57:59 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 54826 invoked by uid 99); 29 Apr 2016 12:57:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 12:57:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 669A81A46DB for ; Fri, 29 Apr 2016 12:57:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.18 X-Spam-Level: *** X-Spam-Status: No, score=3.18 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, KAM_BADIPHTTP=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id aGmHZrzhEAmN for ; Fri, 29 Apr 2016 12:57:52 +0000 (UTC) Received: from mail-wm0-f53.google.com (mail-wm0-f53.google.com [74.125.82.53]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id F0FE25F39B for ; Fri, 29 Apr 2016 12:57:51 +0000 (UTC) Received: by mail-wm0-f53.google.com with SMTP id a17so35068916wme.0 for ; Fri, 29 Apr 2016 05:57:51 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=3KGMBaesW07UeRoto/x1Frk8AdEQrra+Cg/4BFI7OhU=; b=Wr0AQN/hTMqdCzNengTP3FQCDmk+r9hmhE1DwqQQv0GevsWtmrR3NnXsSfaAdKR0Y/ COEyfELrU8cjQi6pCGAS4CyQiyG2WcXmViyZgqeXSikuxqR64DMQDsxNh0CbGGO7TzIb t1wJUwg/F1cRJfsTjFbgLbM5Sns3UaCkKPAK84jn5tRTUtQyqVp7RzrG8ht8tsAzRXxA HhQ5531M7NH1cZxF9y+CU81R6p47qe+wXhaHHm75gWJeKwW1AUG03wwQLPgwiXnFDQ8f W40DYHRYu39IKlbJyTWyi+puVo8Zhn/o2cdkyxvvXwjluaVKsGID4l92Hphqwpz5N7sV vu1Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=3KGMBaesW07UeRoto/x1Frk8AdEQrra+Cg/4BFI7OhU=; b=evsi3lHxKh45VHMw0xQdBaCBfltZN8YQbtbgdm4OHHaKLvFGHuipS0RuQ/t/Je1tGH Vbrzx9nu7tN75GK0sXv2bobVvkWAgphyvLb9dhbmNMhotB93AMtLJigW8DgBUHtlDru/ 11CVeUX6pMjyXYR8wImdKjeKrI0Bc3FGz5CmeXp4NsHtpvrXO60PWe9DTTMKh52foM1N /y5vJ4b1n+6wMDm6h/H7XPKKl/05L3s8bPZwtWmzeTJawClbhHtCEaaMHjWNVfqfyMpZ S4+j8nkZ88iHrnmbEfmM59xN3KYukvN/GdH0TX6E79OTnAp/CPwdUtVnXaJ3J5ZSDvuP Z96w== X-Gm-Message-State: AOPr4FUA9p0igiuJfbHY5xRuAMnoH7bWs5i0azgtgXDMe9EiljDk5OUMG22uqQJwKA/szVLUGJW16XbSwCyraw== X-Received: by 10.28.216.83 with SMTP id p80mr3958518wmg.49.1461934671649; Fri, 29 Apr 2016 05:57:51 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.168.151 with HTTP; Fri, 29 Apr 2016 05:57:32 -0700 (PDT) In-Reply-To: References: <5720DCEB.1080101@apache.org> From: Stefano Bortoli Date: Fri, 29 Apr 2016 14:57:32 +0200 Message-ID: Subject: Re: Requesting the next InputSplit failed To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11469060c9dfe305319f30bd --001a11469060c9dfe305319f30bd Content-Type: text/plain; charset=UTF-8 We could successfully run the job without issues. Thanks a lot everyone for the support. FYI: with Flink we completed in 3h28m the job that was planned to run for 15 days 24/7 relying on our legacy customer approach. :-) saluti, Stefano 2016-04-28 14:50 GMT+02:00 Fabian Hueske : > Yes, assigning more than 0.5GB to a JM is a good idea. 3GB is maybe a bit > too much, 2GB should be enough. > Increasing the timeout should not hurt either. > > 2016-04-28 14:14 GMT+02:00 Flavio Pompermaier : > >> So what do you suggest to try for the next run? >> I was going to increase the Job Manager heap to 3 GB and maybe change >> some gc setting. >> Do you think I should increase also the akka timeout or other things? >> >> On Thu, Apr 28, 2016 at 2:06 PM, Fabian Hueske wrote: >> >>> Hmm, 113k splits is quite a lot. >>> However, the IF uses the DefaultInputSplitAssigner which is very >>> lightweight and should handle a large number of splits well. >>> >>> >>> >>> 2016-04-28 13:50 GMT+02:00 Flavio Pompermaier : >>> >>>> We generate 113k splits because we can't query more than 100k or >>>> records per split (and we have to manage 11 billions of records). We tried >>>> to run the job only once, before running it the 2nd time we would like to >>>> understand which parameter to tune in order to (try to at least to) avoid >>>> such an error. >>>> >>>> Of course I pasted the wrong TM heap size...that is indeed 3Gb ( >>>> taskmanager.heap.mb:512) >>>> >>>> Best, >>>> Flavio >>>> >>>> On Thu, Apr 28, 2016 at 1:29 PM, Fabian Hueske >>>> wrote: >>>> >>>>> Is the problem reproducible? >>>>> Maybe the SplitAssigner gets stuck somehow, but I've never observed >>>>> something like that. >>>>> >>>>> How many splits do you generate? >>>>> >>>>> I guess it is not related, but 512MB for a TM is not a lot on machines >>>>> with 16GB RAM. >>>>> >>>>> 2016-04-28 12:12 GMT+02:00 Flavio Pompermaier : >>>>> >>>>>> When does this usually happens? Is it because the JobManager has too >>>>>> few resources (of some type)? >>>>>> >>>>>> Our current configuration of the cluster has 4 machines (with 4 CPUs >>>>>> and 16 GB of RAM) and one machine has both a JobManager and a TaskManger >>>>>> (the other 3 just a TM). >>>>>> >>>>>> Our flink-conf.yml on every machine has the following params: >>>>>> >>>>>> - jobmanager.heap.mb:512 >>>>>> - taskmanager.heap.mb:512 >>>>>> - taskmanager.numberOfTaskSlots:6 >>>>>> - prallelism.default:24 >>>>>> - env.java.home=/usr/lib/jvm/java-8-oracle/ >>>>>> - taskmanager.network.numberOfBuffers:16384 >>>>>> >>>>>> The job just read a window of max 100k elements and then writes a >>>>>> Tuple5 into a CSV on the jobmanger fs with parallelism 1 (in order to >>>>>> produce a single file). The job dies after 40 minutes and hundreds of >>>>>> millions of records read. >>>>>> >>>>>> Do you see anything sospicious? >>>>>> >>>>>> Thanks for the support, >>>>>> Flavio >>>>>> >>>>>> On Thu, Apr 28, 2016 at 11:54 AM, Fabian Hueske >>>>>> wrote: >>>>>> >>>>>>> I checked the input format from your PR, but didn't see anything >>>>>>> suspicious. >>>>>>> >>>>>>> It is definitely OK if the processing of an input split tasks more >>>>>>> than 10 seconds. That should not be the cause. >>>>>>> It rather looks like the DataSourceTask fails to request a new split >>>>>>> from the JobManager. >>>>>>> >>>>>>> 2016-04-28 9:37 GMT+02:00 Stefano Bortoli : >>>>>>> >>>>>>>> Digging the logs, we found this: >>>>>>>> >>>>>>>> WARN Remoting - Tried to associate with unreachable remote address >>>>>>>> [akka.tcp://flink@127.0.0.1:34984]. Address is now gated for 5000 >>>>>>>> ms, all messages to this address will be delivered to dead letters. Reason: >>>>>>>> Connessione rifiutata: /127.0.0.1:34984 >>>>>>>> >>>>>>>> however, it is not clear why it should refuse a connection to >>>>>>>> itself after 40min of run. we'll try to figure out possible environment >>>>>>>> issues. Its a fresh installation, therefore we may have left out some >>>>>>>> configurations. >>>>>>>> >>>>>>>> saluti, >>>>>>>> Stefano >>>>>>>> >>>>>>>> 2016-04-28 9:22 GMT+02:00 Stefano Bortoli : >>>>>>>> >>>>>>>>> I had this type of exception when trying to build and test Flink >>>>>>>>> on a "small machine". I worked around the test increasing the timeout for >>>>>>>>> Akka. >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java >>>>>>>>> >>>>>>>>> it happened only on my machine (a VirtualBox I use for >>>>>>>>> development), but not on Flavio's. Is it possible that on load situations >>>>>>>>> the JobManager slows down a bit too much? >>>>>>>>> >>>>>>>>> saluti, >>>>>>>>> Stefano >>>>>>>>> >>>>>>>>> 2016-04-27 17:50 GMT+02:00 Flavio Pompermaier < >>>>>>>>> pompermaier@okkam.it>: >>>>>>>>> >>>>>>>>>> A precursor of the modified connector (since we started a long >>>>>>>>>> time ago). However the idea is the same, I compute the inputSplits and then >>>>>>>>>> I get the data split by split (similarly to what it happens in FLINK-3750 - >>>>>>>>>> https://github.com/apache/flink/pull/1941 ) >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>>> On Wed, Apr 27, 2016 at 5:38 PM, Chesnay Schepler < >>>>>>>>>> chesnay@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Are you using your modified connector or the currently available >>>>>>>>>>> one? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 27.04.2016 17:35, Flavio Pompermaier wrote: >>>>>>>>>>> >>>>>>>>>>> Hi to all, >>>>>>>>>>> I'm running a Flink Job on a JDBC datasource and I obtain the >>>>>>>>>>> following exception: >>>>>>>>>>> >>>>>>>>>>> java.lang.RuntimeException: Requesting the next InputSplit >>>>>>>>>>> failed. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:91) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:342) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:137) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed >>>>>>>>>>> out after [10000 milliseconds] >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>>>>>> at scala.concurrent.Await.result(package.scala) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:71) >>>>>>>>>>> ... 4 more >>>>>>>>>>> >>>>>>>>>>> What can be the cause? Is it because the whole DataSource >>>>>>>>>>> reading has cannot take more than 10000 milliseconds? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> > --001a11469060c9dfe305319f30bd Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
We could successfully run the job without i= ssues. Thanks a lot everyone for the support.

FYI: with Flink = we completed in 3h28m the job that was planned to run for 15 days 24/7 rely= ing on our legacy customer approach. :-)

saluti,
Stef= ano

2016= -04-28 14:50 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
Yes, assigning more than 0= .5GB to a JM is a good idea. 3GB is maybe a bit too much, 2GB should be eno= ugh.
Increasing the timeout should not hurt either.

2016-04-28 14:14 GMT+02:00 Flavio Pompermaier <pompermaier= @okkam.it>:
So what do you suggest to try for the next run?
I was going to increa= se the Job Manager heap to 3 GB and maybe change some gc setting.
Do you think I should increase also the akka timeout or other things?

On Thu= , Apr 28, 2016 at 2:06 PM, Fabian Hueske <fhueske@gmail.com>= wrote:
Hmm, 113k s= plits is quite a lot.
However, the IF uses the DefaultInputSplitA= ssigner which is very lightweight and should handle a large number of split= s well.



2016-04-28 13:50 GMT+02:00 Flavio Pompermaier <pomp= ermaier@okkam.it>:
We generate 113k splits because we can't query more than 100k = or records per split (and we have to manage 11 billions of records). We tri= ed to run the job only once, before running it the 2nd time we would like t= o understand which parameter to tune in order to (try to at least to) avoid= such an error.

Of course I pasted the wrong TM heap siz= e...that is indeed 3Gb (taskmanager.heap.m= b:512)

<= div>Best,
Flavio
On Thu, Apr 28, 2016 at 1:29 PM, Fabian Hueske = <fhueske@gmail.com> wrote:
Is the problem reproducible?
Maybe the SplitAssigner gets = stuck somehow, but I've never observed something like that.

How many splits do you generate?

I guess it is not= related, but 512MB for a TM is not a lot on machines with 16GB RAM.

2016-04-28 12:12 GMT+02:00 Flavio Pompermaier <= ;pompermaier@okka= m.it>:
When does this us= ually happens? Is it because the JobManager has too few resources (of some = type)?

Our current configuration of the cluster has 4 machines= (with 4 CPUs and 16 GB of RAM) and one machine has both a JobManager and a= TaskManger (the other 3 just a TM).

Our flink-con= f.yml on every machine has the following params:
  • jobmanag= er.heap.mb:512
  • taskmanager.heap.mb:512
  • taskmanager.numberOf= TaskSlots:6
  • prallelism.default:24
  • env.java.home=3D/usr/= lib/jvm/java-8-oracle/
  • taskmanager.network.numberOfBuffers:16384
The job just read a window of max 100k elements and then writes= a Tuple5 into a CSV on the jobmanger fs with parallelism 1 (in order to pr= oduce a single file). The job dies after 40 minutes and hundreds of million= s of records read.

Do you see anything sospi= cious?

Thanks for the support,
Flavio

On T= hu, Apr 28, 2016 at 11:54 AM, Fabian Hueske <fhueske@gmail.com> wrote:
I checked the i= nput format from your PR, but didn't see anything suspicious.

<= /div>It is definitely OK if the processing of an input split tasks more tha= n 10 seconds. That should not be the cause.
It rather looks like t= he DataSourceTask fails to request a new split from the JobManager.

2016-= 04-28 9:37 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>= :
Digging the logs, we = found this:

WARN=C2=A0 Remoting - Tried to associate with unreachabl= e remote address [akka.tcp://flink@127.0.0.1:34984]. Address is now gated for 5000 ms, all messages to this address will be=20 delivered to dead letters. Reason: Connessione rifiutata: /127.0.0.1:34984

howev= er, it is not clear why it should refuse a connection to itself after 40min= of run. we'll try to figure out possible environment issues. Its a fre= sh installation, therefore we may have left out some configurations.
saluti,
Stefano

2016-04-28 9:22 GMT+02:00 Stefano Bortoli= <s.bortoli@gmail.com>:
I had this type of exception when trying to build and tes= t Flink on a "small machine". I worked around the test increasing= the timeout for Akka.

http= s://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/ja= va/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCas= e.java

it happened only on my machine (a VirtualBox I use = for development), but not on Flavio's. Is it possible that on load situ= ations the JobManager slows down a bit too much?

saluti,
Stefano

2016-04-27 17:50 GMT+02:00 Flavio Pompermaier <pompermaie= r@okkam.it>:
A precursor= of the modified connector (since we started a long time ago). However the = idea is the same, I compute the inputSplits and then I get the data split b= y split (similarly to what it happens in FLINK-3750 -https://github.com/apache= /flink/pull/1941 )

Best,
Flavio
=

On Wed, Apr = 27, 2016 at 5:38 PM, Chesnay Schepler <chesnay@apache.org> = wrote:
=20 =20 =20
Are you using your modified connector or the currently available one?


On 27.04.2016 17:35, Flavio Pompermaier wrote:
Hi to all,
I'm running a Flink Job on a JDBC datasource and I obtain the following exception:

java.lang.RuntimeException: Requesting the next InputSplit failed.
at org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSpl= it(TaskInputSplitProvider.java:91)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.= java:342)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.jav= a:137)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scal= a:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.sca= la:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:10= 7)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.sca= la:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSpl= it(TaskInputSplitProvider.java:71)
... 4 more

What can be the cause? Is it because the whole DataSource reading has cannot take more than 10000 milliseconds?

Best,
Flavio







<= /p>




=



<= /p>



--001a11469060c9dfe305319f30bd--