flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: Requesting the next InputSplit failed
Date Fri, 29 Apr 2016 12:57:32 GMT
We could successfully run the job without issues. Thanks a lot everyone for
the support.

FYI: with Flink we completed in 3h28m the job that was planned to run for
15 days 24/7 relying on our legacy customer approach. :-)

saluti,
Stefano

2016-04-28 14:50 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Yes, assigning more than 0.5GB to a JM is a good idea. 3GB is maybe a bit
> too much, 2GB should be enough.
> Increasing the timeout should not hurt either.
>
> 2016-04-28 14:14 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> So what do you suggest to try for the next run?
>> I was going to increase the Job Manager heap to 3 GB and maybe change
>> some gc setting.
>> Do you think I should increase also the akka timeout or other things?
>>
>> On Thu, Apr 28, 2016 at 2:06 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hmm, 113k splits is quite a lot.
>>> However, the IF uses the DefaultInputSplitAssigner which is very
>>> lightweight and should handle a large number of splits well.
>>>
>>>
>>>
>>> 2016-04-28 13:50 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> We generate 113k splits because we can't query more than 100k or
>>>> records per split (and we have to manage 11 billions of records). We tried
>>>> to run the job only once, before running it the 2nd time we would like to
>>>> understand which parameter to tune in order to (try to at least to) avoid
>>>> such an error.
>>>>
>>>> Of course I pasted the wrong TM heap size...that is indeed 3Gb (
>>>> taskmanager.heap.mb:512)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Thu, Apr 28, 2016 at 1:29 PM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> Is the problem reproducible?
>>>>> Maybe the SplitAssigner gets stuck somehow, but I've never observed
>>>>> something like that.
>>>>>
>>>>> How many splits do you generate?
>>>>>
>>>>> I guess it is not related, but 512MB for a TM is not a lot on machines
>>>>> with 16GB RAM.
>>>>>
>>>>> 2016-04-28 12:12 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>
>>>>>> When does this usually happens? Is it because the JobManager has
too
>>>>>> few resources (of some type)?
>>>>>>
>>>>>> Our current configuration of the cluster has 4 machines (with 4 CPUs
>>>>>> and 16 GB of RAM) and one machine has both a JobManager and a TaskManger
>>>>>> (the other 3 just a TM).
>>>>>>
>>>>>> Our flink-conf.yml on every machine has the following params:
>>>>>>
>>>>>>    - jobmanager.heap.mb:512
>>>>>>    - taskmanager.heap.mb:512
>>>>>>    - taskmanager.numberOfTaskSlots:6
>>>>>>    - prallelism.default:24
>>>>>>    - env.java.home=/usr/lib/jvm/java-8-oracle/
>>>>>>    - taskmanager.network.numberOfBuffers:16384
>>>>>>
>>>>>> The job just read a window of max 100k elements and then writes a
>>>>>> Tuple5 into a CSV on the jobmanger fs with parallelism 1 (in order
to
>>>>>> produce a single file). The job dies after 40 minutes and hundreds
of
>>>>>> millions of records read.
>>>>>>
>>>>>> Do you see anything sospicious?
>>>>>>
>>>>>> Thanks for the support,
>>>>>> Flavio
>>>>>>
>>>>>> On Thu, Apr 28, 2016 at 11:54 AM, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I checked the input format from your PR, but didn't see anything
>>>>>>> suspicious.
>>>>>>>
>>>>>>> It is definitely OK if the processing of an input split tasks
more
>>>>>>> than 10 seconds. That should not be the cause.
>>>>>>> It rather looks like the DataSourceTask fails to request a new
split
>>>>>>> from the JobManager.
>>>>>>>
>>>>>>> 2016-04-28 9:37 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>>>>>>
>>>>>>>> Digging the logs, we found this:
>>>>>>>>
>>>>>>>> WARN  Remoting - Tried to associate with unreachable remote
address
>>>>>>>> [akka.tcp://flink@127.0.0.1:34984]. Address is now gated
for 5000
>>>>>>>> ms, all messages to this address will be delivered to dead
letters. Reason:
>>>>>>>> Connessione rifiutata: /127.0.0.1:34984
>>>>>>>>
>>>>>>>> however, it is not clear why it should refuse a connection
to
>>>>>>>> itself after 40min of run. we'll try to figure out possible
environment
>>>>>>>> issues. Its a fresh installation, therefore we may have left
out some
>>>>>>>> configurations.
>>>>>>>>
>>>>>>>> saluti,
>>>>>>>> Stefano
>>>>>>>>
>>>>>>>> 2016-04-28 9:22 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>>>>>>>
>>>>>>>>> I had this type of exception when trying to build and
test Flink
>>>>>>>>> on a "small machine". I worked around the test increasing
the timeout for
>>>>>>>>> Akka.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
>>>>>>>>>
>>>>>>>>> it happened only on my machine (a VirtualBox I use for
>>>>>>>>> development), but not on Flavio's. Is it possible that
on load situations
>>>>>>>>> the JobManager slows down a bit too much?
>>>>>>>>>
>>>>>>>>> saluti,
>>>>>>>>> Stefano
>>>>>>>>>
>>>>>>>>> 2016-04-27 17:50 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> A precursor of the modified connector (since we started
a long
>>>>>>>>>> time ago). However the idea is the same, I compute
the inputSplits and then
>>>>>>>>>> I get the data split by split (similarly to what
it happens in FLINK-3750 -
>>>>>>>>>> https://github.com/apache/flink/pull/1941 )
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 27, 2016 at 5:38 PM, Chesnay Schepler
<
>>>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you using your modified connector or the
currently available
>>>>>>>>>>> one?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 27.04.2016 17:35, Flavio Pompermaier wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>> I'm running a Flink Job on a JDBC datasource
and I obtain the
>>>>>>>>>>> following exception:
>>>>>>>>>>>
>>>>>>>>>>> java.lang.RuntimeException: Requesting the next
InputSplit
>>>>>>>>>>> failed.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:91)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:342)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:137)
>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException:
Futures timed
>>>>>>>>>>> out after [10000 milliseconds]
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>>>> at scala.concurrent.Await.result(package.scala)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:71)
>>>>>>>>>>> ... 4 more
>>>>>>>>>>>
>>>>>>>>>>> What can be the cause? Is it because the whole
DataSource
>>>>>>>>>>> reading has cannot take more than 10000 milliseconds?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message