flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
Subject RE: Left join with unbalanced dataset
Date Tue, 02 Feb 2016 15:56:49 GMT

Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success
of the batch.

I've figured out which dataset is consuming the most memory, I have a big join that demultiplies
the size of the input set before a group reduce.
I am willing to optimize my code by reducing the join output size upon junction.

The outline of the treatment is :
DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 5)

I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.

Flink does not start the reduce operation until all lines have been created (memory bottleneck
is during the collection of all lines) ; but theorically it is possible.
I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()"

Is there a way to do this ?

The other way I see is to load B in memory for all nodes and use a hash map upon reduction
to get all A.join(B) lines. B is not that small, but I think it will still save RAM.

Best regards,

-----Message d'origine-----
De : Ufuk Celebi [mailto:uce@apache.org]
Envoyé : mardi 2 février 2016 15:27
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

> On 02 Feb 2016, at 15:15, LINZ, Arnaud <ALINZ@bouyguestelecom.fr> wrote:
> Hi,
> Running again with more RAM made the treatement go further, but Yarn still killed one
container for memory consumption. I will experiment various memory parameters.

OK, the killing of the container probably triggered the RemoteTransportException.

Can you tell me how many containers you are using, how much phyiscal memory the machines have
and how much the containers get?

You can monitor memory usage by setting

taskmanager.debug.memory.startLogThread: true

in the config. This will periodically log the memory consumption to the task manager logs.
Can you try this and check the logs for the memory consumption?

You can also have a look at it in the web frontend under the Task Manager tab.

– Ufuk


L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice
ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation
ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message,
merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent
this message cannot therefore be held liable for its content nor attachments. Any unauthorized
use or dissemination is prohibited. If you are not the intended recipient of this message,
then please delete it and notify the sender.
View raw message