flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
Subject RE: Left join with unbalanced dataset
Date Tue, 02 Feb 2016 12:28:55 GMT
Thanks,
I’m using the official 0.10 release. I will try to use the 0.10 snapshot.

FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

12:20:17,313 INFO  org.apache.flink.yarn.YarnJobManager                          - Status
of job c55216ab9383fd14e1d287a69a6e0f7e (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:153))
-> Map (Key Extractor 1)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer
memory
         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
         at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer
memory
         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
         at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
         at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
         ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception:
java.lang.OutOfMemoryError: Direct buffer memory
         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError:
Direct buffer memory
         at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
         at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
         at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
         at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
         at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
         at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
         at java.lang.Thread.run(Thread.java:744)
Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer
memory
         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
         ... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
         at java.nio.Bits.reserveMemory(Bits.java:658)
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
         at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
         at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
         at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
         at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)


De : ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 11:30
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

Hi Arnaud!

Which version of Flink are you using? In 0.10.1, the Netty library version that we use has
changed behavior, and allocates a lot of off-heap memory. Would be my guess that this is the
cause. In 1.0-SNAPSHOT, that should be fixed, also on 0.10-SNAPSHOT.

If that turns out to be the cause, the good news is that we started discussing a 0.10.2 maintenance
release that should also have a fix for that.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr<mailto:ALINZ@bouyguestelecom.fr>>
wrote:
Hi,

Changing for a outer join did not change the error ; nor balancing the join with another dataset
; nor dividing parallelism level by 2 ; nor increasing memory by 2.
Heap size & thread number is OK under JvisualVM.  So the problem is elsewhere.

Do Flink uses off-heap memory ? How can I monitor it ?

Thanks,
Arnaud


10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status
of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining
the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException:
I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5>
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
          at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5>
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
          at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
          at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
          ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception:
java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5>
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5>
          at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5>
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
          at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
          at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
          at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
          at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

(…)

10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association
with remote system [akka.tcp://flink@172.21.125.13:40286<http://flink@172.21.125.13:40286>]
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container
container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025]
is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used;
9.1 GB of 16.8 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :

          |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES)
RSSMEM_USAGE(PAGES) FULL_CMD_LINE

          |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java
-Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log
-Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner
--configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out
2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err
--streamingMode batch

          |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java
-Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log
-Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner
--configDir . --streamingMode batch



Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143



10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager



De : LINZ, Arnaud
Envoyé : lundi 1 février 2016 09:40
À : user@flink.apache.org<mailto:user@flink.apache.org>
Objet : RE: Left join with unbalanced dataset

Hi,
Thanks, I can’t believe I missed the outer join operators… Will try them and will keep
you informed.
I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the
one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files
at once, and doubling the amount of memory did not solve the problem.
Arnaud


De : ewenstephan@gmail.com<mailto:ewenstephan@gmail.com> [mailto:ewenstephan@gmail.com]
De la part de Stephan Ewen
Envoyé : dimanche 31 janvier 2016 20:57
À : user@flink.apache.org<mailto:user@flink.apache.org>
Objet : Re: Left join with unbalanced dataset

Hi!

YARN killing the application seems strange. The memory use that YARN sees should not change
even when one node gets a lot or data.

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap
memory or not?

Thanks,
Stephan


On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <trohrmann@apache.org<mailto:trohrmann@apache.org>>
wrote:
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if they all share the
same key. Otherwise, they will be evenly spread out across your cluster. However, I would
also recommend you to use Flink's leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <chiwanpark@apache.org<mailto:chiwanpark@apache.org>>
wrote:
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation.
For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join
execution strategy using some statistics of the datasets such as size of the dataset. Additionally,
you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation
in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr<mailto:ALINZ@bouyguestelecom.fr>>
wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That
is to say, half of A records will be matched with one record of B, and the other half with
null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to
memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones),
and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets
?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice
ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation
ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message,
merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that
sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized
use or dissemination is prohibited. If you are not the intended recipient of this message,
then please delete it and notify the sender.



Mime
View raw message