flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re:
Date Mon, 13 Oct 2014 12:13:10 GMT
In the 0.6.1 release not, no.
With the upcoming 0.7-incubating release, you can set the number of task
slots per Container (-s flag) and this value will be used automatically as
the default DOP.

On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <robert.waury@googlemail.com>
wrote:

> Yes, I'm running 0.6.1
>
> Setting DOP manually worked, thanks.
>
> Computation time is now down to around a 100 seconds.
>
> Is there a way to let Flink figure out the DOP automatically within a Yarn
> application or do I always have to set it manually?
>
> Cheers,
> Robert
>
>
>
> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Hi!
>>
>> It looks like the job is running with a DOP of one.
>>
>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>> or (preferably) through the "-p" parameter on the command line.
>>
>> You are using 0.6, is that correct? (Looks like it from the logs)
>>
>> Stephan
>>
>>
>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>> robert.waury@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>> 2.3.0-cdh5.1.3 like described on the website.
>>>
>>> I could see the allocated containers in the Yarn ResourceManger and
>>> after starting a Flink job via the CLI client it showed up on the Flink
>>> Dashboard.
>>>
>>> The problem is that the job which runs in about 17 minutes in my local
>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>> rf=5).
>>>
>>> From the Flink log it seemed all data was shuffled to a single machine
>>> even for FlatMap operations.
>>>
>>> log excerpt:
>>>
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList
 - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance
2147483647)
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager
 - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8)
-> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives
input split 5
>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList
 - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance
2147483647)
>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager
 - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8)
-> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives
input split 128
>>>
>>> The job takes two large input files (~9 GB) and after filtering and
>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>> twice with a small data set (< 1MB) after that the join results are joined
>>> with each other. The result is about 2.7 GB.
>>>
>>> Any idea what causes this?
>>>
>>> Cheers,
>>> Robert
>>>
>>
>>
>

Mime
View raw message