flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
Subject RE: Left join with unbalanced dataset
Date Wed, 03 Feb 2016 13:49:04 GMT
Hi,
Yes, I’m always a bit reluctant before installing a snapshot version « for everyone »,
and I was hoping it would suffice…
However, I’ve just recompiled everything and ran with a real 0.10.1 snapshot and everything
worked at an astounding speed with a reasonable memory amount.
Thanks for the great work and the help, as always,
Arnaud

De : Fabian Hueske [mailto:fhueske@gmail.com]
Envoyé : mercredi 3 février 2016 10:51
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

Hi Arnauld,
in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar
submitted as a batch job using the "0.10.0" flink installation"
This will not fix the Netty version error. You need to install a new Flink version or submit
the Flink job, with a new Flink version to YARN to make sure that the correct Netty version
is used.
Best, Fabian

2016-02-03 10:44 GMT+01:00 Stephan Ewen <sewen@apache.org<mailto:sewen@apache.org>>:
Hi!

I think the closed channel is actually an effect of the process kill. Before the exception,
you can see "15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL
15: SIGTERM" in the log, which means that UNIX is killing the process.
I assume that the first thing that happens is that UNIX closes the open file handles, while
the JVM shutdown hooks are still in progress. Hence the exception.

So, the root cause is still the YARN memory killer.

The log comes from release version 0.10.0.
The Netty fix came into Flink after version 0.10.1 - so it is currently only in 0.10-SNAPSHOT
(and will be in 0.10.2 in a couple of days).

Greetings,
Stephan


On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr<mailto:ALINZ@bouyguestelecom.fr>>
wrote:
Hi,

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails
with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory
debug as a web download within the day.

In the mean time, log extract :

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454
================================================================================================

…
15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting
YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current
user: datcrypt
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java
HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum
heap size: 6900 MiBytes
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME:
/usr/java/default
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop
version: 2.6.0
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program
Arguments:
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------
…
15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting
TaskManager actor
15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig
[server address: bt1shlhr/172.21.125.16<http://172.21.125.16>, server port: 47002, memory
segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's
default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use
Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use
Netty's default)]
15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages
between TaskManager and JobManager have a max timeout of 100000 milliseconds
…

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).
15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7
of the currently free heap space for Flink managed heap memory (4099 MB).
15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8
for spill files.
…
15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager
data connection information: h1r1dn06.bpa.bouyguestelecom.fr<http://h1r1dn06.bpa.bouyguestelecom.fr>
(dataPort=47002)
15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager
has 2 task slot(s).
15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory
usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]
15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying
to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager<http://flink@172.21.125.31:36518/user/jobmanager>
(attempt 1, timeout: 500 milliseconds)
15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful
registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager<http://flink@172.21.125.31:36518/user/jobmanager>),
starting network stack and library cache.
…

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering
task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output
to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)
15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink
(Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED
15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing
task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
(76/95)
15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering
task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output
to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)
15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED
SIGNAL 15: SIGTERM
15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in
task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining
the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException:
I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception:
java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in
task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining
the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException:
I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception:
java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce
(GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))
(89/95) switched to FAILED with exception.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining
the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException:
I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception:
java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce
(GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))
(87/95) switched to FAILED with exception.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining
the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException:
I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception:
java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already
closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing
task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)
15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing
task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106))
-> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)
15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering
task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce
at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))
(8d6b8b11714a27ac1ca83b39bdee577f)
15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering
task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce
at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))
(73c6c2f15159dcb134e3899064a30f33)
15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in
task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159))
-> Map (Key Extractor 1) (88/95)
com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed.
Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34>
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34<mailto:org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
        ... 25 more


(...)
______________________
15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container
container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018]
is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory
used; 13.0 GB of 25.2 GB virtual memory used. Killing container.
Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES)
RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java
-Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
-Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner
--configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out
2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err
--streamingMode batch
        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java
-Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
-Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner
--configDir . --streamingMode batch

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143




De : ewenstephan@gmail.com<mailto:ewenstephan@gmail.com> [mailto:ewenstephan@gmail.com<mailto:ewenstephan@gmail.com>]
De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : user@flink.apache.org<mailto:user@flink.apache.org>
Objet : Re: Left join with unbalanced dataset

To make sure this discussion does not go in a wrong direction:

There is no issue here with data size, or memory management. The MemoryManagement for sorting
and hashing works, and Flink handles the spilling correctly, etc.

The issue here is different
   - One possible reason is that the network stack (specifically the Netty library) allocates
too much direct (= off heap) memory for buffering the TCP connections.
   - Another reason could be leaky behavior in Hadoop's HDFS code.


@Arnaud: We need the full log of the TaskManager that initially experiences that failure,
then we can look into this. Best would be with activated memory logging, like suggested by
Ufuk.

Best,
Stephan


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice
ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation
ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message,
merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent
this message cannot therefore be held liable for its content nor attachments. Any unauthorized
use or dissemination is prohibited. If you are not the intended recipient of this message,
then please delete it and notify the sender.


Mime
View raw message