flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Requesting the next InputSplit failed
Date Thu, 28 Apr 2016 12:14:19 GMT
So what do you suggest to try for the next run?
I was going to increase the Job Manager heap to 3 GB and maybe change some
gc setting.
Do you think I should increase also the akka timeout or other things?

On Thu, Apr 28, 2016 at 2:06 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hmm, 113k splits is quite a lot.
> However, the IF uses the DefaultInputSplitAssigner which is very
> lightweight and should handle a large number of splits well.
>
>
>
> 2016-04-28 13:50 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> We generate 113k splits because we can't query more than 100k or records
>> per split (and we have to manage 11 billions of records). We tried to run
>> the job only once, before running it the 2nd time we would like to
>> understand which parameter to tune in order to (try to at least to) avoid
>> such an error.
>>
>> Of course I pasted the wrong TM heap size...that is indeed 3Gb (
>> taskmanager.heap.mb:512)
>>
>> Best,
>> Flavio
>>
>> On Thu, Apr 28, 2016 at 1:29 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Is the problem reproducible?
>>> Maybe the SplitAssigner gets stuck somehow, but I've never observed
>>> something like that.
>>>
>>> How many splits do you generate?
>>>
>>> I guess it is not related, but 512MB for a TM is not a lot on machines
>>> with 16GB RAM.
>>>
>>> 2016-04-28 12:12 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> When does this usually happens? Is it because the JobManager has too
>>>> few resources (of some type)?
>>>>
>>>> Our current configuration of the cluster has 4 machines (with 4 CPUs
>>>> and 16 GB of RAM) and one machine has both a JobManager and a TaskManger
>>>> (the other 3 just a TM).
>>>>
>>>> Our flink-conf.yml on every machine has the following params:
>>>>
>>>>    - jobmanager.heap.mb:512
>>>>    - taskmanager.heap.mb:512
>>>>    - taskmanager.numberOfTaskSlots:6
>>>>    - prallelism.default:24
>>>>    - env.java.home=/usr/lib/jvm/java-8-oracle/
>>>>    - taskmanager.network.numberOfBuffers:16384
>>>>
>>>> The job just read a window of max 100k elements and then writes a
>>>> Tuple5 into a CSV on the jobmanger fs with parallelism 1 (in order to
>>>> produce a single file). The job dies after 40 minutes and hundreds of
>>>> millions of records read.
>>>>
>>>> Do you see anything sospicious?
>>>>
>>>> Thanks for the support,
>>>> Flavio
>>>>
>>>> On Thu, Apr 28, 2016 at 11:54 AM, Fabian Hueske <fhueske@gmail.com>
>>>> wrote:
>>>>
>>>>> I checked the input format from your PR, but didn't see anything
>>>>> suspicious.
>>>>>
>>>>> It is definitely OK if the processing of an input split tasks more
>>>>> than 10 seconds. That should not be the cause.
>>>>> It rather looks like the DataSourceTask fails to request a new split
>>>>> from the JobManager.
>>>>>
>>>>> 2016-04-28 9:37 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>>>>
>>>>>> Digging the logs, we found this:
>>>>>>
>>>>>> WARN  Remoting - Tried to associate with unreachable remote address
>>>>>> [akka.tcp://flink@127.0.0.1:34984]. Address is now gated for 5000
>>>>>> ms, all messages to this address will be delivered to dead letters.
Reason:
>>>>>> Connessione rifiutata: /127.0.0.1:34984
>>>>>>
>>>>>> however, it is not clear why it should refuse a connection to itself
>>>>>> after 40min of run. we'll try to figure out possible environment
issues.
>>>>>> Its a fresh installation, therefore we may have left out some
>>>>>> configurations.
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-04-28 9:22 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>>>>>
>>>>>>> I had this type of exception when trying to build and test Flink
on
>>>>>>> a "small machine". I worked around the test increasing the timeout
for Akka.
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
>>>>>>>
>>>>>>> it happened only on my machine (a VirtualBox I use for development),
>>>>>>> but not on Flavio's. Is it possible that on load situations the
JobManager
>>>>>>> slows down a bit too much?
>>>>>>>
>>>>>>> saluti,
>>>>>>> Stefano
>>>>>>>
>>>>>>> 2016-04-27 17:50 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> A precursor of the modified connector (since we started a
long time
>>>>>>>> ago). However the idea is the same, I compute the inputSplits
and then I
>>>>>>>> get the data split by split (similarly to what it happens
in FLINK-3750 -
>>>>>>>> https://github.com/apache/flink/pull/1941 )
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Wed, Apr 27, 2016 at 5:38 PM, Chesnay Schepler <
>>>>>>>> chesnay@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Are you using your modified connector or the currently
available
>>>>>>>>> one?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 27.04.2016 17:35, Flavio Pompermaier wrote:
>>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>> I'm running a Flink Job on a JDBC datasource and I obtain
the
>>>>>>>>> following exception:
>>>>>>>>>
>>>>>>>>> java.lang.RuntimeException: Requesting the next InputSplit
failed.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:91)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:342)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:137)
>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures
timed
>>>>>>>>> out after [10000 milliseconds]
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>> at scala.concurrent.Await.result(package.scala)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:71)
>>>>>>>>> ... 4 more
>>>>>>>>>
>>>>>>>>> What can be the cause? Is it because the whole DataSource
reading
>>>>>>>>> has cannot take more than 10000 milliseconds?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message