flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re:
Date Mon, 13 Oct 2014 12:23:10 GMT
Are you referring to https://issues.apache.org/jira/browse/FLINK-968? So as
I said, users can pass the the "-s" parameter to set the number of slots
per container and the number is being used by the CliFrontned.


I just found out that only the YARN cluster setup page mentions slots at
all.
So how slots are being used is basically not documented, however a very
important concept to properly configure and run Flink. ( -->
https://issues.apache.org/jira/browse/FLINK-1157)

On Mon, Oct 13, 2014 at 2:13 PM, Stephan Ewen <sewen@apache.org> wrote:

> There is a ticket open for that, to configure the default DOP based on the
> number of containers and slots. It is not implemented, yet, though.
>
>
>
> On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <robert.waury@googlemail.com
> > wrote:
>
>> Yes, I'm running 0.6.1
>>
>> Setting DOP manually worked, thanks.
>>
>> Computation time is now down to around a 100 seconds.
>>
>> Is there a way to let Flink figure out the DOP automatically within a
>> Yarn application or do I always have to set it manually?
>>
>> Cheers,
>> Robert
>>
>>
>>
>> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi!
>>>
>>> It looks like the job is running with a DOP of one.
>>>
>>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>>> or (preferably) through the "-p" parameter on the command line.
>>>
>>> You are using 0.6, is that correct? (Looks like it from the logs)
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>>> robert.waury@googlemail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>>> 2.3.0-cdh5.1.3 like described on the website.
>>>>
>>>> I could see the allocated containers in the Yarn ResourceManger and
>>>> after starting a Flink job via the CLI client it showed up on the Flink
>>>> Dashboard.
>>>>
>>>> The problem is that the job which runs in about 17 minutes in my local
>>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes
on
>>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>>> rf=5).
>>>>
>>>> From the Flink log it seemed all data was shuffled to a single machine
>>>> even for FlatMap operations.
>>>>
>>>> log excerpt:
>>>>
>>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList
 - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance
2147483647)
>>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager
 - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8)
-> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives
input split 5
>>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList
 - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance
2147483647)
>>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager
 - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8)
-> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives
input split 128
>>>>
>>>> The job takes two large input files (~9 GB) and after filtering and
>>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>>> twice with a small data set (< 1MB) after that the join results are joined
>>>> with each other. The result is about 2.7 GB.
>>>>
>>>> Any idea what causes this?
>>>>
>>>> Cheers,
>>>> Robert
>>>>
>>>
>>>
>>
>

Mime
View raw message