flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re:
Date Tue, 20 Oct 2015 13:38:28 GMT
@Jakob: If you use Flink standalone (not through YARN), one thing to be
aware of is that the relevant change is in the bash scripts that start the
cluster, not the code. If you upgraded Flink by copying a newer JAR file,
you missed the update of the bash scripts and missed the fix for that issue.

On Tue, Oct 20, 2015 at 10:39 AM, Maximilian Michels <mxm@apache.org> wrote:

> Hi Jakob,
>
> Your revision number is fairly new and your direct memory
> configuration seems to be correct for your setup. If you have the
> time, you could verify that the memory flags for the JVM are set
> correctly by the startup script. You can see that in the first lines
> of the task manager log. If the direct memory was set to 2GB with the
> default number of network buffers, the JVM should have had enough
> direct memory. Still, we'd like to find out what caused your problem.
>
> Are you running on YARN or standalone?
>
> Yes, the usual setup is one task manager per host/VM. The task manager
> will allocate all memory upfront. However, a large part of this memory
> will be self-managed by Flink and not touched much by the GC. By
> default, this is 0.7 of the configured heap memory. You can control
> this ratio with the taskmanager.memory.fraction variable. You can also
> set a fixed managed memory size using taskmanager.memory.size (MB). In
> large memory setups, we have seen a slightly better performance using
> off-heap memory allocation. This can be configured using
> taskmanager.memory.off-heap: true.
>
> Please let us know if you experience any further issues.
>
> Best,
> Max
>
> On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
> <jakob.ericsson@gmail.com> wrote:
> > The revision is "Starting JobManager (Version: 0.10-SNAPSHOT,
> Rev:c82ebbf,
> > Date:15.10.2015 @ 11:34:01 CEST)"
> >
> > We have a lot of memory left on the machine. I have increased it quite a
> > lot.
> >
> > What is your thought on memory configuration?
> > If I understand Flink correctly, you should only have one taskmanager
> > running each host?
> >
> > For a pretty standard machine with 16 cores and 32-64 GB memory. This
> means
> > that you will have one java process running with a Xmx30G or even higher
> for
> > exhausting all memory of the machine. This is, at least for the CMS GC,
> not
> > the most optimal configuration.
> > It might be viable for G1 but we got some really serious java core dumps
> > when running G1.
> >
> > I looked a bit on the flags that was set on the process and it seems that
> > Xmx and MaxDirectMemorySize are set to the same value by the shell
> script.
> > When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
> > running with a taskmanager.heap.mb:2048. So the direct memory buffer was
> set
> > to 2GB.
> >
> > I have restarted the process with G1 again and 20GB as
> taskmanager.heap.mb.
> > Lets see if it will be stable during the night.
> >
> >
> > On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <mxm@apache.org>
> wrote:
> >>
> >> You can see the revision number and the build date in the JobManager
> >> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
> >> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
> >>
> >> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <mxm@apache.org>
> >> wrote:
> >> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
> >> > If it has been more than a couple of weeks, then I'd advise you to
> >> > update to the latest snapshot version. There has been an issue with
> >> > the calculation of the off-heap memory limit in the past.
> >> >
> >> > Thanks,
> >> > Max
> >> >
> >> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.fora@gmail.com>
> >> > wrote:
> >> >> It's 0.10-SNAPSHOT
> >> >>
> >> >> Gyula
> >> >>
> >> >> Maximilian Michels <mxm@apache.org> ezt írta (időpont: 2015.
okt.
> 19.,
> >> >> H,
> >> >> 17:13):
> >> >>>
> >> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1
or
> >> >>> 0.10-SNAPSHOT?
> >> >>>
> >> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <mxm@apache.org
> >
> >> >>> wrote:
> >> >>> > Hi Jakob,
> >> >>> >
> >> >>> > Thanks. Flink allocates its network memory as direct memory
> outside
> >> >>> > the normal Java heap. By default, that is 64MB but can grow
up to
> >> >>> > 128MB on heavy network transfer. How much memory does your
machine
> >> >>> > have? Could it be that your upper memory bound is lower than
2048
> +
> >> >>> > 128 MB?
> >> >>> >
> >> >>> > Best,
> >> >>> > Max
> >> >>> >
> >> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
> >> >>> > <jakob.ericsson@gmail.com> wrote:
> >> >>> >> Hi,
> >> >>> >>
> >> >>> >> See answers below.
> >> >>> >>
> >> >>> >> /Jakob
> >> >>> >>
> >> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels
> >> >>> >> <mxm@apache.org>
> >> >>> >> wrote:
> >> >>> >>>
> >> >>> >>> Hi Jakob,
> >> >>> >>>
> >> >>> >>> Thank you for reporting the bug. Could you please
post your
> >> >>> >>> configuration here? In particular, could you please
tell us the
> >> >>> >>> value
> >> >>> >>> of the following configuration variables:
> >> >>> >>>
> >> >>> >>> taskmanager.heap.mb
> >> >>> >>
> >> >>> >> taskmanager.heap.mb: 2048
> >> >>> >>>
> >> >>> >>> taskmanager.network.numberOfBuffers
> >> >>> >>
> >> >>> >>
> >> >>> >> Default value. Not changed.
> >> >>> >>
> >> >>> >>>
> >> >>> >>> taskmanager.memory.off-heap
> >> >>> >>>
> >> >>> >> Default value Not changed.
> >> >>> >>
> >> >>> >>>
> >> >>> >>> Are you running the Flink cluster in batch or streaming
mode?
> >> >>> >>>
> >> >>> >> Started in streaming mode. Running with two nodes. In
the
> cluster.
> >> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC"
due
> >> >>> >> to
> >> >>> >> some
> >> >>> >> strange java core dumps in the G1 GC.
> >> >>> >>
> >> >>> >>>
> >> >>> >>> Direct memory is used by Flink's network layer. My
guess is that
> >> >>> >>> you
> >> >>> >>> have set taskmanager.heap.mb too low (it constraints
the number
> of
> >> >>> >>> direct memory at the moment).
> >> >>> >>>
> >> >>> >>> Thank you,
> >> >>> >>> Max
> >> >>> >>>
> >> >>> >>>
> >> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
> >> >>> >>> <jakob.ericsson@gmail.com> wrote:
> >> >>> >>> > Hello,
> >> >>> >>> >
> >> >>> >>> > We are running into a strange problem with Direct
Memory
> >> >>> >>> > buffers.
> >> >>> >>> > From
> >> >>> >>> > what
> >> >>> >>> > I know, we are not using any direct memory buffers
inside our
> >> >>> >>> > code.
> >> >>> >>> > This is pretty trivial streaming application
just doing some
> >> >>> >>> > dedupliction
> >> >>> >>> > and union some kafka streams.
> >> >>> >>> >
> >> >>> >>> > /Jakob
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> > 2015-10-19 13:27:59,064 INFO
> >> >>> >>> > org.apache.flink.runtime.taskmanager.Task
> >> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4)
switched to
> >> >>> >>> > FAILED
> >> >>> >>> > with
> >> >>> >>> > exception.
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> >> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> >> >>> >>> >         at
> >> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> >> >>> >>> >         at java.lang.Thread.run(Thread.java:745)
> >> >>> >>> > Caused by: io.netty.handler.codec.DecoderException:
> >> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> >> >>> >>> >         ... 9 more
> >> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct
buffer memory
> >> >>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
> >> >>> >>> >         at
> >> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >> >>> >>> >         at
> >> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
> >> >>> >>> >         at
> >> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
> >> >>> >>> >         at
> >> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
> >> >>> >>> >         at
> >> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
> >> >>> >>> >         at
> >> >>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
> >> >>> >>> >         at
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> >> >>> >>> >
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
> >> >>> >>> >         ... 10 more
> >> >>> >>> >
> >> >>> >>
> >> >>> >>
> >
> >
>

Mime
View raw message